tokio/task/coop/mod.rs
1#![cfg_attr(not(feature = "full"), allow(dead_code))]
2#![cfg_attr(not(feature = "rt"), allow(unreachable_pub))]
3
4//! Utilities for improved cooperative scheduling.
5//!
6//! ### Cooperative scheduling
7//!
8//! A single call to [`poll`] on a top-level task may potentially do a lot of
9//! work before it returns `Poll::Pending`. If a task runs for a long period of
10//! time without yielding back to the executor, it can starve other tasks
11//! waiting on that executor to execute them, or drive underlying resources.
12//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
13//! long-running task. Instead, this module provides an opt-in mechanism for
14//! futures to collaborate with the executor to avoid starvation.
15//!
16//! Consider a future like this one:
17//!
18//! ```
19//! # use tokio_stream::{Stream, StreamExt};
20//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
21//! while let Some(_) = input.next().await {}
22//! }
23//! ```
24//!
25//! It may look harmless, but consider what happens under heavy load if the
26//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
27//! yield, and will starve other tasks and resources on the same executor.
28//!
29//! To account for this, Tokio has explicit yield points in a number of library
30//! functions, which force tasks to return to the executor periodically.
31//!
32//!
33//! #### unconstrained
34//!
35//! If necessary, [`task::unconstrained`] lets you opt a future out of Tokio's cooperative
36//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
37//! Tokio. For example:
38//!
39//! ```
40//! # #[tokio::main]
41//! # async fn main() {
42//! use tokio::{task, sync::mpsc};
43//!
44//! let fut = async {
45//! let (tx, mut rx) = mpsc::unbounded_channel();
46//!
47//! for i in 0..1000 {
48//! let _ = tx.send(());
49//! // This will always be ready. If coop was in effect, this code would be forced to yield
50//! // periodically. However, if left unconstrained, then this code will never yield.
51//! rx.recv().await;
52//! }
53//! };
54//!
55//! task::coop::unconstrained(fut).await;
56//! # }
57//! ```
58//! [`poll`]: method@std::future::Future::poll
59//! [`task::unconstrained`]: crate::task::unconstrained()
60
61cfg_rt! {
62 mod consume_budget;
63 pub use consume_budget::consume_budget;
64
65 mod unconstrained;
66 pub use unconstrained::{unconstrained, Unconstrained};
67}
68
69// ```ignore
70// # use tokio_stream::{Stream, StreamExt};
71// async fn drop_all<I: Stream + Unpin>(mut input: I) {
72// while let Some(_) = input.next().await {
73// tokio::coop::proceed().await;
74// }
75// }
76// ```
77//
78// The `proceed` future will coordinate with the executor to make sure that
79// every so often control is yielded back to the executor so it can run other
80// tasks.
81//
82// # Placing yield points
83//
84// Voluntary yield points should be placed _after_ at least some work has been
85// done. If they are not, a future sufficiently deep in the task hierarchy may
86// end up _never_ getting to run because of the number of yield points that
87// inevitably appear before it is reached. In general, you will want yield
88// points to only appear in "leaf" futures -- those that do not themselves poll
89// other futures. By doing this, you avoid double-counting each iteration of
90// the outer future against the cooperating budget.
91
92use crate::runtime::context;
93
94/// Opaque type tracking the amount of "work" a task may still do before
95/// yielding back to the scheduler.
96#[derive(Debug, Copy, Clone)]
97pub(crate) struct Budget(Option<u8>);
98
99pub(crate) struct BudgetDecrement {
100 success: bool,
101 hit_zero: bool,
102}
103
104impl Budget {
105 /// Budget assigned to a task on each poll.
106 ///
107 /// The value itself is chosen somewhat arbitrarily. It needs to be high
108 /// enough to amortize wakeup and scheduling costs, but low enough that we
109 /// do not starve other tasks for too long. The value also needs to be high
110 /// enough that particularly deep tasks are able to do at least some useful
111 /// work at all.
112 ///
113 /// Note that as more yield points are added in the ecosystem, this value
114 /// will probably also have to be raised.
115 const fn initial() -> Budget {
116 Budget(Some(128))
117 }
118
119 /// Returns an unconstrained budget. Operations will not be limited.
120 pub(crate) const fn unconstrained() -> Budget {
121 Budget(None)
122 }
123
124 fn has_remaining(self) -> bool {
125 self.0.map_or(true, |budget| budget > 0)
126 }
127}
128
129/// Runs the given closure with a cooperative task budget. When the function
130/// returns, the budget is reset to the value prior to calling the function.
131#[inline(always)]
132pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
133 with_budget(Budget::initial(), f)
134}
135
136/// Runs the given closure with an unconstrained task budget. When the function returns, the budget
137/// is reset to the value prior to calling the function.
138#[inline(always)]
139pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
140 with_budget(Budget::unconstrained(), f)
141}
142
143#[inline(always)]
144fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
145 struct ResetGuard {
146 prev: Budget,
147 }
148
149 impl Drop for ResetGuard {
150 fn drop(&mut self) {
151 let _ = context::budget(|cell| {
152 cell.set(self.prev);
153 });
154 }
155 }
156
157 #[allow(unused_variables)]
158 let maybe_guard = context::budget(|cell| {
159 let prev = cell.get();
160 cell.set(budget);
161
162 ResetGuard { prev }
163 });
164
165 // The function is called regardless even if the budget is not successfully
166 // set due to the thread-local being destroyed.
167 f()
168}
169
170/// Returns `true` if there is still budget left on the task.
171///
172/// # Examples
173///
174/// This example defines a `Timeout` future that requires a given `future` to complete before the
175/// specified duration elapses. If it does, its result is returned; otherwise, an error is returned
176/// and the future is canceled.
177///
178/// Note that the future could exhaust the budget before we evaluate the timeout. Using `has_budget_remaining`,
179/// we can detect this scenario and ensure the timeout is always checked.
180///
181/// ```
182/// # use std::future::Future;
183/// # use std::pin::{pin, Pin};
184/// # use std::task::{ready, Context, Poll};
185/// # use tokio::task::coop;
186/// # use tokio::time::Sleep;
187/// pub struct Timeout<T> {
188/// future: T,
189/// delay: Pin<Box<Sleep>>,
190/// }
191///
192/// impl<T> Future for Timeout<T>
193/// where
194/// T: Future + Unpin,
195/// {
196/// type Output = Result<T::Output, ()>;
197///
198/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199/// let this = Pin::into_inner(self);
200/// let future = Pin::new(&mut this.future);
201/// let delay = Pin::new(&mut this.delay);
202///
203/// // check if the future is ready
204/// let had_budget_before = coop::has_budget_remaining();
205/// if let Poll::Ready(v) = future.poll(cx) {
206/// return Poll::Ready(Ok(v));
207/// }
208/// let has_budget_now = coop::has_budget_remaining();
209///
210/// // evaluate the timeout
211/// if let (true, false) = (had_budget_before, has_budget_now) {
212/// // it is the underlying future that exhausted the budget
213/// ready!(pin!(coop::unconstrained(delay)).poll(cx));
214/// } else {
215/// ready!(delay.poll(cx));
216/// }
217/// return Poll::Ready(Err(()));
218/// }
219/// }
220///```
221#[inline(always)]
222#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
223pub fn has_budget_remaining() -> bool {
224 // If the current budget cannot be accessed due to the thread-local being
225 // shutdown, then we assume there is budget remaining.
226 context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
227}
228
229cfg_rt_multi_thread! {
230 /// Sets the current task's budget.
231 pub(crate) fn set(budget: Budget) {
232 let _ = context::budget(|cell| cell.set(budget));
233 }
234}
235
236cfg_rt! {
237 /// Forcibly removes the budgeting constraints early.
238 ///
239 /// Returns the remaining budget
240 pub(crate) fn stop() -> Budget {
241 context::budget(|cell| {
242 let prev = cell.get();
243 cell.set(Budget::unconstrained());
244 prev
245 }).unwrap_or(Budget::unconstrained())
246 }
247}
248
249cfg_coop! {
250 use pin_project_lite::pin_project;
251 use std::cell::Cell;
252 use std::future::Future;
253 use std::marker::PhantomData;
254 use std::pin::Pin;
255 use std::task::{ready, Context, Poll};
256
257 /// Value returned by the [`poll_proceed`] method.
258 #[derive(Debug)]
259 #[must_use]
260 pub struct RestoreOnPending(Cell<Budget>, PhantomData<*mut ()>);
261
262 impl RestoreOnPending {
263 fn new(budget: Budget) -> Self {
264 RestoreOnPending(
265 Cell::new(budget),
266 PhantomData,
267 )
268 }
269
270 /// Signals that the task that obtained this `RestoreOnPending` was able to make
271 /// progress. This prevents the task budget from being restored to the value
272 /// it had prior to obtaining this instance when it is dropped.
273 pub fn made_progress(&self) {
274 self.0.set(Budget::unconstrained());
275 }
276 }
277
278 impl Drop for RestoreOnPending {
279 fn drop(&mut self) {
280 // Don't reset if budget was unconstrained or if we made progress.
281 // They are both represented as the remembered budget being unconstrained.
282 let budget = self.0.get();
283 if !budget.is_unconstrained() {
284 let _ = context::budget(|cell| {
285 cell.set(budget);
286 });
287 }
288 }
289 }
290
291 /// Decrements the task budget and returns [`Poll::Pending`] if the budget is depleted.
292 /// This indicates that the task should yield to the scheduler. Otherwise, returns
293 /// [`RestoreOnPending`] which can be used to commit the budget consumption.
294 ///
295 /// The returned [`RestoreOnPending`] will revert the budget to its former
296 /// value when dropped unless [`RestoreOnPending::made_progress`]
297 /// is called. It is the caller's responsibility to do so when it _was_ able to
298 /// make progress after the call to [`poll_proceed`].
299 /// Restoring the budget automatically ensures the task can try to make progress in some other
300 /// way.
301 ///
302 /// Note that [`RestoreOnPending`] restores the budget **as it was before [`poll_proceed`]**.
303 /// Therefore, if the budget is _further_ adjusted between when [`poll_proceed`] returns and
304 /// [`RestoreOnPending`] is dropped, those adjustments are erased unless the caller indicates
305 /// that progress was made.
306 ///
307 /// # Examples
308 ///
309 /// This example shows a simple countdown latch that uses [`poll_proceed`] to participate in
310 /// cooperative scheduling.
311 ///
312 /// ```
313 /// use std::future::{Future};
314 /// use std::pin::Pin;
315 /// use std::task::{ready, Context, Poll, Waker};
316 /// use tokio::task::coop;
317 ///
318 /// struct CountdownLatch<T> {
319 /// counter: usize,
320 /// value: Option<T>,
321 /// waker: Option<Waker>
322 /// }
323 ///
324 /// impl<T> CountdownLatch<T> {
325 /// fn new(value: T, count: usize) -> Self {
326 /// CountdownLatch {
327 /// counter: count,
328 /// value: Some(value),
329 /// waker: None
330 /// }
331 /// }
332 /// fn count_down(&mut self) {
333 /// if self.counter <= 0 {
334 /// return;
335 /// }
336 ///
337 /// self.counter -= 1;
338 /// if self.counter == 0 {
339 /// if let Some(w) = self.waker.take() {
340 /// w.wake();
341 /// }
342 /// }
343 /// }
344 /// }
345 ///
346 /// impl<T> Future for CountdownLatch<T> {
347 /// type Output = T;
348 ///
349 /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350 /// // `poll_proceed` checks with the runtime if this task is still allowed to proceed
351 /// // with performing work.
352 /// // If not, `Pending` is returned and `ready!` ensures this function returns.
353 /// // If we are allowed to proceed, coop now represents the budget consumption
354 /// let coop = ready!(coop::poll_proceed(cx));
355 ///
356 /// // Get a mutable reference to the CountdownLatch
357 /// let this = Pin::get_mut(self);
358 ///
359 /// // Next we check if the latch is ready to release its value
360 /// if this.counter == 0 {
361 /// let t = this.value.take();
362 /// // The latch made progress so call `made_progress` to ensure the budget
363 /// // is not reverted.
364 /// coop.made_progress();
365 /// Poll::Ready(t.unwrap())
366 /// } else {
367 /// // If the latch is not ready so return pending and simply drop `coop`.
368 /// // This will restore the budget making it available again to perform any
369 /// // other work.
370 /// this.waker = Some(cx.waker().clone());
371 /// Poll::Pending
372 /// }
373 /// }
374 /// }
375 ///
376 /// impl<T> Unpin for CountdownLatch<T> {}
377 /// ```
378 #[inline]
379 pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
380 context::budget(|cell| {
381 let mut budget = cell.get();
382
383 let decrement = budget.decrement();
384
385 if decrement.success {
386 let restore = RestoreOnPending::new(cell.get());
387 cell.set(budget);
388
389 // avoid double counting
390 if decrement.hit_zero {
391 inc_budget_forced_yield_count();
392 }
393
394 Poll::Ready(restore)
395 } else {
396 register_waker(cx);
397 Poll::Pending
398 }
399 }).unwrap_or(Poll::Ready(RestoreOnPending::new(Budget::unconstrained())))
400 }
401
402 /// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise.
403 ///
404 /// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when
405 /// polling for budget availability.
406 #[inline]
407 pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
408 if has_budget_remaining() {
409 Poll::Ready(())
410 } else {
411 register_waker(cx);
412
413 Poll::Pending
414 }
415 }
416
417 cfg_rt! {
418 cfg_unstable_metrics! {
419 #[inline(always)]
420 fn inc_budget_forced_yield_count() {
421 let _ = context::with_current(|handle| {
422 handle.scheduler_metrics().inc_budget_forced_yield_count();
423 });
424 }
425 }
426
427 cfg_not_unstable_metrics! {
428 #[inline(always)]
429 fn inc_budget_forced_yield_count() {}
430 }
431
432 fn register_waker(cx: &mut Context<'_>) {
433 context::defer(cx.waker());
434 }
435 }
436
437 cfg_not_rt! {
438 #[inline(always)]
439 fn inc_budget_forced_yield_count() {}
440
441 fn register_waker(cx: &mut Context<'_>) {
442 cx.waker().wake_by_ref()
443 }
444 }
445
446 impl Budget {
447 /// Decrements the budget. Returns `true` if successful. Decrementing fails
448 /// when there is not enough remaining budget.
449 fn decrement(&mut self) -> BudgetDecrement {
450 if let Some(num) = &mut self.0 {
451 if *num > 0 {
452 *num -= 1;
453
454 let hit_zero = *num == 0;
455
456 BudgetDecrement { success: true, hit_zero }
457 } else {
458 BudgetDecrement { success: false, hit_zero: false }
459 }
460 } else {
461 BudgetDecrement { success: true, hit_zero: false }
462 }
463 }
464
465 fn is_unconstrained(self) -> bool {
466 self.0.is_none()
467 }
468 }
469
470 pin_project! {
471 /// Future wrapper to ensure cooperative scheduling created by [`cooperative`].
472 #[must_use = "futures do nothing unless polled"]
473 pub struct Coop<F: Future> {
474 #[pin]
475 pub(crate) fut: F,
476 }
477 }
478
479 impl<F: Future> Future for Coop<F> {
480 type Output = F::Output;
481
482 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
483 let coop = ready!(poll_proceed(cx));
484 let me = self.project();
485 if let Poll::Ready(ret) = me.fut.poll(cx) {
486 coop.made_progress();
487 Poll::Ready(ret)
488 } else {
489 Poll::Pending
490 }
491 }
492 }
493
494 /// Creates a wrapper future that makes the inner future cooperate with the Tokio scheduler.
495 ///
496 /// When polled, the wrapper will first call [`poll_proceed`] to consume task budget, and
497 /// immediately yield if the budget has been depleted. If budget was available, the inner future
498 /// is polled. The budget consumption will be made final using [`RestoreOnPending::made_progress`]
499 /// if the inner future resolves to its final value.
500 ///
501 /// # Examples
502 ///
503 /// When you call `recv` on the `Receiver` of a [`tokio::sync::mpsc`](crate::sync::mpsc)
504 /// channel, task budget will automatically be consumed when the next value is returned.
505 /// This makes tasks that use Tokio mpsc channels automatically cooperative.
506 ///
507 /// If you're using [`futures::channel::mpsc`](https://docs.rs/futures/latest/futures/channel/mpsc/index.html)
508 /// instead, automatic task budget consumption will not happen. This example shows how can use
509 /// `cooperative` to make `futures::channel::mpsc` channels cooperate with the scheduler in the
510 /// same way Tokio channels do.
511 ///
512 /// ```
513 /// use tokio::task::coop::cooperative;
514 /// use futures::channel::mpsc::Receiver;
515 /// use futures::stream::StreamExt;
516 ///
517 /// async fn receive_next<T>(receiver: &mut Receiver<T>) -> Option<T> {
518 /// // Use `StreamExt::next` to obtain a `Future` that resolves to the next value
519 /// let recv_future = receiver.next();
520 /// // Wrap it a cooperative wrapper
521 /// let coop_future = cooperative(recv_future);
522 /// // And await
523 /// coop_future.await
524 /// }
525 #[inline]
526 pub fn cooperative<F: Future>(fut: F) -> Coop<F> {
527 Coop { fut }
528 }
529}
530
531#[cfg(all(test, not(loom)))]
532mod test {
533 use super::*;
534
535 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
536 use wasm_bindgen_test::wasm_bindgen_test as test;
537
538 fn get() -> Budget {
539 context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
540 }
541
542 #[test]
543 fn budgeting() {
544 use std::future::poll_fn;
545 use tokio_test::*;
546
547 assert!(get().0.is_none());
548
549 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
550
551 assert!(get().0.is_none());
552 drop(coop);
553 assert!(get().0.is_none());
554
555 budget(|| {
556 assert_eq!(get().0, Budget::initial().0);
557
558 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
559 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
560 drop(coop);
561 // we didn't make progress
562 assert_eq!(get().0, Budget::initial().0);
563
564 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
565 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
566 coop.made_progress();
567 drop(coop);
568 // we _did_ make progress
569 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
570
571 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
572 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
573 coop.made_progress();
574 drop(coop);
575 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
576
577 budget(|| {
578 assert_eq!(get().0, Budget::initial().0);
579
580 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
581 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
582 coop.made_progress();
583 drop(coop);
584 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
585 });
586
587 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
588 });
589
590 assert!(get().0.is_none());
591
592 budget(|| {
593 let n = get().0.unwrap();
594
595 for _ in 0..n {
596 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
597 coop.made_progress();
598 }
599
600 let mut task = task::spawn(poll_fn(|cx| {
601 let coop = std::task::ready!(poll_proceed(cx));
602 coop.made_progress();
603 Poll::Ready(())
604 }));
605
606 assert_pending!(task.poll());
607 });
608 }
609}