tonic/transport/service/
connection.rs1use super::super::BoxFuture;
2use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent};
3use crate::{body::BoxBody, transport::Endpoint};
4use http::Uri;
5use hyper::client::conn::Builder;
6use hyper::client::connect::Connection as HyperConnection;
7use hyper::client::service::Connect as HyperConnect;
8use std::{
9 fmt,
10 task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tower::load::Load;
14use tower::{
15 layer::Layer,
16 limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
17 util::BoxService,
18 ServiceBuilder, ServiceExt,
19};
20use tower_service::Service;
21
22pub(crate) type Request = http::Request<BoxBody>;
23pub(crate) type Response = http::Response<hyper::Body>;
24
25pub(crate) struct Connection {
26 inner: BoxService<Request, Response, crate::Error>,
27}
28
29impl Connection {
30 fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
31 where
32 C: Service<Uri> + Send + 'static,
33 C::Error: Into<crate::Error> + Send,
34 C::Future: Unpin + Send,
35 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
36 {
37 let mut settings = Builder::new()
38 .http2_initial_stream_window_size(endpoint.init_stream_window_size)
39 .http2_initial_connection_window_size(endpoint.init_connection_window_size)
40 .http2_only(true)
41 .http2_keep_alive_interval(endpoint.http2_keep_alive_interval)
42 .clone();
43
44 if let Some(val) = endpoint.http2_keep_alive_timeout {
45 settings.http2_keep_alive_timeout(val);
46 }
47
48 if let Some(val) = endpoint.http2_keep_alive_while_idle {
49 settings.http2_keep_alive_while_idle(val);
50 }
51
52 if let Some(val) = endpoint.http2_adaptive_window {
53 settings.http2_adaptive_window(val);
54 }
55
56 let stack = ServiceBuilder::new()
57 .layer_fn(|s| AddOrigin::new(s, endpoint.uri.clone()))
58 .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
59 .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
60 .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
61 .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
62 .into_inner();
63
64 let connector = HyperConnect::new(connector, settings);
65 let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy);
66
67 let inner = stack.layer(conn);
68
69 Self {
70 inner: BoxService::new(inner),
71 }
72 }
73
74 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
75 where
76 C: Service<Uri> + Send + 'static,
77 C::Error: Into<crate::Error> + Send,
78 C::Future: Unpin + Send,
79 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
80 {
81 Self::new(connector, endpoint, false).ready_oneshot().await
82 }
83
84 pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
85 where
86 C: Service<Uri> + Send + 'static,
87 C::Error: Into<crate::Error> + Send,
88 C::Future: Unpin + Send,
89 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
90 {
91 Self::new(connector, endpoint, true)
92 }
93}
94
95impl Service<Request> for Connection {
96 type Response = Response;
97 type Error = crate::Error;
98 type Future = BoxFuture<Self::Response, Self::Error>;
99
100 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
101 Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
102 }
103
104 fn call(&mut self, req: Request) -> Self::Future {
105 self.inner.call(req)
106 }
107}
108
109impl Load for Connection {
110 type Metric = usize;
111
112 fn load(&self) -> Self::Metric {
113 0
114 }
115}
116
117impl fmt::Debug for Connection {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("Connection").finish()
120 }
121}