tower/util/call_all/
common.rs1use futures_core::{ready, Stream};
2use pin_project_lite::pin_project;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tower_service::Service;
9
10pin_project! {
11 #[derive(Debug)]
13 pub(crate) struct CallAll<Svc, S, Q> {
14 service: Option<Svc>,
15 #[pin]
16 stream: S,
17 queue: Q,
18 eof: bool,
19 }
20}
21
22pub(crate) trait Drive<F: Future> {
23 fn is_empty(&self) -> bool;
24
25 fn push(&mut self, future: F);
26
27 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
28}
29
30impl<Svc, S, Q> CallAll<Svc, S, Q>
31where
32 Svc: Service<S::Item>,
33 Svc::Error: Into<crate::BoxError>,
34 S: Stream,
35 Q: Drive<Svc::Future>,
36{
37 pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
38 CallAll {
39 service: Some(service),
40 stream,
41 queue,
42 eof: false,
43 }
44 }
45
46 pub(crate) fn into_inner(mut self) -> Svc {
48 self.service.take().expect("Service already taken")
49 }
50
51 pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc {
53 self.project()
54 .service
55 .take()
56 .expect("Service already taken")
57 }
58
59 pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
60 assert!(self.queue.is_empty() && !self.eof);
61
62 super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
63 }
64}
65
66impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
67where
68 Svc: Service<S::Item>,
69 Svc::Error: Into<crate::BoxError>,
70 S: Stream,
71 Q: Drive<Svc::Future>,
72{
73 type Item = Result<Svc::Response, crate::BoxError>;
74
75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 let mut this = self.project();
77
78 loop {
79 if let Poll::Ready(r) = this.queue.poll(cx) {
81 if let Some(rsp) = r.transpose().map_err(Into::into)? {
82 return Poll::Ready(Some(Ok(rsp)));
83 }
84 }
85
86 if *this.eof {
88 if this.queue.is_empty() {
89 return Poll::Ready(None);
90 } else {
91 return Poll::Pending;
92 }
93 }
94
95 let svc = this
97 .service
98 .as_mut()
99 .expect("Using CallAll after extracing inner Service");
100 ready!(svc.poll_ready(cx)).map_err(Into::into)?;
101
102 match ready!(this.stream.as_mut().poll_next(cx)) {
108 Some(req) => {
109 this.queue.push(svc.call(req));
110 }
111 None => {
112 *this.eof = true;
114 }
115 }
116 }
117 }
118}