tokio/runtime/metrics/
batch.rs1use crate::runtime::metrics::WorkerMetrics;
2
3cfg_unstable_metrics! {
4 use crate::runtime::metrics::HistogramBatch;
5}
6
7use std::sync::atomic::Ordering::Relaxed;
8use std::time::{Duration, Instant};
9
10pub(crate) struct MetricsBatch {
11 busy_duration_total: u64,
13
14 processing_scheduled_tasks_started_at: Option<Instant>,
16
17 park_count: u64,
19
20 park_unpark_count: u64,
22
23 #[cfg(tokio_unstable)]
24 noop_count: u64,
26
27 #[cfg(tokio_unstable)]
28 steal_count: u64,
30
31 #[cfg(tokio_unstable)]
32 steal_operations: u64,
34
35 #[cfg(tokio_unstable)]
36 poll_count: u64,
38
39 #[cfg(tokio_unstable)]
40 poll_count_on_last_park: u64,
43
44 #[cfg(tokio_unstable)]
45 local_schedule_count: u64,
47
48 #[cfg(tokio_unstable)]
49 overflow_count: u64,
52
53 #[cfg(tokio_unstable)]
54 poll_timer: Option<PollTimer>,
56}
57
58cfg_unstable_metrics! {
59 struct PollTimer {
60 poll_counts: HistogramBatch,
62
63 poll_started_at: Instant,
65 }
66}
67
68impl MetricsBatch {
69 pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch {
70 let maybe_now = now();
71 Self::new_unstable(worker_metrics, maybe_now)
72 }
73
74 cfg_metrics_variant! {
75 stable: {
76 #[inline(always)]
77 fn new_unstable(_worker_metrics: &WorkerMetrics, maybe_now: Option<Instant>) -> MetricsBatch {
78 MetricsBatch {
79 busy_duration_total: 0,
80 processing_scheduled_tasks_started_at: maybe_now,
81 park_count: 0,
82 park_unpark_count: 0,
83 }
84 }
85 },
86 unstable: {
87 #[inline(always)]
88 fn new_unstable(worker_metrics: &WorkerMetrics, maybe_now: Option<Instant>) -> MetricsBatch {
89 let poll_timer = maybe_now.and_then(|now| {
90 worker_metrics
91 .poll_count_histogram
92 .as_ref()
93 .map(|worker_poll_counts| PollTimer {
94 poll_counts: HistogramBatch::from_histogram(worker_poll_counts),
95 poll_started_at: now,
96 })
97 });
98 MetricsBatch {
99 park_count: 0,
100 park_unpark_count: 0,
101 noop_count: 0,
102 steal_count: 0,
103 steal_operations: 0,
104 poll_count: 0,
105 poll_count_on_last_park: 0,
106 local_schedule_count: 0,
107 overflow_count: 0,
108 busy_duration_total: 0,
109 processing_scheduled_tasks_started_at: maybe_now,
110 poll_timer,
111 }
112 }
113 }
114 }
115
116 pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
117 worker
118 .busy_duration_total
119 .store(self.busy_duration_total, Relaxed);
120
121 self.submit_unstable(worker, mean_poll_time);
122 }
123
124 cfg_metrics_variant! {
125 stable: {
126 #[inline(always)]
127 fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
128 worker.park_count.store(self.park_count, Relaxed);
129 worker
130 .park_unpark_count
131 .store(self.park_unpark_count, Relaxed);
132 }
133 },
134 unstable: {
135 #[inline(always)]
136 fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
137 worker.mean_poll_time.store(mean_poll_time, Relaxed);
138 worker.park_count.store(self.park_count, Relaxed);
139 worker
140 .park_unpark_count
141 .store(self.park_unpark_count, Relaxed);
142 worker.noop_count.store(self.noop_count, Relaxed);
143 worker.steal_count.store(self.steal_count, Relaxed);
144 worker
145 .steal_operations
146 .store(self.steal_operations, Relaxed);
147 worker.poll_count.store(self.poll_count, Relaxed);
148
149 worker
150 .local_schedule_count
151 .store(self.local_schedule_count, Relaxed);
152 worker.overflow_count.store(self.overflow_count, Relaxed);
153
154 if let Some(poll_timer) = &self.poll_timer {
155 let dst = worker.poll_count_histogram.as_ref().unwrap();
156 poll_timer.poll_counts.submit(dst);
157 }
158 }
159 }
160 }
161
162 cfg_metrics_variant! {
163 stable: {
164 pub(crate) fn about_to_park(&mut self) {
166 self.park_count += 1;
167 self.park_unpark_count += 1;
168 }
169 },
170 unstable: {
171 pub(crate) fn about_to_park(&mut self) {
173 {
174 self.park_count += 1;
175 self.park_unpark_count += 1;
176
177 if self.poll_count_on_last_park == self.poll_count {
178 self.noop_count += 1;
179 } else {
180 self.poll_count_on_last_park = self.poll_count;
181 }
182 }
183 }
184 }
185 }
186 pub(crate) fn unparked(&mut self) {
188 self.park_unpark_count += 1;
189 }
190
191 pub(crate) fn start_processing_scheduled_tasks(&mut self) {
193 self.processing_scheduled_tasks_started_at = now();
194 }
195
196 pub(crate) fn end_processing_scheduled_tasks(&mut self) {
198 if let Some(processing_scheduled_tasks_started_at) =
199 self.processing_scheduled_tasks_started_at
200 {
201 let busy_duration = processing_scheduled_tasks_started_at.elapsed();
202 self.busy_duration_total += duration_as_u64(busy_duration);
203 }
204 }
205
206 cfg_metrics_variant! {
207 stable: {
208 pub(crate) fn start_poll(&mut self) {}
210 },
211 unstable: {
212 pub(crate) fn start_poll(&mut self) {
214 self.poll_count += 1;
215 if let Some(poll_timer) = &mut self.poll_timer {
216 poll_timer.poll_started_at = Instant::now();
217 }
218 }
219 }
220 }
221
222 cfg_metrics_variant! {
223 stable: {
224 pub(crate) fn end_poll(&mut self) {}
226 },
227 unstable: {
228 pub(crate) fn end_poll(&mut self) {
230 if let Some(poll_timer) = &mut self.poll_timer {
231 let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed());
232 poll_timer.poll_counts.measure(elapsed, 1);
233 }
234 }
235 }
236 }
237
238 cfg_metrics_variant! {
239 stable: {
240 pub(crate) fn inc_local_schedule_count(&mut self) {}
241 },
242 unstable: {
243 pub(crate) fn inc_local_schedule_count(&mut self) {
244 self.local_schedule_count += 1;
245 }
246 }
247 }
248}
249
250cfg_rt_multi_thread! {
251 impl MetricsBatch {
252 cfg_metrics_variant! {
253 stable: {
254 pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
255 },
256 unstable: {
257 pub(crate) fn incr_steal_count(&mut self, by: u16) {
258 self.steal_count += by as u64;
259 }
260 }
261 }
262
263 cfg_metrics_variant! {
264 stable: {
265 pub(crate) fn incr_steal_operations(&mut self) {}
266 },
267 unstable: {
268 pub(crate) fn incr_steal_operations(&mut self) {
269 self.steal_operations += 1;
270 }
271 }
272 }
273
274 cfg_metrics_variant! {
275 stable: {
276 pub(crate) fn incr_overflow_count(&mut self) {}
277 },
278 unstable: {
279 pub(crate) fn incr_overflow_count(&mut self) {
280 self.overflow_count += 1;
281 }
282 }
283 }
284 }
285}
286
287pub(crate) fn duration_as_u64(dur: Duration) -> u64 {
288 u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
289}
290
291fn now() -> Option<Instant> {
294 if cfg!(all(
295 target_arch = "wasm32",
296 target_os = "unknown",
297 target_vendor = "unknown"
298 )) {
299 None
300 } else {
301 Some(Instant::now())
302 }
303}