1cfg_signal_internal_and_unix! {
3 mod signal;
4}
5cfg_tokio_uring! {
6 mod uring;
7 use uring::UringContext;
8 use crate::loom::sync::atomic::AtomicUsize;
9}
10
11use crate::io::interest::Interest;
12use crate::io::ready::Ready;
13use crate::loom::sync::Mutex;
14use crate::runtime::driver;
15use crate::runtime::io::registration_set;
16use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
17
18use mio::event::Source;
19use std::fmt;
20use std::io;
21use std::sync::Arc;
22use std::time::Duration;
23
24pub(crate) struct Driver {
26 signal_ready: bool,
28
29 events: mio::Events,
31
32 poll: mio::Poll,
34}
35
36pub(crate) struct Handle {
38 registry: mio::Registry,
40
41 registrations: RegistrationSet,
43
44 synced: Mutex<registration_set::Synced>,
46
47 #[cfg(not(target_os = "wasi"))]
50 waker: mio::Waker,
51
52 pub(crate) metrics: IoDriverMetrics,
53
54 #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
55 pub(crate) uring_context: Mutex<UringContext>,
56
57 #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
58 pub(crate) uring_state: AtomicUsize,
59}
60
61#[derive(Debug)]
62pub(crate) struct ReadyEvent {
63 pub(super) tick: u8,
64 pub(crate) ready: Ready,
65 pub(super) is_shutdown: bool,
66}
67
68cfg_net_unix!(
69 impl ReadyEvent {
70 pub(crate) fn with_ready(&self, ready: Ready) -> Self {
71 Self {
72 ready,
73 tick: self.tick,
74 is_shutdown: self.is_shutdown,
75 }
76 }
77 }
78);
79
80#[derive(Debug, Eq, PartialEq, Clone, Copy)]
81pub(super) enum Direction {
82 Read,
83 Write,
84}
85
86pub(super) enum Tick {
87 Set,
88 Clear(u8),
89}
90
91const TOKEN_WAKEUP: mio::Token = mio::Token(0);
92const TOKEN_SIGNAL: mio::Token = mio::Token(1);
93
94fn _assert_kinds() {
95 fn _assert<T: Send + Sync>() {}
96
97 _assert::<Handle>();
98}
99
100impl Driver {
103 pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
106 let poll = mio::Poll::new()?;
107 #[cfg(not(target_os = "wasi"))]
108 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
109 let registry = poll.registry().try_clone()?;
110
111 let driver = Driver {
112 signal_ready: false,
113 events: mio::Events::with_capacity(nevents),
114 poll,
115 };
116
117 let (registrations, synced) = RegistrationSet::new();
118
119 let handle = Handle {
120 registry,
121 registrations,
122 synced: Mutex::new(synced),
123 #[cfg(not(target_os = "wasi"))]
124 waker,
125 metrics: IoDriverMetrics::default(),
126 #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
127 uring_context: Mutex::new(UringContext::new()),
128 #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
129 uring_state: AtomicUsize::new(0),
130 };
131
132 Ok((driver, handle))
133 }
134
135 pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
136 let handle = rt_handle.io();
137 self.turn(handle, None);
138 }
139
140 pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
141 let handle = rt_handle.io();
142 self.turn(handle, Some(duration));
143 }
144
145 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
146 let handle = rt_handle.io();
147 let ios = handle.registrations.shutdown(&mut handle.synced.lock());
148
149 for io in ios {
151 io.shutdown();
152 }
153 }
154
155 fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
156 debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
157
158 handle.release_pending_registrations();
159
160 let events = &mut self.events;
161
162 match self.poll.poll(events, max_wait) {
165 Ok(()) => {}
166 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
167 #[cfg(target_os = "wasi")]
168 Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
169 }
172 Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
173 }
174
175 let mut ready_count = 0;
177 for event in events.iter() {
178 let token = event.token();
179
180 if token == TOKEN_WAKEUP {
181 } else if token == TOKEN_SIGNAL {
183 self.signal_ready = true;
184 } else {
185 let ready = Ready::from_mio(event);
186 let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
187
188 let io: &ScheduledIo = unsafe { &*ptr };
193
194 io.set_readiness(Tick::Set, |curr| curr | ready);
195 io.wake(ready);
196
197 ready_count += 1;
198 }
199 }
200
201 #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
202 {
203 let mut guard = handle.get_uring().lock();
204 let ctx = &mut *guard;
205 ctx.dispatch_completions();
206 }
207
208 handle.metrics.incr_ready_count_by(ready_count);
209 }
210}
211
212impl fmt::Debug for Driver {
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 write!(f, "Driver")
215 }
216}
217
218impl Handle {
219 pub(crate) fn unpark(&self) {
229 #[cfg(not(target_os = "wasi"))]
230 self.waker.wake().expect("failed to wake I/O driver");
231 }
232
233 pub(super) fn add_source(
237 &self,
238 source: &mut impl mio::event::Source,
239 interest: Interest,
240 ) -> io::Result<Arc<ScheduledIo>> {
241 let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
242 let token = scheduled_io.token();
243
244 if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
247 unsafe {
249 self.registrations
250 .remove(&mut self.synced.lock(), &scheduled_io)
251 };
252
253 return Err(e);
254 }
255
256 self.metrics.incr_fd_count();
258
259 Ok(scheduled_io)
260 }
261
262 pub(super) fn deregister_source(
264 &self,
265 registration: &Arc<ScheduledIo>,
266 source: &mut impl Source,
267 ) -> io::Result<()> {
268 self.registry.deregister(source)?;
270
271 if self
272 .registrations
273 .deregister(&mut self.synced.lock(), registration)
274 {
275 self.unpark();
276 }
277
278 self.metrics.dec_fd_count();
279
280 Ok(())
281 }
282
283 fn release_pending_registrations(&self) {
284 if self.registrations.needs_release() {
285 self.registrations.release(&mut self.synced.lock());
286 }
287 }
288}
289
290impl fmt::Debug for Handle {
291 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292 write!(f, "Handle")
293 }
294}
295
296impl Direction {
297 pub(super) fn mask(self) -> Ready {
298 match self {
299 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
300 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
301 }
302 }
303}