tokio/sync/notify.rs
1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::sync::Arc;
21use std::task::{Context, Poll, Waker};
22
23type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
25
26/// Notifies a single task to wake up.
27///
28/// `Notify` provides a basic mechanism to notify a single task of an event.
29/// `Notify` itself does not carry any data. Instead, it is to be used to signal
30/// another task to perform an operation.
31///
32/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
33/// [`notified().await`] method waits for a permit to become available, and
34/// [`notify_one()`] sets a permit **if there currently are no available
35/// permits**.
36///
37/// The synchronization details of `Notify` are similar to
38/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
39/// value contains a single permit. [`notified().await`] waits for the permit to
40/// be made available, consumes the permit, and resumes. [`notify_one()`] sets
41/// the permit, waking a pending task if there is one.
42///
43/// If `notify_one()` is called **before** `notified().await`, then the next
44/// call to `notified().await` will complete immediately, consuming the permit.
45/// Any subsequent calls to `notified().await` will wait for a new permit.
46///
47/// If `notify_one()` is called **multiple** times before `notified().await`,
48/// only a **single** permit is stored. The next call to `notified().await` will
49/// complete immediately, but the one after will wait for a new permit.
50///
51/// # Examples
52///
53/// Basic usage.
54///
55/// ```
56/// use tokio::sync::Notify;
57/// use std::sync::Arc;
58///
59/// #[tokio::main]
60/// async fn main() {
61/// let notify = Arc::new(Notify::new());
62/// let notify2 = notify.clone();
63///
64/// let handle = tokio::spawn(async move {
65/// notify2.notified().await;
66/// println!("received notification");
67/// });
68///
69/// println!("sending notification");
70/// notify.notify_one();
71///
72/// // Wait for task to receive notification.
73/// handle.await.unwrap();
74/// }
75/// ```
76///
77/// Unbound multi-producer single-consumer (mpsc) channel.
78///
79/// No wakeups can be lost when using this channel because the call to
80/// `notify_one()` will store a permit in the `Notify`, which the following call
81/// to `notified()` will consume.
82///
83/// ```
84/// use tokio::sync::Notify;
85///
86/// use std::collections::VecDeque;
87/// use std::sync::Mutex;
88///
89/// struct Channel<T> {
90/// values: Mutex<VecDeque<T>>,
91/// notify: Notify,
92/// }
93///
94/// impl<T> Channel<T> {
95/// pub fn send(&self, value: T) {
96/// self.values.lock().unwrap()
97/// .push_back(value);
98///
99/// // Notify the consumer a value is available
100/// self.notify.notify_one();
101/// }
102///
103/// // This is a single-consumer channel, so several concurrent calls to
104/// // `recv` are not allowed.
105/// pub async fn recv(&self) -> T {
106/// loop {
107/// // Drain values
108/// if let Some(value) = self.values.lock().unwrap().pop_front() {
109/// return value;
110/// }
111///
112/// // Wait for values to be available
113/// self.notify.notified().await;
114/// }
115/// }
116/// }
117/// ```
118///
119/// Unbound multi-producer multi-consumer (mpmc) channel.
120///
121/// The call to [`enable`] is important because otherwise if you have two
122/// calls to `recv` and two calls to `send` in parallel, the following could
123/// happen:
124///
125/// 1. Both calls to `try_recv` return `None`.
126/// 2. Both new elements are added to the vector.
127/// 3. The `notify_one` method is called twice, adding only a single
128/// permit to the `Notify`.
129/// 4. Both calls to `recv` reach the `Notified` future. One of them
130/// consumes the permit, and the other sleeps forever.
131///
132/// By adding the `Notified` futures to the list by calling `enable` before
133/// `try_recv`, the `notify_one` calls in step three would remove the
134/// futures from the list and mark them notified instead of adding a permit
135/// to the `Notify`. This ensures that both futures are woken.
136///
137/// Notice that this failure can only happen if there are two concurrent calls
138/// to `recv`. This is why the mpsc example above does not require a call to
139/// `enable`.
140///
141/// ```
142/// use tokio::sync::Notify;
143///
144/// use std::collections::VecDeque;
145/// use std::sync::Mutex;
146///
147/// struct Channel<T> {
148/// messages: Mutex<VecDeque<T>>,
149/// notify_on_sent: Notify,
150/// }
151///
152/// impl<T> Channel<T> {
153/// pub fn send(&self, msg: T) {
154/// let mut locked_queue = self.messages.lock().unwrap();
155/// locked_queue.push_back(msg);
156/// drop(locked_queue);
157///
158/// // Send a notification to one of the calls currently
159/// // waiting in a call to `recv`.
160/// self.notify_on_sent.notify_one();
161/// }
162///
163/// pub fn try_recv(&self) -> Option<T> {
164/// let mut locked_queue = self.messages.lock().unwrap();
165/// locked_queue.pop_front()
166/// }
167///
168/// pub async fn recv(&self) -> T {
169/// let future = self.notify_on_sent.notified();
170/// tokio::pin!(future);
171///
172/// loop {
173/// // Make sure that no wakeup is lost if we get
174/// // `None` from `try_recv`.
175/// future.as_mut().enable();
176///
177/// if let Some(msg) = self.try_recv() {
178/// return msg;
179/// }
180///
181/// // Wait for a call to `notify_one`.
182/// //
183/// // This uses `.as_mut()` to avoid consuming the future,
184/// // which lets us call `Pin::set` below.
185/// future.as_mut().await;
186///
187/// // Reset the future in case another call to
188/// // `try_recv` got the message before us.
189/// future.set(self.notify_on_sent.notified());
190/// }
191/// }
192/// }
193/// ```
194///
195/// [park]: std::thread::park
196/// [unpark]: std::thread::Thread::unpark
197/// [`notified().await`]: Notify::notified()
198/// [`notify_one()`]: Notify::notify_one()
199/// [`enable`]: Notified::enable()
200/// [`Semaphore`]: crate::sync::Semaphore
201#[derive(Debug)]
202pub struct Notify {
203 // `state` uses 2 bits to store one of `EMPTY`,
204 // `WAITING` or `NOTIFIED`. The rest of the bits
205 // are used to store the number of times `notify_waiters`
206 // was called.
207 //
208 // Throughout the code there are two assumptions:
209 // - state can be transitioned *from* `WAITING` only if
210 // `waiters` lock is held
211 // - number of times `notify_waiters` was called can
212 // be modified only if `waiters` lock is held
213 state: AtomicUsize,
214 waiters: Mutex<WaitList>,
215}
216
217#[derive(Debug)]
218struct Waiter {
219 /// Intrusive linked-list pointers.
220 pointers: linked_list::Pointers<Waiter>,
221
222 /// Waiting task's waker. Depending on the value of `notification`,
223 /// this field is either protected by the `waiters` lock in
224 /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
225 waker: UnsafeCell<Option<Waker>>,
226
227 /// Notification for this waiter. Uses 2 bits to store if and how was
228 /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
229 /// the rest of it is unused.
230 /// * if it's `None`, then `waker` is protected by the `waiters` lock.
231 /// * if it's `Some`, then `waker` is exclusively owned by the
232 /// enclosing `Waiter` and can be accessed without locking.
233 notification: AtomicNotification,
234
235 /// Should not be `Unpin`.
236 _p: PhantomPinned,
237}
238
239impl Waiter {
240 fn new() -> Waiter {
241 Waiter {
242 pointers: linked_list::Pointers::new(),
243 waker: UnsafeCell::new(None),
244 notification: AtomicNotification::none(),
245 _p: PhantomPinned,
246 }
247 }
248}
249
250generate_addr_of_methods! {
251 impl<> Waiter {
252 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
253 &self.pointers
254 }
255 }
256}
257
258// No notification.
259const NOTIFICATION_NONE: usize = 0b000;
260
261// Notification type used by `notify_one`.
262const NOTIFICATION_ONE: usize = 0b001;
263
264// Notification type used by `notify_last`.
265const NOTIFICATION_LAST: usize = 0b101;
266
267// Notification type used by `notify_waiters`.
268const NOTIFICATION_ALL: usize = 0b010;
269
270/// Notification for a `Waiter`.
271/// This struct is equivalent to `Option<Notification>`, but uses
272/// `AtomicUsize` inside for atomic operations.
273#[derive(Debug)]
274struct AtomicNotification(AtomicUsize);
275
276impl AtomicNotification {
277 fn none() -> Self {
278 AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
279 }
280
281 /// Store-release a notification.
282 /// This method should be called exactly once.
283 fn store_release(&self, notification: Notification) {
284 let data: usize = match notification {
285 Notification::All => NOTIFICATION_ALL,
286 Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
287 Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
288 };
289 self.0.store(data, Release);
290 }
291
292 fn load(&self, ordering: Ordering) -> Option<Notification> {
293 let data = self.0.load(ordering);
294 match data {
295 NOTIFICATION_NONE => None,
296 NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
297 NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
298 NOTIFICATION_ALL => Some(Notification::All),
299 _ => unreachable!(),
300 }
301 }
302
303 /// Clears the notification.
304 /// This method is used by a `Notified` future to consume the
305 /// notification. It uses relaxed ordering and should be only
306 /// used once the atomic notification is no longer shared.
307 fn clear(&self) {
308 self.0.store(NOTIFICATION_NONE, Relaxed);
309 }
310}
311
312#[derive(Debug, PartialEq, Eq)]
313#[repr(usize)]
314enum NotifyOneStrategy {
315 Fifo,
316 Lifo,
317}
318
319#[derive(Debug, PartialEq, Eq)]
320#[repr(usize)]
321enum Notification {
322 One(NotifyOneStrategy),
323 All,
324}
325
326/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
327/// and gates the access to it on `notify.waiters` mutex. It also empties
328/// the list on drop.
329struct NotifyWaitersList<'a> {
330 list: GuardedWaitList,
331 is_empty: bool,
332 notify: &'a Notify,
333}
334
335impl<'a> NotifyWaitersList<'a> {
336 fn new(
337 unguarded_list: WaitList,
338 guard: Pin<&'a Waiter>,
339 notify: &'a Notify,
340 ) -> NotifyWaitersList<'a> {
341 let guard_ptr = NonNull::from(guard.get_ref());
342 let list = unguarded_list.into_guarded(guard_ptr);
343 NotifyWaitersList {
344 list,
345 is_empty: false,
346 notify,
347 }
348 }
349
350 /// Removes the last element from the guarded list. Modifying this list
351 /// requires an exclusive access to the main list in `Notify`.
352 fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
353 let result = self.list.pop_back();
354 if result.is_none() {
355 // Save information about emptiness to avoid waiting for lock
356 // in the destructor.
357 self.is_empty = true;
358 }
359 result
360 }
361}
362
363impl Drop for NotifyWaitersList<'_> {
364 fn drop(&mut self) {
365 // If the list is not empty, we unlink all waiters from it.
366 // We do not wake the waiters to avoid double panics.
367 if !self.is_empty {
368 let _lock_guard = self.notify.waiters.lock();
369 while let Some(waiter) = self.list.pop_back() {
370 // Safety: we never make mutable references to waiters.
371 let waiter = unsafe { waiter.as_ref() };
372 waiter.notification.store_release(Notification::All);
373 }
374 }
375 }
376}
377
378/// Future returned from [`Notify::notified()`].
379///
380/// This future is fused, so once it has completed, any future calls to poll
381/// will immediately return `Poll::Ready`.
382#[derive(Debug)]
383#[must_use = "futures do nothing unless you `.await` or poll them"]
384pub struct Notified<'a> {
385 /// The `Notify` being received on.
386 notify: &'a Notify,
387
388 /// The current state of the receiving process.
389 state: State,
390
391 /// Number of calls to `notify_waiters` at the time of creation.
392 notify_waiters_calls: usize,
393
394 /// Entry in the waiter `LinkedList`.
395 waiter: Waiter,
396}
397
398unsafe impl<'a> Send for Notified<'a> {}
399unsafe impl<'a> Sync for Notified<'a> {}
400
401/// Future returned from [`Notify::notified_owned()`].
402///
403/// This future is fused, so once it has completed, any future calls to poll
404/// will immediately return `Poll::Ready`.
405#[derive(Debug)]
406#[must_use = "futures do nothing unless you `.await` or poll them"]
407pub struct OwnedNotified {
408 /// The `Notify` being received on.
409 notify: Arc<Notify>,
410
411 /// The current state of the receiving process.
412 state: State,
413
414 /// Number of calls to `notify_waiters` at the time of creation.
415 notify_waiters_calls: usize,
416
417 /// Entry in the waiter `LinkedList`.
418 waiter: Waiter,
419}
420
421unsafe impl Sync for OwnedNotified {}
422
423/// A custom `project` implementation is used in place of `pin-project-lite`
424/// as a custom drop for [`Notified`] and [`OwnedNotified`] implementation
425/// is needed.
426struct NotifiedProject<'a> {
427 notify: &'a Notify,
428 state: &'a mut State,
429 notify_waiters_calls: &'a usize,
430 waiter: &'a Waiter,
431}
432
433#[derive(Debug)]
434enum State {
435 Init,
436 Waiting,
437 Done,
438}
439
440const NOTIFY_WAITERS_SHIFT: usize = 2;
441const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
442const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
443
444/// Initial "idle" state.
445const EMPTY: usize = 0;
446
447/// One or more threads are currently waiting to be notified.
448const WAITING: usize = 1;
449
450/// Pending notification.
451const NOTIFIED: usize = 2;
452
453fn set_state(data: usize, state: usize) -> usize {
454 (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
455}
456
457fn get_state(data: usize) -> usize {
458 data & STATE_MASK
459}
460
461fn get_num_notify_waiters_calls(data: usize) -> usize {
462 (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
463}
464
465fn inc_num_notify_waiters_calls(data: usize) -> usize {
466 data + (1 << NOTIFY_WAITERS_SHIFT)
467}
468
469fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
470 data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
471}
472
473impl Notify {
474 /// Create a new `Notify`, initialized without a permit.
475 ///
476 /// # Examples
477 ///
478 /// ```
479 /// use tokio::sync::Notify;
480 ///
481 /// let notify = Notify::new();
482 /// ```
483 pub fn new() -> Notify {
484 Notify {
485 state: AtomicUsize::new(0),
486 waiters: Mutex::new(LinkedList::new()),
487 }
488 }
489
490 /// Create a new `Notify`, initialized without a permit.
491 ///
492 /// When using the `tracing` [unstable feature], a `Notify` created with
493 /// `const_new` will not be instrumented. As such, it will not be visible
494 /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
495 /// an instrumented object if that is needed.
496 ///
497 /// # Examples
498 ///
499 /// ```
500 /// use tokio::sync::Notify;
501 ///
502 /// static NOTIFY: Notify = Notify::const_new();
503 /// ```
504 ///
505 /// [`tokio-console`]: https://github.com/tokio-rs/console
506 /// [unstable feature]: crate#unstable-features
507 #[cfg(not(all(loom, test)))]
508 pub const fn const_new() -> Notify {
509 Notify {
510 state: AtomicUsize::new(0),
511 waiters: Mutex::const_new(LinkedList::new()),
512 }
513 }
514
515 /// Wait for a notification.
516 ///
517 /// Equivalent to:
518 ///
519 /// ```ignore
520 /// async fn notified(&self);
521 /// ```
522 ///
523 /// Each `Notify` value holds a single permit. If a permit is available from
524 /// an earlier call to [`notify_one()`], then `notified().await` will complete
525 /// immediately, consuming that permit. Otherwise, `notified().await` waits
526 /// for a permit to be made available by the next call to `notify_one()`.
527 ///
528 /// The `Notified` future is not guaranteed to receive wakeups from calls to
529 /// `notify_one()` if it has not yet been polled. See the documentation for
530 /// [`Notified::enable()`] for more details.
531 ///
532 /// The `Notified` future is guaranteed to receive wakeups from
533 /// `notify_waiters()` as soon as it has been created, even if it has not
534 /// yet been polled.
535 ///
536 /// [`notify_one()`]: Notify::notify_one
537 /// [`Notified::enable()`]: Notified::enable
538 ///
539 /// # Cancel safety
540 ///
541 /// This method uses a queue to fairly distribute notifications in the order
542 /// they were requested. Cancelling a call to `notified` makes you lose your
543 /// place in the queue.
544 ///
545 /// # Examples
546 ///
547 /// ```
548 /// use tokio::sync::Notify;
549 /// use std::sync::Arc;
550 ///
551 /// #[tokio::main]
552 /// async fn main() {
553 /// let notify = Arc::new(Notify::new());
554 /// let notify2 = notify.clone();
555 ///
556 /// tokio::spawn(async move {
557 /// notify2.notified().await;
558 /// println!("received notification");
559 /// });
560 ///
561 /// println!("sending notification");
562 /// notify.notify_one();
563 /// }
564 /// ```
565 pub fn notified(&self) -> Notified<'_> {
566 // we load the number of times notify_waiters
567 // was called and store that in the future.
568 let state = self.state.load(SeqCst);
569 Notified {
570 notify: self,
571 state: State::Init,
572 notify_waiters_calls: get_num_notify_waiters_calls(state),
573 waiter: Waiter::new(),
574 }
575 }
576
577 /// Wait for a notification with an owned `Future`.
578 ///
579 /// Unlike [`Self::notified`] which returns a future tied to the `Notify`'s
580 /// lifetime, `notified_owned` creates a self-contained future that owns its
581 /// notification state, making it safe to move between threads.
582 ///
583 /// See [`Self::notified`] for more details.
584 ///
585 /// # Cancel safety
586 ///
587 /// This method uses a queue to fairly distribute notifications in the order
588 /// they were requested. Cancelling a call to `notified_owned` makes you lose your
589 /// place in the queue.
590 ///
591 /// # Examples
592 ///
593 /// ```
594 /// use std::sync::Arc;
595 /// use tokio::sync::Notify;
596 ///
597 /// #[tokio::main]
598 /// async fn main() {
599 /// let notify = Arc::new(Notify::new());
600 ///
601 /// for _ in 0..10 {
602 /// let notified = notify.clone().notified_owned();
603 /// tokio::spawn(async move {
604 /// notified.await;
605 /// println!("received notification");
606 /// });
607 /// }
608 ///
609 /// println!("sending notification");
610 /// notify.notify_waiters();
611 /// }
612 /// ```
613 pub fn notified_owned(self: Arc<Self>) -> OwnedNotified {
614 // we load the number of times notify_waiters
615 // was called and store that in the future.
616 let state = self.state.load(SeqCst);
617 OwnedNotified {
618 notify: self,
619 state: State::Init,
620 notify_waiters_calls: get_num_notify_waiters_calls(state),
621 waiter: Waiter::new(),
622 }
623 }
624 /// Notifies the first waiting task.
625 ///
626 /// If a task is currently waiting, that task is notified. Otherwise, a
627 /// permit is stored in this `Notify` value and the **next** call to
628 /// [`notified().await`] will complete immediately consuming the permit made
629 /// available by this call to `notify_one()`.
630 ///
631 /// At most one permit may be stored by `Notify`. Many sequential calls to
632 /// `notify_one` will result in a single permit being stored. The next call to
633 /// `notified().await` will complete immediately, but the one after that
634 /// will wait.
635 ///
636 /// [`notified().await`]: Notify::notified()
637 ///
638 /// # Examples
639 ///
640 /// ```
641 /// use tokio::sync::Notify;
642 /// use std::sync::Arc;
643 ///
644 /// #[tokio::main]
645 /// async fn main() {
646 /// let notify = Arc::new(Notify::new());
647 /// let notify2 = notify.clone();
648 ///
649 /// tokio::spawn(async move {
650 /// notify2.notified().await;
651 /// println!("received notification");
652 /// });
653 ///
654 /// println!("sending notification");
655 /// notify.notify_one();
656 /// }
657 /// ```
658 // Alias for old name in 0.x
659 #[cfg_attr(docsrs, doc(alias = "notify"))]
660 pub fn notify_one(&self) {
661 self.notify_with_strategy(NotifyOneStrategy::Fifo);
662 }
663
664 /// Notifies the last waiting task.
665 ///
666 /// This function behaves similar to `notify_one`. The only difference is that it wakes
667 /// the most recently added waiter instead of the oldest waiter.
668 ///
669 /// Check the [`notify_one()`] documentation for more info and
670 /// examples.
671 ///
672 /// [`notify_one()`]: Notify::notify_one
673 pub fn notify_last(&self) {
674 self.notify_with_strategy(NotifyOneStrategy::Lifo);
675 }
676
677 fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
678 // Load the current state
679 let mut curr = self.state.load(SeqCst);
680
681 // If the state is `EMPTY`, transition to `NOTIFIED` and return.
682 while let EMPTY | NOTIFIED = get_state(curr) {
683 // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
684 // happens-before synchronization must happen between this atomic
685 // operation and a task calling `notified().await`.
686 let new = set_state(curr, NOTIFIED);
687 let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
688
689 match res {
690 // No waiters, no further work to do
691 Ok(_) => return,
692 Err(actual) => {
693 curr = actual;
694 }
695 }
696 }
697
698 // There are waiters, the lock must be acquired to notify.
699 let mut waiters = self.waiters.lock();
700
701 // The state must be reloaded while the lock is held. The state may only
702 // transition out of WAITING while the lock is held.
703 curr = self.state.load(SeqCst);
704
705 if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
706 drop(waiters);
707 waker.wake();
708 }
709 }
710
711 /// Notifies all waiting tasks.
712 ///
713 /// If a task is currently waiting, that task is notified. Unlike with
714 /// `notify_one()`, no permit is stored to be used by the next call to
715 /// `notified().await`. The purpose of this method is to notify all
716 /// already registered waiters. Registering for notification is done by
717 /// acquiring an instance of the `Notified` future via calling `notified()`.
718 ///
719 /// # Examples
720 ///
721 /// ```
722 /// use tokio::sync::Notify;
723 /// use std::sync::Arc;
724 ///
725 /// #[tokio::main]
726 /// async fn main() {
727 /// let notify = Arc::new(Notify::new());
728 /// let notify2 = notify.clone();
729 ///
730 /// let notified1 = notify.notified();
731 /// let notified2 = notify.notified();
732 ///
733 /// let handle = tokio::spawn(async move {
734 /// println!("sending notifications");
735 /// notify2.notify_waiters();
736 /// });
737 ///
738 /// notified1.await;
739 /// notified2.await;
740 /// println!("received notifications");
741 /// }
742 /// ```
743 pub fn notify_waiters(&self) {
744 let mut waiters = self.waiters.lock();
745
746 // The state must be loaded while the lock is held. The state may only
747 // transition out of WAITING while the lock is held.
748 let curr = self.state.load(SeqCst);
749
750 if matches!(get_state(curr), EMPTY | NOTIFIED) {
751 // There are no waiting tasks. All we need to do is increment the
752 // number of times this method was called.
753 atomic_inc_num_notify_waiters_calls(&self.state);
754 return;
755 }
756
757 // Increment the number of times this method was called
758 // and transition to empty.
759 let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
760 self.state.store(new_state, SeqCst);
761
762 // It is critical for `GuardedLinkedList` safety that the guard node is
763 // pinned in memory and is not dropped until the guarded list is dropped.
764 let guard = Waiter::new();
765 pin!(guard);
766
767 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
768 // underneath to allow every waiter to safely remove itself from it.
769 //
770 // * This list will be still guarded by the `waiters` lock.
771 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
772 // * This wrapper will empty the list on drop. It is critical for safety
773 // that we will not leave any list entry with a pointer to the local
774 // guard node after this function returns / panics.
775 let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
776
777 let mut wakers = WakeList::new();
778 'outer: loop {
779 while wakers.can_push() {
780 match list.pop_back_locked(&mut waiters) {
781 Some(waiter) => {
782 // Safety: we never make mutable references to waiters.
783 let waiter = unsafe { waiter.as_ref() };
784
785 // Safety: we hold the lock, so we can access the waker.
786 if let Some(waker) =
787 unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
788 {
789 wakers.push(waker);
790 }
791
792 // This waiter is unlinked and will not be shared ever again, release it.
793 waiter.notification.store_release(Notification::All);
794 }
795 None => {
796 break 'outer;
797 }
798 }
799 }
800
801 // Release the lock before notifying.
802 drop(waiters);
803
804 // One of the wakers may panic, but the remaining waiters will still
805 // be unlinked from the list in `NotifyWaitersList` destructor.
806 wakers.wake_all();
807
808 // Acquire the lock again.
809 waiters = self.waiters.lock();
810 }
811
812 // Release the lock before notifying
813 drop(waiters);
814
815 wakers.wake_all();
816 }
817}
818
819impl Default for Notify {
820 fn default() -> Notify {
821 Notify::new()
822 }
823}
824
825impl UnwindSafe for Notify {}
826impl RefUnwindSafe for Notify {}
827
828fn notify_locked(
829 waiters: &mut WaitList,
830 state: &AtomicUsize,
831 curr: usize,
832 strategy: NotifyOneStrategy,
833) -> Option<Waker> {
834 match get_state(curr) {
835 EMPTY | NOTIFIED => {
836 let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
837
838 match res {
839 Ok(_) => None,
840 Err(actual) => {
841 let actual_state = get_state(actual);
842 assert!(actual_state == EMPTY || actual_state == NOTIFIED);
843 state.store(set_state(actual, NOTIFIED), SeqCst);
844 None
845 }
846 }
847 }
848 WAITING => {
849 // At this point, it is guaranteed that the state will not
850 // concurrently change as holding the lock is required to
851 // transition **out** of `WAITING`.
852 //
853 // Get a pending waiter using one of the available dequeue strategies.
854 let waiter = match strategy {
855 NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
856 NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
857 };
858
859 // Safety: we never make mutable references to waiters.
860 let waiter = unsafe { waiter.as_ref() };
861
862 // Safety: we hold the lock, so we can access the waker.
863 let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
864
865 // This waiter is unlinked and will not be shared ever again, release it.
866 waiter
867 .notification
868 .store_release(Notification::One(strategy));
869
870 if waiters.is_empty() {
871 // As this the **final** waiter in the list, the state
872 // must be transitioned to `EMPTY`. As transitioning
873 // **from** `WAITING` requires the lock to be held, a
874 // `store` is sufficient.
875 state.store(set_state(curr, EMPTY), SeqCst);
876 }
877 waker
878 }
879 _ => unreachable!(),
880 }
881}
882
883// ===== impl Notified =====
884
885impl Notified<'_> {
886 /// Adds this future to the list of futures that are ready to receive
887 /// wakeups from calls to [`notify_one`].
888 ///
889 /// Polling the future also adds it to the list, so this method should only
890 /// be used if you want to add the future to the list before the first call
891 /// to `poll`. (In fact, this method is equivalent to calling `poll` except
892 /// that no `Waker` is registered.)
893 ///
894 /// This has no effect on notifications sent using [`notify_waiters`], which
895 /// are received as long as they happen after the creation of the `Notified`
896 /// regardless of whether `enable` or `poll` has been called.
897 ///
898 /// This method returns true if the `Notified` is ready. This happens in the
899 /// following situations:
900 ///
901 /// 1. The `notify_waiters` method was called between the creation of the
902 /// `Notified` and the call to this method.
903 /// 2. This is the first call to `enable` or `poll` on this future, and the
904 /// `Notify` was holding a permit from a previous call to `notify_one`.
905 /// The call consumes the permit in that case.
906 /// 3. The future has previously been enabled or polled, and it has since
907 /// then been marked ready by either consuming a permit from the
908 /// `Notify`, or by a call to `notify_one` or `notify_waiters` that
909 /// removed it from the list of futures ready to receive wakeups.
910 ///
911 /// If this method returns true, any future calls to poll on the same future
912 /// will immediately return `Poll::Ready`.
913 ///
914 /// # Examples
915 ///
916 /// Unbound multi-producer multi-consumer (mpmc) channel.
917 ///
918 /// The call to `enable` is important because otherwise if you have two
919 /// calls to `recv` and two calls to `send` in parallel, the following could
920 /// happen:
921 ///
922 /// 1. Both calls to `try_recv` return `None`.
923 /// 2. Both new elements are added to the vector.
924 /// 3. The `notify_one` method is called twice, adding only a single
925 /// permit to the `Notify`.
926 /// 4. Both calls to `recv` reach the `Notified` future. One of them
927 /// consumes the permit, and the other sleeps forever.
928 ///
929 /// By adding the `Notified` futures to the list by calling `enable` before
930 /// `try_recv`, the `notify_one` calls in step three would remove the
931 /// futures from the list and mark them notified instead of adding a permit
932 /// to the `Notify`. This ensures that both futures are woken.
933 ///
934 /// ```
935 /// use tokio::sync::Notify;
936 ///
937 /// use std::collections::VecDeque;
938 /// use std::sync::Mutex;
939 ///
940 /// struct Channel<T> {
941 /// messages: Mutex<VecDeque<T>>,
942 /// notify_on_sent: Notify,
943 /// }
944 ///
945 /// impl<T> Channel<T> {
946 /// pub fn send(&self, msg: T) {
947 /// let mut locked_queue = self.messages.lock().unwrap();
948 /// locked_queue.push_back(msg);
949 /// drop(locked_queue);
950 ///
951 /// // Send a notification to one of the calls currently
952 /// // waiting in a call to `recv`.
953 /// self.notify_on_sent.notify_one();
954 /// }
955 ///
956 /// pub fn try_recv(&self) -> Option<T> {
957 /// let mut locked_queue = self.messages.lock().unwrap();
958 /// locked_queue.pop_front()
959 /// }
960 ///
961 /// pub async fn recv(&self) -> T {
962 /// let future = self.notify_on_sent.notified();
963 /// tokio::pin!(future);
964 ///
965 /// loop {
966 /// // Make sure that no wakeup is lost if we get
967 /// // `None` from `try_recv`.
968 /// future.as_mut().enable();
969 ///
970 /// if let Some(msg) = self.try_recv() {
971 /// return msg;
972 /// }
973 ///
974 /// // Wait for a call to `notify_one`.
975 /// //
976 /// // This uses `.as_mut()` to avoid consuming the future,
977 /// // which lets us call `Pin::set` below.
978 /// future.as_mut().await;
979 ///
980 /// // Reset the future in case another call to
981 /// // `try_recv` got the message before us.
982 /// future.set(self.notify_on_sent.notified());
983 /// }
984 /// }
985 /// }
986 /// ```
987 ///
988 /// [`notify_one`]: Notify::notify_one()
989 /// [`notify_waiters`]: Notify::notify_waiters()
990 pub fn enable(self: Pin<&mut Self>) -> bool {
991 self.poll_notified(None).is_ready()
992 }
993
994 fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
995 unsafe {
996 // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
997
998 is_unpin::<&Notify>();
999 is_unpin::<State>();
1000 is_unpin::<usize>();
1001
1002 let me = self.get_unchecked_mut();
1003 NotifiedProject {
1004 notify: me.notify,
1005 state: &mut me.state,
1006 notify_waiters_calls: &me.notify_waiters_calls,
1007 waiter: &me.waiter,
1008 }
1009 }
1010 }
1011
1012 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1013 self.project().poll_notified(waker)
1014 }
1015}
1016
1017impl Future for Notified<'_> {
1018 type Output = ();
1019
1020 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1021 self.poll_notified(Some(cx.waker()))
1022 }
1023}
1024
1025impl Drop for Notified<'_> {
1026 fn drop(&mut self) {
1027 // Safety: The type only transitions to a "Waiting" state when pinned.
1028 unsafe { Pin::new_unchecked(self) }
1029 .project()
1030 .drop_notified();
1031 }
1032}
1033
1034// ===== impl OwnedNotified =====
1035
1036impl OwnedNotified {
1037 /// Adds this future to the list of futures that are ready to receive
1038 /// wakeups from calls to [`notify_one`].
1039 ///
1040 /// See [`Notified::enable`] for more details.
1041 ///
1042 /// [`notify_one`]: Notify::notify_one()
1043 pub fn enable(self: Pin<&mut Self>) -> bool {
1044 self.poll_notified(None).is_ready()
1045 }
1046
1047 /// A custom `project` implementation is used in place of `pin-project-lite`
1048 /// as a custom drop implementation is needed.
1049 fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
1050 unsafe {
1051 // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
1052
1053 is_unpin::<&Notify>();
1054 is_unpin::<State>();
1055 is_unpin::<usize>();
1056
1057 let me = self.get_unchecked_mut();
1058 NotifiedProject {
1059 notify: &me.notify,
1060 state: &mut me.state,
1061 notify_waiters_calls: &me.notify_waiters_calls,
1062 waiter: &me.waiter,
1063 }
1064 }
1065 }
1066
1067 fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1068 self.project().poll_notified(waker)
1069 }
1070}
1071
1072impl Future for OwnedNotified {
1073 type Output = ();
1074
1075 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1076 self.poll_notified(Some(cx.waker()))
1077 }
1078}
1079
1080impl Drop for OwnedNotified {
1081 fn drop(&mut self) {
1082 // Safety: The type only transitions to a "Waiting" state when pinned.
1083 unsafe { Pin::new_unchecked(self) }
1084 .project()
1085 .drop_notified();
1086 }
1087}
1088
1089// ===== impl NotifiedProject =====
1090
1091impl NotifiedProject<'_> {
1092 fn poll_notified(self, waker: Option<&Waker>) -> Poll<()> {
1093 let NotifiedProject {
1094 notify,
1095 state,
1096 notify_waiters_calls,
1097 waiter,
1098 } = self;
1099
1100 'outer_loop: loop {
1101 match *state {
1102 State::Init => {
1103 let curr = notify.state.load(SeqCst);
1104
1105 // Optimistically try acquiring a pending notification
1106 let res = notify.state.compare_exchange(
1107 set_state(curr, NOTIFIED),
1108 set_state(curr, EMPTY),
1109 SeqCst,
1110 SeqCst,
1111 );
1112
1113 if res.is_ok() {
1114 // Acquired the notification
1115 *state = State::Done;
1116 continue 'outer_loop;
1117 }
1118
1119 // Clone the waker before locking, a waker clone can be
1120 // triggering arbitrary code.
1121 let waker = waker.cloned();
1122
1123 // Acquire the lock and attempt to transition to the waiting
1124 // state.
1125 let mut waiters = notify.waiters.lock();
1126
1127 // Reload the state with the lock held
1128 let mut curr = notify.state.load(SeqCst);
1129
1130 // if notify_waiters has been called after the future
1131 // was created, then we are done
1132 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1133 *state = State::Done;
1134 continue 'outer_loop;
1135 }
1136
1137 // Transition the state to WAITING.
1138 loop {
1139 match get_state(curr) {
1140 EMPTY => {
1141 // Transition to WAITING
1142 let res = notify.state.compare_exchange(
1143 set_state(curr, EMPTY),
1144 set_state(curr, WAITING),
1145 SeqCst,
1146 SeqCst,
1147 );
1148
1149 if let Err(actual) = res {
1150 assert_eq!(get_state(actual), NOTIFIED);
1151 curr = actual;
1152 } else {
1153 break;
1154 }
1155 }
1156 WAITING => break,
1157 NOTIFIED => {
1158 // Try consuming the notification
1159 let res = notify.state.compare_exchange(
1160 set_state(curr, NOTIFIED),
1161 set_state(curr, EMPTY),
1162 SeqCst,
1163 SeqCst,
1164 );
1165
1166 match res {
1167 Ok(_) => {
1168 // Acquired the notification
1169 *state = State::Done;
1170 continue 'outer_loop;
1171 }
1172 Err(actual) => {
1173 assert_eq!(get_state(actual), EMPTY);
1174 curr = actual;
1175 }
1176 }
1177 }
1178 _ => unreachable!(),
1179 }
1180 }
1181
1182 let mut old_waker = None;
1183 if waker.is_some() {
1184 // Safety: called while locked.
1185 //
1186 // The use of `old_waiter` here is not necessary, as the field is always
1187 // None when we reach this line.
1188 unsafe {
1189 old_waker =
1190 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1191 }
1192 }
1193
1194 // Insert the waiter into the linked list
1195 waiters.push_front(NonNull::from(waiter));
1196
1197 *state = State::Waiting;
1198
1199 drop(waiters);
1200 drop(old_waker);
1201
1202 return Poll::Pending;
1203 }
1204 State::Waiting => {
1205 #[cfg(tokio_taskdump)]
1206 if let Some(waker) = waker {
1207 let mut ctx = Context::from_waker(waker);
1208 std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1209 }
1210
1211 if waiter.notification.load(Acquire).is_some() {
1212 // Safety: waiter is already unlinked and will not be shared again,
1213 // so we have an exclusive access to `waker`.
1214 drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1215
1216 waiter.notification.clear();
1217 *state = State::Done;
1218 return Poll::Ready(());
1219 }
1220
1221 // Our waiter was not notified, implying it is still stored in a waiter
1222 // list (guarded by `notify.waiters`). In order to access the waker
1223 // fields, we must acquire the lock.
1224
1225 let mut old_waker = None;
1226 let mut waiters = notify.waiters.lock();
1227
1228 // We hold the lock and notifications are set only with the lock held,
1229 // so this can be relaxed, because the happens-before relationship is
1230 // established through the mutex.
1231 if waiter.notification.load(Relaxed).is_some() {
1232 // Safety: waiter is already unlinked and will not be shared again,
1233 // so we have an exclusive access to `waker`.
1234 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1235
1236 waiter.notification.clear();
1237
1238 // Drop the old waker after releasing the lock.
1239 drop(waiters);
1240 drop(old_waker);
1241
1242 *state = State::Done;
1243 return Poll::Ready(());
1244 }
1245
1246 // Load the state with the lock held.
1247 let curr = notify.state.load(SeqCst);
1248
1249 if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1250 // Before we add a waiter to the list we check if these numbers are
1251 // different while holding the lock. If these numbers are different now,
1252 // it means that there is a call to `notify_waiters` in progress and this
1253 // waiter must be contained by a guarded list used in `notify_waiters`.
1254 // We can treat the waiter as notified and remove it from the list, as
1255 // it would have been notified in the `notify_waiters` call anyways.
1256
1257 // Safety: we hold the lock, so we can modify the waker.
1258 old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1259
1260 // Safety: we hold the lock, so we have an exclusive access to the list.
1261 // The list is used in `notify_waiters`, so it must be guarded.
1262 unsafe { waiters.remove(NonNull::from(waiter)) };
1263
1264 *state = State::Done;
1265 } else {
1266 // Safety: we hold the lock, so we can modify the waker.
1267 unsafe {
1268 waiter.waker.with_mut(|v| {
1269 if let Some(waker) = waker {
1270 let should_update = match &*v {
1271 Some(current_waker) => !current_waker.will_wake(waker),
1272 None => true,
1273 };
1274 if should_update {
1275 old_waker = (*v).replace(waker.clone());
1276 }
1277 }
1278 });
1279 }
1280
1281 // Drop the old waker after releasing the lock.
1282 drop(waiters);
1283 drop(old_waker);
1284
1285 return Poll::Pending;
1286 }
1287
1288 // Explicit drop of the lock to indicate the scope that the
1289 // lock is held. Because holding the lock is required to
1290 // ensure safe access to fields not held within the lock, it
1291 // is helpful to visualize the scope of the critical
1292 // section.
1293 drop(waiters);
1294
1295 // Drop the old waker after releasing the lock.
1296 drop(old_waker);
1297 }
1298 State::Done => {
1299 #[cfg(tokio_taskdump)]
1300 if let Some(waker) = waker {
1301 let mut ctx = Context::from_waker(waker);
1302 std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1303 }
1304 return Poll::Ready(());
1305 }
1306 }
1307 }
1308 }
1309
1310 fn drop_notified(self) {
1311 let NotifiedProject {
1312 notify,
1313 state,
1314 waiter,
1315 ..
1316 } = self;
1317
1318 // This is where we ensure safety. The `Notified` value is being
1319 // dropped, which means we must ensure that the waiter entry is no
1320 // longer stored in the linked list.
1321 if matches!(*state, State::Waiting) {
1322 let mut waiters = notify.waiters.lock();
1323 let mut notify_state = notify.state.load(SeqCst);
1324
1325 // We hold the lock, so this field is not concurrently accessed by
1326 // `notify_*` functions and we can use the relaxed ordering.
1327 let notification = waiter.notification.load(Relaxed);
1328
1329 // remove the entry from the list (if not already removed)
1330 //
1331 // Safety: we hold the lock, so we have an exclusive access to every list the
1332 // waiter may be contained in. If the node is not contained in the `waiters`
1333 // list, then it is contained by a guarded list used by `notify_waiters`.
1334 unsafe { waiters.remove(NonNull::from(waiter)) };
1335
1336 if waiters.is_empty() && get_state(notify_state) == WAITING {
1337 notify_state = set_state(notify_state, EMPTY);
1338 notify.state.store(notify_state, SeqCst);
1339 }
1340
1341 // See if the node was notified but not received. In this case, if
1342 // the notification was triggered via `notify_one`, it must be sent
1343 // to the next waiter.
1344 if let Some(Notification::One(strategy)) = notification {
1345 if let Some(waker) =
1346 notify_locked(&mut waiters, ¬ify.state, notify_state, strategy)
1347 {
1348 drop(waiters);
1349 waker.wake();
1350 }
1351 }
1352 }
1353 }
1354}
1355
1356/// # Safety
1357///
1358/// `Waiter` is forced to be !Unpin.
1359unsafe impl linked_list::Link for Waiter {
1360 type Handle = NonNull<Waiter>;
1361 type Target = Waiter;
1362
1363 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1364 *handle
1365 }
1366
1367 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1368 ptr
1369 }
1370
1371 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1372 Waiter::addr_of_pointers(target)
1373 }
1374}
1375
1376fn is_unpin<T: Unpin>() {}