tokio/runtime/
park.rs

1#![cfg_attr(not(feature = "full"), allow(dead_code))]
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::{Arc, Condvar, Mutex};
5use crate::util::{waker, Wake};
6
7use std::sync::atomic::Ordering::SeqCst;
8use std::time::Duration;
9
10#[derive(Debug)]
11pub(crate) struct ParkThread {
12    inner: Arc<Inner>,
13}
14
15/// Unblocks a thread that was blocked by `ParkThread`.
16#[derive(Clone, Debug)]
17pub(crate) struct UnparkThread {
18    inner: Arc<Inner>,
19}
20
21#[derive(Debug)]
22struct Inner {
23    state: AtomicUsize,
24    mutex: Mutex<()>,
25    condvar: Condvar,
26}
27
28const EMPTY: usize = 0;
29const PARKED: usize = 1;
30const NOTIFIED: usize = 2;
31
32tokio_thread_local! {
33    static CURRENT_PARKER: ParkThread = ParkThread::new();
34}
35
36// Bit of a hack, but it is only for loom
37#[cfg(loom)]
38tokio_thread_local! {
39    pub(crate) static CURRENT_THREAD_PARK_COUNT: AtomicUsize = AtomicUsize::new(0);
40}
41
42// ==== impl ParkThread ====
43
44impl ParkThread {
45    pub(crate) fn new() -> Self {
46        Self {
47            inner: Arc::new(Inner {
48                state: AtomicUsize::new(EMPTY),
49                mutex: Mutex::new(()),
50                condvar: Condvar::new(),
51            }),
52        }
53    }
54
55    pub(crate) fn unpark(&self) -> UnparkThread {
56        let inner = self.inner.clone();
57        UnparkThread { inner }
58    }
59
60    pub(crate) fn park(&mut self) {
61        #[cfg(loom)]
62        CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
63        self.inner.park();
64    }
65
66    pub(crate) fn park_timeout(&mut self, duration: Duration) {
67        #[cfg(loom)]
68        CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
69        self.inner.park_timeout(duration);
70    }
71
72    pub(crate) fn shutdown(&mut self) {
73        self.inner.shutdown();
74    }
75}
76
77// ==== impl Inner ====
78
79impl Inner {
80    fn park(&self) {
81        // If we were previously notified then we consume this notification and
82        // return quickly.
83        if self
84            .state
85            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
86            .is_ok()
87        {
88            return;
89        }
90
91        // Otherwise we need to coordinate going to sleep
92        let mut m = self.mutex.lock();
93
94        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
95            Ok(_) => {}
96            Err(NOTIFIED) => {
97                // We must read here, even though we know it will be `NOTIFIED`.
98                // This is because `unpark` may have been called again since we read
99                // `NOTIFIED` in the `compare_exchange` above. We must perform an
100                // acquire operation that synchronizes with that `unpark` to observe
101                // any writes it made before the call to unpark. To do that we must
102                // read from the write it made to `state`.
103                let old = self.state.swap(EMPTY, SeqCst);
104                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
105
106                return;
107            }
108            Err(actual) => panic!("inconsistent park state; actual = {actual}"),
109        }
110
111        loop {
112            m = self.condvar.wait(m).unwrap();
113
114            if self
115                .state
116                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
117                .is_ok()
118            {
119                // got a notification
120                return;
121            }
122
123            // spurious wakeup, go back to sleep
124        }
125    }
126
127    /// Parks the current thread for at most `dur`.
128    fn park_timeout(&self, dur: Duration) {
129        // Like `park` above we have a fast path for an already-notified thread,
130        // and afterwards we start coordinating for a sleep. Return quickly.
131        if self
132            .state
133            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
134            .is_ok()
135        {
136            return;
137        }
138
139        if dur == Duration::from_millis(0) {
140            return;
141        }
142
143        let m = self.mutex.lock();
144
145        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
146            Ok(_) => {}
147            Err(NOTIFIED) => {
148                // We must read again here, see `park`.
149                let old = self.state.swap(EMPTY, SeqCst);
150                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
151
152                return;
153            }
154            Err(actual) => panic!("inconsistent park_timeout state; actual = {actual}"),
155        }
156
157        #[cfg(not(all(target_family = "wasm", not(target_feature = "atomics"))))]
158        // Wait with a timeout, and if we spuriously wake up or otherwise wake up
159        // from a notification, we just want to unconditionally set the state back to
160        // empty, either consuming a notification or un-flagging ourselves as
161        // parked.
162        let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap();
163
164        #[cfg(all(target_family = "wasm", not(target_feature = "atomics")))]
165        // Wasm without atomics doesn't have threads, so just sleep.
166        {
167            let _m = m;
168            std::thread::sleep(dur);
169        }
170
171        match self.state.swap(EMPTY, SeqCst) {
172            NOTIFIED => {} // got a notification, hurray!
173            PARKED => {}   // no notification, alas
174            n => panic!("inconsistent park_timeout state: {n}"),
175        }
176    }
177
178    fn unpark(&self) {
179        // To ensure the unparked thread will observe any writes we made before
180        // this call, we must perform a release operation that `park` can
181        // synchronize with. To do that we must write `NOTIFIED` even if `state`
182        // is already `NOTIFIED`. That is why this must be a swap rather than a
183        // compare-and-swap that returns if it reads `NOTIFIED` on failure.
184        match self.state.swap(NOTIFIED, SeqCst) {
185            EMPTY => return,    // no one was waiting
186            NOTIFIED => return, // already unparked
187            PARKED => {}        // gotta go wake someone up
188            _ => panic!("inconsistent state in unpark"),
189        }
190
191        // There is a period between when the parked thread sets `state` to
192        // `PARKED` (or last checked `state` in the case of a spurious wake
193        // up) and when it actually waits on `cvar`. If we were to notify
194        // during this period it would be ignored and then when the parked
195        // thread went to sleep it would never wake up. Fortunately, it has
196        // `lock` locked at this stage so we can acquire `lock` to wait until
197        // it is ready to receive the notification.
198        //
199        // Releasing `lock` before the call to `notify_one` means that when the
200        // parked thread wakes it doesn't get woken only to have to wait for us
201        // to release `lock`.
202        drop(self.mutex.lock());
203
204        self.condvar.notify_one();
205    }
206
207    fn shutdown(&self) {
208        self.condvar.notify_all();
209    }
210}
211
212impl Default for ParkThread {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218// ===== impl UnparkThread =====
219
220impl UnparkThread {
221    pub(crate) fn unpark(&self) {
222        self.inner.unpark();
223    }
224}
225
226use crate::loom::thread::AccessError;
227use std::future::Future;
228use std::marker::PhantomData;
229use std::rc::Rc;
230use std::task::Waker;
231
232/// Blocks the current thread using a condition variable.
233#[derive(Debug)]
234pub(crate) struct CachedParkThread {
235    _anchor: PhantomData<Rc<()>>,
236}
237
238impl CachedParkThread {
239    /// Creates a new `ParkThread` handle for the current thread.
240    ///
241    /// This type cannot be moved to other threads, so it should be created on
242    /// the thread that the caller intends to park.
243    pub(crate) fn new() -> CachedParkThread {
244        CachedParkThread {
245            _anchor: PhantomData,
246        }
247    }
248
249    pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
250        self.unpark().map(UnparkThread::into_waker)
251    }
252
253    fn unpark(&self) -> Result<UnparkThread, AccessError> {
254        self.with_current(ParkThread::unpark)
255    }
256
257    pub(crate) fn park(&mut self) {
258        self.with_current(|park_thread| park_thread.inner.park())
259            .unwrap();
260    }
261
262    pub(crate) fn park_timeout(&mut self, duration: Duration) {
263        self.with_current(|park_thread| park_thread.inner.park_timeout(duration))
264            .unwrap();
265    }
266
267    /// Gets a reference to the `ParkThread` handle for this thread.
268    fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
269    where
270        F: FnOnce(&ParkThread) -> R,
271    {
272        CURRENT_PARKER.try_with(|inner| f(inner))
273    }
274
275    pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
276        use std::task::Context;
277        use std::task::Poll::Ready;
278
279        let waker = self.waker()?;
280        let mut cx = Context::from_waker(&waker);
281
282        pin!(f);
283
284        loop {
285            if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) {
286                return Ok(v);
287            }
288
289            self.park();
290        }
291    }
292}
293
294impl UnparkThread {
295    pub(crate) fn into_waker(self) -> Waker {
296        waker(self.inner)
297    }
298}
299
300impl Wake for Inner {
301    fn wake(arc_self: Arc<Self>) {
302        arc_self.unpark();
303    }
304
305    fn wake_by_ref(arc_self: &Arc<Self>) {
306        arc_self.unpark();
307    }
308}
309
310#[cfg(loom)]
311pub(crate) fn current_thread_park_count() -> usize {
312    CURRENT_THREAD_PARK_COUNT.with(|count| count.load(SeqCst))
313}