tower/util/
mod.rs

1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod boxed;
5mod boxed_clone;
6mod call_all;
7mod either;
8
9mod future_service;
10mod map_err;
11mod map_request;
12mod map_response;
13mod map_result;
14
15mod map_future;
16mod oneshot;
17mod optional;
18mod ready;
19mod service_fn;
20mod then;
21
22#[allow(deprecated)]
23pub use self::{
24    and_then::{AndThen, AndThenLayer},
25    boxed::{BoxLayer, BoxService, UnsyncBoxService},
26    boxed_clone::BoxCloneService,
27    either::Either,
28    future_service::{future_service, FutureService},
29    map_err::{MapErr, MapErrLayer},
30    map_future::{MapFuture, MapFutureLayer},
31    map_request::{MapRequest, MapRequestLayer},
32    map_response::{MapResponse, MapResponseLayer},
33    map_result::{MapResult, MapResultLayer},
34    oneshot::Oneshot,
35    optional::Optional,
36    ready::{Ready, ReadyAnd, ReadyOneshot},
37    service_fn::{service_fn, ServiceFn},
38    then::{Then, ThenLayer},
39};
40
41pub use self::call_all::{CallAll, CallAllUnordered};
42use std::future::Future;
43
44use crate::layer::util::Identity;
45
46pub mod error {
47    //! Error types
48
49    pub use super::optional::error as optional;
50}
51
52pub mod future {
53    //! Future types
54
55    pub use super::and_then::AndThenFuture;
56    pub use super::map_err::MapErrFuture;
57    pub use super::map_response::MapResponseFuture;
58    pub use super::map_result::MapResultFuture;
59    pub use super::optional::future as optional;
60    pub use super::then::ThenFuture;
61}
62
63/// An extension trait for `Service`s that provides a variety of convenient
64/// adapters
65pub trait ServiceExt<Request>: tower_service::Service<Request> {
66    /// Yields a mutable reference to the service when it is ready to accept a request.
67    fn ready(&mut self) -> Ready<'_, Self, Request>
68    where
69        Self: Sized,
70    {
71        Ready::new(self)
72    }
73
74    /// Yields a mutable reference to the service when it is ready to accept a request.
75    #[deprecated(
76        since = "0.4.6",
77        note = "please use the `ServiceExt::ready` method instead"
78    )]
79    #[allow(deprecated)]
80    fn ready_and(&mut self) -> ReadyAnd<'_, Self, Request>
81    where
82        Self: Sized,
83    {
84        ReadyAnd::new(self)
85    }
86
87    /// Yields the service when it is ready to accept a request.
88    fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
89    where
90        Self: Sized,
91    {
92        ReadyOneshot::new(self)
93    }
94
95    /// Consume this `Service`, calling with the providing request once it is ready.
96    fn oneshot(self, req: Request) -> Oneshot<Self, Request>
97    where
98        Self: Sized,
99    {
100        Oneshot::new(self, req)
101    }
102
103    /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
104    ///
105    /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
106    /// Response>`][stream]. See the documentation for [`CallAll`] for
107    /// details.
108    ///
109    /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
110    /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
111    fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
112    where
113        Self: Sized,
114        Self::Error: Into<crate::BoxError>,
115        S: futures_core::Stream<Item = Request>,
116    {
117        CallAll::new(self, reqs)
118    }
119
120    /// Executes a new future after this service's future resolves. This does
121    /// not alter the behaviour of the [`poll_ready`] method.
122    ///
123    /// This method can be used to change the [`Response`] type of the service
124    /// into a different type. You can use this method to chain along a computation once the
125    /// service's response has been resolved.
126    ///
127    /// [`Response`]: crate::Service::Response
128    /// [`poll_ready`]: crate::Service::poll_ready
129    ///
130    /// # Example
131    /// ```
132    /// # use std::task::{Poll, Context};
133    /// # use tower::{Service, ServiceExt};
134    /// #
135    /// # struct DatabaseService;
136    /// # impl DatabaseService {
137    /// #   fn new(address: &str) -> Self {
138    /// #       DatabaseService
139    /// #   }
140    /// # }
141    /// #
142    /// # struct Record {
143    /// #   pub name: String,
144    /// #   pub age: u16
145    /// # }
146    /// #
147    /// # impl Service<u32> for DatabaseService {
148    /// #   type Response = Record;
149    /// #   type Error = u8;
150    /// #   type Future = futures_util::future::Ready<Result<Record, u8>>;
151    /// #
152    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153    /// #       Poll::Ready(Ok(()))
154    /// #   }
155    /// #
156    /// #   fn call(&mut self, request: u32) -> Self::Future {
157    /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
158    /// #   }
159    /// # }
160    /// #
161    /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
162    /// #
163    /// # fn main() {
164    /// #    async {
165    /// // A service returning Result<Record, _>
166    /// let service = DatabaseService::new("127.0.0.1:8080");
167    ///
168    /// // Map the response into a new response
169    /// let mut new_service = service.and_then(|record: Record| async move {
170    ///     let name = record.name;
171    ///     avatar_lookup(name).await
172    /// });
173    ///
174    /// // Call the new service
175    /// let id = 13;
176    /// let avatar = new_service.call(id).await.unwrap();
177    /// #    };
178    /// # }
179    /// ```
180    fn and_then<F>(self, f: F) -> AndThen<Self, F>
181    where
182        Self: Sized,
183        F: Clone,
184    {
185        AndThen::new(self, f)
186    }
187
188    /// Maps this service's response value to a different value. This does not
189    /// alter the behaviour of the [`poll_ready`] method.
190    ///
191    /// This method can be used to change the [`Response`] type of the service
192    /// into a different type. It is similar to the [`Result::map`]
193    /// method. You can use this method to chain along a computation once the
194    /// service's response has been resolved.
195    ///
196    /// [`Response`]: crate::Service::Response
197    /// [`poll_ready`]: crate::Service::poll_ready
198    ///
199    /// # Example
200    /// ```
201    /// # use std::task::{Poll, Context};
202    /// # use tower::{Service, ServiceExt};
203    /// #
204    /// # struct DatabaseService;
205    /// # impl DatabaseService {
206    /// #   fn new(address: &str) -> Self {
207    /// #       DatabaseService
208    /// #   }
209    /// # }
210    /// #
211    /// # struct Record {
212    /// #   pub name: String,
213    /// #   pub age: u16
214    /// # }
215    /// #
216    /// # impl Service<u32> for DatabaseService {
217    /// #   type Response = Record;
218    /// #   type Error = u8;
219    /// #   type Future = futures_util::future::Ready<Result<Record, u8>>;
220    /// #
221    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222    /// #       Poll::Ready(Ok(()))
223    /// #   }
224    /// #
225    /// #   fn call(&mut self, request: u32) -> Self::Future {
226    /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
227    /// #   }
228    /// # }
229    /// #
230    /// # fn main() {
231    /// #    async {
232    /// // A service returning Result<Record, _>
233    /// let service = DatabaseService::new("127.0.0.1:8080");
234    ///
235    /// // Map the response into a new response
236    /// let mut new_service = service.map_response(|record| record.name);
237    ///
238    /// // Call the new service
239    /// let id = 13;
240    /// let name = new_service
241    ///     .ready()
242    ///     .await?
243    ///     .call(id)
244    ///     .await?;
245    /// # Ok::<(), u8>(())
246    /// #    };
247    /// # }
248    /// ```
249    fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
250    where
251        Self: Sized,
252        F: FnOnce(Self::Response) -> Response + Clone,
253    {
254        MapResponse::new(self, f)
255    }
256
257    /// Maps this service's error value to a different value. This does not
258    /// alter the behaviour of the [`poll_ready`] method.
259    ///
260    /// This method can be used to change the [`Error`] type of the service
261    /// into a different type. It is similar to the [`Result::map_err`] method.
262    ///
263    /// [`Error`]: crate::Service::Error
264    /// [`poll_ready`]: crate::Service::poll_ready
265    ///
266    /// # Example
267    /// ```
268    /// # use std::task::{Poll, Context};
269    /// # use tower::{Service, ServiceExt};
270    /// #
271    /// # struct DatabaseService;
272    /// # impl DatabaseService {
273    /// #   fn new(address: &str) -> Self {
274    /// #       DatabaseService
275    /// #   }
276    /// # }
277    /// #
278    /// # struct Error {
279    /// #   pub code: u32,
280    /// #   pub message: String
281    /// # }
282    /// #
283    /// # impl Service<u32> for DatabaseService {
284    /// #   type Response = String;
285    /// #   type Error = Error;
286    /// #   type Future = futures_util::future::Ready<Result<String, Error>>;
287    /// #
288    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
289    /// #       Poll::Ready(Ok(()))
290    /// #   }
291    /// #
292    /// #   fn call(&mut self, request: u32) -> Self::Future {
293    /// #       futures_util::future::ready(Ok(String::new()))
294    /// #   }
295    /// # }
296    /// #
297    /// # fn main() {
298    /// #   async {
299    /// // A service returning Result<_, Error>
300    /// let service = DatabaseService::new("127.0.0.1:8080");
301    ///
302    /// // Map the error to a new error
303    /// let mut new_service = service.map_err(|err| err.code);
304    ///
305    /// // Call the new service
306    /// let id = 13;
307    /// let code = new_service
308    ///     .ready()
309    ///     .await?
310    ///     .call(id)
311    ///     .await
312    ///     .unwrap_err();
313    /// # Ok::<(), u32>(())
314    /// #   };
315    /// # }
316    /// ```
317    fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
318    where
319        Self: Sized,
320        F: FnOnce(Self::Error) -> Error + Clone,
321    {
322        MapErr::new(self, f)
323    }
324
325    /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
326    /// to a different value, regardless of whether the future succeeds or
327    /// fails.
328    ///
329    /// This is similar to the [`map_response`] and [`map_err`] combinators,
330    /// except that the *same* function is invoked when the service's future
331    /// completes, whether it completes successfully or fails. This function
332    /// takes the [`Result`] returned by the service's future, and returns a
333    /// [`Result`].
334    ///
335    /// Like the standard library's [`Result::and_then`], this method can be
336    /// used to implement control flow based on `Result` values. For example, it
337    /// may be used to implement error recovery, by turning some [`Err`]
338    /// responses from the service into [`Ok`] responses. Similarly, some
339    /// successful responses from the service could be rejected, by returning an
340    /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
341    /// this method can also be used to implement behaviors that must run when a
342    /// service's future completes, regardless of whether it succeeded or failed.
343    ///
344    /// This method can be used to change the [`Response`] type of the service
345    /// into a different type. It can also be used to change the [`Error`] type
346    /// of the service. However, because the [`map_result`] function is not applied
347    /// to the errors returned by the service's [`poll_ready`] method, it must
348    /// be possible to convert the service's [`Error`] type into the error type
349    /// returned by the [`map_result`] function. This is trivial when the function
350    /// returns the same error type as the service, but in other cases, it can
351    /// be useful to use [`BoxError`] to erase differing error types.
352    ///
353    /// # Examples
354    ///
355    /// Recovering from certain errors:
356    ///
357    /// ```
358    /// # use std::task::{Poll, Context};
359    /// # use tower::{Service, ServiceExt};
360    /// #
361    /// # struct DatabaseService;
362    /// # impl DatabaseService {
363    /// #   fn new(address: &str) -> Self {
364    /// #       DatabaseService
365    /// #   }
366    /// # }
367    /// #
368    /// # struct Record {
369    /// #   pub name: String,
370    /// #   pub age: u16
371    /// # }
372    /// # #[derive(Debug)]
373    /// # enum DbError {
374    /// #   Parse(std::num::ParseIntError),
375    /// #   NoRecordsFound,
376    /// # }
377    /// #
378    /// # impl Service<u32> for DatabaseService {
379    /// #   type Response = Vec<Record>;
380    /// #   type Error = DbError;
381    /// #   type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
382    /// #
383    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
384    /// #       Poll::Ready(Ok(()))
385    /// #   }
386    /// #
387    /// #   fn call(&mut self, request: u32) -> Self::Future {
388    /// #       futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
389    /// #   }
390    /// # }
391    /// #
392    /// # fn main() {
393    /// #    async {
394    /// // A service returning Result<Vec<Record>, DbError>
395    /// let service = DatabaseService::new("127.0.0.1:8080");
396    ///
397    /// // If the database returns no records for the query, we just want an empty `Vec`.
398    /// let mut new_service = service.map_result(|result| match result {
399    ///     // If the error indicates that no records matched the query, return an empty
400    ///     // `Vec` instead.
401    ///     Err(DbError::NoRecordsFound) => Ok(Vec::new()),
402    ///     // Propagate all other responses (`Ok` and `Err`) unchanged
403    ///     x => x,
404    /// });
405    ///
406    /// // Call the new service
407    /// let id = 13;
408    /// let name = new_service
409    ///     .ready()
410    ///     .await?
411    ///     .call(id)
412    ///     .await?;
413    /// # Ok::<(), DbError>(())
414    /// #    };
415    /// # }
416    /// ```
417    ///
418    /// Rejecting some `Ok` responses:
419    ///
420    /// ```
421    /// # use std::task::{Poll, Context};
422    /// # use tower::{Service, ServiceExt};
423    /// #
424    /// # struct DatabaseService;
425    /// # impl DatabaseService {
426    /// #   fn new(address: &str) -> Self {
427    /// #       DatabaseService
428    /// #   }
429    /// # }
430    /// #
431    /// # struct Record {
432    /// #   pub name: String,
433    /// #   pub age: u16
434    /// # }
435    /// # type DbError = String;
436    /// # type AppError = String;
437    /// #
438    /// # impl Service<u32> for DatabaseService {
439    /// #   type Response = Record;
440    /// #   type Error = DbError;
441    /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
442    /// #
443    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
444    /// #       Poll::Ready(Ok(()))
445    /// #   }
446    /// #
447    /// #   fn call(&mut self, request: u32) -> Self::Future {
448    /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
449    /// #   }
450    /// # }
451    /// #
452    /// # fn main() {
453    /// #    async {
454    /// use tower::BoxError;
455    ///
456    /// // A service returning Result<Record, DbError>
457    /// let service = DatabaseService::new("127.0.0.1:8080");
458    ///
459    /// // If the user is zero years old, return an error.
460    /// let mut new_service = service.map_result(|result| {
461    ///    let record = result?;
462    ///
463    ///    if record.age == 0 {
464    ///         // Users must have been born to use our app!
465    ///         let app_error = AppError::from("users cannot be 0 years old!");
466    ///
467    ///         // Box the error to erase its type (as it can be an `AppError`
468    ///         // *or* the inner service's `DbError`).
469    ///         return Err(BoxError::from(app_error));
470    ///     }
471    ///
472    ///     // Otherwise, return the record.
473    ///     Ok(record)
474    /// });
475    ///
476    /// // Call the new service
477    /// let id = 13;
478    /// let record = new_service
479    ///     .ready()
480    ///     .await?
481    ///     .call(id)
482    ///     .await?;
483    /// # Ok::<(), BoxError>(())
484    /// #    };
485    /// # }
486    /// ```
487    ///
488    /// Performing an action that must be run for both successes and failures:
489    ///
490    /// ```
491    /// # use std::convert::TryFrom;
492    /// # use std::task::{Poll, Context};
493    /// # use tower::{Service, ServiceExt};
494    /// #
495    /// # struct DatabaseService;
496    /// # impl DatabaseService {
497    /// #   fn new(address: &str) -> Self {
498    /// #       DatabaseService
499    /// #   }
500    /// # }
501    /// #
502    /// # impl Service<u32> for DatabaseService {
503    /// #   type Response = String;
504    /// #   type Error = u8;
505    /// #   type Future = futures_util::future::Ready<Result<String, u8>>;
506    /// #
507    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
508    /// #       Poll::Ready(Ok(()))
509    /// #   }
510    /// #
511    /// #   fn call(&mut self, request: u32) -> Self::Future {
512    /// #       futures_util::future::ready(Ok(String::new()))
513    /// #   }
514    /// # }
515    /// #
516    /// # fn main() {
517    /// #   async {
518    /// // A service returning Result<Record, DbError>
519    /// let service = DatabaseService::new("127.0.0.1:8080");
520    ///
521    /// // Print a message whenever a query completes.
522    /// let mut new_service = service.map_result(|result| {
523    ///     println!("query completed; success={}", result.is_ok());
524    ///     result
525    /// });
526    ///
527    /// // Call the new service
528    /// let id = 13;
529    /// let response = new_service
530    ///     .ready()
531    ///     .await?
532    ///     .call(id)
533    ///     .await;
534    /// # response
535    /// #    };
536    /// # }
537    /// ```
538    ///
539    /// [`map_response`]: ServiceExt::map_response
540    /// [`map_err`]: ServiceExt::map_err
541    /// [`map_result`]: ServiceExt::map_result
542    /// [`Error`]: crate::Service::Error
543    /// [`Response`]: crate::Service::Response
544    /// [`poll_ready`]: crate::Service::poll_ready
545    /// [`BoxError`]: crate::BoxError
546    fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
547    where
548        Self: Sized,
549        Error: From<Self::Error>,
550        F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
551    {
552        MapResult::new(self, f)
553    }
554
555    /// Composes a function *in front of* the service.
556    ///
557    /// This adapter produces a new service that passes each value through the
558    /// given function `f` before sending it to `self`.
559    ///
560    /// # Example
561    /// ```
562    /// # use std::convert::TryFrom;
563    /// # use std::task::{Poll, Context};
564    /// # use tower::{Service, ServiceExt};
565    /// #
566    /// # struct DatabaseService;
567    /// # impl DatabaseService {
568    /// #   fn new(address: &str) -> Self {
569    /// #       DatabaseService
570    /// #   }
571    /// # }
572    /// #
573    /// # impl Service<String> for DatabaseService {
574    /// #   type Response = String;
575    /// #   type Error = u8;
576    /// #   type Future = futures_util::future::Ready<Result<String, u8>>;
577    /// #
578    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
579    /// #       Poll::Ready(Ok(()))
580    /// #   }
581    /// #
582    /// #   fn call(&mut self, request: String) -> Self::Future {
583    /// #       futures_util::future::ready(Ok(String::new()))
584    /// #   }
585    /// # }
586    /// #
587    /// # fn main() {
588    /// #   async {
589    /// // A service taking a String as a request
590    /// let service = DatabaseService::new("127.0.0.1:8080");
591    ///
592    /// // Map the request to a new request
593    /// let mut new_service = service.map_request(|id: u32| id.to_string());
594    ///
595    /// // Call the new service
596    /// let id = 13;
597    /// let response = new_service
598    ///     .ready()
599    ///     .await?
600    ///     .call(id)
601    ///     .await;
602    /// # response
603    /// #    };
604    /// # }
605    /// ```
606    fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
607    where
608        Self: Sized,
609        F: FnMut(NewRequest) -> Request,
610    {
611        MapRequest::new(self, f)
612    }
613
614    /// Composes this service with a [`Filter`] that conditionally accepts or
615    /// rejects requests based on a [predicate].
616    ///
617    /// This adapter produces a new service that passes each value through the
618    /// given function `predicate` before sending it to `self`.
619    ///
620    /// # Example
621    /// ```
622    /// # use std::convert::TryFrom;
623    /// # use std::task::{Poll, Context};
624    /// # use tower::{Service, ServiceExt};
625    /// #
626    /// # struct DatabaseService;
627    /// # impl DatabaseService {
628    /// #   fn new(address: &str) -> Self {
629    /// #       DatabaseService
630    /// #   }
631    /// # }
632    /// #
633    /// # #[derive(Debug)] enum DbError {
634    /// #   Parse(std::num::ParseIntError)
635    /// # }
636    /// #
637    /// # impl std::fmt::Display for DbError {
638    /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
639    /// # }
640    /// # impl std::error::Error for DbError {}
641    /// # impl Service<u32> for DatabaseService {
642    /// #   type Response = String;
643    /// #   type Error = DbError;
644    /// #   type Future = futures_util::future::Ready<Result<String, DbError>>;
645    /// #
646    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
647    /// #       Poll::Ready(Ok(()))
648    /// #   }
649    /// #
650    /// #   fn call(&mut self, request: u32) -> Self::Future {
651    /// #       futures_util::future::ready(Ok(String::new()))
652    /// #   }
653    /// # }
654    /// #
655    /// # fn main() {
656    /// #    async {
657    /// // A service taking a u32 as a request and returning Result<_, DbError>
658    /// let service = DatabaseService::new("127.0.0.1:8080");
659    ///
660    /// // Fallibly map the request to a new request
661    /// let mut new_service = service
662    ///     .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
663    ///
664    /// // Call the new service
665    /// let id = "13";
666    /// let response = new_service
667    ///     .ready()
668    ///     .await?
669    ///     .call(id)
670    ///     .await;
671    /// # response
672    /// #    };
673    /// # }
674    /// ```
675    ///
676    /// [`Filter`]: crate::filter::Filter
677    /// [predicate]: crate::filter::Predicate
678    #[cfg(feature = "filter")]
679    #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
680    fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
681    where
682        Self: Sized,
683        F: crate::filter::Predicate<NewRequest>,
684    {
685        crate::filter::Filter::new(self, filter)
686    }
687
688    /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
689    /// rejects requests based on an [async predicate].
690    ///
691    /// This adapter produces a new service that passes each value through the
692    /// given function `predicate` before sending it to `self`.
693    ///
694    /// # Example
695    /// ```
696    /// # use std::convert::TryFrom;
697    /// # use std::task::{Poll, Context};
698    /// # use tower::{Service, ServiceExt};
699    /// #
700    /// # #[derive(Clone)] struct DatabaseService;
701    /// # impl DatabaseService {
702    /// #   fn new(address: &str) -> Self {
703    /// #       DatabaseService
704    /// #   }
705    /// # }
706    /// # #[derive(Debug)]
707    /// # enum DbError {
708    /// #   Rejected
709    /// # }
710    /// # impl std::fmt::Display for DbError {
711    /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
712    /// # }
713    /// # impl std::error::Error for DbError {}
714    /// #
715    /// # impl Service<u32> for DatabaseService {
716    /// #   type Response = String;
717    /// #   type Error = DbError;
718    /// #   type Future = futures_util::future::Ready<Result<String, DbError>>;
719    /// #
720    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
721    /// #       Poll::Ready(Ok(()))
722    /// #   }
723    /// #
724    /// #   fn call(&mut self, request: u32) -> Self::Future {
725    /// #       futures_util::future::ready(Ok(String::new()))
726    /// #   }
727    /// # }
728    /// #
729    /// # fn main() {
730    /// #    async {
731    /// // A service taking a u32 as a request and returning Result<_, DbError>
732    /// let service = DatabaseService::new("127.0.0.1:8080");
733    ///
734    /// /// Returns `true` if we should query the database for an ID.
735    /// async fn should_query(id: u32) -> bool {
736    ///     // ...
737    ///     # true
738    /// }
739    ///
740    /// // Filter requests based on `should_query`.
741    /// let mut new_service = service
742    ///     .filter_async(|id: u32| async move {
743    ///         if should_query(id).await {
744    ///             return Ok(id);
745    ///         }
746    ///
747    ///         Err(DbError::Rejected)
748    ///     });
749    ///
750    /// // Call the new service
751    /// let id = 13;
752    /// # let id: u32 = id;
753    /// let response = new_service
754    ///     .ready()
755    ///     .await?
756    ///     .call(id)
757    ///     .await;
758    /// # response
759    /// #    };
760    /// # }
761    /// ```
762    ///
763    /// [`AsyncFilter`]: crate::filter::AsyncFilter
764    /// [asynchronous predicate]: crate::filter::AsyncPredicate
765    #[cfg(feature = "filter")]
766    #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
767    fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
768    where
769        Self: Sized,
770        F: crate::filter::AsyncPredicate<NewRequest>,
771    {
772        crate::filter::AsyncFilter::new(self, filter)
773    }
774
775    /// Composes an asynchronous function *after* this service.
776    ///
777    /// This takes a function or closure returning a future, and returns a new
778    /// `Service` that chains that function after this service's [`Future`]. The
779    /// new `Service`'s future will consist of this service's future, followed
780    /// by the future returned by calling the chained function with the future's
781    /// [`Output`] type. The chained function is called regardless of whether
782    /// this service's future completes with a successful response or with an
783    /// error.
784    ///
785    /// This method can be thought of as an equivalent to the [`futures`
786    /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
787    /// _return_ futures, rather than on an individual future. Similarly to that
788    /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
789    /// error recovery, by calling some asynchronous function with errors
790    /// returned by this service. Alternatively, it may also be used to call a
791    /// fallible async function with the successful response of this service.
792    ///
793    /// This method can be used to change the [`Response`] type of the service
794    /// into a different type. It can also be used to change the [`Error`] type
795    /// of the service. However, because the `then` function is not applied
796    /// to the errors returned by the service's [`poll_ready`] method, it must
797    /// be possible to convert the service's [`Error`] type into the error type
798    /// returned by the `then` future. This is trivial when the function
799    /// returns the same error type as the service, but in other cases, it can
800    /// be useful to use [`BoxError`] to erase differing error types.
801    ///
802    /// # Examples
803    ///
804    /// ```
805    /// # use std::task::{Poll, Context};
806    /// # use tower::{Service, ServiceExt};
807    /// #
808    /// # struct DatabaseService;
809    /// # impl DatabaseService {
810    /// #   fn new(address: &str) -> Self {
811    /// #       DatabaseService
812    /// #   }
813    /// # }
814    /// #
815    /// # type Record = ();
816    /// # type DbError = ();
817    /// #
818    /// # impl Service<u32> for DatabaseService {
819    /// #   type Response = Record;
820    /// #   type Error = DbError;
821    /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
822    /// #
823    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
824    /// #       Poll::Ready(Ok(()))
825    /// #   }
826    /// #
827    /// #   fn call(&mut self, request: u32) -> Self::Future {
828    /// #       futures_util::future::ready(Ok(()))
829    /// #   }
830    /// # }
831    /// #
832    /// # fn main() {
833    /// // A service returning Result<Record, DbError>
834    /// let service = DatabaseService::new("127.0.0.1:8080");
835    ///
836    /// // An async function that attempts to recover from errors returned by the
837    /// // database.
838    /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
839    ///     // ...
840    ///     # Ok(())
841    /// }
842    /// #    async {
843    ///
844    /// // If the database service returns an error, attempt to recover by
845    /// // calling `recover_from_error`. Otherwise, return the successful response.
846    /// let mut new_service = service.then(|result| async move {
847    ///     match result {
848    ///         Ok(record) => Ok(record),
849    ///         Err(e) => recover_from_error(e).await,
850    ///     }
851    /// });
852    ///
853    /// // Call the new service
854    /// let id = 13;
855    /// let record = new_service
856    ///     .ready()
857    ///     .await?
858    ///     .call(id)
859    ///     .await?;
860    /// # Ok::<(), DbError>(())
861    /// #    };
862    /// # }
863    /// ```
864    ///
865    /// [`Future`]: crate::Service::Future
866    /// [`Output`]: std::future::Future::Output
867    /// [`futures` crate]: https://docs.rs/futures
868    /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
869    /// [`Error`]: crate::Service::Error
870    /// [`Response`]: crate::Service::Response
871    /// [`poll_ready`]: crate::Service::poll_ready
872    /// [`BoxError`]: crate::BoxError
873    fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
874    where
875        Self: Sized,
876        Error: From<Self::Error>,
877        F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
878        Fut: Future<Output = Result<Response, Error>>,
879    {
880        Then::new(self, f)
881    }
882
883    /// Composes a function that transforms futures produced by the service.
884    ///
885    /// This takes a function or closure returning a future computed from the future returned by
886    /// the service's [`call`] method, as opposed to the responses produced by the future.
887    ///
888    /// # Examples
889    ///
890    /// ```
891    /// # use std::task::{Poll, Context};
892    /// # use tower::{Service, ServiceExt, BoxError};
893    /// #
894    /// # struct DatabaseService;
895    /// # impl DatabaseService {
896    /// #   fn new(address: &str) -> Self {
897    /// #       DatabaseService
898    /// #   }
899    /// # }
900    /// #
901    /// # type Record = ();
902    /// # type DbError = crate::BoxError;
903    /// #
904    /// # impl Service<u32> for DatabaseService {
905    /// #   type Response = Record;
906    /// #   type Error = DbError;
907    /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
908    /// #
909    /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
910    /// #       Poll::Ready(Ok(()))
911    /// #   }
912    /// #
913    /// #   fn call(&mut self, request: u32) -> Self::Future {
914    /// #       futures_util::future::ready(Ok(()))
915    /// #   }
916    /// # }
917    /// #
918    /// # fn main() {
919    /// use std::time::Duration;
920    /// use tokio::time::timeout;
921    ///
922    /// // A service returning Result<Record, DbError>
923    /// let service = DatabaseService::new("127.0.0.1:8080");
924    /// #    async {
925    ///
926    /// let mut new_service = service.map_future(|future| async move {
927    ///     let res = timeout(Duration::from_secs(1), future).await?;
928    ///     Ok::<_, BoxError>(res)
929    /// });
930    ///
931    /// // Call the new service
932    /// let id = 13;
933    /// let record = new_service
934    ///     .ready()
935    ///     .await?
936    ///     .call(id)
937    ///     .await?;
938    /// # Ok::<(), BoxError>(())
939    /// #    };
940    /// # }
941    /// ```
942    ///
943    /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
944    ///
945    /// [`call`]: crate::Service::call
946    /// [`Timeout`]: crate::timeout::Timeout
947    fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
948    where
949        Self: Sized,
950        F: FnMut(Self::Future) -> Fut,
951        Error: From<Self::Error>,
952        Fut: Future<Output = Result<Response, Error>>,
953    {
954        MapFuture::new(self, f)
955    }
956
957    /// Convert the service into a [`Service`] + [`Send`] trait object.
958    ///
959    /// See [`BoxService`] for more details.
960    ///
961    /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
962    /// can be used instead, to produce a boxed service which will also
963    /// implement [`Clone`].
964    ///
965    /// # Example
966    ///
967    /// ```
968    /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
969    /// #
970    /// # struct Request;
971    /// # struct Response;
972    /// # impl Response {
973    /// #     fn new() -> Self { Self }
974    /// # }
975    ///
976    /// let service = service_fn(|req: Request| async {
977    ///     Ok::<_, BoxError>(Response::new())
978    /// });
979    ///
980    /// let service: BoxService<Request, Response, BoxError> = service
981    ///     .map_request(|req| {
982    ///         println!("received request");
983    ///         req
984    ///     })
985    ///     .map_response(|res| {
986    ///         println!("response produced");
987    ///         res
988    ///     })
989    ///     .boxed();
990    /// # let service = assert_service(service);
991    /// # fn assert_service<S, R>(svc: S) -> S
992    /// # where S: Service<R> { svc }
993    /// ```
994    ///
995    /// [`Service`]: crate::Service
996    /// [`boxed_clone`]: Self::boxed_clone
997    fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
998    where
999        Self: Sized + Send + 'static,
1000        Self::Future: Send + 'static,
1001    {
1002        BoxService::new(self)
1003    }
1004
1005    /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
1006    ///
1007    /// This is similar to the [`boxed`] method, but it requires that `Self` implement
1008    /// [`Clone`], and the returned boxed service implements [`Clone`].
1009    /// See [`BoxCloneService`] for more details.
1010    ///
1011    /// # Example
1012    ///
1013    /// ```
1014    /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1015    /// #
1016    /// # struct Request;
1017    /// # struct Response;
1018    /// # impl Response {
1019    /// #     fn new() -> Self { Self }
1020    /// # }
1021    ///
1022    /// let service = service_fn(|req: Request| async {
1023    ///     Ok::<_, BoxError>(Response::new())
1024    /// });
1025    ///
1026    /// let service: BoxCloneService<Request, Response, BoxError> = service
1027    ///     .map_request(|req| {
1028    ///         println!("received request");
1029    ///         req
1030    ///     })
1031    ///     .map_response(|res| {
1032    ///         println!("response produced");
1033    ///         res
1034    ///     })
1035    ///     .boxed_clone();
1036    ///
1037    /// // The boxed service can still be cloned.
1038    /// service.clone();
1039    /// # let service = assert_service(service);
1040    /// # fn assert_service<S, R>(svc: S) -> S
1041    /// # where S: Service<R> { svc }
1042    /// ```
1043    ///
1044    /// [`Service`]: crate::Service
1045    /// [`boxed`]: Self::boxed
1046    fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1047    where
1048        Self: Clone + Sized + Send + 'static,
1049        Self::Future: Send + 'static,
1050    {
1051        BoxCloneService::new(self)
1052    }
1053}
1054
1055impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1056
1057/// Convert an `Option<Layer>` into a [`Layer`].
1058///
1059/// ```
1060/// # use std::time::Duration;
1061/// # use tower::Service;
1062/// # use tower::builder::ServiceBuilder;
1063/// use tower::util::option_layer;
1064/// # use tower::timeout::TimeoutLayer;
1065/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1066/// # let timeout = Some(Duration::new(10, 0));
1067/// // Layer to apply a timeout if configured
1068/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1069///
1070/// ServiceBuilder::new()
1071///     .layer(maybe_timeout)
1072///     .service(svc);
1073/// # }
1074/// ```
1075///
1076/// [`Layer`]: crate::layer::Layer
1077pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1078    if let Some(layer) = layer {
1079        Either::A(layer)
1080    } else {
1081        Either::B(Identity::new())
1082    }
1083}