tonic/transport/server/
mod.rs

1//! Server implementation and builder.
2
3mod conn;
4mod incoming;
5mod recover_error;
6#[cfg(feature = "tls")]
7#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
8mod tls;
9
10pub use conn::{Connected, TcpConnectInfo};
11#[cfg(feature = "tls")]
12pub use tls::ServerTlsConfig;
13
14#[cfg(feature = "tls")]
15pub use conn::TlsConnectInfo;
16
17#[cfg(feature = "tls")]
18use super::service::TlsAcceptor;
19
20use incoming::TcpIncoming;
21
22#[cfg(feature = "tls")]
23pub(crate) use tokio_rustls::server::TlsStream;
24
25#[cfg(feature = "tls")]
26use crate::transport::Error;
27
28use self::recover_error::RecoverError;
29use super::service::{GrpcTimeout, Or, Routes, ServerIo};
30use crate::body::BoxBody;
31use bytes::Bytes;
32use futures_core::Stream;
33use futures_util::{
34    future::{self, MapErr},
35    ready, TryFutureExt,
36};
37use http::{Request, Response};
38use http_body::Body as _;
39use hyper::{server::accept, Body};
40use pin_project::pin_project;
41use std::{
42    fmt,
43    future::Future,
44    marker::PhantomData,
45    net::SocketAddr,
46    pin::Pin,
47    sync::Arc,
48    task::{Context, Poll},
49    time::Duration,
50};
51use tokio::io::{AsyncRead, AsyncWrite};
52use tower::{
53    layer::util::Identity, layer::Layer, limit::concurrency::ConcurrencyLimitLayer, util::Either,
54    Service, ServiceBuilder,
55};
56
57type BoxHttpBody = http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>;
58type BoxService = tower::util::BoxService<Request<Body>, Response<BoxHttpBody>, crate::Error>;
59type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
60
61const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20;
62
63/// A default batteries included `transport` server.
64///
65/// This is a wrapper around [`hyper::Server`] and provides an easy builder
66/// pattern style builder [`Server`]. This builder exposes easy configuration parameters
67/// for providing a fully featured http2 based gRPC server. This should provide
68/// a very good out of the box http2 server for use with tonic but is also a
69/// reference implementation that should be a good starting point for anyone
70/// wanting to create a more complex and/or specific implementation.
71#[derive(Default, Clone)]
72pub struct Server<L = Identity> {
73    trace_interceptor: Option<TraceInterceptor>,
74    concurrency_limit: Option<usize>,
75    timeout: Option<Duration>,
76    #[cfg(feature = "tls")]
77    tls: Option<TlsAcceptor>,
78    init_stream_window_size: Option<u32>,
79    init_connection_window_size: Option<u32>,
80    max_concurrent_streams: Option<u32>,
81    tcp_keepalive: Option<Duration>,
82    tcp_nodelay: bool,
83    http2_keepalive_interval: Option<Duration>,
84    http2_keepalive_timeout: Option<Duration>,
85    max_frame_size: Option<u32>,
86    accept_http1: bool,
87    layer: L,
88}
89
90/// A stack based `Service` router.
91#[derive(Debug)]
92pub struct Router<A, B, L = Identity> {
93    server: Server<L>,
94    routes: Routes<A, B, Request<Body>>,
95}
96
97/// A service that is produced from a Tonic `Router`.
98///
99/// This service implementation will route between multiple Tonic
100/// gRPC endpoints and can be consumed with the rest of the `tower`
101/// ecosystem.
102#[derive(Debug, Clone)]
103pub struct RouterService<S> {
104    inner: S,
105}
106
107impl<S> Service<Request<Body>> for RouterService<S>
108where
109    S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
110    S::Future: Send + 'static,
111    S::Error: Into<crate::Error> + Send,
112{
113    type Response = Response<BoxBody>;
114    type Error = crate::Error;
115
116    #[allow(clippy::type_complexity)]
117    type Future = MapErr<S::Future, fn(S::Error) -> crate::Error>;
118
119    #[inline]
120    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121        Poll::Ready(Ok(()))
122    }
123
124    fn call(&mut self, req: Request<Body>) -> Self::Future {
125        self.inner.call(req).map_err(Into::into)
126    }
127}
128
129/// A trait to provide a static reference to the service's
130/// name. This is used for routing service's within the router.
131pub trait NamedService {
132    /// The `Service-Name` as described [here].
133    ///
134    /// [here]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
135    const NAME: &'static str;
136}
137
138impl<S: NamedService, T> NamedService for Either<S, T> {
139    const NAME: &'static str = S::NAME;
140}
141
142impl Server {
143    /// Create a new server builder that can configure a [`Server`].
144    pub fn builder() -> Self {
145        Server {
146            tcp_nodelay: true,
147            accept_http1: false,
148            ..Default::default()
149        }
150    }
151}
152
153impl<L> Server<L> {
154    /// Configure TLS for this server.
155    #[cfg(feature = "tls")]
156    #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
157    pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
158        Ok(Server {
159            tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
160            ..self
161        })
162    }
163
164    /// Set the concurrency limit applied to on requests inbound per connection.
165    ///
166    /// # Example
167    ///
168    /// ```
169    /// # use tonic::transport::Server;
170    /// # use tower_service::Service;
171    /// # let builder = Server::builder();
172    /// builder.concurrency_limit_per_connection(32);
173    /// ```
174    pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
175        Server {
176            concurrency_limit: Some(limit),
177            ..self
178        }
179    }
180
181    /// Set a timeout on for all request handlers.
182    ///
183    /// # Example
184    ///
185    /// ```
186    /// # use tonic::transport::Server;
187    /// # use tower_service::Service;
188    /// # use std::time::Duration;
189    /// # let mut builder = Server::builder();
190    /// builder.timeout(Duration::from_secs(30));
191    /// ```
192    pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
193        self.timeout = Some(timeout);
194        self
195    }
196
197    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
198    /// stream-level flow control.
199    ///
200    /// Default is 65,535
201    ///
202    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
203    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
204        Server {
205            init_stream_window_size: sz.into(),
206            ..self
207        }
208    }
209
210    /// Sets the max connection-level flow control for HTTP2
211    ///
212    /// Default is 65,535
213    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
214        Server {
215            init_connection_window_size: sz.into(),
216            ..self
217        }
218    }
219
220    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
221    /// connections.
222    ///
223    /// Default is no limit (`None`).
224    ///
225    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
226    pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
227        Server {
228            max_concurrent_streams: max.into(),
229            ..self
230        }
231    }
232
233    /// Set whether HTTP2 Ping frames are enabled on accepted connections.
234    ///
235    /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
236    /// specified will be the time interval between HTTP2 Ping frames.
237    /// The timeout for receiving an acknowledgement of the keepalive ping
238    /// can be set with [`Server::http2_keepalive_timeout`].
239    ///
240    /// Default is no HTTP2 keepalive (`None`)
241    ///
242    pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
243        Server {
244            http2_keepalive_interval,
245            ..self
246        }
247    }
248
249    /// Sets a timeout for receiving an acknowledgement of the keepalive ping.
250    ///
251    /// If the ping is not acknowledged within the timeout, the connection will be closed.
252    /// Does nothing if http2_keep_alive_interval is disabled.
253    ///
254    /// Default is 20 seconds.
255    ///
256    pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self {
257        Server {
258            http2_keepalive_timeout,
259            ..self
260        }
261    }
262
263    /// Set whether TCP keepalive messages are enabled on accepted connections.
264    ///
265    /// If `None` is specified, keepalive is disabled, otherwise the duration
266    /// specified will be the time to remain idle before sending TCP keepalive
267    /// probes.
268    ///
269    /// Default is no keepalive (`None`)
270    ///
271    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
272        Server {
273            tcp_keepalive,
274            ..self
275        }
276    }
277
278    /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
279    pub fn tcp_nodelay(self, enabled: bool) -> Self {
280        Server {
281            tcp_nodelay: enabled,
282            ..self
283        }
284    }
285
286    /// Sets the maximum frame size to use for HTTP2.
287    ///
288    /// Passing `None` will do nothing.
289    ///
290    /// If not set, will default from underlying transport.
291    pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
292        Server {
293            max_frame_size: frame_size.into(),
294            ..self
295        }
296    }
297
298    /// Allow this server to accept http1 requests.
299    ///
300    /// Accepting http1 requests is only useful when developing `grpc-web`
301    /// enabled services. If this setting is set to `true` but services are
302    /// not correctly configured to handle grpc-web requests, your server may
303    /// return confusing (but correct) protocol errors.
304    ///
305    /// Default is `false`.
306    pub fn accept_http1(self, accept_http1: bool) -> Self {
307        Server {
308            accept_http1,
309            ..self
310        }
311    }
312
313    /// Intercept inbound headers and add a [`tracing::Span`] to each response future.
314    pub fn trace_fn<F>(self, f: F) -> Self
315    where
316        F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
317    {
318        Server {
319            trace_interceptor: Some(Arc::new(f)),
320            ..self
321        }
322    }
323
324    /// Create a router with the `S` typed service as the first service.
325    ///
326    /// This will clone the `Server` builder and create a router that will
327    /// route around different services.
328    pub fn add_service<S>(&mut self, svc: S) -> Router<S, Unimplemented, L>
329    where
330        S: Service<Request<Body>, Response = Response<BoxBody>>
331            + NamedService
332            + Clone
333            + Send
334            + 'static,
335        S::Future: Send + 'static,
336        S::Error: Into<crate::Error> + Send,
337        L: Clone,
338    {
339        Router::new(self.clone(), svc)
340    }
341
342    /// Create a router with the optional `S` typed service as the first service.
343    ///
344    /// This will clone the `Server` builder and create a router that will
345    /// route around different services.
346    ///
347    /// # Note
348    /// Even when the argument given is `None` this will capture *all* requests to this service name.
349    /// As a result, one cannot use this to toggle between two identically named implementations.
350    pub fn add_optional_service<S>(
351        &mut self,
352        svc: Option<S>,
353    ) -> Router<Either<S, Unimplemented>, Unimplemented, L>
354    where
355        S: Service<Request<Body>, Response = Response<BoxBody>>
356            + NamedService
357            + Clone
358            + Send
359            + 'static,
360        S::Future: Send + 'static,
361        S::Error: Into<crate::Error> + Send,
362        L: Clone,
363    {
364        let svc = match svc {
365            Some(some) => Either::A(some),
366            None => Either::B(Unimplemented::default()),
367        };
368        Router::new(self.clone(), svc)
369    }
370
371    /// Set the [Tower] [`Layer`] all services will be wrapped in.
372    ///
373    /// This enables using middleware from the [Tower ecosystem][eco].
374    ///
375    /// # Example
376    ///
377    /// ```
378    /// # use tonic::transport::Server;
379    /// # use tower_service::Service;
380    /// use tower::timeout::TimeoutLayer;
381    /// use std::time::Duration;
382    ///
383    /// # let mut builder = Server::builder();
384    /// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
385    /// ```
386    ///
387    /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
388    /// here as an example.
389    ///
390    /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
391    /// [interceptors]:
392    ///
393    /// ```
394    /// # use tonic::transport::Server;
395    /// # use tower_service::Service;
396    /// use tower::ServiceBuilder;
397    /// use std::time::Duration;
398    /// use tonic::{Request, Status, service::interceptor};
399    ///
400    /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
401    ///     if valid_credentials(&request) {
402    ///         Ok(request)
403    ///     } else {
404    ///         Err(Status::unauthenticated("invalid credentials"))
405    ///     }
406    /// }
407    ///
408    /// fn valid_credentials(request: &Request<()>) -> bool {
409    ///     // ...
410    ///     # true
411    /// }
412    ///
413    /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
414    ///     Ok(request)
415    /// }
416    ///
417    /// let layer = ServiceBuilder::new()
418    ///     .load_shed()
419    ///     .timeout(Duration::from_secs(30))
420    ///     .layer(interceptor(auth_interceptor))
421    ///     .layer(interceptor(some_other_interceptor))
422    ///     .into_inner();
423    ///
424    /// Server::builder().layer(layer);
425    /// ```
426    ///
427    /// [Tower]: https://github.com/tower-rs/tower
428    /// [`Layer`]: tower::layer::Layer
429    /// [eco]: https://github.com/tower-rs
430    /// [`ServiceBuilder`]: tower::ServiceBuilder
431    /// [interceptors]: crate::service::Interceptor
432    pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<NewLayer> {
433        Server {
434            layer: new_layer,
435            trace_interceptor: self.trace_interceptor,
436            concurrency_limit: self.concurrency_limit,
437            timeout: self.timeout,
438            #[cfg(feature = "tls")]
439            tls: self.tls,
440            init_stream_window_size: self.init_stream_window_size,
441            init_connection_window_size: self.init_connection_window_size,
442            max_concurrent_streams: self.max_concurrent_streams,
443            tcp_keepalive: self.tcp_keepalive,
444            tcp_nodelay: self.tcp_nodelay,
445            http2_keepalive_interval: self.http2_keepalive_interval,
446            http2_keepalive_timeout: self.http2_keepalive_timeout,
447            max_frame_size: self.max_frame_size,
448            accept_http1: self.accept_http1,
449        }
450    }
451
452    pub(crate) async fn serve_with_shutdown<S, I, F, IO, IE, ResBody>(
453        self,
454        svc: S,
455        incoming: I,
456        signal: Option<F>,
457    ) -> Result<(), super::Error>
458    where
459        L: Layer<S>,
460        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
461        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static,
462        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
463        I: Stream<Item = Result<IO, IE>>,
464        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
465        IO::ConnectInfo: Clone + Send + Sync + 'static,
466        IE: Into<crate::Error>,
467        F: Future<Output = ()>,
468        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
469        ResBody::Error: Into<crate::Error>,
470    {
471        let trace_interceptor = self.trace_interceptor.clone();
472        let concurrency_limit = self.concurrency_limit;
473        let init_connection_window_size = self.init_connection_window_size;
474        let init_stream_window_size = self.init_stream_window_size;
475        let max_concurrent_streams = self.max_concurrent_streams;
476        let timeout = self.timeout;
477        let max_frame_size = self.max_frame_size;
478        let http2_only = !self.accept_http1;
479
480        let http2_keepalive_interval = self.http2_keepalive_interval;
481        let http2_keepalive_timeout = self
482            .http2_keepalive_timeout
483            .unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
484
485        let svc = self.layer.layer(svc);
486
487        let tcp = incoming::tcp_incoming(incoming, self);
488        let incoming = accept::from_stream::<_, _, crate::Error>(tcp);
489
490        let svc = MakeSvc {
491            inner: svc,
492            concurrency_limit,
493            timeout,
494            trace_interceptor,
495            _io: PhantomData,
496        };
497
498        let server = hyper::Server::builder(incoming)
499            .http2_only(http2_only)
500            .http2_initial_connection_window_size(init_connection_window_size)
501            .http2_initial_stream_window_size(init_stream_window_size)
502            .http2_max_concurrent_streams(max_concurrent_streams)
503            .http2_keep_alive_interval(http2_keepalive_interval)
504            .http2_keep_alive_timeout(http2_keepalive_timeout)
505            .http2_max_frame_size(max_frame_size);
506
507        if let Some(signal) = signal {
508            server
509                .serve(svc)
510                .with_graceful_shutdown(signal)
511                .await
512                .map_err(super::Error::from_source)?
513        } else {
514            server.serve(svc).await.map_err(super::Error::from_source)?;
515        }
516
517        Ok(())
518    }
519}
520
521impl<S, L> Router<S, Unimplemented, L> {
522    pub(crate) fn new(server: Server<L>, svc: S) -> Self
523    where
524        S: Service<Request<Body>, Response = Response<BoxBody>>
525            + NamedService
526            + Clone
527            + Send
528            + 'static,
529        S::Future: Send + 'static,
530        S::Error: Into<crate::Error> + Send,
531    {
532        let svc_name = <S as NamedService>::NAME;
533        let svc_route = format!("/{}", svc_name);
534        let pred = move |req: &Request<Body>| {
535            let path = req.uri().path();
536
537            path.starts_with(&svc_route)
538        };
539        Self {
540            server,
541            routes: Routes::new(pred, svc, Unimplemented::default()),
542        }
543    }
544}
545
546impl<A, B, L> Router<A, B, L>
547where
548    A: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
549    A::Future: Send + 'static,
550    A::Error: Into<crate::Error> + Send,
551    B: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
552    B::Future: Send + 'static,
553    B::Error: Into<crate::Error> + Send,
554{
555    /// Add a new service to this router.
556    pub fn add_service<S>(self, svc: S) -> Router<S, Or<A, B, Request<Body>>, L>
557    where
558        S: Service<Request<Body>, Response = Response<BoxBody>>
559            + NamedService
560            + Clone
561            + Send
562            + 'static,
563        S::Future: Send + 'static,
564        S::Error: Into<crate::Error> + Send,
565    {
566        let Self { routes, server } = self;
567
568        let svc_name = <S as NamedService>::NAME;
569        let svc_route = format!("/{}", svc_name);
570        let pred = move |req: &Request<Body>| {
571            let path = req.uri().path();
572
573            path.starts_with(&svc_route)
574        };
575        let routes = routes.push(pred, svc);
576
577        Router { server, routes }
578    }
579
580    /// Add a new optional service to this router.
581    ///
582    /// # Note
583    /// Even when the argument given is `None` this will capture *all* requests to this service name.
584    /// As a result, one cannot use this to toggle between two identically named implementations.
585    #[allow(clippy::type_complexity)]
586    pub fn add_optional_service<S>(
587        self,
588        svc: Option<S>,
589    ) -> Router<Either<S, Unimplemented>, Or<A, B, Request<Body>>, L>
590    where
591        S: Service<Request<Body>, Response = Response<BoxBody>>
592            + NamedService
593            + Clone
594            + Send
595            + 'static,
596        S::Future: Send + 'static,
597        S::Error: Into<crate::Error> + Send,
598    {
599        let Self { routes, server } = self;
600
601        let svc_name = <S as NamedService>::NAME;
602        let svc_route = format!("/{}", svc_name);
603        let pred = move |req: &Request<Body>| {
604            let path = req.uri().path();
605
606            path.starts_with(&svc_route)
607        };
608        let svc = match svc {
609            Some(some) => Either::A(some),
610            None => Either::B(Unimplemented::default()),
611        };
612        let routes = routes.push(pred, svc);
613
614        Router { server, routes }
615    }
616
617    /// Consume this [`Server`] creating a future that will execute the server
618    /// on [tokio]'s default executor.
619    ///
620    /// [`Server`]: struct.Server.html
621    /// [tokio]: https://docs.rs/tokio
622    pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
623    where
624        L: Layer<Routes<A, B, Request<Body>>>,
625        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
626        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
627            Send + 'static,
628        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
629            Into<crate::Error> + Send,
630        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
631        ResBody::Error: Into<crate::Error>,
632    {
633        let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
634            .map_err(super::Error::from_source)?;
635        self.server
636            .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
637                self.routes,
638                incoming,
639                None,
640            )
641            .await
642    }
643
644    /// Consume this [`Server`] creating a future that will execute the server
645    /// on [tokio]'s default executor. And shutdown when the provided signal
646    /// is received.
647    ///
648    /// [`Server`]: struct.Server.html
649    /// [tokio]: https://docs.rs/tokio
650    pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
651        self,
652        addr: SocketAddr,
653        signal: F,
654    ) -> Result<(), super::Error>
655    where
656        L: Layer<Routes<A, B, Request<Body>>>,
657        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
658        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
659            Send + 'static,
660        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
661            Into<crate::Error> + Send,
662        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
663        ResBody::Error: Into<crate::Error>,
664    {
665        let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
666            .map_err(super::Error::from_source)?;
667        self.server
668            .serve_with_shutdown(self.routes, incoming, Some(signal))
669            .await
670    }
671
672    /// Consume this [`Server`] creating a future that will execute the server on
673    /// the provided incoming stream of `AsyncRead + AsyncWrite`.
674    ///
675    /// [`Server`]: struct.Server.html
676    pub async fn serve_with_incoming<I, IO, IE, ResBody>(
677        self,
678        incoming: I,
679    ) -> Result<(), super::Error>
680    where
681        I: Stream<Item = Result<IO, IE>>,
682        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
683        IO::ConnectInfo: Clone + Send + Sync + 'static,
684        IE: Into<crate::Error>,
685        L: Layer<Routes<A, B, Request<Body>>>,
686        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
687        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
688            Send + 'static,
689        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
690            Into<crate::Error> + Send,
691        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
692        ResBody::Error: Into<crate::Error>,
693    {
694        self.server
695            .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
696                self.routes,
697                incoming,
698                None,
699            )
700            .await
701    }
702
703    /// Consume this [`Server`] creating a future that will execute the server on
704    /// the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
705    /// `serve_with_shutdown` this method will also take a signal future to
706    /// gracefully shutdown the server.
707    ///
708    /// [`Server`]: struct.Server.html
709    pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
710        self,
711        incoming: I,
712        signal: F,
713    ) -> Result<(), super::Error>
714    where
715        I: Stream<Item = Result<IO, IE>>,
716        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
717        IO::ConnectInfo: Clone + Send + Sync + 'static,
718        IE: Into<crate::Error>,
719        F: Future<Output = ()>,
720        L: Layer<Routes<A, B, Request<Body>>>,
721        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
722        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
723            Send + 'static,
724        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
725            Into<crate::Error> + Send,
726        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
727        ResBody::Error: Into<crate::Error>,
728    {
729        self.server
730            .serve_with_shutdown(self.routes, incoming, Some(signal))
731            .await
732    }
733
734    /// Create a tower service out of a router.
735    pub fn into_service<ResBody>(self) -> RouterService<L::Service>
736    where
737        L: Layer<Routes<A, B, Request<Body>>>,
738        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
739        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
740            Send + 'static,
741        <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
742            Into<crate::Error> + Send,
743        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
744        ResBody::Error: Into<crate::Error>,
745    {
746        let inner = self.server.layer.layer(self.routes);
747        RouterService { inner }
748    }
749}
750
751impl<L> fmt::Debug for Server<L> {
752    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
753        f.debug_struct("Builder").finish()
754    }
755}
756
757struct Svc<S> {
758    inner: S,
759    trace_interceptor: Option<TraceInterceptor>,
760}
761
762impl<S, ResBody> Service<Request<Body>> for Svc<S>
763where
764    S: Service<Request<Body>, Response = Response<ResBody>>,
765    S::Error: Into<crate::Error>,
766    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
767    ResBody::Error: Into<crate::Error>,
768{
769    type Response = Response<BoxHttpBody>;
770    type Error = crate::Error;
771    type Future = SvcFuture<S::Future>;
772
773    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
774        self.inner.poll_ready(cx).map_err(Into::into)
775    }
776
777    fn call(&mut self, mut req: Request<Body>) -> Self::Future {
778        let span = if let Some(trace_interceptor) = &self.trace_interceptor {
779            let (parts, body) = req.into_parts();
780            let bodyless_request = Request::from_parts(parts, ());
781
782            let span = trace_interceptor(&bodyless_request);
783
784            let (parts, _) = bodyless_request.into_parts();
785            req = Request::from_parts(parts, body);
786
787            span
788        } else {
789            tracing::Span::none()
790        };
791
792        SvcFuture {
793            inner: self.inner.call(req),
794            span,
795        }
796    }
797}
798
799#[pin_project]
800struct SvcFuture<F> {
801    #[pin]
802    inner: F,
803    span: tracing::Span,
804}
805
806impl<F, E, ResBody> Future for SvcFuture<F>
807where
808    F: Future<Output = Result<Response<ResBody>, E>>,
809    E: Into<crate::Error>,
810    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
811    ResBody::Error: Into<crate::Error>,
812{
813    type Output = Result<Response<BoxHttpBody>, crate::Error>;
814
815    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
816        let this = self.project();
817        let _guard = this.span.enter();
818
819        let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
820        let response = response.map(|body| body.map_err(Into::into).boxed_unsync());
821        Poll::Ready(Ok(response))
822    }
823}
824
825impl<S> fmt::Debug for Svc<S> {
826    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
827        f.debug_struct("Svc").finish()
828    }
829}
830
831struct MakeSvc<S, IO> {
832    concurrency_limit: Option<usize>,
833    timeout: Option<Duration>,
834    inner: S,
835    trace_interceptor: Option<TraceInterceptor>,
836    _io: PhantomData<fn() -> IO>,
837}
838
839impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
840where
841    IO: Connected,
842    S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
843    S::Future: Send + 'static,
844    S::Error: Into<crate::Error> + Send,
845    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
846    ResBody::Error: Into<crate::Error>,
847{
848    type Response = BoxService;
849    type Error = crate::Error;
850    type Future = future::Ready<Result<Self::Response, Self::Error>>;
851
852    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
853        Ok(()).into()
854    }
855
856    fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
857        let conn_info = io.connect_info();
858
859        let svc = self.inner.clone();
860        let concurrency_limit = self.concurrency_limit;
861        let timeout = self.timeout;
862        let trace_interceptor = self.trace_interceptor.clone();
863
864        let svc = ServiceBuilder::new()
865            .layer_fn(RecoverError::new)
866            .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
867            .layer_fn(|s| GrpcTimeout::new(s, timeout))
868            .service(svc);
869
870        let svc = ServiceBuilder::new()
871            .layer(BoxService::layer())
872            .map_request(move |mut request: Request<Body>| {
873                match &conn_info {
874                    tower::util::Either::A(inner) => {
875                        request.extensions_mut().insert(inner.clone());
876                    }
877                    tower::util::Either::B(inner) => {
878                        #[cfg(feature = "tls")]
879                        {
880                            request.extensions_mut().insert(inner.clone());
881                            request.extensions_mut().insert(inner.get_ref().clone());
882                        }
883
884                        #[cfg(not(feature = "tls"))]
885                        {
886                            // just a type check to make sure we didn't forget to
887                            // insert this into the extensions
888                            let _: &() = inner;
889                        }
890                    }
891                }
892
893                request
894            })
895            .service(Svc {
896                inner: svc,
897                trace_interceptor,
898            });
899
900        future::ready(Ok(svc))
901    }
902}
903
904#[derive(Default, Clone, Debug)]
905#[doc(hidden)]
906pub struct Unimplemented {
907    _p: (),
908}
909
910impl Service<Request<Body>> for Unimplemented {
911    type Response = Response<BoxBody>;
912    type Error = crate::Error;
913    type Future = future::Ready<Result<Self::Response, Self::Error>>;
914
915    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
916        Ok(()).into()
917    }
918
919    fn call(&mut self, _req: Request<Body>) -> Self::Future {
920        future::ok(
921            http::Response::builder()
922                .status(200)
923                .header("grpc-status", "12")
924                .header("content-type", "application/grpc")
925                .body(crate::body::empty_body())
926                .unwrap(),
927        )
928    }
929}