tokio/runtime/scheduler/current_thread/
mod.rs

1use crate::loom::sync::atomic::AtomicBool;
2use crate::loom::sync::Arc;
3use crate::runtime::driver::{self, Driver};
4use crate::runtime::scheduler::{self, Defer, Inject};
5use crate::runtime::task::{
6    self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
7};
8use crate::runtime::{
9    blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10};
11use crate::sync::notify::Notify;
12use crate::util::atomic_cell::AtomicCell;
13use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::future::{poll_fn, Future};
18use std::sync::atomic::Ordering::{AcqRel, Release};
19use std::task::Poll::{Pending, Ready};
20use std::task::Waker;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::{fmt, thread};
24
25/// Executes tasks on the current thread
26pub(crate) struct CurrentThread {
27    /// Core scheduler data is acquired by a thread entering `block_on`.
28    core: AtomicCell<Core>,
29
30    /// Notifier for waking up other threads to steal the
31    /// driver.
32    notify: Notify,
33}
34
35/// Handle to the current thread scheduler
36pub(crate) struct Handle {
37    /// Scheduler state shared across threads
38    shared: Shared,
39
40    /// Resource driver handles
41    pub(crate) driver: driver::Handle,
42
43    /// Blocking pool spawner
44    pub(crate) blocking_spawner: blocking::Spawner,
45
46    /// Current random number generator seed
47    pub(crate) seed_generator: RngSeedGenerator,
48
49    /// User-supplied hooks to invoke for things
50    pub(crate) task_hooks: TaskHooks,
51
52    /// If this is a `LocalRuntime`, flags the owning thread ID.
53    pub(crate) local_tid: Option<ThreadId>,
54}
55
56/// Data required for executing the scheduler. The struct is passed around to
57/// a function that will perform the scheduling work and acts as a capability token.
58struct Core {
59    /// Scheduler run queue
60    tasks: VecDeque<Notified>,
61
62    /// Current tick
63    tick: u32,
64
65    /// Runtime driver
66    ///
67    /// The driver is removed before starting to park the thread
68    driver: Option<Driver>,
69
70    /// Metrics batch
71    metrics: MetricsBatch,
72
73    /// How often to check the global queue
74    global_queue_interval: u32,
75
76    /// True if a task panicked without being handled and the runtime is
77    /// configured to shutdown on unhandled panic.
78    unhandled_panic: bool,
79}
80
81/// Scheduler state shared between threads.
82struct Shared {
83    /// Remote run queue
84    inject: Inject<Arc<Handle>>,
85
86    /// Collection of all active tasks spawned onto this executor.
87    owned: OwnedTasks<Arc<Handle>>,
88
89    /// Indicates whether the blocked on thread was woken.
90    woken: AtomicBool,
91
92    /// Scheduler configuration options
93    config: Config,
94
95    /// Keeps track of various runtime metrics.
96    scheduler_metrics: SchedulerMetrics,
97
98    /// This scheduler only has one worker.
99    worker_metrics: WorkerMetrics,
100}
101
102/// Thread-local context.
103///
104/// pub(crate) to store in `runtime::context`.
105pub(crate) struct Context {
106    /// Scheduler handle
107    handle: Arc<Handle>,
108
109    /// Scheduler core, enabling the holder of `Context` to execute the
110    /// scheduler.
111    core: RefCell<Option<Box<Core>>>,
112
113    /// Deferred tasks, usually ones that called `task::yield_now()`.
114    pub(crate) defer: Defer,
115}
116
117type Notified = task::Notified<Arc<Handle>>;
118
119/// Initial queue capacity.
120const INITIAL_CAPACITY: usize = 64;
121
122/// Used if none is specified. This is a temporary constant and will be removed
123/// as we unify tuning logic between the multi-thread and current-thread
124/// schedulers.
125const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127impl CurrentThread {
128    pub(crate) fn new(
129        driver: Driver,
130        driver_handle: driver::Handle,
131        blocking_spawner: blocking::Spawner,
132        seed_generator: RngSeedGenerator,
133        config: Config,
134        local_tid: Option<ThreadId>,
135    ) -> (CurrentThread, Arc<Handle>) {
136        let worker_metrics = WorkerMetrics::from_config(&config);
137        worker_metrics.set_thread_id(thread::current().id());
138
139        // Get the configured global queue interval, or use the default.
140        let global_queue_interval = config
141            .global_queue_interval
142            .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144        let handle = Arc::new(Handle {
145            task_hooks: TaskHooks {
146                task_spawn_callback: config.before_spawn.clone(),
147                task_terminate_callback: config.after_termination.clone(),
148                #[cfg(tokio_unstable)]
149                before_poll_callback: config.before_poll.clone(),
150                #[cfg(tokio_unstable)]
151                after_poll_callback: config.after_poll.clone(),
152            },
153            shared: Shared {
154                inject: Inject::new(),
155                owned: OwnedTasks::new(1),
156                woken: AtomicBool::new(false),
157                config,
158                scheduler_metrics: SchedulerMetrics::new(),
159                worker_metrics,
160            },
161            driver: driver_handle,
162            blocking_spawner,
163            seed_generator,
164            local_tid,
165        });
166
167        let core = AtomicCell::new(Some(Box::new(Core {
168            tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
169            tick: 0,
170            driver: Some(driver),
171            metrics: MetricsBatch::new(&handle.shared.worker_metrics),
172            global_queue_interval,
173            unhandled_panic: false,
174        })));
175
176        let scheduler = CurrentThread {
177            core,
178            notify: Notify::new(),
179        };
180
181        (scheduler, handle)
182    }
183
184    #[track_caller]
185    pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
186        pin!(future);
187
188        crate::runtime::context::enter_runtime(handle, false, |blocking| {
189            let handle = handle.as_current_thread();
190
191            // Attempt to steal the scheduler core and block_on the future if we can
192            // there, otherwise, lets select on a notification that the core is
193            // available or the future is complete.
194            loop {
195                if let Some(core) = self.take_core(handle) {
196                    handle
197                        .shared
198                        .worker_metrics
199                        .set_thread_id(thread::current().id());
200                    return core.block_on(future);
201                } else {
202                    let notified = self.notify.notified();
203                    pin!(notified);
204
205                    if let Some(out) = blocking
206                        .block_on(poll_fn(|cx| {
207                            if notified.as_mut().poll(cx).is_ready() {
208                                return Ready(None);
209                            }
210
211                            if let Ready(out) = future.as_mut().poll(cx) {
212                                return Ready(Some(out));
213                            }
214
215                            Pending
216                        }))
217                        .expect("Failed to `Enter::block_on`")
218                    {
219                        return out;
220                    }
221                }
222            }
223        })
224    }
225
226    fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
227        let core = self.core.take()?;
228
229        Some(CoreGuard {
230            context: scheduler::Context::CurrentThread(Context {
231                handle: handle.clone(),
232                core: RefCell::new(Some(core)),
233                defer: Defer::new(),
234            }),
235            scheduler: self,
236        })
237    }
238
239    pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
240        let handle = handle.as_current_thread();
241
242        // Avoid a double panic if we are currently panicking and
243        // the lock may be poisoned.
244
245        let core = match self.take_core(handle) {
246            Some(core) => core,
247            None if std::thread::panicking() => return,
248            None => panic!("Oh no! We never placed the Core back, this is a bug!"),
249        };
250
251        // Check that the thread-local is not being destroyed
252        let tls_available = context::with_current(|_| ()).is_ok();
253
254        if tls_available {
255            core.enter(|core, _context| {
256                let core = shutdown2(core, handle);
257                (core, ())
258            });
259        } else {
260            // Shutdown without setting the context. `tokio::spawn` calls will
261            // fail, but those will fail either way because the thread-local is
262            // not available anymore.
263            let context = core.context.expect_current_thread();
264            let core = context.core.borrow_mut().take().unwrap();
265
266            let core = shutdown2(core, handle);
267            *context.core.borrow_mut() = Some(core);
268        }
269    }
270}
271
272fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
273    // Drain the OwnedTasks collection. This call also closes the
274    // collection, ensuring that no tasks are ever pushed after this
275    // call returns.
276    handle.shared.owned.close_and_shutdown_all(0);
277
278    // Drain local queue
279    // We already shut down every task, so we just need to drop the task.
280    while let Some(task) = core.next_local_task(handle) {
281        drop(task);
282    }
283
284    // Close the injection queue
285    handle.shared.inject.close();
286
287    // Drain remote queue
288    while let Some(task) = handle.shared.inject.pop() {
289        drop(task);
290    }
291
292    assert!(handle.shared.owned.is_empty());
293
294    // Submit metrics
295    core.submit_metrics(handle);
296
297    // Shutdown the resource drivers
298    if let Some(driver) = core.driver.as_mut() {
299        driver.shutdown(&handle.driver);
300    }
301
302    core
303}
304
305impl fmt::Debug for CurrentThread {
306    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
307        fmt.debug_struct("CurrentThread").finish()
308    }
309}
310
311// ===== impl Core =====
312
313impl Core {
314    /// Get and increment the current tick
315    fn tick(&mut self) {
316        self.tick = self.tick.wrapping_add(1);
317    }
318
319    fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
320        if self.tick % self.global_queue_interval == 0 {
321            handle
322                .next_remote_task()
323                .or_else(|| self.next_local_task(handle))
324        } else {
325            self.next_local_task(handle)
326                .or_else(|| handle.next_remote_task())
327        }
328    }
329
330    fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
331        let ret = self.tasks.pop_front();
332        handle
333            .shared
334            .worker_metrics
335            .set_queue_depth(self.tasks.len());
336        ret
337    }
338
339    fn push_task(&mut self, handle: &Handle, task: Notified) {
340        self.tasks.push_back(task);
341        self.metrics.inc_local_schedule_count();
342        handle
343            .shared
344            .worker_metrics
345            .set_queue_depth(self.tasks.len());
346    }
347
348    fn submit_metrics(&mut self, handle: &Handle) {
349        self.metrics.submit(&handle.shared.worker_metrics, 0);
350    }
351}
352
353#[cfg(tokio_taskdump)]
354fn wake_deferred_tasks_and_free(context: &Context) {
355    let wakers = context.defer.take_deferred();
356    for waker in wakers {
357        waker.wake();
358    }
359}
360
361// ===== impl Context =====
362
363impl Context {
364    /// Execute the closure with the given scheduler core stored in the
365    /// thread-local context.
366    fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
367        core.metrics.start_poll();
368        let mut ret = self.enter(core, || crate::task::coop::budget(f));
369        ret.0.metrics.end_poll();
370        ret
371    }
372
373    /// Blocks the current thread until an event is received by the driver,
374    /// including I/O events, timer events, ...
375    fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
376        let mut driver = core.driver.take().expect("driver missing");
377
378        if let Some(f) = &handle.shared.config.before_park {
379            let (c, ()) = self.enter(core, || f());
380            core = c;
381        }
382
383        // This check will fail if `before_park` spawns a task for us to run
384        // instead of parking the thread
385        if core.tasks.is_empty() {
386            // Park until the thread is signaled
387            core.metrics.about_to_park();
388            core.submit_metrics(handle);
389
390            let (c, ()) = self.enter(core, || {
391                driver.park(&handle.driver);
392                self.defer.wake();
393            });
394
395            core = c;
396
397            core.metrics.unparked();
398            core.submit_metrics(handle);
399        }
400
401        if let Some(f) = &handle.shared.config.after_unpark {
402            let (c, ()) = self.enter(core, || f());
403            core = c;
404        }
405
406        core.driver = Some(driver);
407        core
408    }
409
410    /// Checks the driver for new events without blocking the thread.
411    fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
412        let mut driver = core.driver.take().expect("driver missing");
413
414        core.submit_metrics(handle);
415
416        let (mut core, ()) = self.enter(core, || {
417            driver.park_timeout(&handle.driver, Duration::from_millis(0));
418            self.defer.wake();
419        });
420
421        core.driver = Some(driver);
422        core
423    }
424
425    fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
426        // Store the scheduler core in the thread-local context
427        //
428        // A drop-guard is employed at a higher level.
429        *self.core.borrow_mut() = Some(core);
430
431        // Execute the closure while tracking the execution budget
432        let ret = f();
433
434        // Take the scheduler core back
435        let core = self.core.borrow_mut().take().expect("core missing");
436        (core, ret)
437    }
438
439    pub(crate) fn defer(&self, waker: &Waker) {
440        self.defer.defer(waker);
441    }
442}
443
444// ===== impl Handle =====
445
446impl Handle {
447    /// Spawns a future onto the `CurrentThread` scheduler
448    #[track_caller]
449    pub(crate) fn spawn<F>(
450        me: &Arc<Self>,
451        future: F,
452        id: crate::runtime::task::Id,
453        spawned_at: SpawnLocation,
454    ) -> JoinHandle<F::Output>
455    where
456        F: crate::future::Future + Send + 'static,
457        F::Output: Send + 'static,
458    {
459        let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
460
461        me.task_hooks.spawn(&TaskMeta {
462            id,
463            spawned_at,
464            _phantom: Default::default(),
465        });
466
467        if let Some(notified) = notified {
468            me.schedule(notified);
469        }
470
471        handle
472    }
473
474    /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
475    ///
476    /// # Safety
477    /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
478    /// provably cannot be driven from or moved to different threads from the one on which the task
479    /// is spawned.
480    #[track_caller]
481    pub(crate) unsafe fn spawn_local<F>(
482        me: &Arc<Self>,
483        future: F,
484        id: crate::runtime::task::Id,
485        spawned_at: SpawnLocation,
486    ) -> JoinHandle<F::Output>
487    where
488        F: crate::future::Future + 'static,
489        F::Output: 'static,
490    {
491        let (handle, notified) = me
492            .shared
493            .owned
494            .bind_local(future, me.clone(), id, spawned_at);
495
496        me.task_hooks.spawn(&TaskMeta {
497            id,
498            spawned_at,
499            _phantom: Default::default(),
500        });
501
502        if let Some(notified) = notified {
503            me.schedule(notified);
504        }
505
506        handle
507    }
508
509    /// Capture a snapshot of this runtime's state.
510    #[cfg(all(
511        tokio_unstable,
512        tokio_taskdump,
513        target_os = "linux",
514        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
515    ))]
516    pub(crate) fn dump(&self) -> crate::runtime::Dump {
517        use crate::runtime::dump;
518        use task::trace::trace_current_thread;
519
520        let mut traces = vec![];
521
522        // todo: how to make this work outside of a runtime context?
523        context::with_scheduler(|maybe_context| {
524            // drain the local queue
525            let context = if let Some(context) = maybe_context {
526                context.expect_current_thread()
527            } else {
528                return;
529            };
530            let mut maybe_core = context.core.borrow_mut();
531            let core = if let Some(core) = maybe_core.as_mut() {
532                core
533            } else {
534                return;
535            };
536            let local = &mut core.tasks;
537
538            if self.shared.inject.is_closed() {
539                return;
540            }
541
542            traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
543                .into_iter()
544                .map(|(id, trace)| dump::Task::new(id, trace))
545                .collect();
546
547            // Avoid double borrow panic
548            drop(maybe_core);
549
550            // Taking a taskdump could wakes every task, but we probably don't want
551            // the `yield_now` vector to be that large under normal circumstances.
552            // Therefore, we free its allocation.
553            wake_deferred_tasks_and_free(context);
554        });
555
556        dump::Dump::new(traces)
557    }
558
559    fn next_remote_task(&self) -> Option<Notified> {
560        self.shared.inject.pop()
561    }
562
563    fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
564        // Set woken to true when enter block_on, ensure outer future
565        // be polled for the first time when enter loop
566        me.shared.woken.store(true, Release);
567        waker_ref(me)
568    }
569
570    // reset woken to false and return original value
571    pub(crate) fn reset_woken(&self) -> bool {
572        self.shared.woken.swap(false, AcqRel)
573    }
574
575    pub(crate) fn num_alive_tasks(&self) -> usize {
576        self.shared.owned.num_alive_tasks()
577    }
578
579    pub(crate) fn injection_queue_depth(&self) -> usize {
580        self.shared.inject.len()
581    }
582
583    pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
584        assert_eq!(0, worker);
585        &self.shared.worker_metrics
586    }
587}
588
589cfg_unstable_metrics! {
590    impl Handle {
591        pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
592            &self.shared.scheduler_metrics
593        }
594
595        pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
596            self.worker_metrics(worker).queue_depth()
597        }
598
599        pub(crate) fn num_blocking_threads(&self) -> usize {
600            self.blocking_spawner.num_threads()
601        }
602
603        pub(crate) fn num_idle_blocking_threads(&self) -> usize {
604            self.blocking_spawner.num_idle_threads()
605        }
606
607        pub(crate) fn blocking_queue_depth(&self) -> usize {
608            self.blocking_spawner.queue_depth()
609        }
610
611        cfg_64bit_metrics! {
612            pub(crate) fn spawned_tasks_count(&self) -> u64 {
613                self.shared.owned.spawned_tasks_count()
614            }
615        }
616    }
617}
618
619cfg_unstable! {
620    use std::num::NonZeroU64;
621
622    impl Handle {
623        pub(crate) fn owned_id(&self) -> NonZeroU64 {
624            self.shared.owned.id
625        }
626    }
627}
628
629impl fmt::Debug for Handle {
630    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
631        fmt.debug_struct("current_thread::Handle { ... }").finish()
632    }
633}
634
635// ===== impl Shared =====
636
637impl Schedule for Arc<Handle> {
638    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
639        self.shared.owned.remove(task)
640    }
641
642    fn schedule(&self, task: task::Notified<Self>) {
643        use scheduler::Context::CurrentThread;
644
645        context::with_scheduler(|maybe_cx| match maybe_cx {
646            Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
647                let mut core = cx.core.borrow_mut();
648
649                // If `None`, the runtime is shutting down, so there is no need
650                // to schedule the task.
651                if let Some(core) = core.as_mut() {
652                    core.push_task(self, task);
653                }
654            }
655            _ => {
656                // Track that a task was scheduled from **outside** of the runtime.
657                self.shared.scheduler_metrics.inc_remote_schedule_count();
658
659                // Schedule the task
660                self.shared.inject.push(task);
661                self.driver.unpark();
662            }
663        });
664    }
665
666    fn hooks(&self) -> TaskHarnessScheduleHooks {
667        TaskHarnessScheduleHooks {
668            task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
669        }
670    }
671
672    cfg_unstable! {
673        fn unhandled_panic(&self) {
674            use crate::runtime::UnhandledPanic;
675
676            match self.shared.config.unhandled_panic {
677                UnhandledPanic::Ignore => {
678                    // Do nothing
679                }
680                UnhandledPanic::ShutdownRuntime => {
681                    use scheduler::Context::CurrentThread;
682
683                    // This hook is only called from within the runtime, so
684                    // `context::with_scheduler` should match with `&self`, i.e.
685                    // there is no opportunity for a nested scheduler to be
686                    // called.
687                    context::with_scheduler(|maybe_cx| match maybe_cx {
688                        Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
689                            let mut core = cx.core.borrow_mut();
690
691                            // If `None`, the runtime is shutting down, so there is no need to signal shutdown
692                            if let Some(core) = core.as_mut() {
693                                core.unhandled_panic = true;
694                                self.shared.owned.close_and_shutdown_all(0);
695                            }
696                        }
697                        _ => unreachable!("runtime core not set in CURRENT thread-local"),
698                    })
699                }
700            }
701        }
702    }
703}
704
705impl Wake for Handle {
706    fn wake(arc_self: Arc<Self>) {
707        Wake::wake_by_ref(&arc_self);
708    }
709
710    /// Wake by reference
711    fn wake_by_ref(arc_self: &Arc<Self>) {
712        arc_self.shared.woken.store(true, Release);
713        arc_self.driver.unpark();
714    }
715}
716
717// ===== CoreGuard =====
718
719/// Used to ensure we always place the `Core` value back into its slot in
720/// `CurrentThread`, even if the future panics.
721struct CoreGuard<'a> {
722    context: scheduler::Context,
723    scheduler: &'a CurrentThread,
724}
725
726impl CoreGuard<'_> {
727    #[track_caller]
728    fn block_on<F: Future>(self, future: F) -> F::Output {
729        let ret = self.enter(|mut core, context| {
730            let waker = Handle::waker_ref(&context.handle);
731            let mut cx = std::task::Context::from_waker(&waker);
732
733            pin!(future);
734
735            core.metrics.start_processing_scheduled_tasks();
736
737            'outer: loop {
738                let handle = &context.handle;
739
740                if handle.reset_woken() {
741                    let (c, res) = context.enter(core, || {
742                        crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
743                    });
744
745                    core = c;
746
747                    if let Ready(v) = res {
748                        return (core, Some(v));
749                    }
750                }
751
752                for _ in 0..handle.shared.config.event_interval {
753                    // Make sure we didn't hit an unhandled_panic
754                    if core.unhandled_panic {
755                        return (core, None);
756                    }
757
758                    core.tick();
759
760                    let entry = core.next_task(handle);
761
762                    let task = match entry {
763                        Some(entry) => entry,
764                        None => {
765                            core.metrics.end_processing_scheduled_tasks();
766
767                            core = if !context.defer.is_empty() {
768                                context.park_yield(core, handle)
769                            } else {
770                                context.park(core, handle)
771                            };
772
773                            core.metrics.start_processing_scheduled_tasks();
774
775                            // Try polling the `block_on` future next
776                            continue 'outer;
777                        }
778                    };
779
780                    let task = context.handle.shared.owned.assert_owner(task);
781
782                    #[cfg(tokio_unstable)]
783                    let task_meta = task.task_meta();
784
785                    let (c, ()) = context.run_task(core, || {
786                        #[cfg(tokio_unstable)]
787                        context.handle.task_hooks.poll_start_callback(&task_meta);
788
789                        task.run();
790
791                        #[cfg(tokio_unstable)]
792                        context.handle.task_hooks.poll_stop_callback(&task_meta);
793                    });
794
795                    core = c;
796                }
797
798                core.metrics.end_processing_scheduled_tasks();
799
800                // Yield to the driver, this drives the timer and pulls any
801                // pending I/O events.
802                core = context.park_yield(core, handle);
803
804                core.metrics.start_processing_scheduled_tasks();
805            }
806        });
807
808        match ret {
809            Some(ret) => ret,
810            None => {
811                // `block_on` panicked.
812                panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
813            }
814        }
815    }
816
817    /// Enters the scheduler context. This sets the queue and other necessary
818    /// scheduler state in the thread-local.
819    fn enter<F, R>(self, f: F) -> R
820    where
821        F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
822    {
823        let context = self.context.expect_current_thread();
824
825        // Remove `core` from `context` to pass into the closure.
826        let core = context.core.borrow_mut().take().expect("core missing");
827
828        // Call the closure and place `core` back
829        let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
830
831        *context.core.borrow_mut() = Some(core);
832
833        ret
834    }
835}
836
837impl Drop for CoreGuard<'_> {
838    fn drop(&mut self) {
839        let context = self.context.expect_current_thread();
840
841        if let Some(core) = context.core.borrow_mut().take() {
842            // Replace old scheduler back into the state to allow
843            // other threads to pick it up and drive it.
844            self.scheduler.core.set(core);
845
846            // Wake up other possible threads that could steal the driver.
847            self.scheduler.notify.notify_one();
848        }
849    }
850}