hyper_timeout/
stream.rs

1use std::io;
2use std::io::IoSlice;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use hyper::client::connect::{Connected, Connection};
8use pin_project_lite::pin_project;
9use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
10use tokio_io_timeout::TimeoutStream;
11
12pin_project! {
13    /// A timeout stream that implements required traits to be a Connector
14    #[derive(Debug)]
15    pub struct TimeoutConnectorStream<S> {
16        #[pin]
17        stream: TimeoutStream<S>
18    }
19}
20
21impl<S> TimeoutConnectorStream<S>
22where
23    S: AsyncRead + AsyncWrite + Unpin,
24{
25    /// Returns a new `TimeoutConnectorStream` wrapping the specified stream.
26    ///
27    /// There is initially no read or write timeout.
28    pub fn new(stream: TimeoutStream<S>) -> TimeoutConnectorStream<S> {
29        TimeoutConnectorStream { stream }
30    }
31
32    /// Returns the current read timeout.
33    pub fn read_timeout(&self) -> Option<Duration> {
34        self.stream.read_timeout()
35    }
36
37    /// Sets the read timeout.
38    ///
39    /// This can only be used before the stream is pinned; use
40    /// [`set_read_timeout_pinned`](Self::set_read_timeout_pinned) otherwise.
41    pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
42        self.stream.set_read_timeout(timeout)
43    }
44
45    /// Sets the read timeout.
46    ///
47    /// This will reset any pending read timeout. Use
48    /// [`set_read_timeout`](Self::set_read_timeout) instead if the stream has not yet been pinned.
49    pub fn set_read_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) {
50        self.project()
51            .stream
52            .as_mut()
53            .set_read_timeout_pinned(timeout)
54    }
55
56    /// Returns the current write timeout.
57    pub fn write_timeout(&self) -> Option<Duration> {
58        self.stream.write_timeout()
59    }
60
61    /// Sets the write timeout.
62    ///
63    /// This can only be used before the stream is pinned; use
64    /// [`set_write_timeout_pinned`](Self::set_write_timeout_pinned) otherwise.
65    pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
66        self.stream.set_write_timeout(timeout)
67    }
68
69    /// Sets the write timeout.
70    ///
71    /// This will reset any pending write timeout. Use
72    /// [`set_write_timeout`](Self::set_write_timeout) instead if the stream has not yet been
73    /// pinned.
74    pub fn set_write_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) {
75        self.project()
76            .stream
77            .as_mut()
78            .set_write_timeout_pinned(timeout)
79    }
80
81    /// Returns a shared reference to the inner stream.
82    pub fn get_ref(&self) -> &S {
83        self.stream.get_ref()
84    }
85
86    /// Returns a mutable reference to the inner stream.
87    pub fn get_mut(&mut self) -> &mut S {
88        self.stream.get_mut()
89    }
90
91    /// Returns a pinned mutable reference to the inner stream.
92    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
93        self.project().stream.get_pin_mut()
94    }
95
96    /// Consumes the stream, returning the inner stream.
97    pub fn into_inner(self) -> S {
98        self.stream.into_inner()
99    }
100}
101
102impl<S> AsyncRead for TimeoutConnectorStream<S>
103where
104    S: AsyncRead + AsyncWrite + Unpin,
105{
106    fn poll_read(
107        self: Pin<&mut Self>,
108        cx: &mut Context,
109        buf: &mut ReadBuf,
110    ) -> Poll<Result<(), io::Error>> {
111        self.project().stream.poll_read(cx, buf)
112    }
113}
114
115impl<S> AsyncWrite for TimeoutConnectorStream<S>
116where
117    S: AsyncRead + AsyncWrite + Unpin,
118{
119    fn poll_write(
120        self: Pin<&mut Self>,
121        cx: &mut Context<'_>,
122        buf: &[u8],
123    ) -> Poll<Result<usize, io::Error>> {
124        self.project().stream.poll_write(cx, buf)
125    }
126
127    fn poll_write_vectored(
128        self: Pin<&mut Self>,
129        cx: &mut Context<'_>,
130        bufs: &[IoSlice<'_>],
131    ) -> Poll<Result<usize, io::Error>> {
132        self.project().stream.poll_write_vectored(cx, bufs)
133    }
134
135    fn is_write_vectored(&self) -> bool {
136        self.stream.is_write_vectored()
137    }
138
139    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
140        self.project().stream.poll_flush(cx)
141    }
142
143    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
144        self.project().stream.poll_shutdown(cx)
145    }
146}
147
148impl<S> Connection for TimeoutConnectorStream<S>
149where
150    S: AsyncRead + AsyncWrite + Connection + Unpin,
151{
152    fn connected(&self) -> Connected {
153        self.stream.get_ref().connected()
154    }
155}
156
157impl<S> Connection for Pin<Box<TimeoutConnectorStream<S>>>
158where
159    S: AsyncRead + AsyncWrite + Connection + Unpin,
160{
161    fn connected(&self) -> Connected {
162        self.stream.get_ref().connected()
163    }
164}