tokio/runtime/metrics/
batch.rs

1use crate::runtime::metrics::WorkerMetrics;
2
3cfg_unstable_metrics! {
4    use crate::runtime::metrics::HistogramBatch;
5}
6
7use std::sync::atomic::Ordering::Relaxed;
8use std::time::{Duration, Instant};
9
10pub(crate) struct MetricsBatch {
11    /// The total busy duration in nanoseconds.
12    busy_duration_total: u64,
13
14    /// Instant at which work last resumed (continued after park).
15    processing_scheduled_tasks_started_at: Instant,
16
17    /// Number of times the worker parked.
18    park_count: u64,
19
20    /// Number of times the worker parked and unparked.
21    park_unpark_count: u64,
22
23    #[cfg(tokio_unstable)]
24    /// Number of times the worker woke w/o doing work.
25    noop_count: u64,
26
27    #[cfg(tokio_unstable)]
28    /// Number of tasks stolen.
29    steal_count: u64,
30
31    #[cfg(tokio_unstable)]
32    /// Number of times tasks where stolen.
33    steal_operations: u64,
34
35    #[cfg(tokio_unstable)]
36    /// Number of tasks that were polled by the worker.
37    poll_count: u64,
38
39    #[cfg(tokio_unstable)]
40    /// Number of tasks polled when the worker entered park. This is used to
41    /// track the noop count.
42    poll_count_on_last_park: u64,
43
44    #[cfg(tokio_unstable)]
45    /// Number of tasks that were scheduled locally on this worker.
46    local_schedule_count: u64,
47
48    #[cfg(tokio_unstable)]
49    /// Number of tasks moved to the global queue to make space in the local
50    /// queue
51    overflow_count: u64,
52
53    #[cfg(tokio_unstable)]
54    /// If `Some`, tracks poll times in nanoseconds
55    poll_timer: Option<PollTimer>,
56}
57
58cfg_unstable_metrics! {
59    struct PollTimer {
60        /// Histogram of poll counts within each band.
61        poll_counts: HistogramBatch,
62
63        /// Instant when the most recent task started polling.
64        poll_started_at: Instant,
65    }
66}
67
68impl MetricsBatch {
69    pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
70        let now = Instant::now();
71        Self::new_unstable(worker_metrics, now)
72    }
73
74    cfg_metrics_variant! {
75        stable: {
76            #[inline(always)]
77            fn new_unstable(_worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch {
78                MetricsBatch {
79                    busy_duration_total: 0,
80                    processing_scheduled_tasks_started_at: now,
81                    park_count: 0,
82                    park_unpark_count: 0,
83                }
84            }
85        },
86        unstable: {
87            #[inline(always)]
88            fn new_unstable(worker_metrics: &WorkerMetrics, now: Instant) -> MetricsBatch {
89                MetricsBatch {
90                    park_count: 0,
91                    park_unpark_count: 0,
92                    noop_count: 0,
93                    steal_count: 0,
94                    steal_operations: 0,
95                    poll_count: 0,
96                    poll_count_on_last_park: 0,
97                    local_schedule_count: 0,
98                    overflow_count: 0,
99                    busy_duration_total: 0,
100                    processing_scheduled_tasks_started_at: now,
101                    poll_timer: worker_metrics.poll_count_histogram.as_ref().map(
102                        |worker_poll_counts| PollTimer {
103                            poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
104                            poll_started_at: now,
105                        },
106                    ),
107                }
108            }
109        }
110    }
111
112    pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
113        worker
114            .busy_duration_total
115            .store(self.busy_duration_total, Relaxed);
116
117        self.submit_unstable(worker, mean_poll_time);
118    }
119
120    cfg_metrics_variant! {
121        stable: {
122            #[inline(always)]
123            fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
124                worker.park_count.store(self.park_count, Relaxed);
125                worker
126                    .park_unpark_count
127                    .store(self.park_unpark_count, Relaxed);
128            }
129        },
130        unstable: {
131            #[inline(always)]
132            fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
133                worker.mean_poll_time.store(mean_poll_time, Relaxed);
134                worker.park_count.store(self.park_count, Relaxed);
135                worker
136                    .park_unpark_count
137                    .store(self.park_unpark_count, Relaxed);
138                worker.noop_count.store(self.noop_count, Relaxed);
139                worker.steal_count.store(self.steal_count, Relaxed);
140                worker
141                    .steal_operations
142                    .store(self.steal_operations, Relaxed);
143                worker.poll_count.store(self.poll_count, Relaxed);
144
145                worker
146                    .local_schedule_count
147                    .store(self.local_schedule_count, Relaxed);
148                worker.overflow_count.store(self.overflow_count, Relaxed);
149
150                if let Some(poll_timer) = &self.poll_timer {
151                    let dst = worker.poll_count_histogram.as_ref().unwrap();
152                    poll_timer.poll_counts.submit(dst);
153                }
154            }
155        }
156    }
157
158    cfg_metrics_variant! {
159        stable: {
160            /// The worker is about to park.
161            pub(crate) fn about_to_park(&mut self) {
162                self.park_count += 1;
163                self.park_unpark_count += 1;
164            }
165        },
166        unstable: {
167            /// The worker is about to park.
168            pub(crate) fn about_to_park(&mut self) {
169                {
170                    self.park_count += 1;
171                    self.park_unpark_count += 1;
172
173                    if self.poll_count_on_last_park == self.poll_count {
174                        self.noop_count += 1;
175                    } else {
176                        self.poll_count_on_last_park = self.poll_count;
177                    }
178                }
179            }
180        }
181    }
182    /// The worker was unparked.
183    pub(crate) fn unparked(&mut self) {
184        self.park_unpark_count += 1;
185    }
186
187    /// Start processing a batch of tasks
188    pub(crate) fn start_processing_scheduled_tasks(&mut self) {
189        self.processing_scheduled_tasks_started_at = Instant::now();
190    }
191
192    /// Stop processing a batch of tasks
193    pub(crate) fn end_processing_scheduled_tasks(&mut self) {
194        let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
195        self.busy_duration_total += duration_as_u64(busy_duration);
196    }
197
198    cfg_metrics_variant! {
199        stable: {
200            /// Start polling an individual task
201            pub(crate) fn start_poll(&mut self) {}
202        },
203        unstable: {
204            /// Start polling an individual task
205            pub(crate) fn start_poll(&mut self) {
206                self.poll_count += 1;
207                if let Some(poll_timer) = &mut self.poll_timer {
208                    poll_timer.poll_started_at = Instant::now();
209                }
210            }
211        }
212    }
213
214    cfg_metrics_variant! {
215        stable: {
216            /// Stop polling an individual task
217            pub(crate) fn end_poll(&mut self) {}
218        },
219        unstable: {
220            /// Stop polling an individual task
221            pub(crate) fn end_poll(&mut self) {
222                if let Some(poll_timer) = &mut self.poll_timer {
223                    let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
224                    poll_timer.poll_counts.measure(elapsed, 1);
225                }
226            }
227        }
228    }
229
230    cfg_metrics_variant! {
231        stable: {
232            pub(crate) fn inc_local_schedule_count(&mut self) {}
233        },
234        unstable: {
235            pub(crate) fn inc_local_schedule_count(&mut self) {
236                self.local_schedule_count += 1;
237            }
238        }
239    }
240}
241
242cfg_rt_multi_thread! {
243    impl MetricsBatch {
244        cfg_metrics_variant! {
245            stable: {
246                pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
247            },
248            unstable: {
249                pub(crate) fn incr_steal_count(&mut self, by: u16) {
250                    self.steal_count += by as u64;
251                }
252            }
253        }
254
255        cfg_metrics_variant! {
256            stable: {
257                pub(crate) fn incr_steal_operations(&mut self) {}
258            },
259            unstable: {
260                pub(crate) fn incr_steal_operations(&mut self) {
261                    self.steal_operations += 1;
262                }
263            }
264        }
265
266        cfg_metrics_variant! {
267            stable: {
268                pub(crate) fn incr_overflow_count(&mut self) {}
269            },
270            unstable: {
271                pub(crate) fn incr_overflow_count(&mut self) {
272                    self.overflow_count += 1;
273                }
274            }
275        }
276    }
277}
278
279pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
280    u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
281}