tokio/runtime/metrics/
batch.rs1use crate::runtime::metrics::WorkerMetrics;
2
3cfg_unstable_metrics! {
4 use crate::runtime::metrics::HistogramBatch;
5}
6
7use std::sync::atomic::Ordering::Relaxed;
8use std::time::{Duration, Instant};
9
10pub(crate) struct MetricsBatch {
11 busy_duration_total: u64,
13
14 processing_scheduled_tasks_started_at: Instant,
16
17 park_count: u64,
19
20 park_unpark_count: u64,
22
23 #[cfg(tokio_unstable)]
24 noop_count: u64,
26
27 #[cfg(tokio_unstable)]
28 steal_count: u64,
30
31 #[cfg(tokio_unstable)]
32 steal_operations: u64,
34
35 #[cfg(tokio_unstable)]
36 poll_count: u64,
38
39 #[cfg(tokio_unstable)]
40 poll_count_on_last_park: u64,
43
44 #[cfg(tokio_unstable)]
45 local_schedule_count: u64,
47
48 #[cfg(tokio_unstable)]
49 overflow_count: u64,
52
53 #[cfg(tokio_unstable)]
54 poll_timer: Option<PollTimer>,
56}
57
58cfg_unstable_metrics! {
59 struct PollTimer {
60 poll_counts: HistogramBatch,
62
63 poll_started_at: Instant,
65 }
66}
67
68impl MetricsBatch {
69 pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
70 let now = Instant::now();
71 Self::new_unstable(worker_metrics, now)
72 }
73
74 cfg_metrics_variant! {
75 stable: {
76 #[inline(always)]
77 fn new_unstable(_worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch {
78 MetricsBatch {
79 busy_duration_total: 0,
80 processing_scheduled_tasks_started_at: now,
81 park_count: 0,
82 park_unpark_count: 0,
83 }
84 }
85 },
86 unstable: {
87 #[inline(always)]
88 fn new_unstable(worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch {
89 MetricsBatch {
90 park_count: 0,
91 park_unpark_count: 0,
92 noop_count: 0,
93 steal_count: 0,
94 steal_operations: 0,
95 poll_count: 0,
96 poll_count_on_last_park: 0,
97 local_schedule_count: 0,
98 overflow_count: 0,
99 busy_duration_total: 0,
100 processing_scheduled_tasks_started_at: now,
101 poll_timer: worker_metrics.poll_count_histogram.as_ref().map(
102 |worker_poll_counts| PollTimer {
103 poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
104 poll_started_at: now,
105 },
106 ),
107 }
108 }
109 }
110 }
111
112 pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
113 worker
114 .busy_duration_total
115 .store(self.busy_duration_total, Relaxed);
116
117 self.submit_unstable(worker, mean_poll_time);
118 }
119
120 cfg_metrics_variant! {
121 stable: {
122 #[inline(always)]
123 fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
124 worker.park_count.store(self.park_count, Relaxed);
125 worker
126 .park_unpark_count
127 .store(self.park_unpark_count, Relaxed);
128 }
129 },
130 unstable: {
131 #[inline(always)]
132 fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
133 worker.mean_poll_time.store(mean_poll_time, Relaxed);
134 worker.park_count.store(self.park_count, Relaxed);
135 worker
136 .park_unpark_count
137 .store(self.park_unpark_count, Relaxed);
138 worker.noop_count.store(self.noop_count, Relaxed);
139 worker.steal_count.store(self.steal_count, Relaxed);
140 worker
141 .steal_operations
142 .store(self.steal_operations, Relaxed);
143 worker.poll_count.store(self.poll_count, Relaxed);
144
145 worker
146 .local_schedule_count
147 .store(self.local_schedule_count, Relaxed);
148 worker.overflow_count.store(self.overflow_count, Relaxed);
149
150 if let Some(poll_timer) = &self.poll_timer {
151 let dst = worker.poll_count_histogram.as_ref().unwrap();
152 poll_timer.poll_counts.submit(dst);
153 }
154 }
155 }
156 }
157
158 cfg_metrics_variant! {
159 stable: {
160 pub(crate) fn about_to_park(&mut self) {
162 self.park_count += 1;
163 self.park_unpark_count += 1;
164 }
165 },
166 unstable: {
167 pub(crate) fn about_to_park(&mut self) {
169 {
170 self.park_count += 1;
171 self.park_unpark_count += 1;
172
173 if self.poll_count_on_last_park == self.poll_count {
174 self.noop_count += 1;
175 } else {
176 self.poll_count_on_last_park = self.poll_count;
177 }
178 }
179 }
180 }
181 }
182 pub(crate) fn unparked(&mut self) {
184 self.park_unpark_count += 1;
185 }
186
187 pub(crate) fn start_processing_scheduled_tasks(&mut self) {
189 self.processing_scheduled_tasks_started_at = Instant::now();
190 }
191
192 pub(crate) fn end_processing_scheduled_tasks(&mut self) {
194 let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
195 self.busy_duration_total += duration_as_u64(busy_duration);
196 }
197
198 cfg_metrics_variant! {
199 stable: {
200 pub(crate) fn start_poll(&mut self) {}
202 },
203 unstable: {
204 pub(crate) fn start_poll(&mut self) {
206 self.poll_count += 1;
207 if let Some(poll_timer) = &mut self.poll_timer {
208 poll_timer.poll_started_at = Instant::now();
209 }
210 }
211 }
212 }
213
214 cfg_metrics_variant! {
215 stable: {
216 pub(crate) fn end_poll(&mut self) {}
218 },
219 unstable: {
220 pub(crate) fn end_poll(&mut self) {
222 if let Some(poll_timer) = &mut self.poll_timer {
223 let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
224 poll_timer.poll_counts.measure(elapsed, 1);
225 }
226 }
227 }
228 }
229
230 cfg_metrics_variant! {
231 stable: {
232 pub(crate) fn inc_local_schedule_count(&mut self) {}
233 },
234 unstable: {
235 pub(crate) fn inc_local_schedule_count(&mut self) {
236 self.local_schedule_count += 1;
237 }
238 }
239 }
240}
241
242cfg_rt_multi_thread! {
243 impl MetricsBatch {
244 cfg_metrics_variant! {
245 stable: {
246 pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
247 },
248 unstable: {
249 pub(crate) fn incr_steal_count(&mut self, by: u16) {
250 self.steal_count += by as u64;
251 }
252 }
253 }
254
255 cfg_metrics_variant! {
256 stable: {
257 pub(crate) fn incr_steal_operations(&mut self) {}
258 },
259 unstable: {
260 pub(crate) fn incr_steal_operations(&mut self) {
261 self.steal_operations += 1;
262 }
263 }
264 }
265
266 cfg_metrics_variant! {
267 stable: {
268 pub(crate) fn incr_overflow_count(&mut self) {}
269 },
270 unstable: {
271 pub(crate) fn incr_overflow_count(&mut self) {
272 self.overflow_count += 1;
273 }
274 }
275 }
276 }
277}
278
279pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
280 u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
281}