tower/util/mod.rs
1//! Various utility types and functions that are generally used with Tower.
2
3mod and_then;
4mod boxed;
5mod boxed_clone;
6mod call_all;
7mod either;
8
9mod future_service;
10mod map_err;
11mod map_request;
12mod map_response;
13mod map_result;
14
15mod map_future;
16mod oneshot;
17mod optional;
18mod ready;
19mod service_fn;
20mod then;
21
22#[allow(deprecated)]
23pub use self::{
24 and_then::{AndThen, AndThenLayer},
25 boxed::{BoxLayer, BoxService, UnsyncBoxService},
26 boxed_clone::BoxCloneService,
27 either::Either,
28 future_service::{future_service, FutureService},
29 map_err::{MapErr, MapErrLayer},
30 map_future::{MapFuture, MapFutureLayer},
31 map_request::{MapRequest, MapRequestLayer},
32 map_response::{MapResponse, MapResponseLayer},
33 map_result::{MapResult, MapResultLayer},
34 oneshot::Oneshot,
35 optional::Optional,
36 ready::{Ready, ReadyAnd, ReadyOneshot},
37 service_fn::{service_fn, ServiceFn},
38 then::{Then, ThenLayer},
39};
40
41pub use self::call_all::{CallAll, CallAllUnordered};
42use std::future::Future;
43
44use crate::layer::util::Identity;
45
46pub mod error {
47 //! Error types
48
49 pub use super::optional::error as optional;
50}
51
52pub mod future {
53 //! Future types
54
55 pub use super::and_then::AndThenFuture;
56 pub use super::map_err::MapErrFuture;
57 pub use super::map_response::MapResponseFuture;
58 pub use super::map_result::MapResultFuture;
59 pub use super::optional::future as optional;
60 pub use super::then::ThenFuture;
61}
62
63/// An extension trait for `Service`s that provides a variety of convenient
64/// adapters
65pub trait ServiceExt<Request>: tower_service::Service<Request> {
66 /// Yields a mutable reference to the service when it is ready to accept a request.
67 fn ready(&mut self) -> Ready<'_, Self, Request>
68 where
69 Self: Sized,
70 {
71 Ready::new(self)
72 }
73
74 /// Yields a mutable reference to the service when it is ready to accept a request.
75 #[deprecated(
76 since = "0.4.6",
77 note = "please use the `ServiceExt::ready` method instead"
78 )]
79 #[allow(deprecated)]
80 fn ready_and(&mut self) -> ReadyAnd<'_, Self, Request>
81 where
82 Self: Sized,
83 {
84 ReadyAnd::new(self)
85 }
86
87 /// Yields the service when it is ready to accept a request.
88 fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
89 where
90 Self: Sized,
91 {
92 ReadyOneshot::new(self)
93 }
94
95 /// Consume this `Service`, calling with the providing request once it is ready.
96 fn oneshot(self, req: Request) -> Oneshot<Self, Request>
97 where
98 Self: Sized,
99 {
100 Oneshot::new(self, req)
101 }
102
103 /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
104 ///
105 /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
106 /// Response>`][stream]. See the documentation for [`CallAll`] for
107 /// details.
108 ///
109 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
110 /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
111 fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
112 where
113 Self: Sized,
114 Self::Error: Into<crate::BoxError>,
115 S: futures_core::Stream<Item = Request>,
116 {
117 CallAll::new(self, reqs)
118 }
119
120 /// Executes a new future after this service's future resolves. This does
121 /// not alter the behaviour of the [`poll_ready`] method.
122 ///
123 /// This method can be used to change the [`Response`] type of the service
124 /// into a different type. You can use this method to chain along a computation once the
125 /// service's response has been resolved.
126 ///
127 /// [`Response`]: crate::Service::Response
128 /// [`poll_ready`]: crate::Service::poll_ready
129 ///
130 /// # Example
131 /// ```
132 /// # use std::task::{Poll, Context};
133 /// # use tower::{Service, ServiceExt};
134 /// #
135 /// # struct DatabaseService;
136 /// # impl DatabaseService {
137 /// # fn new(address: &str) -> Self {
138 /// # DatabaseService
139 /// # }
140 /// # }
141 /// #
142 /// # struct Record {
143 /// # pub name: String,
144 /// # pub age: u16
145 /// # }
146 /// #
147 /// # impl Service<u32> for DatabaseService {
148 /// # type Response = Record;
149 /// # type Error = u8;
150 /// # type Future = futures_util::future::Ready<Result<Record, u8>>;
151 /// #
152 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153 /// # Poll::Ready(Ok(()))
154 /// # }
155 /// #
156 /// # fn call(&mut self, request: u32) -> Self::Future {
157 /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
158 /// # }
159 /// # }
160 /// #
161 /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
162 /// #
163 /// # fn main() {
164 /// # async {
165 /// // A service returning Result<Record, _>
166 /// let service = DatabaseService::new("127.0.0.1:8080");
167 ///
168 /// // Map the response into a new response
169 /// let mut new_service = service.and_then(|record: Record| async move {
170 /// let name = record.name;
171 /// avatar_lookup(name).await
172 /// });
173 ///
174 /// // Call the new service
175 /// let id = 13;
176 /// let avatar = new_service.call(id).await.unwrap();
177 /// # };
178 /// # }
179 /// ```
180 fn and_then<F>(self, f: F) -> AndThen<Self, F>
181 where
182 Self: Sized,
183 F: Clone,
184 {
185 AndThen::new(self, f)
186 }
187
188 /// Maps this service's response value to a different value. This does not
189 /// alter the behaviour of the [`poll_ready`] method.
190 ///
191 /// This method can be used to change the [`Response`] type of the service
192 /// into a different type. It is similar to the [`Result::map`]
193 /// method. You can use this method to chain along a computation once the
194 /// service's response has been resolved.
195 ///
196 /// [`Response`]: crate::Service::Response
197 /// [`poll_ready`]: crate::Service::poll_ready
198 ///
199 /// # Example
200 /// ```
201 /// # use std::task::{Poll, Context};
202 /// # use tower::{Service, ServiceExt};
203 /// #
204 /// # struct DatabaseService;
205 /// # impl DatabaseService {
206 /// # fn new(address: &str) -> Self {
207 /// # DatabaseService
208 /// # }
209 /// # }
210 /// #
211 /// # struct Record {
212 /// # pub name: String,
213 /// # pub age: u16
214 /// # }
215 /// #
216 /// # impl Service<u32> for DatabaseService {
217 /// # type Response = Record;
218 /// # type Error = u8;
219 /// # type Future = futures_util::future::Ready<Result<Record, u8>>;
220 /// #
221 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222 /// # Poll::Ready(Ok(()))
223 /// # }
224 /// #
225 /// # fn call(&mut self, request: u32) -> Self::Future {
226 /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
227 /// # }
228 /// # }
229 /// #
230 /// # fn main() {
231 /// # async {
232 /// // A service returning Result<Record, _>
233 /// let service = DatabaseService::new("127.0.0.1:8080");
234 ///
235 /// // Map the response into a new response
236 /// let mut new_service = service.map_response(|record| record.name);
237 ///
238 /// // Call the new service
239 /// let id = 13;
240 /// let name = new_service
241 /// .ready()
242 /// .await?
243 /// .call(id)
244 /// .await?;
245 /// # Ok::<(), u8>(())
246 /// # };
247 /// # }
248 /// ```
249 fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
250 where
251 Self: Sized,
252 F: FnOnce(Self::Response) -> Response + Clone,
253 {
254 MapResponse::new(self, f)
255 }
256
257 /// Maps this service's error value to a different value. This does not
258 /// alter the behaviour of the [`poll_ready`] method.
259 ///
260 /// This method can be used to change the [`Error`] type of the service
261 /// into a different type. It is similar to the [`Result::map_err`] method.
262 ///
263 /// [`Error`]: crate::Service::Error
264 /// [`poll_ready`]: crate::Service::poll_ready
265 ///
266 /// # Example
267 /// ```
268 /// # use std::task::{Poll, Context};
269 /// # use tower::{Service, ServiceExt};
270 /// #
271 /// # struct DatabaseService;
272 /// # impl DatabaseService {
273 /// # fn new(address: &str) -> Self {
274 /// # DatabaseService
275 /// # }
276 /// # }
277 /// #
278 /// # struct Error {
279 /// # pub code: u32,
280 /// # pub message: String
281 /// # }
282 /// #
283 /// # impl Service<u32> for DatabaseService {
284 /// # type Response = String;
285 /// # type Error = Error;
286 /// # type Future = futures_util::future::Ready<Result<String, Error>>;
287 /// #
288 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
289 /// # Poll::Ready(Ok(()))
290 /// # }
291 /// #
292 /// # fn call(&mut self, request: u32) -> Self::Future {
293 /// # futures_util::future::ready(Ok(String::new()))
294 /// # }
295 /// # }
296 /// #
297 /// # fn main() {
298 /// # async {
299 /// // A service returning Result<_, Error>
300 /// let service = DatabaseService::new("127.0.0.1:8080");
301 ///
302 /// // Map the error to a new error
303 /// let mut new_service = service.map_err(|err| err.code);
304 ///
305 /// // Call the new service
306 /// let id = 13;
307 /// let code = new_service
308 /// .ready()
309 /// .await?
310 /// .call(id)
311 /// .await
312 /// .unwrap_err();
313 /// # Ok::<(), u32>(())
314 /// # };
315 /// # }
316 /// ```
317 fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
318 where
319 Self: Sized,
320 F: FnOnce(Self::Error) -> Error + Clone,
321 {
322 MapErr::new(self, f)
323 }
324
325 /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
326 /// to a different value, regardless of whether the future succeeds or
327 /// fails.
328 ///
329 /// This is similar to the [`map_response`] and [`map_err`] combinators,
330 /// except that the *same* function is invoked when the service's future
331 /// completes, whether it completes successfully or fails. This function
332 /// takes the [`Result`] returned by the service's future, and returns a
333 /// [`Result`].
334 ///
335 /// Like the standard library's [`Result::and_then`], this method can be
336 /// used to implement control flow based on `Result` values. For example, it
337 /// may be used to implement error recovery, by turning some [`Err`]
338 /// responses from the service into [`Ok`] responses. Similarly, some
339 /// successful responses from the service could be rejected, by returning an
340 /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
341 /// this method can also be used to implement behaviors that must run when a
342 /// service's future completes, regardless of whether it succeeded or failed.
343 ///
344 /// This method can be used to change the [`Response`] type of the service
345 /// into a different type. It can also be used to change the [`Error`] type
346 /// of the service. However, because the [`map_result`] function is not applied
347 /// to the errors returned by the service's [`poll_ready`] method, it must
348 /// be possible to convert the service's [`Error`] type into the error type
349 /// returned by the [`map_result`] function. This is trivial when the function
350 /// returns the same error type as the service, but in other cases, it can
351 /// be useful to use [`BoxError`] to erase differing error types.
352 ///
353 /// # Examples
354 ///
355 /// Recovering from certain errors:
356 ///
357 /// ```
358 /// # use std::task::{Poll, Context};
359 /// # use tower::{Service, ServiceExt};
360 /// #
361 /// # struct DatabaseService;
362 /// # impl DatabaseService {
363 /// # fn new(address: &str) -> Self {
364 /// # DatabaseService
365 /// # }
366 /// # }
367 /// #
368 /// # struct Record {
369 /// # pub name: String,
370 /// # pub age: u16
371 /// # }
372 /// # #[derive(Debug)]
373 /// # enum DbError {
374 /// # Parse(std::num::ParseIntError),
375 /// # NoRecordsFound,
376 /// # }
377 /// #
378 /// # impl Service<u32> for DatabaseService {
379 /// # type Response = Vec<Record>;
380 /// # type Error = DbError;
381 /// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
382 /// #
383 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
384 /// # Poll::Ready(Ok(()))
385 /// # }
386 /// #
387 /// # fn call(&mut self, request: u32) -> Self::Future {
388 /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
389 /// # }
390 /// # }
391 /// #
392 /// # fn main() {
393 /// # async {
394 /// // A service returning Result<Vec<Record>, DbError>
395 /// let service = DatabaseService::new("127.0.0.1:8080");
396 ///
397 /// // If the database returns no records for the query, we just want an empty `Vec`.
398 /// let mut new_service = service.map_result(|result| match result {
399 /// // If the error indicates that no records matched the query, return an empty
400 /// // `Vec` instead.
401 /// Err(DbError::NoRecordsFound) => Ok(Vec::new()),
402 /// // Propagate all other responses (`Ok` and `Err`) unchanged
403 /// x => x,
404 /// });
405 ///
406 /// // Call the new service
407 /// let id = 13;
408 /// let name = new_service
409 /// .ready()
410 /// .await?
411 /// .call(id)
412 /// .await?;
413 /// # Ok::<(), DbError>(())
414 /// # };
415 /// # }
416 /// ```
417 ///
418 /// Rejecting some `Ok` responses:
419 ///
420 /// ```
421 /// # use std::task::{Poll, Context};
422 /// # use tower::{Service, ServiceExt};
423 /// #
424 /// # struct DatabaseService;
425 /// # impl DatabaseService {
426 /// # fn new(address: &str) -> Self {
427 /// # DatabaseService
428 /// # }
429 /// # }
430 /// #
431 /// # struct Record {
432 /// # pub name: String,
433 /// # pub age: u16
434 /// # }
435 /// # type DbError = String;
436 /// # type AppError = String;
437 /// #
438 /// # impl Service<u32> for DatabaseService {
439 /// # type Response = Record;
440 /// # type Error = DbError;
441 /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
442 /// #
443 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
444 /// # Poll::Ready(Ok(()))
445 /// # }
446 /// #
447 /// # fn call(&mut self, request: u32) -> Self::Future {
448 /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
449 /// # }
450 /// # }
451 /// #
452 /// # fn main() {
453 /// # async {
454 /// use tower::BoxError;
455 ///
456 /// // A service returning Result<Record, DbError>
457 /// let service = DatabaseService::new("127.0.0.1:8080");
458 ///
459 /// // If the user is zero years old, return an error.
460 /// let mut new_service = service.map_result(|result| {
461 /// let record = result?;
462 ///
463 /// if record.age == 0 {
464 /// // Users must have been born to use our app!
465 /// let app_error = AppError::from("users cannot be 0 years old!");
466 ///
467 /// // Box the error to erase its type (as it can be an `AppError`
468 /// // *or* the inner service's `DbError`).
469 /// return Err(BoxError::from(app_error));
470 /// }
471 ///
472 /// // Otherwise, return the record.
473 /// Ok(record)
474 /// });
475 ///
476 /// // Call the new service
477 /// let id = 13;
478 /// let record = new_service
479 /// .ready()
480 /// .await?
481 /// .call(id)
482 /// .await?;
483 /// # Ok::<(), BoxError>(())
484 /// # };
485 /// # }
486 /// ```
487 ///
488 /// Performing an action that must be run for both successes and failures:
489 ///
490 /// ```
491 /// # use std::convert::TryFrom;
492 /// # use std::task::{Poll, Context};
493 /// # use tower::{Service, ServiceExt};
494 /// #
495 /// # struct DatabaseService;
496 /// # impl DatabaseService {
497 /// # fn new(address: &str) -> Self {
498 /// # DatabaseService
499 /// # }
500 /// # }
501 /// #
502 /// # impl Service<u32> for DatabaseService {
503 /// # type Response = String;
504 /// # type Error = u8;
505 /// # type Future = futures_util::future::Ready<Result<String, u8>>;
506 /// #
507 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
508 /// # Poll::Ready(Ok(()))
509 /// # }
510 /// #
511 /// # fn call(&mut self, request: u32) -> Self::Future {
512 /// # futures_util::future::ready(Ok(String::new()))
513 /// # }
514 /// # }
515 /// #
516 /// # fn main() {
517 /// # async {
518 /// // A service returning Result<Record, DbError>
519 /// let service = DatabaseService::new("127.0.0.1:8080");
520 ///
521 /// // Print a message whenever a query completes.
522 /// let mut new_service = service.map_result(|result| {
523 /// println!("query completed; success={}", result.is_ok());
524 /// result
525 /// });
526 ///
527 /// // Call the new service
528 /// let id = 13;
529 /// let response = new_service
530 /// .ready()
531 /// .await?
532 /// .call(id)
533 /// .await;
534 /// # response
535 /// # };
536 /// # }
537 /// ```
538 ///
539 /// [`map_response`]: ServiceExt::map_response
540 /// [`map_err`]: ServiceExt::map_err
541 /// [`map_result`]: ServiceExt::map_result
542 /// [`Error`]: crate::Service::Error
543 /// [`Response`]: crate::Service::Response
544 /// [`poll_ready`]: crate::Service::poll_ready
545 /// [`BoxError`]: crate::BoxError
546 fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
547 where
548 Self: Sized,
549 Error: From<Self::Error>,
550 F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
551 {
552 MapResult::new(self, f)
553 }
554
555 /// Composes a function *in front of* the service.
556 ///
557 /// This adapter produces a new service that passes each value through the
558 /// given function `f` before sending it to `self`.
559 ///
560 /// # Example
561 /// ```
562 /// # use std::convert::TryFrom;
563 /// # use std::task::{Poll, Context};
564 /// # use tower::{Service, ServiceExt};
565 /// #
566 /// # struct DatabaseService;
567 /// # impl DatabaseService {
568 /// # fn new(address: &str) -> Self {
569 /// # DatabaseService
570 /// # }
571 /// # }
572 /// #
573 /// # impl Service<String> for DatabaseService {
574 /// # type Response = String;
575 /// # type Error = u8;
576 /// # type Future = futures_util::future::Ready<Result<String, u8>>;
577 /// #
578 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
579 /// # Poll::Ready(Ok(()))
580 /// # }
581 /// #
582 /// # fn call(&mut self, request: String) -> Self::Future {
583 /// # futures_util::future::ready(Ok(String::new()))
584 /// # }
585 /// # }
586 /// #
587 /// # fn main() {
588 /// # async {
589 /// // A service taking a String as a request
590 /// let service = DatabaseService::new("127.0.0.1:8080");
591 ///
592 /// // Map the request to a new request
593 /// let mut new_service = service.map_request(|id: u32| id.to_string());
594 ///
595 /// // Call the new service
596 /// let id = 13;
597 /// let response = new_service
598 /// .ready()
599 /// .await?
600 /// .call(id)
601 /// .await;
602 /// # response
603 /// # };
604 /// # }
605 /// ```
606 fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
607 where
608 Self: Sized,
609 F: FnMut(NewRequest) -> Request,
610 {
611 MapRequest::new(self, f)
612 }
613
614 /// Composes this service with a [`Filter`] that conditionally accepts or
615 /// rejects requests based on a [predicate].
616 ///
617 /// This adapter produces a new service that passes each value through the
618 /// given function `predicate` before sending it to `self`.
619 ///
620 /// # Example
621 /// ```
622 /// # use std::convert::TryFrom;
623 /// # use std::task::{Poll, Context};
624 /// # use tower::{Service, ServiceExt};
625 /// #
626 /// # struct DatabaseService;
627 /// # impl DatabaseService {
628 /// # fn new(address: &str) -> Self {
629 /// # DatabaseService
630 /// # }
631 /// # }
632 /// #
633 /// # #[derive(Debug)] enum DbError {
634 /// # Parse(std::num::ParseIntError)
635 /// # }
636 /// #
637 /// # impl std::fmt::Display for DbError {
638 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
639 /// # }
640 /// # impl std::error::Error for DbError {}
641 /// # impl Service<u32> for DatabaseService {
642 /// # type Response = String;
643 /// # type Error = DbError;
644 /// # type Future = futures_util::future::Ready<Result<String, DbError>>;
645 /// #
646 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
647 /// # Poll::Ready(Ok(()))
648 /// # }
649 /// #
650 /// # fn call(&mut self, request: u32) -> Self::Future {
651 /// # futures_util::future::ready(Ok(String::new()))
652 /// # }
653 /// # }
654 /// #
655 /// # fn main() {
656 /// # async {
657 /// // A service taking a u32 as a request and returning Result<_, DbError>
658 /// let service = DatabaseService::new("127.0.0.1:8080");
659 ///
660 /// // Fallibly map the request to a new request
661 /// let mut new_service = service
662 /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
663 ///
664 /// // Call the new service
665 /// let id = "13";
666 /// let response = new_service
667 /// .ready()
668 /// .await?
669 /// .call(id)
670 /// .await;
671 /// # response
672 /// # };
673 /// # }
674 /// ```
675 ///
676 /// [`Filter`]: crate::filter::Filter
677 /// [predicate]: crate::filter::Predicate
678 #[cfg(feature = "filter")]
679 #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
680 fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
681 where
682 Self: Sized,
683 F: crate::filter::Predicate<NewRequest>,
684 {
685 crate::filter::Filter::new(self, filter)
686 }
687
688 /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
689 /// rejects requests based on an [async predicate].
690 ///
691 /// This adapter produces a new service that passes each value through the
692 /// given function `predicate` before sending it to `self`.
693 ///
694 /// # Example
695 /// ```
696 /// # use std::convert::TryFrom;
697 /// # use std::task::{Poll, Context};
698 /// # use tower::{Service, ServiceExt};
699 /// #
700 /// # #[derive(Clone)] struct DatabaseService;
701 /// # impl DatabaseService {
702 /// # fn new(address: &str) -> Self {
703 /// # DatabaseService
704 /// # }
705 /// # }
706 /// # #[derive(Debug)]
707 /// # enum DbError {
708 /// # Rejected
709 /// # }
710 /// # impl std::fmt::Display for DbError {
711 /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
712 /// # }
713 /// # impl std::error::Error for DbError {}
714 /// #
715 /// # impl Service<u32> for DatabaseService {
716 /// # type Response = String;
717 /// # type Error = DbError;
718 /// # type Future = futures_util::future::Ready<Result<String, DbError>>;
719 /// #
720 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
721 /// # Poll::Ready(Ok(()))
722 /// # }
723 /// #
724 /// # fn call(&mut self, request: u32) -> Self::Future {
725 /// # futures_util::future::ready(Ok(String::new()))
726 /// # }
727 /// # }
728 /// #
729 /// # fn main() {
730 /// # async {
731 /// // A service taking a u32 as a request and returning Result<_, DbError>
732 /// let service = DatabaseService::new("127.0.0.1:8080");
733 ///
734 /// /// Returns `true` if we should query the database for an ID.
735 /// async fn should_query(id: u32) -> bool {
736 /// // ...
737 /// # true
738 /// }
739 ///
740 /// // Filter requests based on `should_query`.
741 /// let mut new_service = service
742 /// .filter_async(|id: u32| async move {
743 /// if should_query(id).await {
744 /// return Ok(id);
745 /// }
746 ///
747 /// Err(DbError::Rejected)
748 /// });
749 ///
750 /// // Call the new service
751 /// let id = 13;
752 /// # let id: u32 = id;
753 /// let response = new_service
754 /// .ready()
755 /// .await?
756 /// .call(id)
757 /// .await;
758 /// # response
759 /// # };
760 /// # }
761 /// ```
762 ///
763 /// [`AsyncFilter`]: crate::filter::AsyncFilter
764 /// [asynchronous predicate]: crate::filter::AsyncPredicate
765 #[cfg(feature = "filter")]
766 #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
767 fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
768 where
769 Self: Sized,
770 F: crate::filter::AsyncPredicate<NewRequest>,
771 {
772 crate::filter::AsyncFilter::new(self, filter)
773 }
774
775 /// Composes an asynchronous function *after* this service.
776 ///
777 /// This takes a function or closure returning a future, and returns a new
778 /// `Service` that chains that function after this service's [`Future`]. The
779 /// new `Service`'s future will consist of this service's future, followed
780 /// by the future returned by calling the chained function with the future's
781 /// [`Output`] type. The chained function is called regardless of whether
782 /// this service's future completes with a successful response or with an
783 /// error.
784 ///
785 /// This method can be thought of as an equivalent to the [`futures`
786 /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
787 /// _return_ futures, rather than on an individual future. Similarly to that
788 /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
789 /// error recovery, by calling some asynchronous function with errors
790 /// returned by this service. Alternatively, it may also be used to call a
791 /// fallible async function with the successful response of this service.
792 ///
793 /// This method can be used to change the [`Response`] type of the service
794 /// into a different type. It can also be used to change the [`Error`] type
795 /// of the service. However, because the `then` function is not applied
796 /// to the errors returned by the service's [`poll_ready`] method, it must
797 /// be possible to convert the service's [`Error`] type into the error type
798 /// returned by the `then` future. This is trivial when the function
799 /// returns the same error type as the service, but in other cases, it can
800 /// be useful to use [`BoxError`] to erase differing error types.
801 ///
802 /// # Examples
803 ///
804 /// ```
805 /// # use std::task::{Poll, Context};
806 /// # use tower::{Service, ServiceExt};
807 /// #
808 /// # struct DatabaseService;
809 /// # impl DatabaseService {
810 /// # fn new(address: &str) -> Self {
811 /// # DatabaseService
812 /// # }
813 /// # }
814 /// #
815 /// # type Record = ();
816 /// # type DbError = ();
817 /// #
818 /// # impl Service<u32> for DatabaseService {
819 /// # type Response = Record;
820 /// # type Error = DbError;
821 /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
822 /// #
823 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
824 /// # Poll::Ready(Ok(()))
825 /// # }
826 /// #
827 /// # fn call(&mut self, request: u32) -> Self::Future {
828 /// # futures_util::future::ready(Ok(()))
829 /// # }
830 /// # }
831 /// #
832 /// # fn main() {
833 /// // A service returning Result<Record, DbError>
834 /// let service = DatabaseService::new("127.0.0.1:8080");
835 ///
836 /// // An async function that attempts to recover from errors returned by the
837 /// // database.
838 /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
839 /// // ...
840 /// # Ok(())
841 /// }
842 /// # async {
843 ///
844 /// // If the database service returns an error, attempt to recover by
845 /// // calling `recover_from_error`. Otherwise, return the successful response.
846 /// let mut new_service = service.then(|result| async move {
847 /// match result {
848 /// Ok(record) => Ok(record),
849 /// Err(e) => recover_from_error(e).await,
850 /// }
851 /// });
852 ///
853 /// // Call the new service
854 /// let id = 13;
855 /// let record = new_service
856 /// .ready()
857 /// .await?
858 /// .call(id)
859 /// .await?;
860 /// # Ok::<(), DbError>(())
861 /// # };
862 /// # }
863 /// ```
864 ///
865 /// [`Future`]: crate::Service::Future
866 /// [`Output`]: std::future::Future::Output
867 /// [`futures` crate]: https://docs.rs/futures
868 /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
869 /// [`Error`]: crate::Service::Error
870 /// [`Response`]: crate::Service::Response
871 /// [`poll_ready`]: crate::Service::poll_ready
872 /// [`BoxError`]: crate::BoxError
873 fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
874 where
875 Self: Sized,
876 Error: From<Self::Error>,
877 F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
878 Fut: Future<Output = Result<Response, Error>>,
879 {
880 Then::new(self, f)
881 }
882
883 /// Composes a function that transforms futures produced by the service.
884 ///
885 /// This takes a function or closure returning a future computed from the future returned by
886 /// the service's [`call`] method, as opposed to the responses produced by the future.
887 ///
888 /// # Examples
889 ///
890 /// ```
891 /// # use std::task::{Poll, Context};
892 /// # use tower::{Service, ServiceExt, BoxError};
893 /// #
894 /// # struct DatabaseService;
895 /// # impl DatabaseService {
896 /// # fn new(address: &str) -> Self {
897 /// # DatabaseService
898 /// # }
899 /// # }
900 /// #
901 /// # type Record = ();
902 /// # type DbError = crate::BoxError;
903 /// #
904 /// # impl Service<u32> for DatabaseService {
905 /// # type Response = Record;
906 /// # type Error = DbError;
907 /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
908 /// #
909 /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
910 /// # Poll::Ready(Ok(()))
911 /// # }
912 /// #
913 /// # fn call(&mut self, request: u32) -> Self::Future {
914 /// # futures_util::future::ready(Ok(()))
915 /// # }
916 /// # }
917 /// #
918 /// # fn main() {
919 /// use std::time::Duration;
920 /// use tokio::time::timeout;
921 ///
922 /// // A service returning Result<Record, DbError>
923 /// let service = DatabaseService::new("127.0.0.1:8080");
924 /// # async {
925 ///
926 /// let mut new_service = service.map_future(|future| async move {
927 /// let res = timeout(Duration::from_secs(1), future).await?;
928 /// Ok::<_, BoxError>(res)
929 /// });
930 ///
931 /// // Call the new service
932 /// let id = 13;
933 /// let record = new_service
934 /// .ready()
935 /// .await?
936 /// .call(id)
937 /// .await?;
938 /// # Ok::<(), BoxError>(())
939 /// # };
940 /// # }
941 /// ```
942 ///
943 /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
944 ///
945 /// [`call`]: crate::Service::call
946 /// [`Timeout`]: crate::timeout::Timeout
947 fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
948 where
949 Self: Sized,
950 F: FnMut(Self::Future) -> Fut,
951 Error: From<Self::Error>,
952 Fut: Future<Output = Result<Response, Error>>,
953 {
954 MapFuture::new(self, f)
955 }
956
957 /// Convert the service into a [`Service`] + [`Send`] trait object.
958 ///
959 /// See [`BoxService`] for more details.
960 ///
961 /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
962 /// can be used instead, to produce a boxed service which will also
963 /// implement [`Clone`].
964 ///
965 /// # Example
966 ///
967 /// ```
968 /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
969 /// #
970 /// # struct Request;
971 /// # struct Response;
972 /// # impl Response {
973 /// # fn new() -> Self { Self }
974 /// # }
975 ///
976 /// let service = service_fn(|req: Request| async {
977 /// Ok::<_, BoxError>(Response::new())
978 /// });
979 ///
980 /// let service: BoxService<Request, Response, BoxError> = service
981 /// .map_request(|req| {
982 /// println!("received request");
983 /// req
984 /// })
985 /// .map_response(|res| {
986 /// println!("response produced");
987 /// res
988 /// })
989 /// .boxed();
990 /// # let service = assert_service(service);
991 /// # fn assert_service<S, R>(svc: S) -> S
992 /// # where S: Service<R> { svc }
993 /// ```
994 ///
995 /// [`Service`]: crate::Service
996 /// [`boxed_clone`]: Self::boxed_clone
997 fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
998 where
999 Self: Sized + Send + 'static,
1000 Self::Future: Send + 'static,
1001 {
1002 BoxService::new(self)
1003 }
1004
1005 /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
1006 ///
1007 /// This is similar to the [`boxed`] method, but it requires that `Self` implement
1008 /// [`Clone`], and the returned boxed service implements [`Clone`].
1009 /// See [`BoxCloneService`] for more details.
1010 ///
1011 /// # Example
1012 ///
1013 /// ```
1014 /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1015 /// #
1016 /// # struct Request;
1017 /// # struct Response;
1018 /// # impl Response {
1019 /// # fn new() -> Self { Self }
1020 /// # }
1021 ///
1022 /// let service = service_fn(|req: Request| async {
1023 /// Ok::<_, BoxError>(Response::new())
1024 /// });
1025 ///
1026 /// let service: BoxCloneService<Request, Response, BoxError> = service
1027 /// .map_request(|req| {
1028 /// println!("received request");
1029 /// req
1030 /// })
1031 /// .map_response(|res| {
1032 /// println!("response produced");
1033 /// res
1034 /// })
1035 /// .boxed_clone();
1036 ///
1037 /// // The boxed service can still be cloned.
1038 /// service.clone();
1039 /// # let service = assert_service(service);
1040 /// # fn assert_service<S, R>(svc: S) -> S
1041 /// # where S: Service<R> { svc }
1042 /// ```
1043 ///
1044 /// [`Service`]: crate::Service
1045 /// [`boxed`]: Self::boxed
1046 fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1047 where
1048 Self: Clone + Sized + Send + 'static,
1049 Self::Future: Send + 'static,
1050 {
1051 BoxCloneService::new(self)
1052 }
1053}
1054
1055impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1056
1057/// Convert an `Option<Layer>` into a [`Layer`].
1058///
1059/// ```
1060/// # use std::time::Duration;
1061/// # use tower::Service;
1062/// # use tower::builder::ServiceBuilder;
1063/// use tower::util::option_layer;
1064/// # use tower::timeout::TimeoutLayer;
1065/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1066/// # let timeout = Some(Duration::new(10, 0));
1067/// // Layer to apply a timeout if configured
1068/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1069///
1070/// ServiceBuilder::new()
1071/// .layer(maybe_timeout)
1072/// .service(svc);
1073/// # }
1074/// ```
1075///
1076/// [`Layer`]: crate::layer::Layer
1077pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1078 if let Some(layer) = layer {
1079 Either::A(layer)
1080 } else {
1081 Either::B(Identity::new())
1082 }
1083}