tokio/runtime/scheduler/
mod.rs
1cfg_rt! {
2 pub(crate) mod current_thread;
3 pub(crate) use current_thread::CurrentThread;
4
5 mod defer;
6 use defer::Defer;
7
8 pub(crate) mod inject;
9 pub(crate) use inject::Inject;
10
11 use crate::runtime::TaskHooks;
12
13 use crate::runtime::WorkerMetrics;
14}
15
16cfg_rt_multi_thread! {
17 mod block_in_place;
18 pub(crate) use block_in_place::block_in_place;
19
20 mod lock;
21 use lock::Lock;
22
23 pub(crate) mod multi_thread;
24 pub(crate) use multi_thread::MultiThread;
25}
26
27use crate::runtime::driver;
28
29#[derive(Debug, Clone)]
30pub(crate) enum Handle {
31 #[cfg(feature = "rt")]
32 CurrentThread(Arc<current_thread::Handle>),
33
34 #[cfg(feature = "rt-multi-thread")]
35 MultiThread(Arc<multi_thread::Handle>),
36
37 #[cfg(not(feature = "rt"))]
40 #[allow(dead_code)]
41 Disabled,
42}
43
44#[cfg(feature = "rt")]
45pub(super) enum Context {
46 CurrentThread(current_thread::Context),
47
48 #[cfg(feature = "rt-multi-thread")]
49 MultiThread(multi_thread::Context),
50}
51
52impl Handle {
53 #[cfg_attr(not(feature = "full"), allow(dead_code))]
54 pub(crate) fn driver(&self) -> &driver::Handle {
55 match *self {
56 #[cfg(feature = "rt")]
57 Handle::CurrentThread(ref h) => &h.driver,
58
59 #[cfg(feature = "rt-multi-thread")]
60 Handle::MultiThread(ref h) => &h.driver,
61
62 #[cfg(not(feature = "rt"))]
63 Handle::Disabled => unreachable!(),
64 }
65 }
66}
67
68cfg_rt! {
69 use crate::future::Future;
70 use crate::loom::sync::Arc;
71 use crate::runtime::{blocking, task::{Id, SpawnLocation}};
72 use crate::runtime::context;
73 use crate::task::JoinHandle;
74 use crate::util::RngSeedGenerator;
75 use std::task::Waker;
76
77 macro_rules! match_flavor {
78 ($self:expr, $ty:ident($h:ident) => $e:expr) => {
79 match $self {
80 $ty::CurrentThread($h) => $e,
81
82 #[cfg(feature = "rt-multi-thread")]
83 $ty::MultiThread($h) => $e,
84 }
85 }
86 }
87
88 impl Handle {
89 #[track_caller]
90 pub(crate) fn current() -> Handle {
91 match context::with_current(Clone::clone) {
92 Ok(handle) => handle,
93 Err(e) => panic!("{}", e),
94 }
95 }
96
97 pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
98 match_flavor!(self, Handle(h) => &h.blocking_spawner)
99 }
100
101 pub(crate) fn is_local(&self) -> bool {
102 match self {
103 Handle::CurrentThread(h) => h.local_tid.is_some(),
104
105 #[cfg(feature = "rt-multi-thread")]
106 Handle::MultiThread(_) => false,
107 }
108 }
109
110 pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool {
112 match self {
113 Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false),
114
115 #[cfg(feature = "rt-multi-thread")]
116 Handle::MultiThread(_) => false,
117 }
118 }
119
120 pub(crate) fn spawn<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
121 where
122 F: Future + Send + 'static,
123 F::Output: Send + 'static,
124 {
125 match self {
126 Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id, spawned_at),
127
128 #[cfg(feature = "rt-multi-thread")]
129 Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id, spawned_at),
130 }
131 }
132
133 #[allow(irrefutable_let_patterns)]
139 #[track_caller]
140 pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
141 where
142 F: Future + 'static,
143 F::Output: 'static,
144 {
145 if let Handle::CurrentThread(h) = self {
146 current_thread::Handle::spawn_local(h, future, id, spawned_at)
147 } else {
148 panic!("Only current_thread and LocalSet have spawn_local internals implemented")
149 }
150 }
151
152 pub(crate) fn shutdown(&self) {
153 match *self {
154 Handle::CurrentThread(_) => {},
155
156 #[cfg(feature = "rt-multi-thread")]
157 Handle::MultiThread(ref h) => h.shutdown(),
158 }
159 }
160
161 pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
162 match_flavor!(self, Handle(h) => &h.seed_generator)
163 }
164
165 pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
166 match self {
167 Handle::CurrentThread(handle) => handle,
168 #[cfg(feature = "rt-multi-thread")]
169 _ => panic!("not a CurrentThread handle"),
170 }
171 }
172
173 pub(crate) fn hooks(&self) -> &TaskHooks {
174 match self {
175 Handle::CurrentThread(h) => &h.task_hooks,
176 #[cfg(feature = "rt-multi-thread")]
177 Handle::MultiThread(h) => &h.task_hooks,
178 }
179 }
180 }
181
182 impl Handle {
183 pub(crate) fn num_workers(&self) -> usize {
184 match self {
185 Handle::CurrentThread(_) => 1,
186 #[cfg(feature = "rt-multi-thread")]
187 Handle::MultiThread(handle) => handle.num_workers(),
188 }
189 }
190
191 pub(crate) fn num_alive_tasks(&self) -> usize {
192 match_flavor!(self, Handle(handle) => handle.num_alive_tasks())
193 }
194
195 pub(crate) fn injection_queue_depth(&self) -> usize {
196 match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
197 }
198
199 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
200 match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
201 }
202 }
203
204 cfg_unstable_metrics! {
205 use crate::runtime::SchedulerMetrics;
206
207 impl Handle {
208 cfg_64bit_metrics! {
209 pub(crate) fn spawned_tasks_count(&self) -> u64 {
210 match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
211 }
212 }
213
214 pub(crate) fn num_blocking_threads(&self) -> usize {
215 match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
216 }
217
218 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
219 match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
220 }
221
222 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
223 match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
224 }
225
226 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
227 match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
228 }
229
230 pub(crate) fn blocking_queue_depth(&self) -> usize {
231 match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
232 }
233 }
234 }
235
236 impl Context {
237 #[track_caller]
238 pub(crate) fn expect_current_thread(&self) -> ¤t_thread::Context {
239 match self {
240 Context::CurrentThread(context) => context,
241 #[cfg(feature = "rt-multi-thread")]
242 _ => panic!("expected `CurrentThread::Context`")
243 }
244 }
245
246 pub(crate) fn defer(&self, waker: &Waker) {
247 match_flavor!(self, Context(context) => context.defer(waker));
248 }
249
250 cfg_rt_multi_thread! {
251 #[track_caller]
252 pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
253 match self {
254 Context::MultiThread(context) => context,
255 _ => panic!("expected `MultiThread::Context`")
256 }
257 }
258 }
259 }
260}
261
262cfg_not_rt! {
263 #[cfg(any(
264 feature = "net",
265 all(unix, feature = "process"),
266 all(unix, feature = "signal"),
267 feature = "time",
268 ))]
269 impl Handle {
270 #[track_caller]
271 pub(crate) fn current() -> Handle {
272 panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
273 }
274 }
275}