tokio/runtime/io/
driver.rs

1// Signal handling
2cfg_signal_internal_and_unix! {
3    mod signal;
4}
5cfg_tokio_uring! {
6    mod uring;
7    use uring::UringContext;
8    use crate::loom::sync::atomic::AtomicUsize;
9}
10
11use crate::io::interest::Interest;
12use crate::io::ready::Ready;
13use crate::loom::sync::Mutex;
14use crate::runtime::driver;
15use crate::runtime::io::registration_set;
16use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
17
18use mio::event::Source;
19use std::fmt;
20use std::io;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// I/O driver, backed by Mio.
25pub(crate) struct Driver {
26    /// True when an event with the signal token is received
27    signal_ready: bool,
28
29    /// Reuse the `mio::Events` value across calls to poll.
30    events: mio::Events,
31
32    /// The system event queue.
33    poll: mio::Poll,
34}
35
36/// A reference to an I/O driver.
37pub(crate) struct Handle {
38    /// Registers I/O resources.
39    registry: mio::Registry,
40
41    /// Tracks all registrations
42    registrations: RegistrationSet,
43
44    /// State that should be synchronized
45    synced: Mutex<registration_set::Synced>,
46
47    /// Used to wake up the reactor from a call to `turn`.
48    /// Not supported on `Wasi` due to lack of threading support.
49    #[cfg(not(target_os = "wasi"))]
50    waker: mio::Waker,
51
52    pub(crate) metrics: IoDriverMetrics,
53
54    #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
55    pub(crate) uring_context: Mutex<UringContext>,
56
57    #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
58    pub(crate) uring_state: AtomicUsize,
59}
60
61#[derive(Debug)]
62pub(crate) struct ReadyEvent {
63    pub(super) tick: u8,
64    pub(crate) ready: Ready,
65    pub(super) is_shutdown: bool,
66}
67
68cfg_net_unix!(
69    impl ReadyEvent {
70        pub(crate) fn with_ready(&self, ready: Ready) -> Self {
71            Self {
72                ready,
73                tick: self.tick,
74                is_shutdown: self.is_shutdown,
75            }
76        }
77    }
78);
79
80#[derive(Debug, Eq, PartialEq, Clone, Copy)]
81pub(super) enum Direction {
82    Read,
83    Write,
84}
85
86pub(super) enum Tick {
87    Set,
88    Clear(u8),
89}
90
91const TOKEN_WAKEUP: mio::Token = mio::Token(0);
92const TOKEN_SIGNAL: mio::Token = mio::Token(1);
93
94fn _assert_kinds() {
95    fn _assert<T: Send + Sync>() {}
96
97    _assert::<Handle>();
98}
99
100// ===== impl Driver =====
101
102impl Driver {
103    /// Creates a new event loop, returning any error that happened during the
104    /// creation.
105    pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
106        let poll = mio::Poll::new()?;
107        #[cfg(not(target_os = "wasi"))]
108        let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
109        let registry = poll.registry().try_clone()?;
110
111        let driver = Driver {
112            signal_ready: false,
113            events: mio::Events::with_capacity(nevents),
114            poll,
115        };
116
117        let (registrations, synced) = RegistrationSet::new();
118
119        let handle = Handle {
120            registry,
121            registrations,
122            synced: Mutex::new(synced),
123            #[cfg(not(target_os = "wasi"))]
124            waker,
125            metrics: IoDriverMetrics::default(),
126            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
127            uring_context: Mutex::new(UringContext::new()),
128            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
129            uring_state: AtomicUsize::new(0),
130        };
131
132        Ok((driver, handle))
133    }
134
135    pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
136        let handle = rt_handle.io();
137        self.turn(handle, None);
138    }
139
140    pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
141        let handle = rt_handle.io();
142        self.turn(handle, Some(duration));
143    }
144
145    pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
146        let handle = rt_handle.io();
147        let ios = handle.registrations.shutdown(&mut handle.synced.lock());
148
149        // `shutdown()` must be called without holding the lock.
150        for io in ios {
151            io.shutdown();
152        }
153    }
154
155    fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
156        debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
157
158        handle.release_pending_registrations();
159
160        let events = &mut self.events;
161
162        // Block waiting for an event to happen, peeling out how many events
163        // happened.
164        match self.poll.poll(events, max_wait) {
165            Ok(()) => {}
166            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
167            #[cfg(target_os = "wasi")]
168            Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
169                // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
170                // just return from the park, as there would be nothing, which wakes us up.
171            }
172            Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
173        }
174
175        // Process all the events that came in, dispatching appropriately
176        let mut ready_count = 0;
177        for event in events.iter() {
178            let token = event.token();
179
180            if token == TOKEN_WAKEUP {
181                // Nothing to do, the event is used to unblock the I/O driver
182            } else if token == TOKEN_SIGNAL {
183                self.signal_ready = true;
184            } else {
185                let ready = Ready::from_mio(event);
186                let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
187
188                // Safety: we ensure that the pointers used as tokens are not freed
189                // until they are both deregistered from mio **and** we know the I/O
190                // driver is not concurrently polling. The I/O driver holds ownership of
191                // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
192                let io: &ScheduledIo = unsafe { &*ptr };
193
194                io.set_readiness(Tick::Set, |curr| curr | ready);
195                io.wake(ready);
196
197                ready_count += 1;
198            }
199        }
200
201        #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
202        {
203            let mut guard = handle.get_uring().lock();
204            let ctx = &mut *guard;
205            ctx.dispatch_completions();
206        }
207
208        handle.metrics.incr_ready_count_by(ready_count);
209    }
210}
211
212impl fmt::Debug for Driver {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        write!(f, "Driver")
215    }
216}
217
218impl Handle {
219    /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
220    /// makes the next call to `turn` return immediately.
221    ///
222    /// This method is intended to be used in situations where a notification
223    /// needs to otherwise be sent to the main reactor. If the reactor is
224    /// currently blocked inside of `turn` then it will wake up and soon return
225    /// after this method has been called. If the reactor is not currently
226    /// blocked in `turn`, then the next call to `turn` will not block and
227    /// return immediately.
228    pub(crate) fn unpark(&self) {
229        #[cfg(not(target_os = "wasi"))]
230        self.waker.wake().expect("failed to wake I/O driver");
231    }
232
233    /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
234    ///
235    /// The registration token is returned.
236    pub(super) fn add_source(
237        &self,
238        source: &mut impl mio::event::Source,
239        interest: Interest,
240    ) -> io::Result<Arc<ScheduledIo>> {
241        let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
242        let token = scheduled_io.token();
243
244        // we should remove the `scheduled_io` from the `registrations` set if registering
245        // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`.
246        if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
247            // safety: `scheduled_io` is part of the `registrations` set.
248            unsafe {
249                self.registrations
250                    .remove(&mut self.synced.lock(), &scheduled_io)
251            };
252
253            return Err(e);
254        }
255
256        // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
257        self.metrics.incr_fd_count();
258
259        Ok(scheduled_io)
260    }
261
262    /// Deregisters an I/O resource from the reactor.
263    pub(super) fn deregister_source(
264        &self,
265        registration: &Arc<ScheduledIo>,
266        source: &mut impl Source,
267    ) -> io::Result<()> {
268        // Deregister the source with the OS poller **first**
269        self.registry.deregister(source)?;
270
271        if self
272            .registrations
273            .deregister(&mut self.synced.lock(), registration)
274        {
275            self.unpark();
276        }
277
278        self.metrics.dec_fd_count();
279
280        Ok(())
281    }
282
283    fn release_pending_registrations(&self) {
284        if self.registrations.needs_release() {
285            self.registrations.release(&mut self.synced.lock());
286        }
287    }
288}
289
290impl fmt::Debug for Handle {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292        write!(f, "Handle")
293    }
294}
295
296impl Direction {
297    pub(super) fn mask(self) -> Ready {
298        match self {
299            Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
300            Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
301        }
302    }
303}