tonic/transport/service/
connection.rs

1use super::super::BoxFuture;
2use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent};
3use crate::{body::BoxBody, transport::Endpoint};
4use http::Uri;
5use hyper::client::conn::Builder;
6use hyper::client::connect::Connection as HyperConnection;
7use hyper::client::service::Connect as HyperConnect;
8use std::{
9    fmt,
10    task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tower::load::Load;
14use tower::{
15    layer::Layer,
16    limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
17    util::BoxService,
18    ServiceBuilder, ServiceExt,
19};
20use tower_service::Service;
21
22pub(crate) type Request = http::Request<BoxBody>;
23pub(crate) type Response = http::Response<hyper::Body>;
24
25pub(crate) struct Connection {
26    inner: BoxService<Request, Response, crate::Error>,
27}
28
29impl Connection {
30    fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
31    where
32        C: Service<Uri> + Send + 'static,
33        C::Error: Into<crate::Error> + Send,
34        C::Future: Unpin + Send,
35        C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
36    {
37        let mut settings = Builder::new()
38            .http2_initial_stream_window_size(endpoint.init_stream_window_size)
39            .http2_initial_connection_window_size(endpoint.init_connection_window_size)
40            .http2_only(true)
41            .http2_keep_alive_interval(endpoint.http2_keep_alive_interval)
42            .clone();
43
44        if let Some(val) = endpoint.http2_keep_alive_timeout {
45            settings.http2_keep_alive_timeout(val);
46        }
47
48        if let Some(val) = endpoint.http2_keep_alive_while_idle {
49            settings.http2_keep_alive_while_idle(val);
50        }
51
52        if let Some(val) = endpoint.http2_adaptive_window {
53            settings.http2_adaptive_window(val);
54        }
55
56        let stack = ServiceBuilder::new()
57            .layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone()))
58            .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
59            .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
60            .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
61            .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
62            .into_inner();
63
64        let connector = HyperConnect::new(connector, settings);
65        let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy);
66
67        let inner = stack.layer(conn);
68
69        Self {
70            inner: BoxService::new(inner),
71        }
72    }
73
74    pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
75    where
76        C: Service<Uri> + Send + 'static,
77        C::Error: Into<crate::Error> + Send,
78        C::Future: Unpin + Send,
79        C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
80    {
81        Self::new(connector, endpoint, false).ready_oneshot().await
82    }
83
84    pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
85    where
86        C: Service<Uri> + Send + 'static,
87        C::Error: Into<crate::Error> + Send,
88        C::Future: Unpin + Send,
89        C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
90    {
91        Self::new(connector, endpoint, true)
92    }
93}
94
95impl Service<Request> for Connection {
96    type Response = Response;
97    type Error = crate::Error;
98    type Future = BoxFuture<Self::Response, Self::Error>;
99
100    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101        Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
102    }
103
104    fn call(&mut self, req: Request) -> Self::Future {
105        self.inner.call(req)
106    }
107}
108
109impl Load for Connection {
110    type Metric = usize;
111
112    fn load(&self) -> Self::Metric {
113        0
114    }
115}
116
117impl fmt::Debug for Connection {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("Connection").finish()
120    }
121}