tonic/transport/channel/
mod.rs

1//! Client implementation and builder.
2
3mod endpoint;
4#[cfg(feature = "tls")]
5#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
6mod tls;
7
8pub use endpoint::Endpoint;
9#[cfg(feature = "tls")]
10pub use tls::ClientTlsConfig;
11
12use super::service::{Connection, DynamicServiceStream};
13use crate::body::BoxBody;
14use bytes::Bytes;
15use http::{
16    uri::{InvalidUri, Uri},
17    Request, Response,
18};
19use hyper::client::connect::Connection as HyperConnection;
20use std::{
21    fmt,
22    future::Future,
23    hash::Hash,
24    pin::Pin,
25    task::{Context, Poll},
26};
27use tokio::{
28    io::{AsyncRead, AsyncWrite},
29    sync::mpsc::{channel, Sender},
30};
31
32use tower::balance::p2c::Balance;
33use tower::{
34    buffer::{self, Buffer},
35    discover::{Change, Discover},
36    util::{BoxService, Either},
37    Service,
38};
39
40type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<hyper::Body>, crate::Error>>;
41
42const DEFAULT_BUFFER_SIZE: usize = 1024;
43
44/// A default batteries included `transport` channel.
45///
46/// This provides a fully featured http2 gRPC client based on [`hyper::Client`]
47/// and `tower` services.
48///
49/// # Multiplexing requests
50///
51/// Sending a request on a channel requires a `&mut self` and thus can only send
52/// one request in flight. This is intentional and is required to follow the `Service`
53/// contract from the `tower` library which this channel implementation is built on
54/// top of.
55///
56/// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply
57/// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready`
58/// we know the `Service` is able to accept only one request before we must `poll_ready`
59/// again. Due to this fact any `async fn` that wants to poll for readiness and submit
60/// the request must have a `&mut self` reference.
61///
62/// To work around this and to ease the use of the channel, `Channel` provides a
63/// `Clone` implementation that is _cheap_. This is because at the very top level
64/// the channel is backed by a `tower_buffer::Buffer` which runs the connection
65/// in a background task and provides a `mpsc` channel interface. Due to this
66/// cloning the `Channel` type is cheap and encouraged.
67#[derive(Clone)]
68pub struct Channel {
69    svc: Buffer<Svc, Request<BoxBody>>,
70}
71
72/// A future that resolves to an HTTP response.
73///
74/// This is returned by the `Service::call` on [`Channel`].
75pub struct ResponseFuture {
76    inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>,
77}
78
79impl Channel {
80    /// Create an [`Endpoint`] builder that can create [`Channel`]s.
81    pub fn builder(uri: Uri) -> Endpoint {
82        Endpoint::from(uri)
83    }
84
85    /// Create an `Endpoint` from a static string.
86    ///
87    /// ```
88    /// # use tonic::transport::Channel;
89    /// Channel::from_static("https://example.com");
90    /// ```
91    pub fn from_static(s: &'static str) -> Endpoint {
92        let uri = Uri::from_static(s);
93        Self::builder(uri)
94    }
95
96    /// Create an `Endpoint` from shared bytes.
97    ///
98    /// ```
99    /// # use tonic::transport::Channel;
100    /// Channel::from_shared("https://example.com");
101    /// ```
102    pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
103        let uri = Uri::from_maybe_shared(s.into())?;
104        Ok(Self::builder(uri))
105    }
106
107    /// Balance a list of [`Endpoint`]'s.
108    ///
109    /// This creates a [`Channel`] that will load balance accross all the
110    /// provided endpoints.
111    pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
112        let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
113        list.for_each(|endpoint| {
114            tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
115                .unwrap();
116        });
117
118        channel
119    }
120
121    /// Balance a list of [`Endpoint`]'s.
122    ///
123    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
124    pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
125    where
126        K: Hash + Eq + Send + Clone + 'static,
127    {
128        let (tx, rx) = channel(capacity);
129        let list = DynamicServiceStream::new(rx);
130        (Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
131    }
132
133    pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
134    where
135        C: Service<Uri> + Send + 'static,
136        C::Error: Into<crate::Error> + Send,
137        C::Future: Unpin + Send,
138        C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
139    {
140        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
141
142        let svc = Connection::lazy(connector, endpoint);
143        let svc = Buffer::new(Either::A(svc), buffer_size);
144
145        Channel { svc }
146    }
147
148    pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
149    where
150        C: Service<Uri> + Send + 'static,
151        C::Error: Into<crate::Error> + Send,
152        C::Future: Unpin + Send,
153        C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
154    {
155        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
156
157        let svc = Connection::connect(connector, endpoint)
158            .await
159            .map_err(super::Error::from_source)?;
160        let svc = Buffer::new(Either::A(svc), buffer_size);
161
162        Ok(Channel { svc })
163    }
164
165    pub(crate) fn balance<D>(discover: D, buffer_size: usize) -> Self
166    where
167        D: Discover<Service = Connection> + Unpin + Send + 'static,
168        D::Error: Into<crate::Error>,
169        D::Key: Hash + Send + Clone,
170    {
171        let svc = Balance::new(discover);
172
173        let svc = BoxService::new(svc);
174        let svc = Buffer::new(Either::B(svc), buffer_size);
175
176        Channel { svc }
177    }
178}
179
180impl Service<http::Request<BoxBody>> for Channel {
181    type Response = http::Response<super::Body>;
182    type Error = super::Error;
183    type Future = ResponseFuture;
184
185    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186        Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
187    }
188
189    fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
190        let inner = Service::call(&mut self.svc, request);
191
192        ResponseFuture { inner }
193    }
194}
195
196impl Future for ResponseFuture {
197    type Output = Result<Response<hyper::Body>, super::Error>;
198
199    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
200        let val = futures_util::ready!(Pin::new(&mut self.inner).poll(cx))
201            .map_err(super::Error::from_source)?;
202        Ok(val).into()
203    }
204}
205
206impl fmt::Debug for Channel {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        f.debug_struct("Channel").finish()
209    }
210}
211
212impl fmt::Debug for ResponseFuture {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        f.debug_struct("ResponseFuture").finish()
215    }
216}