1#![cfg_attr(not(feature = "full"), allow(dead_code))]
2
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::{Arc, Condvar, Mutex};
5use crate::util::{waker, Wake};
6
7use std::sync::atomic::Ordering::SeqCst;
8use std::time::Duration;
9
10#[derive(Debug)]
11pub(crate) struct ParkThread {
12 inner: Arc<Inner>,
13}
14
15#[derive(Clone, Debug)]
17pub(crate) struct UnparkThread {
18 inner: Arc<Inner>,
19}
20
21#[derive(Debug)]
22struct Inner {
23 state: AtomicUsize,
24 mutex: Mutex<()>,
25 condvar: Condvar,
26}
27
28const EMPTY: usize = 0;
29const PARKED: usize = 1;
30const NOTIFIED: usize = 2;
31
32tokio_thread_local! {
33 static CURRENT_PARKER: ParkThread = ParkThread::new();
34}
35
36#[cfg(loom)]
38tokio_thread_local! {
39 pub(crate) static CURRENT_THREAD_PARK_COUNT: AtomicUsize = AtomicUsize::new(0);
40}
41
42impl ParkThread {
45 pub(crate) fn new() -> Self {
46 Self {
47 inner: Arc::new(Inner {
48 state: AtomicUsize::new(EMPTY),
49 mutex: Mutex::new(()),
50 condvar: Condvar::new(),
51 }),
52 }
53 }
54
55 pub(crate) fn unpark(&self) -> UnparkThread {
56 let inner = self.inner.clone();
57 UnparkThread { inner }
58 }
59
60 pub(crate) fn park(&mut self) {
61 #[cfg(loom)]
62 CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
63 self.inner.park();
64 }
65
66 pub(crate) fn park_timeout(&mut self, duration: Duration) {
67 #[cfg(loom)]
68 CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
69 self.inner.park_timeout(duration);
70 }
71
72 pub(crate) fn shutdown(&mut self) {
73 self.inner.shutdown();
74 }
75}
76
77impl Inner {
80 fn park(&self) {
81 if self
84 .state
85 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
86 .is_ok()
87 {
88 return;
89 }
90
91 let mut m = self.mutex.lock();
93
94 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
95 Ok(_) => {}
96 Err(NOTIFIED) => {
97 let old = self.state.swap(EMPTY, SeqCst);
104 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
105
106 return;
107 }
108 Err(actual) => panic!("inconsistent park state; actual = {actual}"),
109 }
110
111 loop {
112 m = self.condvar.wait(m).unwrap();
113
114 if self
115 .state
116 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
117 .is_ok()
118 {
119 return;
121 }
122
123 }
125 }
126
127 fn park_timeout(&self, dur: Duration) {
129 if self
132 .state
133 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
134 .is_ok()
135 {
136 return;
137 }
138
139 if dur == Duration::from_millis(0) {
140 return;
141 }
142
143 let m = self.mutex.lock();
144
145 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
146 Ok(_) => {}
147 Err(NOTIFIED) => {
148 let old = self.state.swap(EMPTY, SeqCst);
150 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
151
152 return;
153 }
154 Err(actual) => panic!("inconsistent park_timeout state; actual = {actual}"),
155 }
156
157 #[cfg(not(all(target_family = "wasm", not(target_feature = "atomics"))))]
158 let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap();
163
164 #[cfg(all(target_family = "wasm", not(target_feature = "atomics")))]
165 {
167 let _m = m;
168 std::thread::sleep(dur);
169 }
170
171 match self.state.swap(EMPTY, SeqCst) {
172 NOTIFIED => {} PARKED => {} n => panic!("inconsistent park_timeout state: {n}"),
175 }
176 }
177
178 fn unpark(&self) {
179 match self.state.swap(NOTIFIED, SeqCst) {
185 EMPTY => return, NOTIFIED => return, PARKED => {} _ => panic!("inconsistent state in unpark"),
189 }
190
191 drop(self.mutex.lock());
203
204 self.condvar.notify_one();
205 }
206
207 fn shutdown(&self) {
208 self.condvar.notify_all();
209 }
210}
211
212impl Default for ParkThread {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218impl UnparkThread {
221 pub(crate) fn unpark(&self) {
222 self.inner.unpark();
223 }
224}
225
226use crate::loom::thread::AccessError;
227use std::future::Future;
228use std::marker::PhantomData;
229use std::rc::Rc;
230use std::task::Waker;
231
232#[derive(Debug)]
234pub(crate) struct CachedParkThread {
235 _anchor: PhantomData<Rc<()>>,
236}
237
238impl CachedParkThread {
239 pub(crate) fn new() -> CachedParkThread {
244 CachedParkThread {
245 _anchor: PhantomData,
246 }
247 }
248
249 pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
250 self.unpark().map(UnparkThread::into_waker)
251 }
252
253 fn unpark(&self) -> Result<UnparkThread, AccessError> {
254 self.with_current(ParkThread::unpark)
255 }
256
257 pub(crate) fn park(&mut self) {
258 self.with_current(|park_thread| park_thread.inner.park())
259 .unwrap();
260 }
261
262 pub(crate) fn park_timeout(&mut self, duration: Duration) {
263 self.with_current(|park_thread| park_thread.inner.park_timeout(duration))
264 .unwrap();
265 }
266
267 fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
269 where
270 F: FnOnce(&ParkThread) -> R,
271 {
272 CURRENT_PARKER.try_with(|inner| f(inner))
273 }
274
275 pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
276 use std::task::Context;
277 use std::task::Poll::Ready;
278
279 let waker = self.waker()?;
280 let mut cx = Context::from_waker(&waker);
281
282 pin!(f);
283
284 loop {
285 if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) {
286 return Ok(v);
287 }
288
289 self.park();
290 }
291 }
292}
293
294impl UnparkThread {
295 pub(crate) fn into_waker(self) -> Waker {
296 waker(self.inner)
297 }
298}
299
300impl Wake for Inner {
301 fn wake(arc_self: Arc<Self>) {
302 arc_self.unpark();
303 }
304
305 fn wake_by_ref(arc_self: &Arc<Self>) {
306 arc_self.unpark();
307 }
308}
309
310#[cfg(loom)]
311pub(crate) fn current_thread_park_count() -> usize {
312 CURRENT_THREAD_PARK_COUNT.with(|count| count.load(SeqCst))
313}