tokio/sync/
set_once.rs

1use super::Notify;
2
3use crate::loom::cell::UnsafeCell;
4use crate::loom::sync::{atomic::AtomicBool, Mutex};
5
6use std::error::Error;
7use std::fmt;
8use std::mem::MaybeUninit;
9use std::ops::Drop;
10use std::ptr;
11use std::sync::atomic::Ordering;
12
13// This file contains an implementation of an SetOnce. The value of SetOnce
14// can only be modified once during initialization.
15//
16//  1. When `value_set` is false, the `value` is not initialized and wait()
17//      future will keep on waiting.
18//  2. When `value_set` is true, the wait() future completes, get() will return
19//      Some(&T)
20//
21// The value cannot be changed after set() is called. Subsequent calls to set()
22// will return a `SetOnceError`.
23
24/// A thread-safe cell that can be written to only once.
25///
26/// A `SetOnce` is inspired from python's [`asyncio.Event`] type. It can be
27/// used to wait until the value of the `SetOnce` is set like a "Event" mechanism.
28///
29/// # Example
30///
31/// ```
32/// use tokio::sync::{SetOnce, SetOnceError};
33///
34/// static ONCE: SetOnce<u32> = SetOnce::const_new();
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), SetOnceError<u32>> {
38///
39///     // set the value inside a task somewhere...
40///     tokio::spawn(async move { ONCE.set(20) });
41///
42///     // checking with .get doesn't block main thread
43///     println!("{:?}", ONCE.get());
44///
45///     // wait until the value is set, blocks the thread
46///     println!("{:?}", ONCE.wait().await);
47///
48///     Ok(())
49/// }
50/// ```
51///
52/// A `SetOnce` is typically used for global variables that need to be
53/// initialized once on first use, but need no further changes. The `SetOnce`
54/// in Tokio allows the initialization procedure to be asynchronous.
55///
56/// # Example
57///
58/// ```
59/// use tokio::sync::{SetOnce, SetOnceError};
60/// use std::sync::Arc;
61///
62/// #[tokio::main]
63/// async fn main() -> Result<(), SetOnceError<u32>> {
64///     let once = SetOnce::new();
65///
66///     let arc = Arc::new(once);
67///     let first_cl = Arc::clone(&arc);
68///     let second_cl = Arc::clone(&arc);
69///
70///     // set the value inside a task
71///     tokio::spawn(async move { first_cl.set(20) }).await.unwrap()?;
72///
73///     // wait inside task to not block the main thread
74///     tokio::spawn(async move {
75///         // wait inside async context for the value to be set
76///         assert_eq!(*second_cl.wait().await, 20);
77///     }).await.unwrap();
78///
79///     // subsequent set calls will fail
80///     assert!(arc.set(30).is_err());
81///
82///     println!("{:?}", arc.get());
83///
84///     Ok(())
85/// }
86/// ```
87///
88/// [`asyncio.Event`]: https://docs.python.org/3/library/asyncio-event.html
89pub struct SetOnce<T> {
90    value_set: AtomicBool,
91    value: UnsafeCell<MaybeUninit<T>>,
92    notify: Notify,
93    // we lock the mutex inside set to ensure
94    // only one caller of set can run at a time
95    lock: Mutex<()>,
96}
97
98impl<T> Default for SetOnce<T> {
99    fn default() -> SetOnce<T> {
100        SetOnce::new()
101    }
102}
103
104impl<T: fmt::Debug> fmt::Debug for SetOnce<T> {
105    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
106        fmt.debug_struct("SetOnce")
107            .field("value", &self.get())
108            .finish()
109    }
110}
111
112impl<T: Clone> Clone for SetOnce<T> {
113    fn clone(&self) -> SetOnce<T> {
114        SetOnce::new_with(self.get().cloned())
115    }
116}
117
118impl<T: PartialEq> PartialEq for SetOnce<T> {
119    fn eq(&self, other: &SetOnce<T>) -> bool {
120        self.get() == other.get()
121    }
122}
123
124impl<T: Eq> Eq for SetOnce<T> {}
125
126impl<T> Drop for SetOnce<T> {
127    fn drop(&mut self) {
128        // TODO: Use get_mut()
129        if self.value_set.load(Ordering::Relaxed) {
130            // SAFETY: If the value_set is true, then the value is initialized
131            // then there is a value to be dropped and this is safe
132            unsafe { self.value.with_mut(|ptr| ptr::drop_in_place(ptr as *mut T)) }
133        }
134    }
135}
136
137impl<T> From<T> for SetOnce<T> {
138    fn from(value: T) -> Self {
139        SetOnce {
140            value_set: AtomicBool::new(true),
141            value: UnsafeCell::new(MaybeUninit::new(value)),
142            notify: Notify::new(),
143            lock: Mutex::new(()),
144        }
145    }
146}
147
148impl<T> SetOnce<T> {
149    /// Creates a new empty `SetOnce` instance.
150    pub fn new() -> Self {
151        Self {
152            value_set: AtomicBool::new(false),
153            value: UnsafeCell::new(MaybeUninit::uninit()),
154            notify: Notify::new(),
155            lock: Mutex::new(()),
156        }
157    }
158
159    /// Creates a new empty `SetOnce` instance.
160    ///
161    /// Equivalent to `SetOnce::new`, except that it can be used in static
162    /// variables.
163    ///
164    /// When using the `tracing` [unstable feature], a `SetOnce` created with
165    /// `const_new` will not be instrumented. As such, it will not be visible
166    /// in [`tokio-console`]. Instead, [`SetOnce::new`] should be used to
167    /// create an instrumented object if that is needed.
168    ///
169    /// # Example
170    ///
171    /// ```
172    /// use tokio::sync::{SetOnce, SetOnceError};
173    ///
174    /// static ONCE: SetOnce<u32> = SetOnce::const_new();
175    ///
176    /// fn get_global_integer() -> Result<Option<&'static u32>, SetOnceError<u32>> {
177    ///     ONCE.set(2)?;
178    ///     Ok(ONCE.get())
179    /// }
180    ///
181    /// #[tokio::main]
182    /// async fn main() -> Result<(), SetOnceError<u32>> {
183    ///     let result = get_global_integer()?;
184    ///
185    ///     assert_eq!(result, Some(&2));
186    ///     Ok(())
187    /// }
188    /// ```
189    ///
190    /// [`tokio-console`]: https://github.com/tokio-rs/console
191    /// [unstable feature]: crate#unstable-features
192    #[cfg(not(all(loom, test)))]
193    pub const fn const_new() -> Self {
194        Self {
195            value_set: AtomicBool::new(false),
196            value: UnsafeCell::new(MaybeUninit::uninit()),
197            notify: Notify::const_new(),
198            lock: Mutex::const_new(()),
199        }
200    }
201
202    /// Creates a new `SetOnce` that contains the provided value, if any.
203    ///
204    /// If the `Option` is `None`, this is equivalent to `SetOnce::new`.
205    ///
206    /// [`SetOnce::new`]: crate::sync::SetOnce::new
207    pub fn new_with(value: Option<T>) -> Self {
208        if let Some(v) = value {
209            SetOnce::from(v)
210        } else {
211            SetOnce::new()
212        }
213    }
214
215    /// Creates a new `SetOnce` that contains the provided value.
216    ///
217    /// # Example
218    ///
219    /// When using the `tracing` [unstable feature], a `SetOnce` created with
220    /// `const_new_with` will not be instrumented. As such, it will not be
221    /// visible in [`tokio-console`]. Instead, [`SetOnce::new_with`] should be
222    /// used to create an instrumented object if that is needed.
223    ///
224    /// ```
225    /// use tokio::sync::SetOnce;
226    ///
227    /// static ONCE: SetOnce<u32> = SetOnce::const_new_with(1);
228    ///
229    /// fn get_global_integer() -> Option<&'static u32> {
230    ///     ONCE.get()
231    /// }
232    ///
233    /// #[tokio::main]
234    /// async fn main() {
235    ///     let result = get_global_integer();
236    ///
237    ///     assert_eq!(result, Some(&1));
238    /// }
239    /// ```
240    ///
241    /// [`tokio-console`]: https://github.com/tokio-rs/console
242    /// [unstable feature]: crate#unstable-features
243    #[cfg(not(all(loom, test)))]
244    pub const fn const_new_with(value: T) -> Self {
245        Self {
246            value_set: AtomicBool::new(true),
247            value: UnsafeCell::new(MaybeUninit::new(value)),
248            notify: Notify::const_new(),
249            lock: Mutex::const_new(()),
250        }
251    }
252
253    /// Returns `true` if the `SetOnce` currently contains a value, and `false`
254    /// otherwise.
255    pub fn initialized(&self) -> bool {
256        // Using acquire ordering so we're able to read/catch any writes that
257        // are done with `Ordering::Release`
258        self.value_set.load(Ordering::Acquire)
259    }
260
261    // SAFETY: The SetOnce must not be empty.
262    unsafe fn get_unchecked(&self) -> &T {
263        &*self.value.with(|ptr| (*ptr).as_ptr())
264    }
265
266    /// Returns a reference to the value currently stored in the `SetOnce`, or
267    /// `None` if the `SetOnce` is empty.
268    pub fn get(&self) -> Option<&T> {
269        if self.initialized() {
270            // SAFETY: the SetOnce is initialized, so we can safely
271            // call get_unchecked and return the value
272            Some(unsafe { self.get_unchecked() })
273        } else {
274            None
275        }
276    }
277
278    /// Sets the value of the `SetOnce` to the given value if the `SetOnce` is
279    /// empty.
280    ///
281    /// If the `SetOnce` already has a value, this call will fail with an
282    /// [`SetOnceError`].
283    ///
284    /// [`SetOnceError`]: crate::sync::SetOnceError
285    pub fn set(&self, value: T) -> Result<(), SetOnceError<T>> {
286        if self.initialized() {
287            return Err(SetOnceError(value));
288        }
289
290        // SAFETY: lock the mutex to ensure only one caller of set
291        // can run at a time.
292        let guard = self.lock.lock();
293
294        if self.initialized() {
295            // If the value is already set, we return an error
296            drop(guard);
297
298            return Err(SetOnceError(value));
299        }
300
301        // SAFETY: We have locked the mutex and checked if the value is
302        // initalized or not, so we can safely write to the value
303        unsafe {
304            self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value));
305        }
306
307        // Using release ordering so any threads that read a true from this
308        // atomic is able to read the value we just stored.
309        self.value_set.store(true, Ordering::Release);
310
311        drop(guard);
312
313        // notify the waiting wakers that the value is set
314        self.notify.notify_waiters();
315
316        Ok(())
317    }
318
319    /// Takes the value from the cell, destroying the cell in the process.
320    /// Returns `None` if the cell is empty.
321    pub fn into_inner(self) -> Option<T> {
322        // TODO: Use get_mut()
323        let value_set = self.value_set.load(Ordering::Relaxed);
324
325        if value_set {
326            // Since we have taken ownership of self, its drop implementation
327            // will be called by the end of this function, to prevent a double
328            // free we will set the value_set to false so that the drop
329            // implementation does not try to drop the value again.
330            self.value_set.store(false, Ordering::Relaxed);
331
332            // SAFETY: The SetOnce is currently initialized, we can assume the
333            // value is initialized and return that, when we return the value
334            // we give the drop handler to the return scope.
335            Some(unsafe { self.value.with_mut(|ptr| ptr::read(ptr).assume_init()) })
336        } else {
337            None
338        }
339    }
340
341    /// Waits until set is called. The future returned will keep blocking until
342    /// the `SetOnce` is initialized.
343    ///
344    /// If the `SetOnce` is already initialized, it will return the value
345    /// immediately.
346    ///
347    /// # Note
348    ///
349    /// This will keep waiting until the `SetOnce` is initialized, so it
350    /// should be used with care to avoid blocking the current task
351    /// indefinitely.
352    pub async fn wait(&self) -> &T {
353        loop {
354            if let Some(val) = self.get() {
355                return val;
356            }
357
358            let notify_fut = self.notify.notified();
359            {
360                // Taking the lock here ensures that a concurrent call to `set`
361                // will see the creation of `notify_fut` in case the check
362                // fails.
363                let _guard = self.lock.lock();
364
365                if self.value_set.load(Ordering::Relaxed) {
366                    // SAFETY: the state is initialized
367                    return unsafe { self.get_unchecked() };
368                }
369            }
370
371            // wait until the value is set
372            notify_fut.await;
373        }
374    }
375}
376
377// Since `get` gives us access to immutable references of the SetOnce, SetOnce
378// can only be Sync if T is Sync, otherwise SetOnce would allow sharing
379// references of !Sync values across threads. We need T to be Send in order for
380// SetOnce to by Sync because we can use `set` on `&SetOnce<T>` to send values
381// (of type T) across threads.
382unsafe impl<T: Sync + Send> Sync for SetOnce<T> {}
383
384// Access to SetOnce's value is guarded by the Atomic boolean flag
385// and atomic operations on `value_set`, so as long as T itself is Send
386// it's safe to send it to another thread
387unsafe impl<T: Send> Send for SetOnce<T> {}
388
389/// Error that can be returned from [`SetOnce::set`].
390///
391/// This error means that the `SetOnce` was already initialized when
392/// set was called
393///
394/// [`SetOnce::set`]: crate::sync::SetOnce::set
395#[derive(Debug, PartialEq, Eq)]
396pub struct SetOnceError<T>(pub T);
397
398impl<T> fmt::Display for SetOnceError<T> {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        write!(f, "SetOnceError")
401    }
402}
403
404impl<T: fmt::Debug> Error for SetOnceError<T> {}