tokio_util/
future.rs

1//! An extension trait for Futures that provides a variety of convenient adapters.
2
3mod with_cancellation_token;
4use with_cancellation_token::{WithCancellationTokenFuture, WithCancellationTokenFutureOwned};
5
6use std::future::Future;
7
8use crate::sync::CancellationToken;
9
10/// A trait which contains a variety of convenient adapters and utilities for `Future`s.
11pub trait FutureExt: Future {
12    cfg_time! {
13        /// A wrapper around [`tokio::time::timeout`], with the advantage that it is easier to write
14        /// fluent call chains.
15        ///
16        /// # Examples
17        ///
18        /// ```rust
19        /// use tokio::{sync::oneshot, time::Duration};
20        /// use tokio_util::future::FutureExt;
21        ///
22        /// # async fn dox() {
23        /// let (_tx, rx) = oneshot::channel::<()>();
24        ///
25        /// let res = rx.timeout(Duration::from_millis(10)).await;
26        /// assert!(res.is_err());
27        /// # }
28        /// ```
29        fn timeout(self, timeout: std::time::Duration) -> tokio::time::Timeout<Self>
30        where
31            Self: Sized,
32        {
33            tokio::time::timeout(timeout, self)
34        }
35
36        /// A wrapper around [`tokio::time::timeout_at`], with the advantage that it is easier to write
37        /// fluent call chains.
38        ///
39        /// # Examples
40        ///
41        /// ```rust
42        /// use tokio::{sync::oneshot, time::{Duration, Instant}};
43        /// use tokio_util::future::FutureExt;
44        ///
45        /// # async fn dox() {
46        /// let (_tx, rx) = oneshot::channel::<()>();
47        /// let deadline = Instant::now() + Duration::from_millis(10);
48        ///
49        /// let res = rx.timeout_at(deadline).await;
50        /// assert!(res.is_err());
51        /// # }
52        /// ```
53        fn timeout_at(self, deadline: tokio::time::Instant) -> tokio::time::Timeout<Self>
54        where
55            Self: Sized,
56        {
57            tokio::time::timeout_at(deadline, self)
58        }
59    }
60
61    /// Similar to [`CancellationToken::run_until_cancelled`],
62    /// but with the advantage that it is easier to write fluent call chains,
63    /// and biased towards waiting for [`CancellationToken`] to complete.
64    ///
65    /// # Fairness
66    ///
67    /// Calling this on an already-cancelled token directly returns `None`.
68    /// For all subsequent polls, in case of concurrent completion and
69    /// cancellation, this is biased towards the future completion.
70    ///
71    /// # Examples
72    ///
73    /// ```rust
74    /// use tokio::sync::oneshot;
75    /// use tokio_util::future::FutureExt;
76    /// use tokio_util::sync::CancellationToken;
77    ///
78    /// # async fn dox() {
79    /// let (_tx, rx) = oneshot::channel::<()>();
80    /// let token = CancellationToken::new();
81    /// let token_clone = token.clone();
82    /// tokio::spawn(async move {
83    ///     tokio::time::sleep(std::time::Duration::from_millis(10)).await;
84    ///     token.cancel();
85    /// });
86    /// assert!(rx.with_cancellation_token(&token_clone).await.is_none())
87    /// # }
88    /// ```
89    fn with_cancellation_token(
90        self,
91        cancellation_token: &CancellationToken,
92    ) -> WithCancellationTokenFuture<'_, Self>
93    where
94        Self: Sized,
95    {
96        WithCancellationTokenFuture::new(cancellation_token, self)
97    }
98
99    /// Similar to [`CancellationToken::run_until_cancelled_owned`],
100    /// but with the advantage that it is easier to write fluent call chains,
101    /// and biased towards waiting for [`CancellationToken`] to complete.
102    ///
103    /// # Fairness
104    ///
105    /// Calling this on an already-cancelled token directly returns `None`.
106    /// For all subsequent polls, in case of concurrent completion and
107    /// cancellation, this is biased towards the future completion.
108    ///
109    /// # Examples
110    ///
111    /// ```rust
112    /// use tokio::sync::oneshot;
113    /// use tokio_util::future::FutureExt;
114    /// use tokio_util::sync::CancellationToken;
115    ///
116    /// # async fn dox() {
117    /// let (_tx, rx) = oneshot::channel::<()>();
118    /// let token = CancellationToken::new();
119    /// let token_clone = token.clone();
120    /// tokio::spawn(async move {
121    ///     tokio::time::sleep(std::time::Duration::from_millis(10)).await;
122    ///     token.cancel();
123    /// });
124    /// assert!(rx.with_cancellation_token_owned(token_clone).await.is_none())
125    /// # }
126    /// ```
127    fn with_cancellation_token_owned(
128        self,
129        cancellation_token: CancellationToken,
130    ) -> WithCancellationTokenFutureOwned<Self>
131    where
132        Self: Sized,
133    {
134        WithCancellationTokenFutureOwned::new(cancellation_token, self)
135    }
136}
137
138impl<T: Future + ?Sized> FutureExt for T {}