tonic/transport/channel/
endpoint.rs

1use super::super::service;
2use super::Channel;
3#[cfg(feature = "tls")]
4use super::ClientTlsConfig;
5#[cfg(feature = "tls")]
6use crate::transport::service::TlsConnector;
7use crate::transport::Error;
8use bytes::Bytes;
9use http::{
10    uri::{InvalidUri, Uri},
11    HeaderValue,
12};
13use std::{
14    convert::{TryFrom, TryInto},
15    fmt,
16    str::FromStr,
17    time::Duration,
18};
19use tower::make::MakeConnection;
20
21/// Channel builder.
22///
23/// This struct is used to build and configure HTTP/2 channels.
24#[derive(Clone)]
25pub struct Endpoint {
26    pub(crate) uri: Uri,
27    pub(crate) user_agent: Option<HeaderValue>,
28    pub(crate) timeout: Option<Duration>,
29    pub(crate) concurrency_limit: Option<usize>,
30    pub(crate) rate_limit: Option<(u64, Duration)>,
31    #[cfg(feature = "tls")]
32    pub(crate) tls: Option<TlsConnector>,
33    pub(crate) buffer_size: Option<usize>,
34    pub(crate) init_stream_window_size: Option<u32>,
35    pub(crate) init_connection_window_size: Option<u32>,
36    pub(crate) tcp_keepalive: Option<Duration>,
37    pub(crate) tcp_nodelay: bool,
38    pub(crate) http2_keep_alive_interval: Option<Duration>,
39    pub(crate) http2_keep_alive_timeout: Option<Duration>,
40    pub(crate) http2_keep_alive_while_idle: Option<bool>,
41    pub(crate) connect_timeout: Option<Duration>,
42    pub(crate) http2_adaptive_window: Option<bool>,
43}
44
45impl Endpoint {
46    // FIXME: determine if we want to expose this or not. This is really
47    // just used in codegen for a shortcut.
48    #[doc(hidden)]
49    pub fn new<D>(dst: D) -> Result<Self, Error>
50    where
51        D: TryInto<Self>,
52        D::Error: Into<crate::Error>,
53    {
54        let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
55        Ok(me)
56    }
57
58    /// Convert an `Endpoint` from a static string.
59    ///
60    /// # Panics
61    ///
62    /// This function panics if the argument is an invalid URI.
63    ///
64    /// ```
65    /// # use tonic::transport::Endpoint;
66    /// Endpoint::from_static("https://example.com");
67    /// ```
68    pub fn from_static(s: &'static str) -> Self {
69        let uri = Uri::from_static(s);
70        Self::from(uri)
71    }
72
73    /// Convert an `Endpoint` from shared bytes.
74    ///
75    /// ```
76    /// # use tonic::transport::Endpoint;
77    /// Endpoint::from_shared("https://example.com".to_string());
78    /// ```
79    pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, InvalidUri> {
80        let uri = Uri::from_maybe_shared(s.into())?;
81        Ok(Self::from(uri))
82    }
83
84    /// Set a custom user-agent header.
85    ///
86    /// `user_agent` will be prepended to Tonic's default user-agent string (`tonic/x.x.x`).
87    /// It must be a value that can be converted into a valid  `http::HeaderValue` or building
88    /// the endpoint will fail.
89    /// ```
90    /// # use tonic::transport::Endpoint;
91    /// # let mut builder = Endpoint::from_static("https://example.com");
92    /// builder.user_agent("Greeter").expect("Greeter should be a valid header value");
93    /// // user-agent: "Greeter tonic/x.x.x"
94    /// ```
95    pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
96    where
97        T: TryInto<HeaderValue>,
98    {
99        user_agent
100            .try_into()
101            .map(|ua| Endpoint {
102                user_agent: Some(ua),
103                ..self
104            })
105            .map_err(|_| Error::new_invalid_user_agent())
106    }
107
108    /// Apply a timeout to each request.
109    ///
110    /// ```
111    /// # use tonic::transport::Endpoint;
112    /// # use std::time::Duration;
113    /// # let mut builder = Endpoint::from_static("https://example.com");
114    /// builder.timeout(Duration::from_secs(5));
115    /// ```
116    ///
117    /// # Notes
118    ///
119    /// This does **not** set the timeout metadata (`grpc-timeout` header) on
120    /// the request, meaning the server will not be informed of this timeout,
121    /// for that use [`Request::set_timeout`].
122    ///
123    /// [`Request::set_timeout`]: crate::Request::set_timeout
124    pub fn timeout(self, dur: Duration) -> Self {
125        Endpoint {
126            timeout: Some(dur),
127            ..self
128        }
129    }
130
131    /// Apply a timeout to connecting to the uri.
132    ///
133    /// Defaults to no timeout.
134    ///
135    /// ```
136    /// # use tonic::transport::Endpoint;
137    /// # use std::time::Duration;
138    /// # let mut builder = Endpoint::from_static("https://example.com");
139    /// builder.connect_timeout(Duration::from_secs(5));
140    /// ```
141    pub fn connect_timeout(self, dur: Duration) -> Self {
142        Endpoint {
143            connect_timeout: Some(dur),
144            ..self
145        }
146    }
147
148    /// Set whether TCP keepalive messages are enabled on accepted connections.
149    ///
150    /// If `None` is specified, keepalive is disabled, otherwise the duration
151    /// specified will be the time to remain idle before sending TCP keepalive
152    /// probes.
153    ///
154    /// Default is no keepalive (`None`)
155    ///
156    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
157        Endpoint {
158            tcp_keepalive,
159            ..self
160        }
161    }
162
163    /// Apply a concurrency limit to each request.
164    ///
165    /// ```
166    /// # use tonic::transport::Endpoint;
167    /// # let mut builder = Endpoint::from_static("https://example.com");
168    /// builder.concurrency_limit(256);
169    /// ```
170    pub fn concurrency_limit(self, limit: usize) -> Self {
171        Endpoint {
172            concurrency_limit: Some(limit),
173            ..self
174        }
175    }
176
177    /// Apply a rate limit to each request.
178    ///
179    /// ```
180    /// # use tonic::transport::Endpoint;
181    /// # use std::time::Duration;
182    /// # let mut builder = Endpoint::from_static("https://example.com");
183    /// builder.rate_limit(32, Duration::from_secs(1));
184    /// ```
185    pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
186        Endpoint {
187            rate_limit: Some((limit, duration)),
188            ..self
189        }
190    }
191
192    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
193    /// stream-level flow control.
194    ///
195    /// Default is 65,535
196    ///
197    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
198    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
199        Endpoint {
200            init_stream_window_size: sz.into(),
201            ..self
202        }
203    }
204
205    /// Sets the max connection-level flow control for HTTP2
206    ///
207    /// Default is 65,535
208    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
209        Endpoint {
210            init_connection_window_size: sz.into(),
211            ..self
212        }
213    }
214
215    /// Configures TLS for the endpoint.
216    #[cfg(feature = "tls")]
217    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
218    pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
219        Ok(Endpoint {
220            tls: Some(
221                tls_config
222                    .tls_connector(self.uri.clone())
223                    .map_err(Error::from_source)?,
224            ),
225            ..self
226        })
227    }
228
229    /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
230    pub fn tcp_nodelay(self, enabled: bool) -> Self {
231        Endpoint {
232            tcp_nodelay: enabled,
233            ..self
234        }
235    }
236
237    /// Set http2 KEEP_ALIVE_INTERVAL. Uses `hyper`'s default otherwise.
238    pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
239        Endpoint {
240            http2_keep_alive_interval: Some(interval),
241            ..self
242        }
243    }
244
245    /// Set http2 KEEP_ALIVE_TIMEOUT. Uses `hyper`'s default otherwise.
246    pub fn keep_alive_timeout(self, duration: Duration) -> Self {
247        Endpoint {
248            http2_keep_alive_timeout: Some(duration),
249            ..self
250        }
251    }
252
253    /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses `hyper`'s default otherwise.
254    pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
255        Endpoint {
256            http2_keep_alive_while_idle: Some(enabled),
257            ..self
258        }
259    }
260
261    /// Sets whether to use an adaptive flow control. Uses `hyper`'s default otherwise.
262    pub fn http2_adaptive_window(self, enabled: bool) -> Self {
263        Endpoint {
264            http2_adaptive_window: Some(enabled),
265            ..self
266        }
267    }
268
269    /// Create a channel from this config.
270    pub async fn connect(&self) -> Result<Channel, Error> {
271        let mut http = hyper::client::connect::HttpConnector::new();
272        http.enforce_http(false);
273        http.set_nodelay(self.tcp_nodelay);
274        http.set_keepalive(self.tcp_keepalive);
275
276        #[cfg(feature = "tls")]
277        let connector = service::connector(http, self.tls.clone());
278
279        #[cfg(not(feature = "tls"))]
280        let connector = service::connector(http);
281
282        if let Some(connect_timeout) = self.connect_timeout {
283            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
284            connector.set_connect_timeout(Some(connect_timeout));
285            Channel::connect(connector, self.clone()).await
286        } else {
287            Channel::connect(connector, self.clone()).await
288        }
289    }
290
291    /// Create a channel from this config.
292    ///
293    /// The channel returned by this method does not attempt to connect to the endpoint until first
294    /// use.
295    pub fn connect_lazy(&self) -> Channel {
296        let mut http = hyper::client::connect::HttpConnector::new();
297        http.enforce_http(false);
298        http.set_nodelay(self.tcp_nodelay);
299        http.set_keepalive(self.tcp_keepalive);
300
301        #[cfg(feature = "tls")]
302        let connector = service::connector(http, self.tls.clone());
303
304        #[cfg(not(feature = "tls"))]
305        let connector = service::connector(http);
306
307        if let Some(connect_timeout) = self.connect_timeout {
308            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
309            connector.set_connect_timeout(Some(connect_timeout));
310            Channel::new(connector, self.clone())
311        } else {
312            Channel::new(connector, self.clone())
313        }
314    }
315
316    /// Connect with a custom connector.
317    ///
318    /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport.
319    /// See the `uds` example for an example on how to use this function to build channel that
320    /// uses a Unix socket transport.
321    ///
322    /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied.
323    pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
324    where
325        C: MakeConnection<Uri> + Send + 'static,
326        C::Connection: Unpin + Send + 'static,
327        C::Future: Send + 'static,
328        crate::Error: From<C::Error> + Send + 'static,
329    {
330        #[cfg(feature = "tls")]
331        let connector = service::connector(connector, self.tls.clone());
332
333        #[cfg(not(feature = "tls"))]
334        let connector = service::connector(connector);
335
336        if let Some(connect_timeout) = self.connect_timeout {
337            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
338            connector.set_connect_timeout(Some(connect_timeout));
339            Channel::connect(connector, self.clone()).await
340        } else {
341            Channel::connect(connector, self.clone()).await
342        }
343    }
344
345    /// Connect with a custom connector lazily.
346    ///
347    /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport
348    /// connec to it lazily.
349    ///
350    /// See the `uds` example for an example on how to use this function to build channel that
351    /// uses a Unix socket transport.
352    pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Result<Channel, Error>
353    where
354        C: MakeConnection<Uri> + Send + 'static,
355        C::Connection: Unpin + Send + 'static,
356        C::Future: Send + 'static,
357        crate::Error: From<C::Error> + Send + 'static,
358    {
359        #[cfg(feature = "tls")]
360        let connector = service::connector(connector, self.tls.clone());
361
362        #[cfg(not(feature = "tls"))]
363        let connector = service::connector(connector);
364
365        Ok(Channel::new(connector, self.clone()))
366    }
367
368    /// Get the endpoint uri.
369    ///
370    /// ```
371    /// # use tonic::transport::Endpoint;
372    /// # use http::Uri;
373    /// let endpoint = Endpoint::from_static("https://example.com");
374    ///
375    /// assert_eq!(endpoint.uri(), &Uri::from_static("https://example.com"));
376    /// ```
377    pub fn uri(&self) -> &Uri {
378        &self.uri
379    }
380}
381
382impl From<Uri> for Endpoint {
383    fn from(uri: Uri) -> Self {
384        Self {
385            uri,
386            user_agent: None,
387            concurrency_limit: None,
388            rate_limit: None,
389            timeout: None,
390            #[cfg(feature = "tls")]
391            tls: None,
392            buffer_size: None,
393            init_stream_window_size: None,
394            init_connection_window_size: None,
395            tcp_keepalive: None,
396            tcp_nodelay: true,
397            http2_keep_alive_interval: None,
398            http2_keep_alive_timeout: None,
399            http2_keep_alive_while_idle: None,
400            connect_timeout: None,
401            http2_adaptive_window: None,
402        }
403    }
404}
405
406impl TryFrom<Bytes> for Endpoint {
407    type Error = InvalidUri;
408
409    fn try_from(t: Bytes) -> Result<Self, Self::Error> {
410        Self::from_shared(t)
411    }
412}
413
414impl TryFrom<String> for Endpoint {
415    type Error = InvalidUri;
416
417    fn try_from(t: String) -> Result<Self, Self::Error> {
418        Self::from_shared(t.into_bytes())
419    }
420}
421
422impl TryFrom<&'static str> for Endpoint {
423    type Error = InvalidUri;
424
425    fn try_from(t: &'static str) -> Result<Self, Self::Error> {
426        Self::from_shared(t.as_bytes())
427    }
428}
429
430impl fmt::Debug for Endpoint {
431    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432        f.debug_struct("Endpoint").finish()
433    }
434}
435
436impl FromStr for Endpoint {
437    type Err = InvalidUri;
438
439    fn from_str(s: &str) -> Result<Self, Self::Err> {
440        Self::try_from(s.to_string())
441    }
442}