tower/util/call_all/
common.rs

1use futures_core::{ready, Stream};
2use pin_project_lite::pin_project;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tower_service::Service;
9
10pin_project! {
11    /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator.
12    #[derive(Debug)]
13    pub(crate) struct CallAll<Svc, S, Q> {
14        service: Option<Svc>,
15        #[pin]
16        stream: S,
17        queue: Q,
18        eof: bool,
19    }
20}
21
22pub(crate) trait Drive<F: Future> {
23    fn is_empty(&self) -> bool;
24
25    fn push(&mut self, future: F);
26
27    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
28}
29
30impl<Svc, S, Q> CallAll<Svc, S, Q>
31where
32    Svc: Service<S::Item>,
33    Svc::Error: Into<crate::BoxError>,
34    S: Stream,
35    Q: Drive<Svc::Future>,
36{
37    pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
38        CallAll {
39            service: Some(service),
40            stream,
41            queue,
42            eof: false,
43        }
44    }
45
46    /// Extract the wrapped [`Service`].
47    pub(crate) fn into_inner(mut self) -> Svc {
48        self.service.take().expect("Service already taken")
49    }
50
51    /// Extract the wrapped [`Service`].
52    pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc {
53        self.project()
54            .service
55            .take()
56            .expect("Service already taken")
57    }
58
59    pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
60        assert!(self.queue.is_empty() && !self.eof);
61
62        super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
63    }
64}
65
66impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
67where
68    Svc: Service<S::Item>,
69    Svc::Error: Into<crate::BoxError>,
70    S: Stream,
71    Q: Drive<Svc::Future>,
72{
73    type Item = Result<Svc::Response, crate::BoxError>;
74
75    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        let mut this = self.project();
77
78        loop {
79            // First, see if we have any responses to yield
80            if let Poll::Ready(r) = this.queue.poll(cx) {
81                if let Some(rsp) = r.transpose().map_err(Into::into)? {
82                    return Poll::Ready(Some(Ok(rsp)));
83                }
84            }
85
86            // If there are no more requests coming, check if we're done
87            if *this.eof {
88                if this.queue.is_empty() {
89                    return Poll::Ready(None);
90                } else {
91                    return Poll::Pending;
92                }
93            }
94
95            // Then, see that the service is ready for another request
96            let svc = this
97                .service
98                .as_mut()
99                .expect("Using CallAll after extracing inner Service");
100            ready!(svc.poll_ready(cx)).map_err(Into::into)?;
101
102            // If it is, gather the next request (if there is one), or return `Pending` if the
103            // stream is not ready.
104            // TODO: We probably want to "release" the slot we reserved in Svc if the
105            // stream returns `Pending`. It may be a while until we get around to actually
106            // using it.
107            match ready!(this.stream.as_mut().poll_next(cx)) {
108                Some(req) => {
109                    this.queue.push(svc.call(req));
110                }
111                None => {
112                    // We're all done once any outstanding requests have completed
113                    *this.eof = true;
114                }
115            }
116        }
117    }
118}