tokio_util/future/
with_cancellation_token.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use pin_project_lite::pin_project;
8
9use crate::sync::{CancellationToken, RunUntilCancelledFuture, RunUntilCancelledFutureOwned};
10
11pin_project! {
12    /// A [`Future`] that is resolved once the corresponding [`CancellationToken`]
13    /// is cancelled or a given [`Future`] gets resolved.
14    ///
15    /// This future is immediately resolved if the corresponding [`CancellationToken`]
16    /// is already cancelled, otherwise, in case of concurrent completion and
17    /// cancellation, this is biased towards the future completion.
18    #[must_use = "futures do nothing unless polled"]
19    pub struct WithCancellationTokenFuture<'a, F: Future> {
20        #[pin]
21        run_until_cancelled: Option<RunUntilCancelledFuture<'a, F>>
22    }
23}
24
25impl<'a, F: Future> WithCancellationTokenFuture<'a, F> {
26    pub(crate) fn new(cancellation_token: &'a CancellationToken, future: F) -> Self {
27        Self {
28            run_until_cancelled: (!cancellation_token.is_cancelled())
29                .then(|| RunUntilCancelledFuture::new(cancellation_token, future)),
30        }
31    }
32}
33
34impl<'a, F: Future> Future for WithCancellationTokenFuture<'a, F> {
35    type Output = Option<F::Output>;
36
37    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38        let this = self.project();
39        match this.run_until_cancelled.as_pin_mut() {
40            Some(fut) => fut.poll(cx),
41            None => Poll::Ready(None),
42        }
43    }
44}
45
46pin_project! {
47    /// A [`Future`] that is resolved once the corresponding [`CancellationToken`]
48    /// is cancelled or a given [`Future`] gets resolved.
49    ///
50    /// This future is immediately resolved if the corresponding [`CancellationToken`]
51    /// is already cancelled, otherwise, in case of concurrent completion and
52    /// cancellation, this is biased towards the future completion.
53    #[must_use = "futures do nothing unless polled"]
54    pub struct WithCancellationTokenFutureOwned<F: Future> {
55        #[pin]
56        run_until_cancelled: Option<RunUntilCancelledFutureOwned<F>>
57    }
58}
59
60impl<F: Future> WithCancellationTokenFutureOwned<F> {
61    pub(crate) fn new(cancellation_token: CancellationToken, future: F) -> Self {
62        Self {
63            run_until_cancelled: (!cancellation_token.is_cancelled())
64                .then(|| RunUntilCancelledFutureOwned::new(cancellation_token, future)),
65        }
66    }
67}
68
69impl<F: Future> Future for WithCancellationTokenFutureOwned<F> {
70    type Output = Option<F::Output>;
71
72    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73        let this = self.project();
74        match this.run_until_cancelled.as_pin_mut() {
75            Some(fut) => fut.poll(cx),
76            None => Poll::Ready(None),
77        }
78    }
79}