1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25pub(crate) struct CurrentThread {
27 core: AtomicCell<Core>,
29
30 notify: Notify,
33}
34
35pub(crate) struct Handle {
37 shared: Shared,
39
40 pub(crate) driver: driver::Handle,
42
43 pub(crate) blocking_spawner: blocking::Spawner,
45
46 pub(crate) seed_generator: RngSeedGenerator,
48
49 pub(crate) task_hooks: TaskHooks,
51
52 pub(crate) local_tid: Option<ThreadId>,
54}
55
56struct Core {
59 tasks: VecDeque<Notified>,
61
62 tick: u32,
64
65 driver: Option<Driver>,
69
70 metrics: MetricsBatch,
72
73 global_queue_interval: u32,
75
76 unhandled_panic: bool,
79}
80
81struct Shared {
83 inject: Inject<Arc<Handle>>,
85
86 owned: OwnedTasks<Arc<Handle>>,
88
89 woken: AtomicBool,
91
92 config: Config,
94
95 scheduler_metrics: SchedulerMetrics,
97
98 worker_metrics: WorkerMetrics,
100}
101
102pub(crate) struct Context {
106 handle: Arc<Handle>,
108
109 core: RefCell<Option<Box<Core>>>,
112
113 pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119const INITIAL_CAPACITY: usize = 64;
121
122const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128 pub(crate) fn new(
129 driver: Driver,
130 driver_handle: driver::Handle,
131 blocking_spawner: blocking::Spawner,
132 seed_generator: RngSeedGenerator,
133 config: Config,
134 local_tid: Option<ThreadId>,
135 ) -> (CurrentThread, Arc<Handle>) {
136 let worker_metrics = WorkerMetrics::from_config(&config);
137 worker_metrics.set_thread_id(thread::current().id());
138
139 let global_queue_interval = config
141 .global_queue_interval
142 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144 let handle = Arc::new(Handle {
145 task_hooks: TaskHooks {
146 task_spawn_callback: config.before_spawn.clone(),
147 task_terminate_callback: config.after_termination.clone(),
148 #[cfg(tokio_unstable)]
149 before_poll_callback: config.before_poll.clone(),
150 #[cfg(tokio_unstable)]
151 after_poll_callback: config.after_poll.clone(),
152 },
153 shared: Shared {
154 inject: Inject::new(),
155 owned: OwnedTasks::new(1),
156 woken: AtomicBool::new(false),
157 config,
158 scheduler_metrics: SchedulerMetrics::new(),
159 worker_metrics,
160 },
161 driver: driver_handle,
162 blocking_spawner,
163 seed_generator,
164 local_tid,
165 });
166
167 let core = AtomicCell::new(Some(Box::new(Core {
168 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169 tick: 0,
170 driver: Some(driver),
171 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172 global_queue_interval,
173 unhandled_panic: false,
174 })));
175
176 let scheduler = CurrentThread {
177 core,
178 notify: Notify::new(),
179 };
180
181 (scheduler, handle)
182 }
183
184 #[track_caller]
185 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186 pin!(future);
187
188 crate::runtime::context::enter_runtime(handle, false, |blocking| {
189 let handle = handle.as_current_thread();
190
191 loop {
195 if let Some(core) = self.take_core(handle) {
196 handle
197 .shared
198 .worker_metrics
199 .set_thread_id(thread::current().id());
200 return core.block_on(future);
201 } else {
202 let notified = self.notify.notified();
203 pin!(notified);
204
205 if let Some(out) = blocking
206 .block_on(poll_fn(|cx| {
207 if notified.as_mut().poll(cx).is_ready() {
208 return Ready(None);
209 }
210
211 if let Ready(out) = future.as_mut().poll(cx) {
212 return Ready(Some(out));
213 }
214
215 Pending
216 }))
217 .expect("Failed to `Enter::block_on`")
218 {
219 return out;
220 }
221 }
222 }
223 })
224 }
225
226 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227 let core = self.core.take()?;
228
229 Some(CoreGuard {
230 context: scheduler::Context::CurrentThread(Context {
231 handle: handle.clone(),
232 core: RefCell::new(Some(core)),
233 defer: Defer::new(),
234 }),
235 scheduler: self,
236 })
237 }
238
239 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240 let handle = handle.as_current_thread();
241
242 let core = match self.take_core(handle) {
246 Some(core) => core,
247 None if std::thread::panicking() => return,
248 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249 };
250
251 let tls_available = context::with_current(|_| ()).is_ok();
253
254 if tls_available {
255 core.enter(|core, _context| {
256 let core = shutdown2(core, handle);
257 (core, ())
258 });
259 } else {
260 let context = core.context.expect_current_thread();
264 let core = context.core.borrow_mut().take().unwrap();
265
266 let core = shutdown2(core, handle);
267 *context.core.borrow_mut() = Some(core);
268 }
269 }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273 handle.shared.owned.close_and_shutdown_all(0);
277
278 while let Some(task) = core.next_local_task(handle) {
281 drop(task);
282 }
283
284 handle.shared.inject.close();
286
287 while let Some(task) = handle.shared.inject.pop() {
289 drop(task);
290 }
291
292 assert!(handle.shared.owned.is_empty());
293
294 core.submit_metrics(handle);
296
297 if let Some(driver) = core.driver.as_mut() {
299 driver.shutdown(&handle.driver);
300 }
301
302 core
303}
304
305impl fmt::Debug for CurrentThread {
306 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307 fmt.debug_struct("CurrentThread").finish()
308 }
309}
310
311impl Core {
314 fn tick(&mut self) {
316 self.tick = self.tick.wrapping_add(1);
317 }
318
319 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320 if self.tick % self.global_queue_interval == 0 {
321 handle
322 .next_remote_task()
323 .or_else(|| self.next_local_task(handle))
324 } else {
325 self.next_local_task(handle)
326 .or_else(|| handle.next_remote_task())
327 }
328 }
329
330 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331 let ret = self.tasks.pop_front();
332 handle
333 .shared
334 .worker_metrics
335 .set_queue_depth(self.tasks.len());
336 ret
337 }
338
339 fn push_task(&mut self, handle: &Handle, task: Notified) {
340 self.tasks.push_back(task);
341 self.metrics.inc_local_schedule_count();
342 handle
343 .shared
344 .worker_metrics
345 .set_queue_depth(self.tasks.len());
346 }
347
348 fn submit_metrics(&mut self, handle: &Handle) {
349 self.metrics.submit(&handle.shared.worker_metrics, 0);
350 }
351}
352
353#[cfg(tokio_taskdump)]
354fn wake_deferred_tasks_and_free(context: &Context) {
355 let wakers = context.defer.take_deferred();
356 for waker in wakers {
357 waker.wake();
358 }
359}
360
361impl Context {
364 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367 core.metrics.start_poll();
368 let mut ret = self.enter(core, || crate::task::coop::budget(f));
369 ret.0.metrics.end_poll();
370 ret
371 }
372
373 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376 let mut driver = core.driver.take().expect("driver missing");
377
378 if let Some(f) = &handle.shared.config.before_park {
379 let (c, ()) = self.enter(core, || f());
380 core = c;
381 }
382
383 if core.tasks.is_empty() {
386 core.metrics.about_to_park();
388 core.submit_metrics(handle);
389
390 let (c, ()) = self.enter(core, || {
391 driver.park(&handle.driver);
392 self.defer.wake();
393 });
394
395 core = c;
396
397 core.metrics.unparked();
398 core.submit_metrics(handle);
399 }
400
401 if let Some(f) = &handle.shared.config.after_unpark {
402 let (c, ()) = self.enter(core, || f());
403 core = c;
404 }
405
406 core.driver = Some(driver);
407 core
408 }
409
410 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
412 let mut driver = core.driver.take().expect("driver missing");
413
414 core.submit_metrics(handle);
415
416 let (mut core, ()) = self.enter(core, || {
417 driver.park_timeout(&handle.driver, Duration::from_millis(0));
418 self.defer.wake();
419 });
420
421 core.driver = Some(driver);
422 core
423 }
424
425 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
426 *self.core.borrow_mut() = Some(core);
430
431 let ret = f();
433
434 let core = self.core.borrow_mut().take().expect("core missing");
436 (core, ret)
437 }
438
439 pub(crate) fn defer(&self, waker: &Waker) {
440 self.defer.defer(waker);
441 }
442}
443
444impl Handle {
447 #[track_caller]
449 pub(crate) fn spawn<F>(
450 me: &Arc<Self>,
451 future: F,
452 id: crate::runtime::task::Id,
453 spawned_at: SpawnLocation,
454 ) -> JoinHandle<F::Output>
455 where
456 F: crate::future::Future + Send + 'static,
457 F::Output: Send + 'static,
458 {
459 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
460
461 me.task_hooks.spawn(&TaskMeta {
462 id,
463 spawned_at,
464 _phantom: Default::default(),
465 });
466
467 if let Some(notified) = notified {
468 me.schedule(notified);
469 }
470
471 handle
472 }
473
474 #[track_caller]
481 pub(crate) unsafe fn spawn_local<F>(
482 me: &Arc<Self>,
483 future: F,
484 id: crate::runtime::task::Id,
485 spawned_at: SpawnLocation,
486 ) -> JoinHandle<F::Output>
487 where
488 F: crate::future::Future + 'static,
489 F::Output: 'static,
490 {
491 let (handle, notified) = me
492 .shared
493 .owned
494 .bind_local(future, me.clone(), id, spawned_at);
495
496 me.task_hooks.spawn(&TaskMeta {
497 id,
498 spawned_at,
499 _phantom: Default::default(),
500 });
501
502 if let Some(notified) = notified {
503 me.schedule(notified);
504 }
505
506 handle
507 }
508
509 #[cfg(all(
511 tokio_unstable,
512 tokio_taskdump,
513 target_os = "linux",
514 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
515 ))]
516 pub(crate) fn dump(&self) -> crate::runtime::Dump {
517 use crate::runtime::dump;
518 use task::trace::trace_current_thread;
519
520 let mut traces = vec![];
521
522 context::with_scheduler(|maybe_context| {
524 let context = if let Some(context) = maybe_context {
526 context.expect_current_thread()
527 } else {
528 return;
529 };
530 let mut maybe_core = context.core.borrow_mut();
531 let core = if let Some(core) = maybe_core.as_mut() {
532 core
533 } else {
534 return;
535 };
536 let local = &mut core.tasks;
537
538 if self.shared.inject.is_closed() {
539 return;
540 }
541
542 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
543 .into_iter()
544 .map(|(id, trace)| dump::Task::new(id, trace))
545 .collect();
546
547 drop(maybe_core);
549
550 wake_deferred_tasks_and_free(context);
554 });
555
556 dump::Dump::new(traces)
557 }
558
559 fn next_remote_task(&self) -> Option<Notified> {
560 self.shared.inject.pop()
561 }
562
563 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
564 me.shared.woken.store(true, Release);
567 waker_ref(me)
568 }
569
570 pub(crate) fn reset_woken(&self) -> bool {
572 self.shared.woken.swap(false, AcqRel)
573 }
574
575 pub(crate) fn num_alive_tasks(&self) -> usize {
576 self.shared.owned.num_alive_tasks()
577 }
578
579 pub(crate) fn injection_queue_depth(&self) -> usize {
580 self.shared.inject.len()
581 }
582
583 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
584 assert_eq!(0, worker);
585 &self.shared.worker_metrics
586 }
587}
588
589cfg_unstable_metrics! {
590 impl Handle {
591 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
592 &self.shared.scheduler_metrics
593 }
594
595 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
596 self.worker_metrics(worker).queue_depth()
597 }
598
599 pub(crate) fn num_blocking_threads(&self) -> usize {
600 self.blocking_spawner.num_threads()
601 }
602
603 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
604 self.blocking_spawner.num_idle_threads()
605 }
606
607 pub(crate) fn blocking_queue_depth(&self) -> usize {
608 self.blocking_spawner.queue_depth()
609 }
610
611 cfg_64bit_metrics! {
612 pub(crate) fn spawned_tasks_count(&self) -> u64 {
613 self.shared.owned.spawned_tasks_count()
614 }
615 }
616 }
617}
618
619cfg_unstable! {
620 use std::num::NonZeroU64;
621
622 impl Handle {
623 pub(crate) fn owned_id(&self) -> NonZeroU64 {
624 self.shared.owned.id
625 }
626 }
627}
628
629impl fmt::Debug for Handle {
630 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
631 fmt.debug_struct("current_thread::Handle { ... }").finish()
632 }
633}
634
635impl Schedule for Arc<Handle> {
638 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
639 self.shared.owned.remove(task)
640 }
641
642 fn schedule(&self, task: task::Notified<Self>) {
643 use scheduler::Context::CurrentThread;
644
645 context::with_scheduler(|maybe_cx| match maybe_cx {
646 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
647 let mut core = cx.core.borrow_mut();
648
649 if let Some(core) = core.as_mut() {
652 core.push_task(self, task);
653 }
654 }
655 _ => {
656 self.shared.scheduler_metrics.inc_remote_schedule_count();
658
659 self.shared.inject.push(task);
661 self.driver.unpark();
662 }
663 });
664 }
665
666 fn hooks(&self) -> TaskHarnessScheduleHooks {
667 TaskHarnessScheduleHooks {
668 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
669 }
670 }
671
672 cfg_unstable! {
673 fn unhandled_panic(&self) {
674 use crate::runtime::UnhandledPanic;
675
676 match self.shared.config.unhandled_panic {
677 UnhandledPanic::Ignore => {
678 }
680 UnhandledPanic::ShutdownRuntime => {
681 use scheduler::Context::CurrentThread;
682
683 context::with_scheduler(|maybe_cx| match maybe_cx {
688 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
689 let mut core = cx.core.borrow_mut();
690
691 if let Some(core) = core.as_mut() {
693 core.unhandled_panic = true;
694 self.shared.owned.close_and_shutdown_all(0);
695 }
696 }
697 _ => unreachable!("runtime core not set in CURRENT thread-local"),
698 })
699 }
700 }
701 }
702 }
703}
704
705impl Wake for Handle {
706 fn wake(arc_self: Arc<Self>) {
707 Wake::wake_by_ref(&arc_self);
708 }
709
710 fn wake_by_ref(arc_self: &Arc<Self>) {
712 arc_self.shared.woken.store(true, Release);
713 arc_self.driver.unpark();
714 }
715}
716
717struct CoreGuard<'a> {
722 context: scheduler::Context,
723 scheduler: &'a CurrentThread,
724}
725
726impl CoreGuard<'_> {
727 #[track_caller]
728 fn block_on<F: Future>(self, future: F) -> F::Output {
729 let ret = self.enter(|mut core, context| {
730 let waker = Handle::waker_ref(&context.handle);
731 let mut cx = std::task::Context::from_waker(&waker);
732
733 pin!(future);
734
735 core.metrics.start_processing_scheduled_tasks();
736
737 'outer: loop {
738 let handle = &context.handle;
739
740 if handle.reset_woken() {
741 let (c, res) = context.enter(core, || {
742 crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
743 });
744
745 core = c;
746
747 if let Ready(v) = res {
748 return (core, Some(v));
749 }
750 }
751
752 for _ in 0..handle.shared.config.event_interval {
753 if core.unhandled_panic {
755 return (core, None);
756 }
757
758 core.tick();
759
760 let entry = core.next_task(handle);
761
762 let task = match entry {
763 Some(entry) => entry,
764 None => {
765 core.metrics.end_processing_scheduled_tasks();
766
767 core = if !context.defer.is_empty() {
768 context.park_yield(core, handle)
769 } else {
770 context.park(core, handle)
771 };
772
773 core.metrics.start_processing_scheduled_tasks();
774
775 continue 'outer;
777 }
778 };
779
780 let task = context.handle.shared.owned.assert_owner(task);
781
782 #[cfg(tokio_unstable)]
783 let task_meta = task.task_meta();
784
785 let (c, ()) = context.run_task(core, || {
786 #[cfg(tokio_unstable)]
787 context.handle.task_hooks.poll_start_callback(&task_meta);
788
789 task.run();
790
791 #[cfg(tokio_unstable)]
792 context.handle.task_hooks.poll_stop_callback(&task_meta);
793 });
794
795 core = c;
796 }
797
798 core.metrics.end_processing_scheduled_tasks();
799
800 core = context.park_yield(core, handle);
803
804 core.metrics.start_processing_scheduled_tasks();
805 }
806 });
807
808 match ret {
809 Some(ret) => ret,
810 None => {
811 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
813 }
814 }
815 }
816
817 fn enter<F, R>(self, f: F) -> R
820 where
821 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
822 {
823 let context = self.context.expect_current_thread();
824
825 let core = context.core.borrow_mut().take().expect("core missing");
827
828 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
830
831 *context.core.borrow_mut() = Some(core);
832
833 ret
834 }
835}
836
837impl Drop for CoreGuard<'_> {
838 fn drop(&mut self) {
839 let context = self.context.expect_current_thread();
840
841 if let Some(core) = context.core.borrow_mut().take() {
842 self.scheduler.core.set(core);
845
846 self.scheduler.notify.notify_one();
848 }
849 }
850}