tonic/transport/channel/
mod.rs1mod endpoint;
4#[cfg(feature = "tls")]
5#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
6mod tls;
7
8pub use endpoint::Endpoint;
9#[cfg(feature = "tls")]
10pub use tls::ClientTlsConfig;
11
12use super::service::{Connection, DynamicServiceStream};
13use crate::body::BoxBody;
14use bytes::Bytes;
15use http::{
16 uri::{InvalidUri, Uri},
17 Request, Response,
18};
19use hyper::client::connect::Connection as HyperConnection;
20use std::{
21 fmt,
22 future::Future,
23 hash::Hash,
24 pin::Pin,
25 task::{Context, Poll},
26};
27use tokio::{
28 io::{AsyncRead, AsyncWrite},
29 sync::mpsc::{channel, Sender},
30};
31
32use tower::balance::p2c::Balance;
33use tower::{
34 buffer::{self, Buffer},
35 discover::{Change, Discover},
36 util::{BoxService, Either},
37 Service,
38};
39
40type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<hyper::Body>, crate::Error>>;
41
42const DEFAULT_BUFFER_SIZE: usize = 1024;
43
44#[derive(Clone)]
68pub struct Channel {
69 svc: Buffer<Svc, Request<BoxBody>>,
70}
71
72pub struct ResponseFuture {
76 inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>,
77}
78
79impl Channel {
80 pub fn builder(uri: Uri) -> Endpoint {
82 Endpoint::from(uri)
83 }
84
85 pub fn from_static(s: &'static str) -> Endpoint {
92 let uri = Uri::from_static(s);
93 Self::builder(uri)
94 }
95
96 pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
103 let uri = Uri::from_maybe_shared(s.into())?;
104 Ok(Self::builder(uri))
105 }
106
107 pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
112 let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
113 list.for_each(|endpoint| {
114 tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
115 .unwrap();
116 });
117
118 channel
119 }
120
121 pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
125 where
126 K: Hash + Eq + Send + Clone + 'static,
127 {
128 let (tx, rx) = channel(capacity);
129 let list = DynamicServiceStream::new(rx);
130 (Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
131 }
132
133 pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
134 where
135 C: Service<Uri> + Send + 'static,
136 C::Error: Into<crate::Error> + Send,
137 C::Future: Unpin + Send,
138 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
139 {
140 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
141
142 let svc = Connection::lazy(connector, endpoint);
143 let svc = Buffer::new(Either::A(svc), buffer_size);
144
145 Channel { svc }
146 }
147
148 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
149 where
150 C: Service<Uri> + Send + 'static,
151 C::Error: Into<crate::Error> + Send,
152 C::Future: Unpin + Send,
153 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
154 {
155 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
156
157 let svc = Connection::connect(connector, endpoint)
158 .await
159 .map_err(super::Error::from_source)?;
160 let svc = Buffer::new(Either::A(svc), buffer_size);
161
162 Ok(Channel { svc })
163 }
164
165 pub(crate) fn balance<D>(discover: D, buffer_size: usize) -> Self
166 where
167 D: Discover<Service = Connection> + Unpin + Send + 'static,
168 D::Error: Into<crate::Error>,
169 D::Key: Hash + Send + Clone,
170 {
171 let svc = Balance::new(discover);
172
173 let svc = BoxService::new(svc);
174 let svc = Buffer::new(Either::B(svc), buffer_size);
175
176 Channel { svc }
177 }
178}
179
180impl Service<http::Request<BoxBody>> for Channel {
181 type Response = http::Response<super::Body>;
182 type Error = super::Error;
183 type Future = ResponseFuture;
184
185 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186 Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
187 }
188
189 fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
190 let inner = Service::call(&mut self.svc, request);
191
192 ResponseFuture { inner }
193 }
194}
195
196impl Future for ResponseFuture {
197 type Output = Result<Response<hyper::Body>, super::Error>;
198
199 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
200 let val = futures_util::ready!(Pin::new(&mut self.inner).poll(cx))
201 .map_err(super::Error::from_source)?;
202 Ok(val).into()
203 }
204}
205
206impl fmt::Debug for Channel {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 f.debug_struct("Channel").finish()
209 }
210}
211
212impl fmt::Debug for ResponseFuture {
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 f.debug_struct("ResponseFuture").finish()
215 }
216}