rustls/
stream.rs

1use crate::session::Session;
2use std::io::{IoSlice, Read, Result, Write};
3
4/// This type implements `io::Read` and `io::Write`, encapsulating
5/// a Session `S` and an underlying transport `T`, such as a socket.
6///
7/// This allows you to use a rustls Session like a normal stream.
8pub struct Stream<'a, S: 'a + Session + ?Sized, T: 'a + Read + Write + ?Sized> {
9    /// Our session
10    pub sess: &'a mut S,
11
12    /// The underlying transport, like a socket
13    pub sock: &'a mut T,
14}
15
16impl<'a, S, T> Stream<'a, S, T>
17where
18    S: 'a + Session,
19    T: 'a + Read + Write,
20{
21    /// Make a new Stream using the Session `sess` and socket-like object
22    /// `sock`.  This does not fail and does no IO.
23    pub fn new(sess: &'a mut S, sock: &'a mut T) -> Stream<'a, S, T> {
24        Stream { sess, sock }
25    }
26
27    /// If we're handshaking, complete all the IO for that.
28    /// If we have data to write, write it all.
29    fn complete_prior_io(&mut self) -> Result<()> {
30        if self.sess.is_handshaking() {
31            self.sess.complete_io(self.sock)?;
32        }
33
34        if self.sess.wants_write() {
35            self.sess.complete_io(self.sock)?;
36        }
37
38        Ok(())
39    }
40}
41
42impl<'a, S, T> Read for Stream<'a, S, T>
43where
44    S: 'a + Session,
45    T: 'a + Read + Write,
46{
47    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
48        self.complete_prior_io()?;
49
50        // We call complete_io() in a loop since a single call may read only
51        // a partial packet from the underlying transport. A full packet is
52        // needed to get more plaintext, which we must do if EOF has not been
53        // hit. Otherwise, we will prematurely signal EOF by returning 0. We
54        // determine if EOF has actually been hit by checking if 0 bytes were
55        // read from the underlying transport.
56        while self.sess.wants_read() && self.sess.complete_io(self.sock)?.0 != 0 {}
57
58        self.sess.read(buf)
59    }
60}
61
62impl<'a, S, T> Write for Stream<'a, S, T>
63where
64    S: 'a + Session,
65    T: 'a + Read + Write,
66{
67    fn write(&mut self, buf: &[u8]) -> Result<usize> {
68        self.complete_prior_io()?;
69
70        let len = self.sess.write(buf)?;
71
72        // Try to write the underlying transport here, but don't let
73        // any errors mask the fact we've consumed `len` bytes.
74        // Callers will learn of permanent errors on the next call.
75        let _ = self.sess.complete_io(self.sock);
76
77        Ok(len)
78    }
79
80    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
81        self.complete_prior_io()?;
82
83        let len = self.sess.write_vectored(bufs)?;
84
85        // Try to write the underlying transport here, but don't let
86        // any errors mask the fact we've consumed `len` bytes.
87        // Callers will learn of permanent errors on the next call.
88        let _ = self.sess.complete_io(self.sock);
89
90        Ok(len)
91    }
92
93    fn flush(&mut self) -> Result<()> {
94        self.complete_prior_io()?;
95
96        self.sess.flush()?;
97        if self.sess.wants_write() {
98            self.sess.complete_io(self.sock)?;
99        }
100        Ok(())
101    }
102}
103
104/// This type implements `io::Read` and `io::Write`, encapsulating
105/// and owning a Session `S` and an underlying blocking transport
106/// `T`, such as a socket.
107///
108/// This allows you to use a rustls Session like a normal stream.
109pub struct StreamOwned<S: Session + Sized, T: Read + Write + Sized> {
110    /// Our session
111    pub sess: S,
112
113    /// The underlying transport, like a socket
114    pub sock: T,
115}
116
117impl<S, T> StreamOwned<S, T>
118where
119    S: Session,
120    T: Read + Write,
121{
122    /// Make a new StreamOwned taking the Session `sess` and socket-like
123    /// object `sock`.  This does not fail and does no IO.
124    ///
125    /// This is the same as `Stream::new` except `sess` and `sock` are
126    /// moved into the StreamOwned.
127    pub fn new(sess: S, sock: T) -> StreamOwned<S, T> {
128        StreamOwned { sess, sock }
129    }
130
131    /// Get a reference to the underlying socket
132    pub fn get_ref(&self) -> &T {
133        &self.sock
134    }
135
136    /// Get a mutable reference to the underlying socket
137    pub fn get_mut(&mut self) -> &mut T {
138        &mut self.sock
139    }
140}
141
142impl<'a, S, T> StreamOwned<S, T>
143where
144    S: Session,
145    T: Read + Write,
146{
147    fn as_stream(&'a mut self) -> Stream<'a, S, T> {
148        Stream {
149            sess: &mut self.sess,
150            sock: &mut self.sock,
151        }
152    }
153}
154
155impl<S, T> Read for StreamOwned<S, T>
156where
157    S: Session,
158    T: Read + Write,
159{
160    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
161        self.as_stream().read(buf)
162    }
163}
164
165impl<S, T> Write for StreamOwned<S, T>
166where
167    S: Session,
168    T: Read + Write,
169{
170    fn write(&mut self, buf: &[u8]) -> Result<usize> {
171        self.as_stream().write(buf)
172    }
173
174    fn flush(&mut self) -> Result<()> {
175        self.as_stream().flush()
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::{Stream, StreamOwned};
182    use crate::client::ClientSession;
183    use crate::server::ServerSession;
184    use crate::session::Session;
185    use std::net::TcpStream;
186
187    #[test]
188    fn stream_can_be_created_for_session_and_tcpstream() {
189        type _Test<'a> = Stream<'a, dyn Session, TcpStream>;
190    }
191
192    #[test]
193    fn streamowned_can_be_created_for_client_and_tcpstream() {
194        type _Test = StreamOwned<ClientSession, TcpStream>;
195    }
196
197    #[test]
198    fn streamowned_can_be_created_for_server_and_tcpstream() {
199        type _Test = StreamOwned<ServerSession, TcpStream>;
200    }
201}