tokio/runtime/task/
harness.rs

1use crate::future::Future;
2use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3use crate::runtime::task::state::{Snapshot, State};
4use crate::runtime::task::waker::waker_ref;
5use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
6
7#[cfg(tokio_unstable)]
8use crate::runtime::TaskMeta;
9use std::any::Any;
10use std::mem;
11use std::mem::ManuallyDrop;
12use std::panic;
13use std::ptr::NonNull;
14use std::task::{Context, Poll, Waker};
15
16/// Typed raw task handle.
17pub(super) struct Harness<T: Future, S: 'static> {
18    cell: NonNull<Cell<T, S>>,
19}
20
21impl<T, S> Harness<T, S>
22where
23    T: Future,
24    S: 'static,
25{
26    pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
27        Harness {
28            cell: ptr.cast::<Cell<T, S>>(),
29        }
30    }
31
32    fn header_ptr(&self) -> NonNull<Header> {
33        self.cell.cast()
34    }
35
36    fn header(&self) -> &Header {
37        unsafe { &*self.header_ptr().as_ptr() }
38    }
39
40    fn state(&self) -> &State {
41        &self.header().state
42    }
43
44    fn trailer(&self) -> &Trailer {
45        unsafe { &self.cell.as_ref().trailer }
46    }
47
48    fn core(&self) -> &Core<T, S> {
49        unsafe { &self.cell.as_ref().core }
50    }
51}
52
53/// Task operations that can be implemented without being generic over the
54/// scheduler or task. Only one version of these methods should exist in the
55/// final binary.
56impl RawTask {
57    pub(super) fn drop_reference(self) {
58        if self.state().ref_dec() {
59            self.dealloc();
60        }
61    }
62
63    /// This call consumes a ref-count and notifies the task. This will create a
64    /// new Notified and submit it if necessary.
65    ///
66    /// The caller does not need to hold a ref-count besides the one that was
67    /// passed to this call.
68    pub(super) fn wake_by_val(&self) {
69        use super::state::TransitionToNotifiedByVal;
70
71        match self.state().transition_to_notified_by_val() {
72            TransitionToNotifiedByVal::Submit => {
73                // The caller has given us a ref-count, and the transition has
74                // created a new ref-count, so we now hold two. We turn the new
75                // ref-count Notified and pass it to the call to `schedule`.
76                //
77                // The old ref-count is retained for now to ensure that the task
78                // is not dropped during the call to `schedule` if the call
79                // drops the task it was given.
80                self.schedule();
81
82                // Now that we have completed the call to schedule, we can
83                // release our ref-count.
84                self.drop_reference();
85            }
86            TransitionToNotifiedByVal::Dealloc => {
87                self.dealloc();
88            }
89            TransitionToNotifiedByVal::DoNothing => {}
90        }
91    }
92
93    /// This call notifies the task. It will not consume any ref-counts, but the
94    /// caller should hold a ref-count.  This will create a new Notified and
95    /// submit it if necessary.
96    pub(super) fn wake_by_ref(&self) {
97        use super::state::TransitionToNotifiedByRef;
98
99        match self.state().transition_to_notified_by_ref() {
100            TransitionToNotifiedByRef::Submit => {
101                // The transition above incremented the ref-count for a new task
102                // and the caller also holds a ref-count. The caller's ref-count
103                // ensures that the task is not destroyed even if the new task
104                // is dropped before `schedule` returns.
105                self.schedule();
106            }
107            TransitionToNotifiedByRef::DoNothing => {}
108        }
109    }
110
111    /// Remotely aborts the task.
112    ///
113    /// The caller should hold a ref-count, but we do not consume it.
114    ///
115    /// This is similar to `shutdown` except that it asks the runtime to perform
116    /// the shutdown. This is necessary to avoid the shutdown happening in the
117    /// wrong thread for non-Send tasks.
118    pub(super) fn remote_abort(&self) {
119        if self.state().transition_to_notified_and_cancel() {
120            // The transition has created a new ref-count, which we turn into
121            // a Notified and pass to the task.
122            //
123            // Since the caller holds a ref-count, the task cannot be destroyed
124            // before the call to `schedule` returns even if the call drops the
125            // `Notified` internally.
126            self.schedule();
127        }
128    }
129
130    /// Try to set the waker notified when the task is complete. Returns true if
131    /// the task has already completed. If this call returns false, then the
132    /// waker will not be notified.
133    pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
134        can_read_output(self.header(), self.trailer(), waker)
135    }
136}
137
138impl<T, S> Harness<T, S>
139where
140    T: Future,
141    S: Schedule,
142{
143    pub(super) fn drop_reference(self) {
144        if self.state().ref_dec() {
145            self.dealloc();
146        }
147    }
148
149    /// Polls the inner future. A ref-count is consumed.
150    ///
151    /// All necessary state checks and transitions are performed.
152    /// Panics raised while polling the future are handled.
153    pub(super) fn poll(self) {
154        // We pass our ref-count to `poll_inner`.
155        match self.poll_inner() {
156            PollFuture::Notified => {
157                // The `poll_inner` call has given us two ref-counts back.
158                // We give one of them to a new task and call `yield_now`.
159                self.core()
160                    .scheduler
161                    .yield_now(Notified(self.get_new_task()));
162
163                // The remaining ref-count is now dropped. We kept the extra
164                // ref-count until now to ensure that even if the `yield_now`
165                // call drops the provided task, the task isn't deallocated
166                // before after `yield_now` returns.
167                self.drop_reference();
168            }
169            PollFuture::Complete => {
170                self.complete();
171            }
172            PollFuture::Dealloc => {
173                self.dealloc();
174            }
175            PollFuture::Done => (),
176        }
177    }
178
179    /// Polls the task and cancel it if necessary. This takes ownership of a
180    /// ref-count.
181    ///
182    /// If the return value is Notified, the caller is given ownership of two
183    /// ref-counts.
184    ///
185    /// If the return value is Complete, the caller is given ownership of a
186    /// single ref-count, which should be passed on to `complete`.
187    ///
188    /// If the return value is `Dealloc`, then this call consumed the last
189    /// ref-count and the caller should call `dealloc`.
190    ///
191    /// Otherwise the ref-count is consumed and the caller should not access
192    /// `self` again.
193    fn poll_inner(&self) -> PollFuture {
194        use super::state::{TransitionToIdle, TransitionToRunning};
195
196        match self.state().transition_to_running() {
197            TransitionToRunning::Success => {
198                // Separated to reduce LLVM codegen
199                fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
200                    match result {
201                        TransitionToIdle::Ok => PollFuture::Done,
202                        TransitionToIdle::OkNotified => PollFuture::Notified,
203                        TransitionToIdle::OkDealloc => PollFuture::Dealloc,
204                        TransitionToIdle::Cancelled => PollFuture::Complete,
205                    }
206                }
207                let header_ptr = self.header_ptr();
208                let waker_ref = waker_ref::<S>(&header_ptr);
209                let cx = Context::from_waker(&waker_ref);
210                let res = poll_future(self.core(), cx);
211
212                if res == Poll::Ready(()) {
213                    // The future completed. Move on to complete the task.
214                    return PollFuture::Complete;
215                }
216
217                let transition_res = self.state().transition_to_idle();
218                if let TransitionToIdle::Cancelled = transition_res {
219                    // The transition to idle failed because the task was
220                    // cancelled during the poll.
221                    cancel_task(self.core());
222                }
223                transition_result_to_poll_future(transition_res)
224            }
225            TransitionToRunning::Cancelled => {
226                cancel_task(self.core());
227                PollFuture::Complete
228            }
229            TransitionToRunning::Failed => PollFuture::Done,
230            TransitionToRunning::Dealloc => PollFuture::Dealloc,
231        }
232    }
233
234    /// Forcibly shuts down the task.
235    ///
236    /// Attempt to transition to `Running` in order to forcibly shutdown the
237    /// task. If the task is currently running or in a state of completion, then
238    /// there is nothing further to do. When the task completes running, it will
239    /// notice the `CANCELLED` bit and finalize the task.
240    pub(super) fn shutdown(self) {
241        if !self.state().transition_to_shutdown() {
242            // The task is concurrently running. No further work needed.
243            self.drop_reference();
244            return;
245        }
246
247        // By transitioning the lifecycle to `Running`, we have permission to
248        // drop the future.
249        cancel_task(self.core());
250        self.complete();
251    }
252
253    pub(super) fn dealloc(self) {
254        // Observe that we expect to have mutable access to these objects
255        // because we are going to drop them. This only matters when running
256        // under loom.
257        self.trailer().waker.with_mut(|_| ());
258        self.core().stage.with_mut(|_| ());
259
260        // Safety: The caller of this method just transitioned our ref-count to
261        // zero, so it is our responsibility to release the allocation.
262        //
263        // We don't hold any references into the allocation at this point, but
264        // it is possible for another thread to still hold a `&State` into the
265        // allocation if that other thread has decremented its last ref-count,
266        // but has not yet returned from the relevant method on `State`.
267        //
268        // However, the `State` type consists of just an `AtomicUsize`, and an
269        // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
270        // As explained in the documentation for `UnsafeCell`, such references
271        // are allowed to be dangling after their last use, even if the
272        // reference has not yet gone out of scope.
273        unsafe {
274            drop(Box::from_raw(self.cell.as_ptr()));
275        }
276    }
277
278    // ===== join handle =====
279
280    /// Read the task output into `dst`.
281    pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
282        if can_read_output(self.header(), self.trailer(), waker) {
283            *dst = Poll::Ready(self.core().take_output());
284        }
285    }
286
287    pub(super) fn drop_join_handle_slow(self) {
288        // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in
289        // case the task concurrently completed.
290        let transition = self.state().transition_to_join_handle_dropped();
291
292        if transition.drop_output {
293            // It is our responsibility to drop the output. This is critical as
294            // the task output may not be `Send` and as such must remain with
295            // the scheduler or `JoinHandle`. i.e. if the output remains in the
296            // task structure until the task is deallocated, it may be dropped
297            // by a Waker on any arbitrary thread.
298            //
299            // Panics are delivered to the user via the `JoinHandle`. Given that
300            // they are dropping the `JoinHandle`, we assume they are not
301            // interested in the panic and swallow it.
302            let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
303                self.core().drop_future_or_output();
304            }));
305        }
306
307        if transition.drop_waker {
308            // If the JOIN_WAKER flag is unset at this point, the task is either
309            // already terminal or not complete so the `JoinHandle` is responsible
310            // for dropping the waker.
311            // Safety:
312            // If the JOIN_WAKER bit is not set the join handle has exclusive
313            // access to the waker as per rule 2 in task/mod.rs.
314            // This can only be the case at this point in two scenarios:
315            // 1. The task completed and the runtime unset `JOIN_WAKER` flag
316            //    after accessing the waker during task completion. So the
317            //    `JoinHandle` is the only one to access the  join waker here.
318            // 2. The task is not completed so the `JoinHandle` was able to unset
319            //    `JOIN_WAKER` bit itself to get mutable access to the waker.
320            //    The runtime will not access the waker when this flag is unset.
321            unsafe { self.trailer().set_waker(None) };
322        }
323
324        // Drop the `JoinHandle` reference, possibly deallocating the task
325        self.drop_reference();
326    }
327
328    // ====== internal ======
329
330    /// Completes the task. This method assumes that the state is RUNNING.
331    fn complete(self) {
332        // The future has completed and its output has been written to the task
333        // stage. We transition from running to complete.
334        let snapshot = self.state().transition_to_complete();
335
336        // We catch panics here in case dropping the future or waking the
337        // JoinHandle panics.
338        let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
339            if !snapshot.is_join_interested() {
340                // The `JoinHandle` is not interested in the output of
341                // this task. It is our responsibility to drop the
342                // output. The join waker was already dropped by the
343                // `JoinHandle` before.
344                self.core().drop_future_or_output();
345            } else if snapshot.is_join_waker_set() {
346                // Notify the waker. Reading the waker field is safe per rule 4
347                // in task/mod.rs, since the JOIN_WAKER bit is set and the call
348                // to transition_to_complete() above set the COMPLETE bit.
349                self.trailer().wake_join();
350
351                // Inform the `JoinHandle` that we are done waking the waker by
352                // unsetting the `JOIN_WAKER` bit. If the `JoinHandle` has
353                // already been dropped and `JOIN_INTEREST` is unset, then we must
354                // drop the waker ourselves.
355                if !self
356                    .state()
357                    .unset_waker_after_complete()
358                    .is_join_interested()
359                {
360                    // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so
361                    // we have exclusive access to the waker.
362                    unsafe { self.trailer().set_waker(None) };
363                }
364            }
365        }));
366
367        // We catch panics here in case invoking a hook panics.
368        //
369        // We call this in a separate block so that it runs after the task appears to have
370        // completed and will still run if the destructor panics.
371        #[cfg(tokio_unstable)]
372        if let Some(f) = self.trailer().hooks.task_terminate_callback.as_ref() {
373            let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
374                f(&TaskMeta {
375                    id: self.core().task_id,
376                    spawned_at: self.core().spawned_at.into(),
377                    _phantom: Default::default(),
378                })
379            }));
380        }
381
382        // The task has completed execution and will no longer be scheduled.
383        let num_release = self.release();
384
385        if self.state().transition_to_terminal(num_release) {
386            self.dealloc();
387        }
388    }
389
390    /// Releases the task from the scheduler. Returns the number of ref-counts
391    /// that should be decremented.
392    fn release(&self) -> usize {
393        // We don't actually increment the ref-count here, but the new task is
394        // never destroyed, so that's ok.
395        let me = ManuallyDrop::new(self.get_new_task());
396
397        if let Some(task) = self.core().scheduler.release(&me) {
398            mem::forget(task);
399            2
400        } else {
401            1
402        }
403    }
404
405    /// Creates a new task that holds its own ref-count.
406    ///
407    /// # Safety
408    ///
409    /// Any use of `self` after this call must ensure that a ref-count to the
410    /// task holds the task alive until after the use of `self`. Passing the
411    /// returned Task to any method on `self` is unsound if dropping the Task
412    /// could drop `self` before the call on `self` returned.
413    fn get_new_task(&self) -> Task<S> {
414        // safety: The header is at the beginning of the cell, so this cast is
415        // safe.
416        unsafe { Task::from_raw(self.cell.cast()) }
417    }
418}
419
420fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
421    // Load a snapshot of the current task state
422    let snapshot = header.state.load();
423
424    debug_assert!(snapshot.is_join_interested());
425
426    if !snapshot.is_complete() {
427        // If the task is not complete, try storing the provided waker in the
428        // task's waker field.
429
430        let res = if snapshot.is_join_waker_set() {
431            // If JOIN_WAKER is set, then JoinHandle has previously stored a
432            // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
433
434            // Optimization: if the stored waker and the provided waker wake the
435            // same task, then return without touching the waker field. (Reading
436            // the waker field below is safe per rule 3 in task/mod.rs.)
437            if unsafe { trailer.will_wake(waker) } {
438                return false;
439            }
440
441            // Otherwise swap the stored waker with the provided waker by
442            // following the rule 5 in task/mod.rs.
443            header
444                .state
445                .unset_waker()
446                .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
447        } else {
448            // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
449            // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
450            // of rule 5 and try to store the provided waker in the waker field.
451            set_join_waker(header, trailer, waker.clone(), snapshot)
452        };
453
454        match res {
455            Ok(_) => return false,
456            Err(snapshot) => {
457                assert!(snapshot.is_complete());
458            }
459        }
460    }
461    true
462}
463
464fn set_join_waker(
465    header: &Header,
466    trailer: &Trailer,
467    waker: Waker,
468    snapshot: Snapshot,
469) -> Result<Snapshot, Snapshot> {
470    assert!(snapshot.is_join_interested());
471    assert!(!snapshot.is_join_waker_set());
472
473    // Safety: Only the `JoinHandle` may set the `waker` field. When
474    // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
475    unsafe {
476        trailer.set_waker(Some(waker));
477    }
478
479    // Update the `JoinWaker` state accordingly
480    let res = header.state.set_join_waker();
481
482    // If the state could not be updated, then clear the join waker
483    if res.is_err() {
484        unsafe {
485            trailer.set_waker(None);
486        }
487    }
488
489    res
490}
491
492enum PollFuture {
493    Complete,
494    Notified,
495    Done,
496    Dealloc,
497}
498
499/// Cancels the task and store the appropriate error in the stage field.
500fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
501    // Drop the future from a panic guard.
502    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
503        core.drop_future_or_output();
504    }));
505
506    core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
507}
508
509fn panic_result_to_join_error(
510    task_id: Id,
511    res: Result<(), Box<dyn Any + Send + 'static>>,
512) -> JoinError {
513    match res {
514        Ok(()) => JoinError::cancelled(task_id),
515        Err(panic) => JoinError::panic(task_id, panic),
516    }
517}
518
519/// Polls the future. If the future completes, the output is written to the
520/// stage field.
521fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
522    // Poll the future.
523    let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
524        struct Guard<'a, T: Future, S: Schedule> {
525            core: &'a Core<T, S>,
526        }
527        impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
528            fn drop(&mut self) {
529                // If the future panics on poll, we drop it inside the panic
530                // guard.
531                self.core.drop_future_or_output();
532            }
533        }
534        let guard = Guard { core };
535        let res = guard.core.poll(cx);
536        mem::forget(guard);
537        res
538    }));
539
540    // Prepare output for being placed in the core stage.
541    let output = match output {
542        Ok(Poll::Pending) => return Poll::Pending,
543        Ok(Poll::Ready(output)) => Ok(output),
544        Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
545    };
546
547    // Catch and ignore panics if the future panics on drop.
548    let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
549        core.store_output(output);
550    }));
551
552    if res.is_err() {
553        core.scheduler.unhandled_panic();
554    }
555
556    Poll::Ready(())
557}
558
559#[cold]
560fn panic_to_error<S: Schedule>(
561    scheduler: &S,
562    task_id: Id,
563    panic: Box<dyn Any + Send + 'static>,
564) -> JoinError {
565    scheduler.unhandled_panic();
566    JoinError::panic(task_id, panic)
567}