1mod conn;
4mod incoming;
5mod recover_error;
6#[cfg(feature = "tls")]
7#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
8mod tls;
9
10pub use conn::{Connected, TcpConnectInfo};
11#[cfg(feature = "tls")]
12pub use tls::ServerTlsConfig;
13
14#[cfg(feature = "tls")]
15pub use conn::TlsConnectInfo;
16
17#[cfg(feature = "tls")]
18use super::service::TlsAcceptor;
19
20use incoming::TcpIncoming;
21
22#[cfg(feature = "tls")]
23pub(crate) use tokio_rustls::server::TlsStream;
24
25#[cfg(feature = "tls")]
26use crate::transport::Error;
27
28use self::recover_error::RecoverError;
29use super::service::{GrpcTimeout, Or, Routes, ServerIo};
30use crate::body::BoxBody;
31use bytes::Bytes;
32use futures_core::Stream;
33use futures_util::{
34 future::{self, MapErr},
35 ready, TryFutureExt,
36};
37use http::{Request, Response};
38use http_body::Body as _;
39use hyper::{server::accept, Body};
40use pin_project::pin_project;
41use std::{
42 fmt,
43 future::Future,
44 marker::PhantomData,
45 net::SocketAddr,
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49 time::Duration,
50};
51use tokio::io::{AsyncRead, AsyncWrite};
52use tower::{
53 layer::util::Identity, layer::Layer, limit::concurrency::ConcurrencyLimitLayer, util::Either,
54 Service, ServiceBuilder,
55};
56
57type BoxHttpBody = http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>;
58type BoxService = tower::util::BoxService<Request<Body>, Response<BoxHttpBody>, crate::Error>;
59type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
60
61const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20;
62
63#[derive(Default, Clone)]
72pub struct Server<L = Identity> {
73 trace_interceptor: Option<TraceInterceptor>,
74 concurrency_limit: Option<usize>,
75 timeout: Option<Duration>,
76 #[cfg(feature = "tls")]
77 tls: Option<TlsAcceptor>,
78 init_stream_window_size: Option<u32>,
79 init_connection_window_size: Option<u32>,
80 max_concurrent_streams: Option<u32>,
81 tcp_keepalive: Option<Duration>,
82 tcp_nodelay: bool,
83 http2_keepalive_interval: Option<Duration>,
84 http2_keepalive_timeout: Option<Duration>,
85 max_frame_size: Option<u32>,
86 accept_http1: bool,
87 layer: L,
88}
89
90#[derive(Debug)]
92pub struct Router<A, B, L = Identity> {
93 server: Server<L>,
94 routes: Routes<A, B, Request<Body>>,
95}
96
97#[derive(Debug, Clone)]
103pub struct RouterService<S> {
104 inner: S,
105}
106
107impl<S> Service<Request<Body>> for RouterService<S>
108where
109 S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
110 S::Future: Send + 'static,
111 S::Error: Into<crate::Error> + Send,
112{
113 type Response = Response<BoxBody>;
114 type Error = crate::Error;
115
116 #[allow(clippy::type_complexity)]
117 type Future = MapErr<S::Future, fn(S::Error) -> crate::Error>;
118
119 #[inline]
120 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121 Poll::Ready(Ok(()))
122 }
123
124 fn call(&mut self, req: Request<Body>) -> Self::Future {
125 self.inner.call(req).map_err(Into::into)
126 }
127}
128
129pub trait NamedService {
132 const NAME: &'static str;
136}
137
138impl<S: NamedService, T> NamedService for Either<S, T> {
139 const NAME: &'static str = S::NAME;
140}
141
142impl Server {
143 pub fn builder() -> Self {
145 Server {
146 tcp_nodelay: true,
147 accept_http1: false,
148 ..Default::default()
149 }
150 }
151}
152
153impl<L> Server<L> {
154 #[cfg(feature = "tls")]
156 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
157 pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
158 Ok(Server {
159 tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
160 ..self
161 })
162 }
163
164 pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
175 Server {
176 concurrency_limit: Some(limit),
177 ..self
178 }
179 }
180
181 pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
193 self.timeout = Some(timeout);
194 self
195 }
196
197 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
204 Server {
205 init_stream_window_size: sz.into(),
206 ..self
207 }
208 }
209
210 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
214 Server {
215 init_connection_window_size: sz.into(),
216 ..self
217 }
218 }
219
220 pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
227 Server {
228 max_concurrent_streams: max.into(),
229 ..self
230 }
231 }
232
233 pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
243 Server {
244 http2_keepalive_interval,
245 ..self
246 }
247 }
248
249 pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self {
257 Server {
258 http2_keepalive_timeout,
259 ..self
260 }
261 }
262
263 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
272 Server {
273 tcp_keepalive,
274 ..self
275 }
276 }
277
278 pub fn tcp_nodelay(self, enabled: bool) -> Self {
280 Server {
281 tcp_nodelay: enabled,
282 ..self
283 }
284 }
285
286 pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
292 Server {
293 max_frame_size: frame_size.into(),
294 ..self
295 }
296 }
297
298 pub fn accept_http1(self, accept_http1: bool) -> Self {
307 Server {
308 accept_http1,
309 ..self
310 }
311 }
312
313 pub fn trace_fn<F>(self, f: F) -> Self
315 where
316 F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
317 {
318 Server {
319 trace_interceptor: Some(Arc::new(f)),
320 ..self
321 }
322 }
323
324 pub fn add_service<S>(&mut self, svc: S) -> Router<S, Unimplemented, L>
329 where
330 S: Service<Request<Body>, Response = Response<BoxBody>>
331 + NamedService
332 + Clone
333 + Send
334 + 'static,
335 S::Future: Send + 'static,
336 S::Error: Into<crate::Error> + Send,
337 L: Clone,
338 {
339 Router::new(self.clone(), svc)
340 }
341
342 pub fn add_optional_service<S>(
351 &mut self,
352 svc: Option<S>,
353 ) -> Router<Either<S, Unimplemented>, Unimplemented, L>
354 where
355 S: Service<Request<Body>, Response = Response<BoxBody>>
356 + NamedService
357 + Clone
358 + Send
359 + 'static,
360 S::Future: Send + 'static,
361 S::Error: Into<crate::Error> + Send,
362 L: Clone,
363 {
364 let svc = match svc {
365 Some(some) => Either::A(some),
366 None => Either::B(Unimplemented::default()),
367 };
368 Router::new(self.clone(), svc)
369 }
370
371 pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<NewLayer> {
433 Server {
434 layer: new_layer,
435 trace_interceptor: self.trace_interceptor,
436 concurrency_limit: self.concurrency_limit,
437 timeout: self.timeout,
438 #[cfg(feature = "tls")]
439 tls: self.tls,
440 init_stream_window_size: self.init_stream_window_size,
441 init_connection_window_size: self.init_connection_window_size,
442 max_concurrent_streams: self.max_concurrent_streams,
443 tcp_keepalive: self.tcp_keepalive,
444 tcp_nodelay: self.tcp_nodelay,
445 http2_keepalive_interval: self.http2_keepalive_interval,
446 http2_keepalive_timeout: self.http2_keepalive_timeout,
447 max_frame_size: self.max_frame_size,
448 accept_http1: self.accept_http1,
449 }
450 }
451
452 pub(crate) async fn serve_with_shutdown<S, I, F, IO, IE, ResBody>(
453 self,
454 svc: S,
455 incoming: I,
456 signal: Option<F>,
457 ) -> Result<(), super::Error>
458 where
459 L: Layer<S>,
460 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
461 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static,
462 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
463 I: Stream<Item = Result<IO, IE>>,
464 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
465 IO::ConnectInfo: Clone + Send + Sync + 'static,
466 IE: Into<crate::Error>,
467 F: Future<Output = ()>,
468 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
469 ResBody::Error: Into<crate::Error>,
470 {
471 let trace_interceptor = self.trace_interceptor.clone();
472 let concurrency_limit = self.concurrency_limit;
473 let init_connection_window_size = self.init_connection_window_size;
474 let init_stream_window_size = self.init_stream_window_size;
475 let max_concurrent_streams = self.max_concurrent_streams;
476 let timeout = self.timeout;
477 let max_frame_size = self.max_frame_size;
478 let http2_only = !self.accept_http1;
479
480 let http2_keepalive_interval = self.http2_keepalive_interval;
481 let http2_keepalive_timeout = self
482 .http2_keepalive_timeout
483 .unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
484
485 let svc = self.layer.layer(svc);
486
487 let tcp = incoming::tcp_incoming(incoming, self);
488 let incoming = accept::from_stream::<_, _, crate::Error>(tcp);
489
490 let svc = MakeSvc {
491 inner: svc,
492 concurrency_limit,
493 timeout,
494 trace_interceptor,
495 _io: PhantomData,
496 };
497
498 let server = hyper::Server::builder(incoming)
499 .http2_only(http2_only)
500 .http2_initial_connection_window_size(init_connection_window_size)
501 .http2_initial_stream_window_size(init_stream_window_size)
502 .http2_max_concurrent_streams(max_concurrent_streams)
503 .http2_keep_alive_interval(http2_keepalive_interval)
504 .http2_keep_alive_timeout(http2_keepalive_timeout)
505 .http2_max_frame_size(max_frame_size);
506
507 if let Some(signal) = signal {
508 server
509 .serve(svc)
510 .with_graceful_shutdown(signal)
511 .await
512 .map_err(super::Error::from_source)?
513 } else {
514 server.serve(svc).await.map_err(super::Error::from_source)?;
515 }
516
517 Ok(())
518 }
519}
520
521impl<S, L> Router<S, Unimplemented, L> {
522 pub(crate) fn new(server: Server<L>, svc: S) -> Self
523 where
524 S: Service<Request<Body>, Response = Response<BoxBody>>
525 + NamedService
526 + Clone
527 + Send
528 + 'static,
529 S::Future: Send + 'static,
530 S::Error: Into<crate::Error> + Send,
531 {
532 let svc_name = <S as NamedService>::NAME;
533 let svc_route = format!("/{}", svc_name);
534 let pred = move |req: &Request<Body>| {
535 let path = req.uri().path();
536
537 path.starts_with(&svc_route)
538 };
539 Self {
540 server,
541 routes: Routes::new(pred, svc, Unimplemented::default()),
542 }
543 }
544}
545
546impl<A, B, L> Router<A, B, L>
547where
548 A: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
549 A::Future: Send + 'static,
550 A::Error: Into<crate::Error> + Send,
551 B: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
552 B::Future: Send + 'static,
553 B::Error: Into<crate::Error> + Send,
554{
555 pub fn add_service<S>(self, svc: S) -> Router<S, Or<A, B, Request<Body>>, L>
557 where
558 S: Service<Request<Body>, Response = Response<BoxBody>>
559 + NamedService
560 + Clone
561 + Send
562 + 'static,
563 S::Future: Send + 'static,
564 S::Error: Into<crate::Error> + Send,
565 {
566 let Self { routes, server } = self;
567
568 let svc_name = <S as NamedService>::NAME;
569 let svc_route = format!("/{}", svc_name);
570 let pred = move |req: &Request<Body>| {
571 let path = req.uri().path();
572
573 path.starts_with(&svc_route)
574 };
575 let routes = routes.push(pred, svc);
576
577 Router { server, routes }
578 }
579
580 #[allow(clippy::type_complexity)]
586 pub fn add_optional_service<S>(
587 self,
588 svc: Option<S>,
589 ) -> Router<Either<S, Unimplemented>, Or<A, B, Request<Body>>, L>
590 where
591 S: Service<Request<Body>, Response = Response<BoxBody>>
592 + NamedService
593 + Clone
594 + Send
595 + 'static,
596 S::Future: Send + 'static,
597 S::Error: Into<crate::Error> + Send,
598 {
599 let Self { routes, server } = self;
600
601 let svc_name = <S as NamedService>::NAME;
602 let svc_route = format!("/{}", svc_name);
603 let pred = move |req: &Request<Body>| {
604 let path = req.uri().path();
605
606 path.starts_with(&svc_route)
607 };
608 let svc = match svc {
609 Some(some) => Either::A(some),
610 None => Either::B(Unimplemented::default()),
611 };
612 let routes = routes.push(pred, svc);
613
614 Router { server, routes }
615 }
616
617 pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
623 where
624 L: Layer<Routes<A, B, Request<Body>>>,
625 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
626 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
627 Send + 'static,
628 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
629 Into<crate::Error> + Send,
630 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
631 ResBody::Error: Into<crate::Error>,
632 {
633 let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
634 .map_err(super::Error::from_source)?;
635 self.server
636 .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
637 self.routes,
638 incoming,
639 None,
640 )
641 .await
642 }
643
644 pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
651 self,
652 addr: SocketAddr,
653 signal: F,
654 ) -> Result<(), super::Error>
655 where
656 L: Layer<Routes<A, B, Request<Body>>>,
657 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
658 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
659 Send + 'static,
660 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
661 Into<crate::Error> + Send,
662 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
663 ResBody::Error: Into<crate::Error>,
664 {
665 let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
666 .map_err(super::Error::from_source)?;
667 self.server
668 .serve_with_shutdown(self.routes, incoming, Some(signal))
669 .await
670 }
671
672 pub async fn serve_with_incoming<I, IO, IE, ResBody>(
677 self,
678 incoming: I,
679 ) -> Result<(), super::Error>
680 where
681 I: Stream<Item = Result<IO, IE>>,
682 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
683 IO::ConnectInfo: Clone + Send + Sync + 'static,
684 IE: Into<crate::Error>,
685 L: Layer<Routes<A, B, Request<Body>>>,
686 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
687 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
688 Send + 'static,
689 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
690 Into<crate::Error> + Send,
691 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
692 ResBody::Error: Into<crate::Error>,
693 {
694 self.server
695 .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
696 self.routes,
697 incoming,
698 None,
699 )
700 .await
701 }
702
703 pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
710 self,
711 incoming: I,
712 signal: F,
713 ) -> Result<(), super::Error>
714 where
715 I: Stream<Item = Result<IO, IE>>,
716 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
717 IO::ConnectInfo: Clone + Send + Sync + 'static,
718 IE: Into<crate::Error>,
719 F: Future<Output = ()>,
720 L: Layer<Routes<A, B, Request<Body>>>,
721 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
722 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
723 Send + 'static,
724 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
725 Into<crate::Error> + Send,
726 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
727 ResBody::Error: Into<crate::Error>,
728 {
729 self.server
730 .serve_with_shutdown(self.routes, incoming, Some(signal))
731 .await
732 }
733
734 pub fn into_service<ResBody>(self) -> RouterService<L::Service>
736 where
737 L: Layer<Routes<A, B, Request<Body>>>,
738 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
739 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Future:
740 Send + 'static,
741 <<L as Layer<Routes<A, B, Request<Body>>>>::Service as Service<Request<Body>>>::Error:
742 Into<crate::Error> + Send,
743 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
744 ResBody::Error: Into<crate::Error>,
745 {
746 let inner = self.server.layer.layer(self.routes);
747 RouterService { inner }
748 }
749}
750
751impl<L> fmt::Debug for Server<L> {
752 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
753 f.debug_struct("Builder").finish()
754 }
755}
756
757struct Svc<S> {
758 inner: S,
759 trace_interceptor: Option<TraceInterceptor>,
760}
761
762impl<S, ResBody> Service<Request<Body>> for Svc<S>
763where
764 S: Service<Request<Body>, Response = Response<ResBody>>,
765 S::Error: Into<crate::Error>,
766 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
767 ResBody::Error: Into<crate::Error>,
768{
769 type Response = Response<BoxHttpBody>;
770 type Error = crate::Error;
771 type Future = SvcFuture<S::Future>;
772
773 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
774 self.inner.poll_ready(cx).map_err(Into::into)
775 }
776
777 fn call(&mut self, mut req: Request<Body>) -> Self::Future {
778 let span = if let Some(trace_interceptor) = &self.trace_interceptor {
779 let (parts, body) = req.into_parts();
780 let bodyless_request = Request::from_parts(parts, ());
781
782 let span = trace_interceptor(&bodyless_request);
783
784 let (parts, _) = bodyless_request.into_parts();
785 req = Request::from_parts(parts, body);
786
787 span
788 } else {
789 tracing::Span::none()
790 };
791
792 SvcFuture {
793 inner: self.inner.call(req),
794 span,
795 }
796 }
797}
798
799#[pin_project]
800struct SvcFuture<F> {
801 #[pin]
802 inner: F,
803 span: tracing::Span,
804}
805
806impl<F, E, ResBody> Future for SvcFuture<F>
807where
808 F: Future<Output = Result<Response<ResBody>, E>>,
809 E: Into<crate::Error>,
810 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
811 ResBody::Error: Into<crate::Error>,
812{
813 type Output = Result<Response<BoxHttpBody>, crate::Error>;
814
815 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
816 let this = self.project();
817 let _guard = this.span.enter();
818
819 let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
820 let response = response.map(|body| body.map_err(Into::into).boxed_unsync());
821 Poll::Ready(Ok(response))
822 }
823}
824
825impl<S> fmt::Debug for Svc<S> {
826 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
827 f.debug_struct("Svc").finish()
828 }
829}
830
831struct MakeSvc<S, IO> {
832 concurrency_limit: Option<usize>,
833 timeout: Option<Duration>,
834 inner: S,
835 trace_interceptor: Option<TraceInterceptor>,
836 _io: PhantomData<fn() -> IO>,
837}
838
839impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
840where
841 IO: Connected,
842 S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
843 S::Future: Send + 'static,
844 S::Error: Into<crate::Error> + Send,
845 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
846 ResBody::Error: Into<crate::Error>,
847{
848 type Response = BoxService;
849 type Error = crate::Error;
850 type Future = future::Ready<Result<Self::Response, Self::Error>>;
851
852 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
853 Ok(()).into()
854 }
855
856 fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
857 let conn_info = io.connect_info();
858
859 let svc = self.inner.clone();
860 let concurrency_limit = self.concurrency_limit;
861 let timeout = self.timeout;
862 let trace_interceptor = self.trace_interceptor.clone();
863
864 let svc = ServiceBuilder::new()
865 .layer_fn(RecoverError::new)
866 .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
867 .layer_fn(|s| GrpcTimeout::new(s, timeout))
868 .service(svc);
869
870 let svc = ServiceBuilder::new()
871 .layer(BoxService::layer())
872 .map_request(move |mut request: Request<Body>| {
873 match &conn_info {
874 tower::util::Either::A(inner) => {
875 request.extensions_mut().insert(inner.clone());
876 }
877 tower::util::Either::B(inner) => {
878 #[cfg(feature = "tls")]
879 {
880 request.extensions_mut().insert(inner.clone());
881 request.extensions_mut().insert(inner.get_ref().clone());
882 }
883
884 #[cfg(not(feature = "tls"))]
885 {
886 let _: &() = inner;
889 }
890 }
891 }
892
893 request
894 })
895 .service(Svc {
896 inner: svc,
897 trace_interceptor,
898 });
899
900 future::ready(Ok(svc))
901 }
902}
903
904#[derive(Default, Clone, Debug)]
905#[doc(hidden)]
906pub struct Unimplemented {
907 _p: (),
908}
909
910impl Service<Request<Body>> for Unimplemented {
911 type Response = Response<BoxBody>;
912 type Error = crate::Error;
913 type Future = future::Ready<Result<Self::Response, Self::Error>>;
914
915 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
916 Ok(()).into()
917 }
918
919 fn call(&mut self, _req: Request<Body>) -> Self::Future {
920 future::ok(
921 http::Response::builder()
922 .status(200)
923 .header("grpc-status", "12")
924 .header("content-type", "application/grpc")
925 .body(crate::body::empty_body())
926 .unwrap(),
927 )
928 }
929}