tower/util/call_all/
ordered.rs

1//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
2//!
3//! [`Service<Request>`]: crate::Service
4//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
5
6use super::common;
7use futures_core::Stream;
8use futures_util::stream::FuturesOrdered;
9use pin_project_lite::pin_project;
10use std::{
11    future::Future,
12    pin::Pin,
13    task::{Context, Poll},
14};
15use tower_service::Service;
16
17pin_project! {
18    /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
19    /// request received on the wrapped [`Stream`].
20    ///
21    /// ```rust
22    /// # use std::task::{Poll, Context};
23    /// # use std::cell::Cell;
24    /// # use std::error::Error;
25    /// # use std::rc::Rc;
26    /// #
27    /// use futures::future::{ready, Ready};
28    /// use futures::StreamExt;
29    /// use futures::channel::mpsc;
30    /// use tower_service::Service;
31    /// use tower::util::ServiceExt;
32    ///
33    /// // First, we need to have a Service to process our requests.
34    /// #[derive(Debug, Eq, PartialEq)]
35    /// struct FirstLetter;
36    /// impl Service<&'static str> for FirstLetter {
37    ///      type Response = &'static str;
38    ///      type Error = Box<dyn Error + Send + Sync>;
39    ///      type Future = Ready<Result<Self::Response, Self::Error>>;
40    ///
41    ///      fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42    ///          Poll::Ready(Ok(()))
43    ///      }
44    ///
45    ///      fn call(&mut self, req: &'static str) -> Self::Future {
46    ///          ready(Ok(&req[..1]))
47    ///      }
48    /// }
49    ///
50    /// #[tokio::main]
51    /// async fn main() {
52    ///     // Next, we need a Stream of requests.
53    // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
54    //              tokio::sync::mpsc again?
55    ///     let (mut reqs, rx) = mpsc::unbounded();
56    ///     // Note that we have to help Rust out here by telling it what error type to use.
57    ///     // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
58    ///     let mut rsps = FirstLetter.call_all(rx);
59    ///
60    ///     // Now, let's send a few requests and then check that we get the corresponding responses.
61    ///     reqs.unbounded_send("one").unwrap();
62    ///     reqs.unbounded_send("two").unwrap();
63    ///     reqs.unbounded_send("three").unwrap();
64    ///     drop(reqs);
65    ///
66    ///     // We then loop over the response Strem that we get back from call_all.
67    ///     let mut i = 0usize;
68    ///     while let Some(rsp) = rsps.next().await {
69    ///         // Each response is a Result (we could also have used TryStream::try_next)
70    ///         match (i + 1, rsp.unwrap()) {
71    ///             (1, "o") |
72    ///             (2, "t") |
73    ///             (3, "t") => {}
74    ///             (n, i) => {
75    ///                 unreachable!("{}. response was '{}'", n, i);
76    ///             }
77    ///         }
78    ///         i += 1;
79    ///     }
80    ///
81    ///     // And at the end, we can get the Service back when there are no more requests.
82    ///     assert_eq!(rsps.into_inner(), FirstLetter);
83    /// }
84    /// ```
85    ///
86    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
87    #[derive(Debug)]
88    pub struct CallAll<Svc, S>
89    where
90        Svc: Service<S::Item>,
91        S: Stream,
92    {
93        #[pin]
94        inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
95    }
96}
97
98impl<Svc, S> CallAll<Svc, S>
99where
100    Svc: Service<S::Item>,
101    Svc::Error: Into<crate::BoxError>,
102    S: Stream,
103{
104    /// Create new [`CallAll`] combinator.
105    ///
106    /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are
107    /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`].
108    ///
109    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
110    pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
111        CallAll {
112            inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
113        }
114    }
115
116    /// Extract the wrapped [`Service`].
117    ///
118    /// # Panics
119    ///
120    /// Panics if [`take_service`] was already called.
121    ///
122    /// [`take_service`]: crate::util::CallAll::take_service
123    pub fn into_inner(self) -> Svc {
124        self.inner.into_inner()
125    }
126
127    /// Extract the wrapped [`Service`].
128    ///
129    /// This [`CallAll`] can no longer be used after this function has been called.
130    ///
131    /// # Panics
132    ///
133    /// Panics if [`take_service`] was already called.
134    ///
135    /// [`take_service`]: crate::util::CallAll::take_service
136    pub fn take_service(self: Pin<&mut Self>) -> Svc {
137        self.project().inner.take_service()
138    }
139
140    /// Return responses as they are ready, regardless of the initial order.
141    ///
142    /// This function must be called before the stream is polled.
143    ///
144    /// # Panics
145    ///
146    /// Panics if [`poll`] was called.
147    ///
148    /// [`poll`]: std::future::Future::poll
149    pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
150        self.inner.unordered()
151    }
152}
153
154impl<Svc, S> Stream for CallAll<Svc, S>
155where
156    Svc: Service<S::Item>,
157    Svc::Error: Into<crate::BoxError>,
158    S: Stream,
159{
160    type Item = Result<Svc::Response, crate::BoxError>;
161
162    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
163        self.project().inner.poll_next(cx)
164    }
165}
166
167impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
168    fn is_empty(&self) -> bool {
169        FuturesOrdered::is_empty(self)
170    }
171
172    fn push(&mut self, future: F) {
173        FuturesOrdered::push(self, future)
174    }
175
176    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
177        Stream::poll_next(Pin::new(self), cx)
178    }
179}