tokio/runtime/metrics/
batch.rs

1use crate::runtime::metrics::WorkerMetrics;
2
3cfg_unstable_metrics! {
4    use crate::runtime::metrics::HistogramBatch;
5}
6
7use std::sync::atomic::Ordering::Relaxed;
8use std::time::{Duration, Instant};
9
10pub(crate) struct MetricsBatch {
11    /// The total busy duration in nanoseconds.
12    busy_duration_total: u64,
13
14    /// Instant at which work last resumed (continued after park).
15    processing_scheduled_tasks_started_at: Option<Instant>,
16
17    /// Number of times the worker parked.
18    park_count: u64,
19
20    /// Number of times the worker parked and unparked.
21    park_unpark_count: u64,
22
23    #[cfg(tokio_unstable)]
24    /// Number of times the worker woke w/o doing work.
25    noop_count: u64,
26
27    #[cfg(tokio_unstable)]
28    /// Number of tasks stolen.
29    steal_count: u64,
30
31    #[cfg(tokio_unstable)]
32    /// Number of times tasks where stolen.
33    steal_operations: u64,
34
35    #[cfg(tokio_unstable)]
36    /// Number of tasks that were polled by the worker.
37    poll_count: u64,
38
39    #[cfg(tokio_unstable)]
40    /// Number of tasks polled when the worker entered park. This is used to
41    /// track the noop count.
42    poll_count_on_last_park: u64,
43
44    #[cfg(tokio_unstable)]
45    /// Number of tasks that were scheduled locally on this worker.
46    local_schedule_count: u64,
47
48    #[cfg(tokio_unstable)]
49    /// Number of tasks moved to the global queue to make space in the local
50    /// queue
51    overflow_count: u64,
52
53    #[cfg(tokio_unstable)]
54    /// If `Some`, tracks poll times in nanoseconds
55    poll_timer: Option<PollTimer>,
56}
57
58cfg_unstable_metrics! {
59    struct PollTimer {
60        /// Histogram of poll counts within each band.
61        poll_counts: HistogramBatch,
62
63        /// Instant when the most recent task started polling.
64        poll_started_at: Instant,
65    }
66}
67
68impl MetricsBatch {
69    pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
70        let maybe_now = now();
71        Self::new_unstable(worker_metrics, maybe_now)
72    }
73
74    cfg_metrics_variant! {
75        stable: {
76            #[inline(always)]
77            fn new_unstable(_worker_metrics: &WorkerMetrics, maybe_now: Option<Instant>) -> MetricsBatch {
78                MetricsBatch {
79                    busy_duration_total: 0,
80                    processing_scheduled_tasks_started_at: maybe_now,
81                    park_count: 0,
82                    park_unpark_count: 0,
83                }
84            }
85        },
86        unstable: {
87            #[inline(always)]
88            fn new_unstable(worker_metrics: &WorkerMetrics, maybe_now: Option<Instant>) -> MetricsBatch {
89                let poll_timer = maybe_now.and_then(|now| {
90                    worker_metrics
91                        .poll_count_histogram
92                        .as_ref()
93                        .map(|worker_poll_counts| PollTimer {
94                            poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
95                            poll_started_at: now,
96                        })
97                });
98                MetricsBatch {
99                    park_count: 0,
100                    park_unpark_count: 0,
101                    noop_count: 0,
102                    steal_count: 0,
103                    steal_operations: 0,
104                    poll_count: 0,
105                    poll_count_on_last_park: 0,
106                    local_schedule_count: 0,
107                    overflow_count: 0,
108                    busy_duration_total: 0,
109                    processing_scheduled_tasks_started_at: maybe_now,
110                    poll_timer,
111                }
112            }
113        }
114    }
115
116    pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
117        worker
118            .busy_duration_total
119            .store(self.busy_duration_total, Relaxed);
120
121        self.submit_unstable(worker, mean_poll_time);
122    }
123
124    cfg_metrics_variant! {
125        stable: {
126            #[inline(always)]
127            fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
128                worker.park_count.store(self.park_count, Relaxed);
129                worker
130                    .park_unpark_count
131                    .store(self.park_unpark_count, Relaxed);
132            }
133        },
134        unstable: {
135            #[inline(always)]
136            fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
137                worker.mean_poll_time.store(mean_poll_time, Relaxed);
138                worker.park_count.store(self.park_count, Relaxed);
139                worker
140                    .park_unpark_count
141                    .store(self.park_unpark_count, Relaxed);
142                worker.noop_count.store(self.noop_count, Relaxed);
143                worker.steal_count.store(self.steal_count, Relaxed);
144                worker
145                    .steal_operations
146                    .store(self.steal_operations, Relaxed);
147                worker.poll_count.store(self.poll_count, Relaxed);
148
149                worker
150                    .local_schedule_count
151                    .store(self.local_schedule_count, Relaxed);
152                worker.overflow_count.store(self.overflow_count, Relaxed);
153
154                if let Some(poll_timer) = &self.poll_timer {
155                    let dst = worker.poll_count_histogram.as_ref().unwrap();
156                    poll_timer.poll_counts.submit(dst);
157                }
158            }
159        }
160    }
161
162    cfg_metrics_variant! {
163        stable: {
164            /// The worker is about to park.
165            pub(crate) fn about_to_park(&mut self) {
166                self.park_count += 1;
167                self.park_unpark_count += 1;
168            }
169        },
170        unstable: {
171            /// The worker is about to park.
172            pub(crate) fn about_to_park(&mut self) {
173                {
174                    self.park_count += 1;
175                    self.park_unpark_count += 1;
176
177                    if self.poll_count_on_last_park == self.poll_count {
178                        self.noop_count += 1;
179                    } else {
180                        self.poll_count_on_last_park = self.poll_count;
181                    }
182                }
183            }
184        }
185    }
186    /// The worker was unparked.
187    pub(crate) fn unparked(&mut self) {
188        self.park_unpark_count += 1;
189    }
190
191    /// Start processing a batch of tasks
192    pub(crate) fn start_processing_scheduled_tasks(&mut self) {
193        self.processing_scheduled_tasks_started_at = now();
194    }
195
196    /// Stop processing a batch of tasks
197    pub(crate) fn end_processing_scheduled_tasks(&mut self) {
198        if let Some(processing_scheduled_tasks_started_at) =
199            self.processing_scheduled_tasks_started_at
200        {
201            let busy_duration = processing_scheduled_tasks_started_at.elapsed();
202            self.busy_duration_total += duration_as_u64(busy_duration);
203        }
204    }
205
206    cfg_metrics_variant! {
207        stable: {
208            /// Start polling an individual task
209            pub(crate) fn start_poll(&mut self) {}
210        },
211        unstable: {
212            /// Start polling an individual task
213            pub(crate) fn start_poll(&mut self) {
214                self.poll_count += 1;
215                if let Some(poll_timer) = &mut self.poll_timer {
216                    poll_timer.poll_started_at = Instant::now();
217                }
218            }
219        }
220    }
221
222    cfg_metrics_variant! {
223        stable: {
224            /// Stop polling an individual task
225            pub(crate) fn end_poll(&mut self) {}
226        },
227        unstable: {
228            /// Stop polling an individual task
229            pub(crate) fn end_poll(&mut self) {
230                if let Some(poll_timer) = &mut self.poll_timer {
231                    let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
232                    poll_timer.poll_counts.measure(elapsed, 1);
233                }
234            }
235        }
236    }
237
238    cfg_metrics_variant! {
239        stable: {
240            pub(crate) fn inc_local_schedule_count(&mut self) {}
241        },
242        unstable: {
243            pub(crate) fn inc_local_schedule_count(&mut self) {
244                self.local_schedule_count += 1;
245            }
246        }
247    }
248}
249
250cfg_rt_multi_thread! {
251    impl MetricsBatch {
252        cfg_metrics_variant! {
253            stable: {
254                pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
255            },
256            unstable: {
257                pub(crate) fn incr_steal_count(&mut self, by: u16) {
258                    self.steal_count += by as u64;
259                }
260            }
261        }
262
263        cfg_metrics_variant! {
264            stable: {
265                pub(crate) fn incr_steal_operations(&mut self) {}
266            },
267            unstable: {
268                pub(crate) fn incr_steal_operations(&mut self) {
269                    self.steal_operations += 1;
270                }
271            }
272        }
273
274        cfg_metrics_variant! {
275            stable: {
276                pub(crate) fn incr_overflow_count(&mut self) {}
277            },
278            unstable: {
279                pub(crate) fn incr_overflow_count(&mut self) {
280                    self.overflow_count += 1;
281                }
282            }
283        }
284    }
285}
286
287pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
288    u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
289}
290
291/// Gate unsupported time metrics for `wasm32-unknown-unknown`
292/// <https://github.com/tokio-rs/tokio/issues/7319>
293fn now() -> Option<Instant> {
294    if cfg!(all(
295        target_arch = "wasm32",
296        target_os = "unknown",
297        target_vendor = "unknown"
298    )) {
299        None
300    } else {
301        Some(Instant::now())
302    }
303}