tokio/runtime/scheduler/
mod.rs

1cfg_rt! {
2    pub(crate) mod current_thread;
3    pub(crate) use current_thread::CurrentThread;
4
5    mod defer;
6    use defer::Defer;
7
8    pub(crate) mod inject;
9    pub(crate) use inject::Inject;
10
11    use crate::runtime::TaskHooks;
12
13    use crate::runtime::WorkerMetrics;
14}
15
16cfg_rt_multi_thread! {
17    mod block_in_place;
18    pub(crate) use block_in_place::block_in_place;
19
20    mod lock;
21    use lock::Lock;
22
23    pub(crate) mod multi_thread;
24    pub(crate) use multi_thread::MultiThread;
25}
26
27use crate::runtime::driver;
28
29#[derive(Debug, Clone)]
30pub(crate) enum Handle {
31    #[cfg(feature = "rt")]
32    CurrentThread(Arc<current_thread::Handle>),
33
34    #[cfg(feature = "rt-multi-thread")]
35    MultiThread(Arc<multi_thread::Handle>),
36
37    // TODO: This is to avoid triggering "dead code" warnings many other places
38    // in the codebase. Remove this during a later cleanup
39    #[cfg(not(feature = "rt"))]
40    #[allow(dead_code)]
41    Disabled,
42}
43
44#[cfg(feature = "rt")]
45pub(super) enum Context {
46    CurrentThread(current_thread::Context),
47
48    #[cfg(feature = "rt-multi-thread")]
49    MultiThread(multi_thread::Context),
50}
51
52impl Handle {
53    #[cfg_attr(not(feature = "full"), allow(dead_code))]
54    pub(crate) fn driver(&self) -> &driver::Handle {
55        match *self {
56            #[cfg(feature = "rt")]
57            Handle::CurrentThread(ref h) => &h.driver,
58
59            #[cfg(feature = "rt-multi-thread")]
60            Handle::MultiThread(ref h) => &h.driver,
61
62            #[cfg(not(feature = "rt"))]
63            Handle::Disabled => unreachable!(),
64        }
65    }
66}
67
68cfg_rt! {
69    use crate::future::Future;
70    use crate::loom::sync::Arc;
71    use crate::runtime::{blocking, task::{Id, SpawnLocation}};
72    use crate::runtime::context;
73    use crate::task::JoinHandle;
74    use crate::util::RngSeedGenerator;
75    use std::task::Waker;
76
77    macro_rules! match_flavor {
78        ($self:expr, $ty:ident($h:ident) => $e:expr) => {
79            match $self {
80                $ty::CurrentThread($h) => $e,
81
82                #[cfg(feature = "rt-multi-thread")]
83                $ty::MultiThread($h) => $e,
84            }
85        }
86    }
87
88    impl Handle {
89        #[track_caller]
90        pub(crate) fn current() -> Handle {
91            match context::with_current(Clone::clone) {
92                Ok(handle) => handle,
93                Err(e) => panic!("{}", e),
94            }
95        }
96
97        pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
98            match_flavor!(self, Handle(h) => &h.blocking_spawner)
99        }
100
101        pub(crate) fn is_local(&self) -> bool {
102            match self {
103                Handle::CurrentThread(h) => h.local_tid.is_some(),
104
105                #[cfg(feature = "rt-multi-thread")]
106                Handle::MultiThread(_) => false,
107            }
108        }
109
110        /// Returns true if this is a local runtime and the runtime is owned by the current thread.
111        pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool {
112            match self {
113                Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false),
114
115                #[cfg(feature = "rt-multi-thread")]
116                Handle::MultiThread(_) => false,
117            }
118        }
119
120        pub(crate) fn spawn<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
121        where
122            F: Future + Send + 'static,
123            F::Output: Send + 'static,
124        {
125            match self {
126                Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id, spawned_at),
127
128                #[cfg(feature = "rt-multi-thread")]
129                Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id, spawned_at),
130            }
131        }
132
133        /// Spawn a local task
134        ///
135        /// # Safety
136        /// This should only be called in `LocalRuntime` if the runtime has been verified to be owned
137        /// by the current thread.
138        #[allow(irrefutable_let_patterns)]
139        #[track_caller]
140        pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
141        where
142            F: Future + 'static,
143            F::Output: 'static,
144        {
145            if let Handle::CurrentThread(h) = self {
146                current_thread::Handle::spawn_local(h, future, id, spawned_at)
147            } else {
148                panic!("Only current_thread and LocalSet have spawn_local internals implemented")
149            }
150        }
151
152        pub(crate) fn shutdown(&self) {
153            match *self {
154                Handle::CurrentThread(_) => {},
155
156                #[cfg(feature = "rt-multi-thread")]
157                Handle::MultiThread(ref h) => h.shutdown(),
158            }
159        }
160
161        pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
162            match_flavor!(self, Handle(h) => &h.seed_generator)
163        }
164
165        pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
166            match self {
167                Handle::CurrentThread(handle) => handle,
168                #[cfg(feature = "rt-multi-thread")]
169                _ => panic!("not a CurrentThread handle"),
170            }
171        }
172
173        pub(crate) fn hooks(&self) -> &TaskHooks {
174            match self {
175                Handle::CurrentThread(h) => &h.task_hooks,
176                #[cfg(feature = "rt-multi-thread")]
177                Handle::MultiThread(h) => &h.task_hooks,
178            }
179        }
180    }
181
182    impl Handle {
183        pub(crate) fn num_workers(&self) -> usize {
184            match self {
185                Handle::CurrentThread(_) => 1,
186                #[cfg(feature = "rt-multi-thread")]
187                Handle::MultiThread(handle) => handle.num_workers(),
188            }
189        }
190
191        pub(crate) fn num_alive_tasks(&self) -> usize {
192            match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
193        }
194
195        pub(crate) fn injection_queue_depth(&self) -> usize {
196            match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
197        }
198
199        pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
200            match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
201        }
202    }
203
204    cfg_unstable_metrics! {
205        use crate::runtime::SchedulerMetrics;
206
207        impl Handle {
208            cfg_64bit_metrics! {
209                pub(crate) fn spawned_tasks_count(&self) -> u64 {
210                    match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
211                }
212            }
213
214            pub(crate) fn num_blocking_threads(&self) -> usize {
215                match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
216            }
217
218            pub(crate) fn num_idle_blocking_threads(&self) -> usize {
219                match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
220            }
221
222            pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
223                match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
224            }
225
226            pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
227                match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
228            }
229
230            pub(crate) fn blocking_queue_depth(&self) -> usize {
231                match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
232            }
233        }
234    }
235
236    impl Context {
237        #[track_caller]
238        pub(crate) fn expect_current_thread(&self) -> &current_thread::Context {
239            match self {
240                Context::CurrentThread(context) => context,
241                #[cfg(feature = "rt-multi-thread")]
242                _ => panic!("expected `CurrentThread::Context`")
243            }
244        }
245
246        pub(crate) fn defer(&self, waker: &Waker) {
247            match_flavor!(self, Context(context) => context.defer(waker));
248        }
249
250        cfg_rt_multi_thread! {
251            #[track_caller]
252            pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
253                match self {
254                    Context::MultiThread(context) => context,
255                    _ => panic!("expected `MultiThread::Context`")
256                }
257            }
258        }
259    }
260}
261
262cfg_not_rt! {
263    #[cfg(any(
264        feature = "net",
265        all(unix, feature = "process"),
266        all(unix, feature = "signal"),
267        feature = "time",
268    ))]
269    impl Handle {
270        #[track_caller]
271        pub(crate) fn current() -> Handle {
272            panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
273        }
274    }
275}