tokio/sync/
broadcast.rs

1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//!     let (tx, mut rx1) = broadcast::channel(16);
79//!     let mut rx2 = tx.subscribe();
80//!
81//!     tokio::spawn(async move {
82//!         assert_eq!(rx1.recv().await.unwrap(), 10);
83//!         assert_eq!(rx1.recv().await.unwrap(), 20);
84//!     });
85//!
86//!     tokio::spawn(async move {
87//!         assert_eq!(rx2.recv().await.unwrap(), 10);
88//!         assert_eq!(rx2.recv().await.unwrap(), 20);
89//!     });
90//!
91//!     tx.send(10).unwrap();
92//!     tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//!     let (tx, mut rx) = broadcast::channel(2);
104//!
105//!     tx.send(10).unwrap();
106//!     tx.send(20).unwrap();
107//!     tx.send(30).unwrap();
108//!
109//!     // The receiver lagged behind
110//!     assert!(rx.recv().await.is_err());
111//!
112//!     // At this point, we can abort or continue with lost messages
113//!
114//!     assert_eq!(20, rx.recv().await.unwrap());
115//!     assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard};
122use crate::task::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146///     let (tx, mut rx1) = broadcast::channel(16);
147///     let mut rx2 = tx.subscribe();
148///
149///     tokio::spawn(async move {
150///         assert_eq!(rx1.recv().await.unwrap(), 10);
151///         assert_eq!(rx1.recv().await.unwrap(), 20);
152///     });
153///
154///     tokio::spawn(async move {
155///         assert_eq!(rx2.recv().await.unwrap(), 10);
156///         assert_eq!(rx2.recv().await.unwrap(), 20);
157///     });
158///
159///     tx.send(10).unwrap();
160///     tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166    shared: Arc<Shared<T>>,
167}
168
169/// A sender that does not prevent the channel from being closed.
170///
171/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
172/// instances remain, the channel is closed.
173///
174/// In order to send messages, the `WeakSender` needs to be upgraded using
175/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
176/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
177///
178/// [`Sender`]: Sender
179/// [`WeakSender::upgrade`]: WeakSender::upgrade
180///
181/// # Examples
182///
183/// ```
184/// use tokio::sync::broadcast::channel;
185///
186/// #[tokio::main]
187/// async fn main() {
188///     let (tx, _rx) = channel::<i32>(15);
189///     let tx_weak = tx.downgrade();
190///
191///     // Upgrading will succeed because `tx` still exists.
192///     assert!(tx_weak.upgrade().is_some());
193///
194///     // If we drop `tx`, then it will fail.
195///     drop(tx);
196///     assert!(tx_weak.clone().upgrade().is_none());
197/// }
198/// ```
199pub struct WeakSender<T> {
200    shared: Arc<Shared<T>>,
201}
202
203/// Receiving-half of the [`broadcast`] channel.
204///
205/// Must not be used concurrently. Messages may be retrieved using
206/// [`recv`][Receiver::recv].
207///
208/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
209/// wrapper.
210///
211/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
212///
213/// # Examples
214///
215/// ```
216/// use tokio::sync::broadcast;
217///
218/// #[tokio::main]
219/// async fn main() {
220///     let (tx, mut rx1) = broadcast::channel(16);
221///     let mut rx2 = tx.subscribe();
222///
223///     tokio::spawn(async move {
224///         assert_eq!(rx1.recv().await.unwrap(), 10);
225///         assert_eq!(rx1.recv().await.unwrap(), 20);
226///     });
227///
228///     tokio::spawn(async move {
229///         assert_eq!(rx2.recv().await.unwrap(), 10);
230///         assert_eq!(rx2.recv().await.unwrap(), 20);
231///     });
232///
233///     tx.send(10).unwrap();
234///     tx.send(20).unwrap();
235/// }
236/// ```
237///
238/// [`broadcast`]: crate::sync::broadcast
239pub struct Receiver<T> {
240    /// State shared with all receivers and senders.
241    shared: Arc<Shared<T>>,
242
243    /// Next position to read from
244    next: u64,
245}
246
247pub mod error {
248    //! Broadcast error types
249
250    use std::fmt;
251
252    /// Error returned by the [`send`] function on a [`Sender`].
253    ///
254    /// A **send** operation can only fail if there are no active receivers,
255    /// implying that the message could never be received. The error contains the
256    /// message being sent as a payload so it can be recovered.
257    ///
258    /// [`send`]: crate::sync::broadcast::Sender::send
259    /// [`Sender`]: crate::sync::broadcast::Sender
260    #[derive(Debug)]
261    pub struct SendError<T>(pub T);
262
263    impl<T> fmt::Display for SendError<T> {
264        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265            write!(f, "channel closed")
266        }
267    }
268
269    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
270
271    /// An error returned from the [`recv`] function on a [`Receiver`].
272    ///
273    /// [`recv`]: crate::sync::broadcast::Receiver::recv
274    /// [`Receiver`]: crate::sync::broadcast::Receiver
275    #[derive(Debug, PartialEq, Eq, Clone)]
276    pub enum RecvError {
277        /// There are no more active senders implying no further messages will ever
278        /// be sent.
279        Closed,
280
281        /// The receiver lagged too far behind. Attempting to receive again will
282        /// return the oldest message still retained by the channel.
283        ///
284        /// Includes the number of skipped messages.
285        Lagged(u64),
286    }
287
288    impl fmt::Display for RecvError {
289        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290            match self {
291                RecvError::Closed => write!(f, "channel closed"),
292                RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
293            }
294        }
295    }
296
297    impl std::error::Error for RecvError {}
298
299    /// An error returned from the [`try_recv`] function on a [`Receiver`].
300    ///
301    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
302    /// [`Receiver`]: crate::sync::broadcast::Receiver
303    #[derive(Debug, PartialEq, Eq, Clone)]
304    pub enum TryRecvError {
305        /// The channel is currently empty. There are still active
306        /// [`Sender`] handles, so data may yet become available.
307        ///
308        /// [`Sender`]: crate::sync::broadcast::Sender
309        Empty,
310
311        /// There are no more active senders implying no further messages will ever
312        /// be sent.
313        Closed,
314
315        /// The receiver lagged too far behind and has been forcibly disconnected.
316        /// Attempting to receive again will return the oldest message still
317        /// retained by the channel.
318        ///
319        /// Includes the number of skipped messages.
320        Lagged(u64),
321    }
322
323    impl fmt::Display for TryRecvError {
324        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325            match self {
326                TryRecvError::Empty => write!(f, "channel empty"),
327                TryRecvError::Closed => write!(f, "channel closed"),
328                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
329            }
330        }
331    }
332
333    impl std::error::Error for TryRecvError {}
334}
335
336use self::error::{RecvError, SendError, TryRecvError};
337
338use super::Notify;
339
340/// Data shared between senders and receivers.
341struct Shared<T> {
342    /// slots in the channel.
343    buffer: Box<[Mutex<Slot<T>>]>,
344
345    /// Mask a position -> index.
346    mask: usize,
347
348    /// Tail of the queue. Includes the rx wait list.
349    tail: Mutex<Tail>,
350
351    /// Number of outstanding Sender handles.
352    num_tx: AtomicUsize,
353
354    /// Number of outstanding weak Sender handles.
355    num_weak_tx: AtomicUsize,
356
357    /// Notify when the last subscribed [`Receiver`] drops.
358    notify_last_rx_drop: Notify,
359}
360
361/// Next position to write a value.
362struct Tail {
363    /// Next position to write to.
364    pos: u64,
365
366    /// Number of active receivers.
367    rx_cnt: usize,
368
369    /// True if the channel is closed.
370    closed: bool,
371
372    /// Receivers waiting for a value.
373    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
374}
375
376/// Slot in the buffer.
377struct Slot<T> {
378    /// Remaining number of receivers that are expected to see this value.
379    ///
380    /// When this goes to zero, the value is released.
381    ///
382    /// An atomic is used as it is mutated concurrently with the slot read lock
383    /// acquired.
384    rem: AtomicUsize,
385
386    /// Uniquely identifies the `send` stored in the slot.
387    pos: u64,
388
389    /// The value being broadcast.
390    ///
391    /// The value is set by `send` when the write lock is held. When a reader
392    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
393    val: Option<T>,
394}
395
396/// An entry in the wait queue.
397struct Waiter {
398    /// True if queued.
399    queued: AtomicBool,
400
401    /// Task waiting on the broadcast channel.
402    waker: Option<Waker>,
403
404    /// Intrusive linked-list pointers.
405    pointers: linked_list::Pointers<Waiter>,
406
407    /// Should not be `Unpin`.
408    _p: PhantomPinned,
409}
410
411impl Waiter {
412    fn new() -> Self {
413        Self {
414            queued: AtomicBool::new(false),
415            waker: None,
416            pointers: linked_list::Pointers::new(),
417            _p: PhantomPinned,
418        }
419    }
420}
421
422generate_addr_of_methods! {
423    impl<> Waiter {
424        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
425            &self.pointers
426        }
427    }
428}
429
430struct RecvGuard<'a, T> {
431    slot: MutexGuard<'a, Slot<T>>,
432}
433
434/// Receive a value future.
435struct Recv<'a, T> {
436    /// Receiver being waited on.
437    receiver: &'a mut Receiver<T>,
438
439    /// Entry in the waiter `LinkedList`.
440    waiter: WaiterCell,
441}
442
443// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
444// from `Recv`.
445struct WaiterCell(UnsafeCell<Waiter>);
446
447unsafe impl Send for WaiterCell {}
448unsafe impl Sync for WaiterCell {}
449
450/// Max number of receivers. Reserve space to lock.
451const MAX_RECEIVERS: usize = usize::MAX >> 2;
452
453/// Create a bounded, multi-producer, multi-consumer channel where each sent
454/// value is broadcasted to all active receivers.
455///
456/// **Note:** The actual capacity may be greater than the provided `capacity`.
457///
458/// All data sent on [`Sender`] will become available on every active
459/// [`Receiver`] in the same order as it was sent.
460///
461/// The `Sender` can be cloned to `send` to the same channel from multiple
462/// points in the process or it can be used concurrently from an `Arc`. New
463/// `Receiver` handles are created by calling [`Sender::subscribe`].
464///
465/// If all [`Receiver`] handles are dropped, the `send` method will return a
466/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
467/// method will return a [`RecvError`].
468///
469/// [`Sender`]: crate::sync::broadcast::Sender
470/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
471/// [`Receiver`]: crate::sync::broadcast::Receiver
472/// [`recv`]: crate::sync::broadcast::Receiver::recv
473/// [`SendError`]: crate::sync::broadcast::error::SendError
474/// [`RecvError`]: crate::sync::broadcast::error::RecvError
475///
476/// # Examples
477///
478/// ```
479/// use tokio::sync::broadcast;
480///
481/// #[tokio::main]
482/// async fn main() {
483///     let (tx, mut rx1) = broadcast::channel(16);
484///     let mut rx2 = tx.subscribe();
485///
486///     tokio::spawn(async move {
487///         assert_eq!(rx1.recv().await.unwrap(), 10);
488///         assert_eq!(rx1.recv().await.unwrap(), 20);
489///     });
490///
491///     tokio::spawn(async move {
492///         assert_eq!(rx2.recv().await.unwrap(), 10);
493///         assert_eq!(rx2.recv().await.unwrap(), 20);
494///     });
495///
496///     tx.send(10).unwrap();
497///     tx.send(20).unwrap();
498/// }
499/// ```
500///
501/// # Panics
502///
503/// This will panic if `capacity` is equal to `0`.
504///
505/// This pre-allocates space for `capacity` messages. Allocation failure may result in a panic or
506/// [an allocation failure](std::alloc::handle_alloc_error).
507#[track_caller]
508pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
509    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
510    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
511    let rx = Receiver {
512        shared: tx.shared.clone(),
513        next: 0,
514    };
515    (tx, rx)
516}
517
518impl<T> Sender<T> {
519    /// Creates the sending-half of the [`broadcast`] channel.
520    ///
521    /// See the documentation of [`broadcast::channel`] for more information on this method.
522    ///
523    /// [`broadcast`]: crate::sync::broadcast
524    /// [`broadcast::channel`]: crate::sync::broadcast::channel
525    #[track_caller]
526    pub fn new(capacity: usize) -> Self {
527        // SAFETY: We don't create extra receivers, so there are 0.
528        unsafe { Self::new_with_receiver_count(0, capacity) }
529    }
530
531    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
532    /// count.
533    ///
534    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
535    /// calling this function.
536    ///
537    /// # Safety:
538    ///
539    /// The caller must ensure that the amount of receivers for this Sender is correct before
540    /// the channel functionalities are used, the count is zero by default, as this function
541    /// does not create any receivers by itself.
542    #[track_caller]
543    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
544        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
545        assert!(
546            capacity <= usize::MAX >> 1,
547            "broadcast channel capacity exceeded `usize::MAX / 2`"
548        );
549
550        // Round to a power of two
551        capacity = capacity.next_power_of_two();
552
553        let mut buffer = Vec::with_capacity(capacity);
554
555        for i in 0..capacity {
556            buffer.push(Mutex::new(Slot {
557                rem: AtomicUsize::new(0),
558                pos: (i as u64).wrapping_sub(capacity as u64),
559                val: None,
560            }));
561        }
562
563        let shared = Arc::new(Shared {
564            buffer: buffer.into_boxed_slice(),
565            mask: capacity - 1,
566            tail: Mutex::new(Tail {
567                pos: 0,
568                rx_cnt: receiver_count,
569                closed: false,
570                waiters: LinkedList::new(),
571            }),
572            num_tx: AtomicUsize::new(1),
573            num_weak_tx: AtomicUsize::new(0),
574            notify_last_rx_drop: Notify::new(),
575        });
576
577        Sender { shared }
578    }
579
580    /// Attempts to send a value to all active [`Receiver`] handles, returning
581    /// it back if it could not be sent.
582    ///
583    /// A successful send occurs when there is at least one active [`Receiver`]
584    /// handle. An unsuccessful send would be one where all associated
585    /// [`Receiver`] handles have already been dropped.
586    ///
587    /// # Return
588    ///
589    /// On success, the number of subscribed [`Receiver`] handles is returned.
590    /// This does not mean that this number of receivers will see the message as
591    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
592    /// the message.
593    ///
594    /// # Note
595    ///
596    /// A return value of `Ok` **does not** mean that the sent value will be
597    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
598    /// handles may be dropped before receiving the sent message.
599    ///
600    /// A return value of `Err` **does not** mean that future calls to `send`
601    /// will fail. New [`Receiver`] handles may be created by calling
602    /// [`subscribe`].
603    ///
604    /// [`Receiver`]: crate::sync::broadcast::Receiver
605    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
606    ///
607    /// # Examples
608    ///
609    /// ```
610    /// use tokio::sync::broadcast;
611    ///
612    /// #[tokio::main]
613    /// async fn main() {
614    ///     let (tx, mut rx1) = broadcast::channel(16);
615    ///     let mut rx2 = tx.subscribe();
616    ///
617    ///     tokio::spawn(async move {
618    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
619    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
620    ///     });
621    ///
622    ///     tokio::spawn(async move {
623    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
624    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
625    ///     });
626    ///
627    ///     tx.send(10).unwrap();
628    ///     tx.send(20).unwrap();
629    /// }
630    /// ```
631    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
632        let mut tail = self.shared.tail.lock();
633
634        if tail.rx_cnt == 0 {
635            return Err(SendError(value));
636        }
637
638        // Position to write into
639        let pos = tail.pos;
640        let rem = tail.rx_cnt;
641        let idx = (pos & self.shared.mask as u64) as usize;
642
643        // Update the tail position
644        tail.pos = tail.pos.wrapping_add(1);
645
646        // Get the slot
647        let mut slot = self.shared.buffer[idx].lock();
648
649        // Track the position
650        slot.pos = pos;
651
652        // Set remaining receivers
653        slot.rem.with_mut(|v| *v = rem);
654
655        // Write the value
656        slot.val = Some(value);
657
658        // Release the slot lock before notifying the receivers.
659        drop(slot);
660
661        // Notify and release the mutex. This must happen after the slot lock is
662        // released, otherwise the writer lock bit could be cleared while another
663        // thread is in the critical section.
664        self.shared.notify_rx(tail);
665
666        Ok(rem)
667    }
668
669    /// Creates a new [`Receiver`] handle that will receive values sent **after**
670    /// this call to `subscribe`.
671    ///
672    /// # Examples
673    ///
674    /// ```
675    /// use tokio::sync::broadcast;
676    ///
677    /// #[tokio::main]
678    /// async fn main() {
679    ///     let (tx, _rx) = broadcast::channel(16);
680    ///
681    ///     // Will not be seen
682    ///     tx.send(10).unwrap();
683    ///
684    ///     let mut rx = tx.subscribe();
685    ///
686    ///     tx.send(20).unwrap();
687    ///
688    ///     let value = rx.recv().await.unwrap();
689    ///     assert_eq!(20, value);
690    /// }
691    /// ```
692    pub fn subscribe(&self) -> Receiver<T> {
693        let shared = self.shared.clone();
694        new_receiver(shared)
695    }
696
697    /// Converts the `Sender` to a [`WeakSender`] that does not count
698    /// towards RAII semantics, i.e. if all `Sender` instances of the
699    /// channel were dropped and only `WeakSender` instances remain,
700    /// the channel is closed.
701    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
702    pub fn downgrade(&self) -> WeakSender<T> {
703        self.shared.num_weak_tx.fetch_add(1, Relaxed);
704        WeakSender {
705            shared: self.shared.clone(),
706        }
707    }
708
709    /// Returns the number of queued values.
710    ///
711    /// A value is queued until it has either been seen by all receivers that were alive at the time
712    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
713    /// queue's capacity.
714    ///
715    /// # Note
716    ///
717    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
718    /// have been evicted from the queue before being seen by all receivers.
719    ///
720    /// # Examples
721    ///
722    /// ```
723    /// use tokio::sync::broadcast;
724    ///
725    /// #[tokio::main]
726    /// async fn main() {
727    ///     let (tx, mut rx1) = broadcast::channel(16);
728    ///     let mut rx2 = tx.subscribe();
729    ///
730    ///     tx.send(10).unwrap();
731    ///     tx.send(20).unwrap();
732    ///     tx.send(30).unwrap();
733    ///
734    ///     assert_eq!(tx.len(), 3);
735    ///
736    ///     rx1.recv().await.unwrap();
737    ///
738    ///     // The len is still 3 since rx2 hasn't seen the first value yet.
739    ///     assert_eq!(tx.len(), 3);
740    ///
741    ///     rx2.recv().await.unwrap();
742    ///
743    ///     assert_eq!(tx.len(), 2);
744    /// }
745    /// ```
746    pub fn len(&self) -> usize {
747        let tail = self.shared.tail.lock();
748
749        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
750        let mut low = 0;
751        let mut high = self.shared.buffer.len();
752        while low < high {
753            let mid = low + (high - low) / 2;
754            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
755            if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
756                low = mid + 1;
757            } else {
758                high = mid;
759            }
760        }
761
762        self.shared.buffer.len() - low
763    }
764
765    /// Returns true if there are no queued values.
766    ///
767    /// # Examples
768    ///
769    /// ```
770    /// use tokio::sync::broadcast;
771    ///
772    /// #[tokio::main]
773    /// async fn main() {
774    ///     let (tx, mut rx1) = broadcast::channel(16);
775    ///     let mut rx2 = tx.subscribe();
776    ///
777    ///     assert!(tx.is_empty());
778    ///
779    ///     tx.send(10).unwrap();
780    ///
781    ///     assert!(!tx.is_empty());
782    ///
783    ///     rx1.recv().await.unwrap();
784    ///
785    ///     // The queue is still not empty since rx2 hasn't seen the value.
786    ///     assert!(!tx.is_empty());
787    ///
788    ///     rx2.recv().await.unwrap();
789    ///
790    ///     assert!(tx.is_empty());
791    /// }
792    /// ```
793    pub fn is_empty(&self) -> bool {
794        let tail = self.shared.tail.lock();
795
796        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
797        self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
798    }
799
800    /// Returns the number of active receivers.
801    ///
802    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
803    /// [`subscribe`]. These are the handles that will receive values sent on
804    /// this [`Sender`].
805    ///
806    /// # Note
807    ///
808    /// It is not guaranteed that a sent message will reach this number of
809    /// receivers. Active receivers may never call [`recv`] again before
810    /// dropping.
811    ///
812    /// [`recv`]: crate::sync::broadcast::Receiver::recv
813    /// [`Receiver`]: crate::sync::broadcast::Receiver
814    /// [`Sender`]: crate::sync::broadcast::Sender
815    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
816    /// [`channel`]: crate::sync::broadcast::channel
817    ///
818    /// # Examples
819    ///
820    /// ```
821    /// use tokio::sync::broadcast;
822    ///
823    /// #[tokio::main]
824    /// async fn main() {
825    ///     let (tx, _rx1) = broadcast::channel(16);
826    ///
827    ///     assert_eq!(1, tx.receiver_count());
828    ///
829    ///     let mut _rx2 = tx.subscribe();
830    ///
831    ///     assert_eq!(2, tx.receiver_count());
832    ///
833    ///     tx.send(10).unwrap();
834    /// }
835    /// ```
836    pub fn receiver_count(&self) -> usize {
837        let tail = self.shared.tail.lock();
838        tail.rx_cnt
839    }
840
841    /// Returns `true` if senders belong to the same channel.
842    ///
843    /// # Examples
844    ///
845    /// ```
846    /// use tokio::sync::broadcast;
847    ///
848    /// #[tokio::main]
849    /// async fn main() {
850    ///     let (tx, _rx) = broadcast::channel::<()>(16);
851    ///     let tx2 = tx.clone();
852    ///
853    ///     assert!(tx.same_channel(&tx2));
854    ///
855    ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
856    ///
857    ///     assert!(!tx3.same_channel(&tx2));
858    /// }
859    /// ```
860    pub fn same_channel(&self, other: &Self) -> bool {
861        Arc::ptr_eq(&self.shared, &other.shared)
862    }
863
864    /// A future which completes when the number of [Receiver]s subscribed to this `Sender` reaches
865    /// zero.
866    ///
867    /// # Examples
868    ///
869    /// ```
870    /// use futures::FutureExt;
871    /// use tokio::sync::broadcast;
872    ///
873    /// #[tokio::main]
874    /// async fn main() {
875    ///     let (tx, mut rx1) = broadcast::channel::<u32>(16);
876    ///     let mut rx2 = tx.subscribe();
877    ///
878    ///     let _ = tx.send(10);
879    ///
880    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
881    ///     drop(rx1);
882    ///     assert!(tx.closed().now_or_never().is_none());
883    ///
884    ///     assert_eq!(rx2.recv().await.unwrap(), 10);
885    ///     drop(rx2);
886    ///     assert!(tx.closed().now_or_never().is_some());
887    /// }
888    /// ```
889    pub async fn closed(&self) {
890        loop {
891            let notified = self.shared.notify_last_rx_drop.notified();
892
893            {
894                // Ensure the lock drops if the channel isn't closed
895                let tail = self.shared.tail.lock();
896                if tail.closed {
897                    return;
898                }
899            }
900
901            notified.await;
902        }
903    }
904
905    fn close_channel(&self) {
906        let mut tail = self.shared.tail.lock();
907        tail.closed = true;
908
909        self.shared.notify_rx(tail);
910    }
911
912    /// Returns the number of [`Sender`] handles.
913    pub fn strong_count(&self) -> usize {
914        self.shared.num_tx.load(Acquire)
915    }
916
917    /// Returns the number of [`WeakSender`] handles.
918    pub fn weak_count(&self) -> usize {
919        self.shared.num_weak_tx.load(Acquire)
920    }
921}
922
923/// Create a new `Receiver` which reads starting from the tail.
924fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
925    let mut tail = shared.tail.lock();
926
927    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
928
929    if tail.rx_cnt == 0 {
930        // Potentially need to re-open the channel, if a new receiver has been added between calls
931        // to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
932        // applies if the sender has been dropped
933        tail.closed = false;
934    }
935
936    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
937    let next = tail.pos;
938
939    drop(tail);
940
941    Receiver { shared, next }
942}
943
944/// List used in `Shared::notify_rx`. It wraps a guarded linked list
945/// and gates the access to it on the `Shared.tail` mutex. It also empties
946/// the list on drop.
947struct WaitersList<'a, T> {
948    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
949    is_empty: bool,
950    shared: &'a Shared<T>,
951}
952
953impl<'a, T> Drop for WaitersList<'a, T> {
954    fn drop(&mut self) {
955        // If the list is not empty, we unlink all waiters from it.
956        // We do not wake the waiters to avoid double panics.
957        if !self.is_empty {
958            let _lock_guard = self.shared.tail.lock();
959            while self.list.pop_back().is_some() {}
960        }
961    }
962}
963
964impl<'a, T> WaitersList<'a, T> {
965    fn new(
966        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
967        guard: Pin<&'a Waiter>,
968        shared: &'a Shared<T>,
969    ) -> Self {
970        let guard_ptr = NonNull::from(guard.get_ref());
971        let list = unguarded_list.into_guarded(guard_ptr);
972        WaitersList {
973            list,
974            is_empty: false,
975            shared,
976        }
977    }
978
979    /// Removes the last element from the guarded list. Modifying this list
980    /// requires an exclusive access to the main list in `Notify`.
981    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
982        let result = self.list.pop_back();
983        if result.is_none() {
984            // Save information about emptiness to avoid waiting for lock
985            // in the destructor.
986            self.is_empty = true;
987        }
988        result
989    }
990}
991
992impl<T> Shared<T> {
993    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
994        // It is critical for `GuardedLinkedList` safety that the guard node is
995        // pinned in memory and is not dropped until the guarded list is dropped.
996        let guard = Waiter::new();
997        pin!(guard);
998
999        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
1000        // underneath to allow every waiter to safely remove itself from it.
1001        //
1002        // * This list will be still guarded by the `waiters` lock.
1003        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
1004        // * This wrapper will empty the list on drop. It is critical for safety
1005        //   that we will not leave any list entry with a pointer to the local
1006        //   guard node after this function returns / panics.
1007        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
1008
1009        let mut wakers = WakeList::new();
1010        'outer: loop {
1011            while wakers.can_push() {
1012                match list.pop_back_locked(&mut tail) {
1013                    Some(waiter) => {
1014                        unsafe {
1015                            // Safety: accessing `waker` is safe because
1016                            // the tail lock is held.
1017                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
1018                                wakers.push(waker);
1019                            }
1020
1021                            // Safety: `queued` is atomic.
1022                            let queued = &(*waiter.as_ptr()).queued;
1023                            // `Relaxed` suffices because the tail lock is held.
1024                            assert!(queued.load(Relaxed));
1025                            // `Release` is needed to synchronize with `Recv::drop`.
1026                            // It is critical to set this variable **after** waker
1027                            // is extracted, otherwise we may data race with `Recv::drop`.
1028                            queued.store(false, Release);
1029                        }
1030                    }
1031                    None => {
1032                        break 'outer;
1033                    }
1034                }
1035            }
1036
1037            // Release the lock before waking.
1038            drop(tail);
1039
1040            // Before we acquire the lock again all sorts of things can happen:
1041            // some waiters may remove themselves from the list and new waiters
1042            // may be added. This is fine since at worst we will unnecessarily
1043            // wake up waiters which will then queue themselves again.
1044
1045            wakers.wake_all();
1046
1047            // Acquire the lock again.
1048            tail = self.tail.lock();
1049        }
1050
1051        // Release the lock before waking.
1052        drop(tail);
1053
1054        wakers.wake_all();
1055    }
1056}
1057
1058impl<T> Clone for Sender<T> {
1059    fn clone(&self) -> Sender<T> {
1060        let shared = self.shared.clone();
1061        shared.num_tx.fetch_add(1, Relaxed);
1062
1063        Sender { shared }
1064    }
1065}
1066
1067impl<T> Drop for Sender<T> {
1068    fn drop(&mut self) {
1069        if 1 == self.shared.num_tx.fetch_sub(1, AcqRel) {
1070            self.close_channel();
1071        }
1072    }
1073}
1074
1075impl<T> WeakSender<T> {
1076    /// Tries to convert a `WeakSender` into a [`Sender`].
1077    ///
1078    /// This will return `Some` if there are other `Sender` instances alive and
1079    /// the channel wasn't previously dropped, otherwise `None` is returned.
1080    #[must_use]
1081    pub fn upgrade(&self) -> Option<Sender<T>> {
1082        let mut tx_count = self.shared.num_tx.load(Acquire);
1083
1084        loop {
1085            if tx_count == 0 {
1086                // channel is closed so this WeakSender can not be upgraded
1087                return None;
1088            }
1089
1090            match self
1091                .shared
1092                .num_tx
1093                .compare_exchange_weak(tx_count, tx_count + 1, Relaxed, Acquire)
1094            {
1095                Ok(_) => {
1096                    return Some(Sender {
1097                        shared: self.shared.clone(),
1098                    })
1099                }
1100                Err(prev_count) => tx_count = prev_count,
1101            }
1102        }
1103    }
1104
1105    /// Returns the number of [`Sender`] handles.
1106    pub fn strong_count(&self) -> usize {
1107        self.shared.num_tx.load(Acquire)
1108    }
1109
1110    /// Returns the number of [`WeakSender`] handles.
1111    pub fn weak_count(&self) -> usize {
1112        self.shared.num_weak_tx.load(Acquire)
1113    }
1114}
1115
1116impl<T> Clone for WeakSender<T> {
1117    fn clone(&self) -> WeakSender<T> {
1118        let shared = self.shared.clone();
1119        shared.num_weak_tx.fetch_add(1, Relaxed);
1120
1121        Self { shared }
1122    }
1123}
1124
1125impl<T> Drop for WeakSender<T> {
1126    fn drop(&mut self) {
1127        self.shared.num_weak_tx.fetch_sub(1, AcqRel);
1128    }
1129}
1130
1131impl<T> Receiver<T> {
1132    /// Returns the number of messages that were sent into the channel and that
1133    /// this [`Receiver`] has yet to receive.
1134    ///
1135    /// If the returned value from `len` is larger than the next largest power of 2
1136    /// of the capacity of the channel any call to [`recv`] will return an
1137    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
1138    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
1139    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
1140    /// values larger than 16.
1141    ///
1142    /// [`Receiver`]: crate::sync::broadcast::Receiver
1143    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1144    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1145    ///
1146    /// # Examples
1147    ///
1148    /// ```
1149    /// use tokio::sync::broadcast;
1150    ///
1151    /// #[tokio::main]
1152    /// async fn main() {
1153    ///     let (tx, mut rx1) = broadcast::channel(16);
1154    ///
1155    ///     tx.send(10).unwrap();
1156    ///     tx.send(20).unwrap();
1157    ///
1158    ///     assert_eq!(rx1.len(), 2);
1159    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1160    ///     assert_eq!(rx1.len(), 1);
1161    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1162    ///     assert_eq!(rx1.len(), 0);
1163    /// }
1164    /// ```
1165    pub fn len(&self) -> usize {
1166        let next_send_pos = self.shared.tail.lock().pos;
1167        (next_send_pos - self.next) as usize
1168    }
1169
1170    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1171    /// has yet to receive.
1172    ///
1173    /// [`Receiver]: create::sync::broadcast::Receiver
1174    ///
1175    /// # Examples
1176    ///
1177    /// ```
1178    /// use tokio::sync::broadcast;
1179    ///
1180    /// #[tokio::main]
1181    /// async fn main() {
1182    ///     let (tx, mut rx1) = broadcast::channel(16);
1183    ///
1184    ///     assert!(rx1.is_empty());
1185    ///
1186    ///     tx.send(10).unwrap();
1187    ///     tx.send(20).unwrap();
1188    ///
1189    ///     assert!(!rx1.is_empty());
1190    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1191    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1192    ///     assert!(rx1.is_empty());
1193    /// }
1194    /// ```
1195    pub fn is_empty(&self) -> bool {
1196        self.len() == 0
1197    }
1198
1199    /// Returns `true` if receivers belong to the same channel.
1200    ///
1201    /// # Examples
1202    ///
1203    /// ```
1204    /// use tokio::sync::broadcast;
1205    ///
1206    /// #[tokio::main]
1207    /// async fn main() {
1208    ///     let (tx, rx) = broadcast::channel::<()>(16);
1209    ///     let rx2 = tx.subscribe();
1210    ///
1211    ///     assert!(rx.same_channel(&rx2));
1212    ///
1213    ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1214    ///
1215    ///     assert!(!rx3.same_channel(&rx2));
1216    /// }
1217    /// ```
1218    pub fn same_channel(&self, other: &Self) -> bool {
1219        Arc::ptr_eq(&self.shared, &other.shared)
1220    }
1221
1222    /// Locks the next value if there is one.
1223    fn recv_ref(
1224        &mut self,
1225        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1226    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1227        let idx = (self.next & self.shared.mask as u64) as usize;
1228
1229        // The slot holding the next value to read
1230        let mut slot = self.shared.buffer[idx].lock();
1231
1232        if slot.pos != self.next {
1233            // Release the `slot` lock before attempting to acquire the `tail`
1234            // lock. This is required because `send2` acquires the tail lock
1235            // first followed by the slot lock. Acquiring the locks in reverse
1236            // order here would result in a potential deadlock: `recv_ref`
1237            // acquires the `slot` lock and attempts to acquire the `tail` lock
1238            // while `send2` acquired the `tail` lock and attempts to acquire
1239            // the slot lock.
1240            drop(slot);
1241
1242            let mut old_waker = None;
1243
1244            let mut tail = self.shared.tail.lock();
1245
1246            // Acquire slot lock again
1247            slot = self.shared.buffer[idx].lock();
1248
1249            // Make sure the position did not change. This could happen in the
1250            // unlikely event that the buffer is wrapped between dropping the
1251            // read lock and acquiring the tail lock.
1252            if slot.pos != self.next {
1253                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1254
1255                if next_pos == self.next {
1256                    // At this point the channel is empty for *this* receiver. If
1257                    // it's been closed, then that's what we return, otherwise we
1258                    // set a waker and return empty.
1259                    if tail.closed {
1260                        return Err(TryRecvError::Closed);
1261                    }
1262
1263                    // Store the waker
1264                    if let Some((waiter, waker)) = waiter {
1265                        // Safety: called while locked.
1266                        unsafe {
1267                            // Only queue if not already queued
1268                            waiter.with_mut(|ptr| {
1269                                // If there is no waker **or** if the currently
1270                                // stored waker references a **different** task,
1271                                // track the tasks' waker to be notified on
1272                                // receipt of a new value.
1273                                match (*ptr).waker {
1274                                    Some(ref w) if w.will_wake(waker) => {}
1275                                    _ => {
1276                                        old_waker = (*ptr).waker.replace(waker.clone());
1277                                    }
1278                                }
1279
1280                                // If the waiter is not already queued, enqueue it.
1281                                // `Relaxed` order suffices: we have synchronized with
1282                                // all writers through the tail lock that we hold.
1283                                if !(*ptr).queued.load(Relaxed) {
1284                                    // `Relaxed` order suffices: all the readers will
1285                                    // synchronize with this write through the tail lock.
1286                                    (*ptr).queued.store(true, Relaxed);
1287                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1288                                }
1289                            });
1290                        }
1291                    }
1292
1293                    // Drop the old waker after releasing the locks.
1294                    drop(slot);
1295                    drop(tail);
1296                    drop(old_waker);
1297
1298                    return Err(TryRecvError::Empty);
1299                }
1300
1301                // At this point, the receiver has lagged behind the sender by
1302                // more than the channel capacity. The receiver will attempt to
1303                // catch up by skipping dropped messages and setting the
1304                // internal cursor to the **oldest** message stored by the
1305                // channel.
1306                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1307
1308                let missed = next.wrapping_sub(self.next);
1309
1310                drop(tail);
1311
1312                // The receiver is slow but no values have been missed
1313                if missed == 0 {
1314                    self.next = self.next.wrapping_add(1);
1315
1316                    return Ok(RecvGuard { slot });
1317                }
1318
1319                self.next = next;
1320
1321                return Err(TryRecvError::Lagged(missed));
1322            }
1323        }
1324
1325        self.next = self.next.wrapping_add(1);
1326
1327        Ok(RecvGuard { slot })
1328    }
1329
1330    /// Returns the number of [`Sender`] handles.
1331    pub fn sender_strong_count(&self) -> usize {
1332        self.shared.num_tx.load(Acquire)
1333    }
1334
1335    /// Returns the number of [`WeakSender`] handles.
1336    pub fn sender_weak_count(&self) -> usize {
1337        self.shared.num_weak_tx.load(Acquire)
1338    }
1339
1340    /// Checks if a channel is closed.
1341    ///
1342    /// This method returns `true` if the channel has been closed. The channel is closed
1343    /// when all [`Sender`] have been dropped.
1344    ///
1345    /// [`Sender`]: crate::sync::broadcast::Sender
1346    ///
1347    /// # Examples
1348    /// ```
1349    /// use tokio::sync::broadcast;
1350    ///
1351    /// #[tokio::main]
1352    /// async fn main() {
1353    ///     let (tx, rx) = broadcast::channel::<()>(10);
1354    ///     assert!(!rx.is_closed());
1355    ///
1356    ///     drop(tx);
1357    ///
1358    ///     assert!(rx.is_closed());
1359    /// }
1360    /// ```
1361    pub fn is_closed(&self) -> bool {
1362        // Channel is closed when there are no strong senders left active
1363        self.shared.num_tx.load(Acquire) == 0
1364    }
1365}
1366
1367impl<T: Clone> Receiver<T> {
1368    /// Re-subscribes to the channel starting from the current tail element.
1369    ///
1370    /// This [`Receiver`] handle will receive a clone of all values sent
1371    /// **after** it has resubscribed. This will not include elements that are
1372    /// in the queue of the current receiver. Consider the following example.
1373    ///
1374    /// # Examples
1375    ///
1376    /// ```
1377    /// use tokio::sync::broadcast;
1378    ///
1379    /// #[tokio::main]
1380    /// async fn main() {
1381    ///   let (tx, mut rx) = broadcast::channel(2);
1382    ///
1383    ///   tx.send(1).unwrap();
1384    ///   let mut rx2 = rx.resubscribe();
1385    ///   tx.send(2).unwrap();
1386    ///
1387    ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1388    ///   assert_eq!(rx.recv().await.unwrap(), 1);
1389    /// }
1390    /// ```
1391    pub fn resubscribe(&self) -> Self {
1392        let shared = self.shared.clone();
1393        new_receiver(shared)
1394    }
1395    /// Receives the next value for this receiver.
1396    ///
1397    /// Each [`Receiver`] handle will receive a clone of all values sent
1398    /// **after** it has subscribed.
1399    ///
1400    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1401    /// dropped, indicating that no further values can be sent on the channel.
1402    ///
1403    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1404    /// sent values will overwrite old values. At this point, a call to [`recv`]
1405    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1406    /// internal cursor is updated to point to the oldest value still held by
1407    /// the channel. A subsequent call to [`recv`] will return this value
1408    /// **unless** it has been since overwritten.
1409    ///
1410    /// # Cancel safety
1411    ///
1412    /// This method is cancel safe. If `recv` is used as the event in a
1413    /// [`tokio::select!`](crate::select) statement and some other branch
1414    /// completes first, it is guaranteed that no messages were received on this
1415    /// channel.
1416    ///
1417    /// [`Receiver`]: crate::sync::broadcast::Receiver
1418    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1419    ///
1420    /// # Examples
1421    ///
1422    /// ```
1423    /// use tokio::sync::broadcast;
1424    ///
1425    /// #[tokio::main]
1426    /// async fn main() {
1427    ///     let (tx, mut rx1) = broadcast::channel(16);
1428    ///     let mut rx2 = tx.subscribe();
1429    ///
1430    ///     tokio::spawn(async move {
1431    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1432    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1433    ///     });
1434    ///
1435    ///     tokio::spawn(async move {
1436    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1437    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1438    ///     });
1439    ///
1440    ///     tx.send(10).unwrap();
1441    ///     tx.send(20).unwrap();
1442    /// }
1443    /// ```
1444    ///
1445    /// Handling lag
1446    ///
1447    /// ```
1448    /// use tokio::sync::broadcast;
1449    ///
1450    /// #[tokio::main]
1451    /// async fn main() {
1452    ///     let (tx, mut rx) = broadcast::channel(2);
1453    ///
1454    ///     tx.send(10).unwrap();
1455    ///     tx.send(20).unwrap();
1456    ///     tx.send(30).unwrap();
1457    ///
1458    ///     // The receiver lagged behind
1459    ///     assert!(rx.recv().await.is_err());
1460    ///
1461    ///     // At this point, we can abort or continue with lost messages
1462    ///
1463    ///     assert_eq!(20, rx.recv().await.unwrap());
1464    ///     assert_eq!(30, rx.recv().await.unwrap());
1465    /// }
1466    /// ```
1467    pub async fn recv(&mut self) -> Result<T, RecvError> {
1468        cooperative(Recv::new(self)).await
1469    }
1470
1471    /// Attempts to return a pending value on this receiver without awaiting.
1472    ///
1473    /// This is useful for a flavor of "optimistic check" before deciding to
1474    /// await on a receiver.
1475    ///
1476    /// Compared with [`recv`], this function has three failure cases instead of two
1477    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1478    ///
1479    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1480    /// dropped, indicating that no further values can be sent on the channel.
1481    ///
1482    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1483    /// sent values will overwrite old values. At this point, a call to [`recv`]
1484    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1485    /// internal cursor is updated to point to the oldest value still held by
1486    /// the channel. A subsequent call to [`try_recv`] will return this value
1487    /// **unless** it has been since overwritten. If there are no values to
1488    /// receive, `Err(TryRecvError::Empty)` is returned.
1489    ///
1490    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1491    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1492    /// [`Receiver`]: crate::sync::broadcast::Receiver
1493    ///
1494    /// # Examples
1495    ///
1496    /// ```
1497    /// use tokio::sync::broadcast;
1498    ///
1499    /// #[tokio::main]
1500    /// async fn main() {
1501    ///     let (tx, mut rx) = broadcast::channel(16);
1502    ///
1503    ///     assert!(rx.try_recv().is_err());
1504    ///
1505    ///     tx.send(10).unwrap();
1506    ///
1507    ///     let value = rx.try_recv().unwrap();
1508    ///     assert_eq!(10, value);
1509    /// }
1510    /// ```
1511    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1512        let guard = self.recv_ref(None)?;
1513        guard.clone_value().ok_or(TryRecvError::Closed)
1514    }
1515
1516    /// Blocking receive to call outside of asynchronous contexts.
1517    ///
1518    /// # Panics
1519    ///
1520    /// This function panics if called within an asynchronous execution
1521    /// context.
1522    ///
1523    /// # Examples
1524    /// ```
1525    /// use std::thread;
1526    /// use tokio::sync::broadcast;
1527    ///
1528    /// #[tokio::main]
1529    /// async fn main() {
1530    ///     let (tx, mut rx) = broadcast::channel(16);
1531    ///
1532    ///     let sync_code = thread::spawn(move || {
1533    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1534    ///     });
1535    ///
1536    ///     let _ = tx.send(10);
1537    ///     sync_code.join().unwrap();
1538    /// }
1539    /// ```
1540    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1541        crate::future::block_on(self.recv())
1542    }
1543}
1544
1545impl<T> Drop for Receiver<T> {
1546    fn drop(&mut self) {
1547        let mut tail = self.shared.tail.lock();
1548
1549        tail.rx_cnt -= 1;
1550        let until = tail.pos;
1551        let remaining_rx = tail.rx_cnt;
1552
1553        if remaining_rx == 0 {
1554            self.shared.notify_last_rx_drop.notify_waiters();
1555            tail.closed = true;
1556        }
1557
1558        drop(tail);
1559
1560        while self.next < until {
1561            match self.recv_ref(None) {
1562                Ok(_) => {}
1563                // The channel is closed
1564                Err(TryRecvError::Closed) => break,
1565                // Ignore lagging, we will catch up
1566                Err(TryRecvError::Lagged(..)) => {}
1567                // Can't be empty
1568                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1569            }
1570        }
1571    }
1572}
1573
1574impl<'a, T> Recv<'a, T> {
1575    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1576        Recv {
1577            receiver,
1578            waiter: WaiterCell(UnsafeCell::new(Waiter {
1579                queued: AtomicBool::new(false),
1580                waker: None,
1581                pointers: linked_list::Pointers::new(),
1582                _p: PhantomPinned,
1583            })),
1584        }
1585    }
1586
1587    /// A custom `project` implementation is used in place of `pin-project-lite`
1588    /// as a custom drop implementation is needed.
1589    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1590        unsafe {
1591            // Safety: Receiver is Unpin
1592            is_unpin::<&mut Receiver<T>>();
1593
1594            let me = self.get_unchecked_mut();
1595            (me.receiver, &me.waiter.0)
1596        }
1597    }
1598}
1599
1600impl<'a, T> Future for Recv<'a, T>
1601where
1602    T: Clone,
1603{
1604    type Output = Result<T, RecvError>;
1605
1606    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1607        ready!(crate::trace::trace_leaf(cx));
1608
1609        let (receiver, waiter) = self.project();
1610
1611        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1612            Ok(value) => value,
1613            Err(TryRecvError::Empty) => return Poll::Pending,
1614            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1615            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1616        };
1617
1618        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1619    }
1620}
1621
1622impl<'a, T> Drop for Recv<'a, T> {
1623    fn drop(&mut self) {
1624        // Safety: `waiter.queued` is atomic.
1625        // Acquire ordering is required to synchronize with
1626        // `Shared::notify_rx` before we drop the object.
1627        let queued = self
1628            .waiter
1629            .0
1630            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1631
1632        // If the waiter is queued, we need to unlink it from the waiters list.
1633        // If not, no further synchronization is required, since the waiter
1634        // is not in the list and, as such, is not shared with any other threads.
1635        if queued {
1636            // Acquire the tail lock. This is required for safety before accessing
1637            // the waiter node.
1638            let mut tail = self.receiver.shared.tail.lock();
1639
1640            // Safety: tail lock is held.
1641            // `Relaxed` order suffices because we hold the tail lock.
1642            let queued = self
1643                .waiter
1644                .0
1645                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1646
1647            if queued {
1648                // Remove the node
1649                //
1650                // safety: tail lock is held and the wait node is verified to be in
1651                // the list.
1652                unsafe {
1653                    self.waiter.0.with_mut(|ptr| {
1654                        tail.waiters.remove((&mut *ptr).into());
1655                    });
1656                }
1657            }
1658        }
1659    }
1660}
1661
1662/// # Safety
1663///
1664/// `Waiter` is forced to be !Unpin.
1665unsafe impl linked_list::Link for Waiter {
1666    type Handle = NonNull<Waiter>;
1667    type Target = Waiter;
1668
1669    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1670        *handle
1671    }
1672
1673    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1674        ptr
1675    }
1676
1677    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1678        Waiter::addr_of_pointers(target)
1679    }
1680}
1681
1682impl<T> fmt::Debug for Sender<T> {
1683    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1684        write!(fmt, "broadcast::Sender")
1685    }
1686}
1687
1688impl<T> fmt::Debug for WeakSender<T> {
1689    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1690        write!(fmt, "broadcast::WeakSender")
1691    }
1692}
1693
1694impl<T> fmt::Debug for Receiver<T> {
1695    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1696        write!(fmt, "broadcast::Receiver")
1697    }
1698}
1699
1700impl<'a, T> RecvGuard<'a, T> {
1701    fn clone_value(&self) -> Option<T>
1702    where
1703        T: Clone,
1704    {
1705        self.slot.val.clone()
1706    }
1707}
1708
1709impl<'a, T> Drop for RecvGuard<'a, T> {
1710    fn drop(&mut self) {
1711        // Decrement the remaining counter
1712        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1713            self.slot.val = None;
1714        }
1715    }
1716}
1717
1718fn is_unpin<T: Unpin>() {}
1719
1720#[cfg(not(loom))]
1721#[cfg(test)]
1722mod tests {
1723    use super::*;
1724
1725    #[test]
1726    fn receiver_count_on_sender_constructor() {
1727        let sender = Sender::<i32>::new(16);
1728        assert_eq!(sender.receiver_count(), 0);
1729
1730        let rx_1 = sender.subscribe();
1731        assert_eq!(sender.receiver_count(), 1);
1732
1733        let rx_2 = rx_1.resubscribe();
1734        assert_eq!(sender.receiver_count(), 2);
1735
1736        let rx_3 = sender.subscribe();
1737        assert_eq!(sender.receiver_count(), 3);
1738
1739        drop(rx_3);
1740        drop(rx_1);
1741        assert_eq!(sender.receiver_count(), 1);
1742
1743        drop(rx_2);
1744        assert_eq!(sender.receiver_count(), 0);
1745    }
1746
1747    #[cfg(not(loom))]
1748    #[test]
1749    fn receiver_count_on_channel_constructor() {
1750        let (sender, rx) = channel::<i32>(16);
1751        assert_eq!(sender.receiver_count(), 1);
1752
1753        let _rx_2 = rx.resubscribe();
1754        assert_eq!(sender.receiver_count(), 2);
1755    }
1756}