tokio/runtime/io/scheduled_io.rs
1use crate::io::interest::Interest;
2use crate::io::ready::Ready;
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::Mutex;
5use crate::runtime::io::{Direction, ReadyEvent, Tick};
6use crate::util::bit;
7use crate::util::linked_list::{self, LinkedList};
8use crate::util::WakeList;
9
10use std::cell::UnsafeCell;
11use std::future::Future;
12use std::marker::PhantomPinned;
13use std::pin::Pin;
14use std::ptr::NonNull;
15use std::sync::atomic::Ordering::{AcqRel, Acquire};
16use std::task::{Context, Poll, Waker};
17
18/// Stored in the I/O driver resource slab.
19#[derive(Debug)]
20// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
21// from crossbeam-utils/src/cache_padded.rs
22//
23// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
24// lines at a time, so we have to align to 128 bytes rather than 64.
25//
26// Sources:
27// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
28// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
29//
30// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
31//
32// Sources:
33// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
34//
35// powerpc64 has 128-byte cache line size.
36//
37// Sources:
38// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
39#[cfg_attr(
40 any(
41 target_arch = "x86_64",
42 target_arch = "aarch64",
43 target_arch = "powerpc64",
44 ),
45 repr(align(128))
46)]
47// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
48//
49// Sources:
50// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
51// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
52// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
53// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
54// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
55// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
56#[cfg_attr(
57 any(
58 target_arch = "arm",
59 target_arch = "mips",
60 target_arch = "mips64",
61 target_arch = "sparc",
62 target_arch = "hexagon",
63 ),
64 repr(align(32))
65)]
66// m68k has 16-byte cache line size.
67//
68// Sources:
69// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
70#[cfg_attr(target_arch = "m68k", repr(align(16)))]
71// s390x has 256-byte cache line size.
72//
73// Sources:
74// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
75// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
76#[cfg_attr(target_arch = "s390x", repr(align(256)))]
77// x86, riscv, wasm, and sparc64 have 64-byte cache line size.
78//
79// Sources:
80// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
81// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
82// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
83// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
84//
85// All others are assumed to have 64-byte cache line size.
86#[cfg_attr(
87 not(any(
88 target_arch = "x86_64",
89 target_arch = "aarch64",
90 target_arch = "powerpc64",
91 target_arch = "arm",
92 target_arch = "mips",
93 target_arch = "mips64",
94 target_arch = "sparc",
95 target_arch = "hexagon",
96 target_arch = "m68k",
97 target_arch = "s390x",
98 )),
99 repr(align(64))
100)]
101pub(crate) struct ScheduledIo {
102 pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
103
104 /// Packs the resource's readiness and I/O driver latest tick.
105 readiness: AtomicUsize,
106
107 waiters: Mutex<Waiters>,
108}
109
110type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
111
112#[derive(Debug, Default)]
113struct Waiters {
114 /// List of all current waiters.
115 list: WaitList,
116
117 /// Waker used for `AsyncRead`.
118 reader: Option<Waker>,
119
120 /// Waker used for `AsyncWrite`.
121 writer: Option<Waker>,
122}
123
124#[derive(Debug)]
125struct Waiter {
126 pointers: linked_list::Pointers<Waiter>,
127
128 /// The waker for this task.
129 waker: Option<Waker>,
130
131 /// The interest this waiter is waiting on.
132 interest: Interest,
133
134 is_ready: bool,
135
136 /// Should never be `Unpin`.
137 _p: PhantomPinned,
138}
139
140generate_addr_of_methods! {
141 impl<> Waiter {
142 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
143 &self.pointers
144 }
145 }
146}
147
148/// Future returned by `readiness()`.
149struct Readiness<'a> {
150 scheduled_io: &'a ScheduledIo,
151
152 state: State,
153
154 /// Entry in the waiter `LinkedList`.
155 waiter: UnsafeCell<Waiter>,
156}
157
158enum State {
159 Init,
160 Waiting,
161 Done,
162}
163
164// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
165//
166// | shutdown | driver tick | readiness |
167// |----------+-------------+-----------|
168// | 1 bit | 15 bits | 16 bits |
169
170const READINESS: bit::Pack = bit::Pack::least_significant(16);
171
172const TICK: bit::Pack = READINESS.then(15);
173
174const SHUTDOWN: bit::Pack = TICK.then(1);
175
176// ===== impl ScheduledIo =====
177
178impl Default for ScheduledIo {
179 fn default() -> ScheduledIo {
180 ScheduledIo {
181 linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
182 readiness: AtomicUsize::new(0),
183 waiters: Mutex::new(Waiters::default()),
184 }
185 }
186}
187
188impl ScheduledIo {
189 pub(crate) fn token(&self) -> mio::Token {
190 mio::Token(super::EXPOSE_IO.expose_provenance(self))
191 }
192
193 /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a
194 /// permanently shutdown state.
195 pub(super) fn shutdown(&self) {
196 let mask = SHUTDOWN.pack(1, 0);
197 self.readiness.fetch_or(mask, AcqRel);
198 self.wake(Ready::ALL);
199 }
200
201 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
202 /// the current value, returning the previous readiness value.
203 ///
204 /// # Arguments
205 /// - `tick`: whether setting the tick or trying to clear readiness for a
206 /// specific tick.
207 /// - `f`: a closure returning a new readiness value given the previous
208 /// readiness.
209 pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
210 let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
211 // If the io driver is shut down, then you are only allowed to clear readiness.
212 debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));
213
214 const MAX_TICK: usize = TICK.max_value() + 1;
215 let tick = TICK.unpack(curr);
216
217 let new_tick = match tick_op {
218 // Trying to clear readiness with an old event!
219 Tick::Clear(t) if tick as u8 != t => return None,
220 Tick::Clear(t) => t as usize,
221 Tick::Set => tick.wrapping_add(1) % MAX_TICK,
222 };
223 let ready = Ready::from_usize(READINESS.unpack(curr));
224 Some(TICK.pack(new_tick, f(ready).as_usize()))
225 });
226 }
227
228 /// Notifies all pending waiters that have registered interest in `ready`.
229 ///
230 /// There may be many waiters to notify. Waking the pending task **must** be
231 /// done from outside of the lock otherwise there is a potential for a
232 /// deadlock.
233 ///
234 /// A stack array of wakers is created and filled with wakers to notify, the
235 /// lock is released, and the wakers are notified. Because there may be more
236 /// than 32 wakers to notify, if the stack array fills up, the lock is
237 /// released, the array is cleared, and the iteration continues.
238 pub(super) fn wake(&self, ready: Ready) {
239 let mut wakers = WakeList::new();
240
241 let mut waiters = self.waiters.lock();
242
243 // check for AsyncRead slot
244 if ready.is_readable() {
245 if let Some(waker) = waiters.reader.take() {
246 wakers.push(waker);
247 }
248 }
249
250 // check for AsyncWrite slot
251 if ready.is_writable() {
252 if let Some(waker) = waiters.writer.take() {
253 wakers.push(waker);
254 }
255 }
256
257 'outer: loop {
258 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
259
260 while wakers.can_push() {
261 match iter.next() {
262 Some(waiter) => {
263 let waiter = unsafe { &mut *waiter.as_ptr() };
264
265 if let Some(waker) = waiter.waker.take() {
266 waiter.is_ready = true;
267 wakers.push(waker);
268 }
269 }
270 None => {
271 break 'outer;
272 }
273 }
274 }
275
276 drop(waiters);
277
278 wakers.wake_all();
279
280 // Acquire the lock again.
281 waiters = self.waiters.lock();
282 }
283
284 // Release the lock before notifying
285 drop(waiters);
286
287 wakers.wake_all();
288 }
289
290 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
291 let curr = self.readiness.load(Acquire);
292
293 ReadyEvent {
294 tick: TICK.unpack(curr) as u8,
295 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
296 is_shutdown: SHUTDOWN.unpack(curr) != 0,
297 }
298 }
299
300 /// Polls for readiness events in a given direction.
301 ///
302 /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
303 /// which cannot use the `async fn` version. This uses reserved reader
304 /// and writer slots.
305 pub(super) fn poll_readiness(
306 &self,
307 cx: &mut Context<'_>,
308 direction: Direction,
309 ) -> Poll<ReadyEvent> {
310 let curr = self.readiness.load(Acquire);
311
312 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
313 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
314
315 if ready.is_empty() && !is_shutdown {
316 // Update the task info
317 let mut waiters = self.waiters.lock();
318 let waker = match direction {
319 Direction::Read => &mut waiters.reader,
320 Direction::Write => &mut waiters.writer,
321 };
322
323 // Avoid cloning the waker if one is already stored that matches the
324 // current task.
325 match waker {
326 Some(waker) => waker.clone_from(cx.waker()),
327 None => *waker = Some(cx.waker().clone()),
328 }
329
330 // Try again, in case the readiness was changed while we were
331 // taking the waiters lock
332 let curr = self.readiness.load(Acquire);
333 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
334 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
335 if is_shutdown {
336 Poll::Ready(ReadyEvent {
337 tick: TICK.unpack(curr) as u8,
338 ready: direction.mask(),
339 is_shutdown,
340 })
341 } else if ready.is_empty() {
342 Poll::Pending
343 } else {
344 Poll::Ready(ReadyEvent {
345 tick: TICK.unpack(curr) as u8,
346 ready,
347 is_shutdown,
348 })
349 }
350 } else {
351 Poll::Ready(ReadyEvent {
352 tick: TICK.unpack(curr) as u8,
353 ready,
354 is_shutdown,
355 })
356 }
357 }
358
359 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
360 // This consumes the current readiness state **except** for closed
361 // states. Closed states are excluded because they are final states.
362 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
363 self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
364 }
365
366 pub(crate) fn clear_wakers(&self) {
367 let mut waiters = self.waiters.lock();
368 waiters.reader.take();
369 waiters.writer.take();
370 }
371}
372
373impl Drop for ScheduledIo {
374 fn drop(&mut self) {
375 self.wake(Ready::ALL);
376 }
377}
378
379unsafe impl Send for ScheduledIo {}
380unsafe impl Sync for ScheduledIo {}
381
382impl ScheduledIo {
383 /// An async version of `poll_readiness` which uses a linked list of wakers.
384 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
385 self.readiness_fut(interest).await
386 }
387
388 // This is in a separate function so that the borrow checker doesn't think
389 // we are borrowing the `UnsafeCell` possibly over await boundaries.
390 //
391 // Go figure.
392 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
393 Readiness {
394 scheduled_io: self,
395 state: State::Init,
396 waiter: UnsafeCell::new(Waiter {
397 pointers: linked_list::Pointers::new(),
398 waker: None,
399 is_ready: false,
400 interest,
401 _p: PhantomPinned,
402 }),
403 }
404 }
405}
406
407unsafe impl linked_list::Link for Waiter {
408 type Handle = NonNull<Waiter>;
409 type Target = Waiter;
410
411 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
412 *handle
413 }
414
415 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
416 ptr
417 }
418
419 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
420 Waiter::addr_of_pointers(target)
421 }
422}
423
424// ===== impl Readiness =====
425
426impl Future for Readiness<'_> {
427 type Output = ReadyEvent;
428
429 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
430 use std::sync::atomic::Ordering::SeqCst;
431
432 let (scheduled_io, state, waiter) = {
433 // Safety: `Self` is `!Unpin`
434 //
435 // While we could use `pin_project!` to remove
436 // this unsafe block, there are already unsafe blocks here,
437 // so it wouldn't significantly ease the mental burden
438 // and would actually complicate the code.
439 // That's why we didn't use it.
440 let me = unsafe { self.get_unchecked_mut() };
441 (me.scheduled_io, &mut me.state, &me.waiter)
442 };
443
444 loop {
445 match *state {
446 State::Init => {
447 // Optimistically check existing readiness
448 let curr = scheduled_io.readiness.load(SeqCst);
449 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
450
451 // Safety: `waiter.interest` never changes
452 let interest = unsafe { (*waiter.get()).interest };
453 let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
454
455 if !ready.is_empty() || is_shutdown {
456 // Currently ready!
457 let tick = TICK.unpack(curr) as u8;
458 *state = State::Done;
459 return Poll::Ready(ReadyEvent {
460 tick,
461 ready,
462 is_shutdown,
463 });
464 }
465
466 // Wasn't ready, take the lock (and check again while locked).
467 let mut waiters = scheduled_io.waiters.lock();
468
469 let curr = scheduled_io.readiness.load(SeqCst);
470 let mut ready = Ready::from_usize(READINESS.unpack(curr));
471 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
472
473 if is_shutdown {
474 ready = Ready::ALL;
475 }
476
477 let ready = ready.intersection(interest);
478
479 if !ready.is_empty() || is_shutdown {
480 // Currently ready!
481 let tick = TICK.unpack(curr) as u8;
482 *state = State::Done;
483 return Poll::Ready(ReadyEvent {
484 tick,
485 ready,
486 is_shutdown,
487 });
488 }
489
490 // Not ready even after locked, insert into list...
491
492 // Safety: Since the `waiter` is not in the intrusive list yet,
493 // we have exclusive access to it. The Mutex ensures
494 // that this modification is visible to other threads that
495 // acquire the same Mutex.
496 let waker = unsafe { &mut (*waiter.get()).waker };
497 let old = waker.replace(cx.waker().clone());
498 debug_assert!(old.is_none(), "waker should be None at the first poll");
499
500 // Insert the waiter into the linked list
501 //
502 // safety: pointers from `UnsafeCell` are never null.
503 waiters
504 .list
505 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
506 *state = State::Waiting;
507 }
508 State::Waiting => {
509 // Currently in the "Waiting" state, implying the caller has
510 // a waiter stored in the waiter list (guarded by
511 // `notify.waiters`). In order to access the waker fields,
512 // we must hold the lock.
513
514 let waiters = scheduled_io.waiters.lock();
515
516 // Safety: With the lock held, we have exclusive access to
517 // the waiter. In other words, `ScheduledIo::wake()`
518 // cannot access the waiter concurrently.
519 let w = unsafe { &mut *waiter.get() };
520
521 if w.is_ready {
522 // Our waker has been notified.
523 *state = State::Done;
524 } else {
525 // Update the waker, if necessary.
526 w.waker.as_mut().unwrap().clone_from(cx.waker());
527 return Poll::Pending;
528 }
529
530 // Explicit drop of the lock to indicate the scope that the
531 // lock is held. Because holding the lock is required to
532 // ensure safe access to fields not held within the lock, it
533 // is helpful to visualize the scope of the critical
534 // section.
535 drop(waiters);
536 }
537 State::Done => {
538 let curr = scheduled_io.readiness.load(Acquire);
539 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
540
541 // The returned tick might be newer than the event
542 // which notified our waker. This is ok because the future
543 // still didn't return `Poll::Ready`.
544 let tick = TICK.unpack(curr) as u8;
545
546 // Safety: We don't need to acquire the lock here because
547 // 1. `State::Done`` means `waiter` is no longer shared,
548 // this means no concurrent access to `waiter` can happen
549 // at this point.
550 // 2. `waiter.interest` is never changed, this means
551 // no side effects need to be synchronized by the lock.
552 let interest = unsafe { (*waiter.get()).interest };
553 // The readiness state could have been cleared in the meantime,
554 // but we allow the returned ready set to be empty.
555 let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
556
557 return Poll::Ready(ReadyEvent {
558 tick,
559 ready,
560 is_shutdown,
561 });
562 }
563 }
564 }
565 }
566}
567
568impl Drop for Readiness<'_> {
569 fn drop(&mut self) {
570 let mut waiters = self.scheduled_io.waiters.lock();
571
572 // Safety: `waiter` is only ever stored in `waiters`
573 unsafe {
574 waiters
575 .list
576 .remove(NonNull::new_unchecked(self.waiter.get()))
577 };
578 }
579}
580
581unsafe impl Send for Readiness<'_> {}
582unsafe impl Sync for Readiness<'_> {}