1use crate::session::Session;
2use std::io::{IoSlice, Read, Result, Write};
3
4pub struct Stream<'a, S: 'a + Session + ?Sized, T: 'a + Read + Write + ?Sized> {
9 pub sess: &'a mut S,
11
12 pub sock: &'a mut T,
14}
15
16impl<'a, S, T> Stream<'a, S, T>
17where
18 S: 'a + Session,
19 T: 'a + Read + Write,
20{
21 pub fn new(sess: &'a mut S, sock: &'a mut T) -> Stream<'a, S, T> {
24 Stream { sess, sock }
25 }
26
27 fn complete_prior_io(&mut self) -> Result<()> {
30 if self.sess.is_handshaking() {
31 self.sess.complete_io(self.sock)?;
32 }
33
34 if self.sess.wants_write() {
35 self.sess.complete_io(self.sock)?;
36 }
37
38 Ok(())
39 }
40}
41
42impl<'a, S, T> Read for Stream<'a, S, T>
43where
44 S: 'a + Session,
45 T: 'a + Read + Write,
46{
47 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
48 self.complete_prior_io()?;
49
50 while self.sess.wants_read() && self.sess.complete_io(self.sock)?.0 != 0 {}
57
58 self.sess.read(buf)
59 }
60}
61
62impl<'a, S, T> Write for Stream<'a, S, T>
63where
64 S: 'a + Session,
65 T: 'a + Read + Write,
66{
67 fn write(&mut self, buf: &[u8]) -> Result<usize> {
68 self.complete_prior_io()?;
69
70 let len = self.sess.write(buf)?;
71
72 let _ = self.sess.complete_io(self.sock);
76
77 Ok(len)
78 }
79
80 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
81 self.complete_prior_io()?;
82
83 let len = self.sess.write_vectored(bufs)?;
84
85 let _ = self.sess.complete_io(self.sock);
89
90 Ok(len)
91 }
92
93 fn flush(&mut self) -> Result<()> {
94 self.complete_prior_io()?;
95
96 self.sess.flush()?;
97 if self.sess.wants_write() {
98 self.sess.complete_io(self.sock)?;
99 }
100 Ok(())
101 }
102}
103
104pub struct StreamOwned<S: Session + Sized, T: Read + Write + Sized> {
110 pub sess: S,
112
113 pub sock: T,
115}
116
117impl<S, T> StreamOwned<S, T>
118where
119 S: Session,
120 T: Read + Write,
121{
122 pub fn new(sess: S, sock: T) -> StreamOwned<S, T> {
128 StreamOwned { sess, sock }
129 }
130
131 pub fn get_ref(&self) -> &T {
133 &self.sock
134 }
135
136 pub fn get_mut(&mut self) -> &mut T {
138 &mut self.sock
139 }
140}
141
142impl<'a, S, T> StreamOwned<S, T>
143where
144 S: Session,
145 T: Read + Write,
146{
147 fn as_stream(&'a mut self) -> Stream<'a, S, T> {
148 Stream {
149 sess: &mut self.sess,
150 sock: &mut self.sock,
151 }
152 }
153}
154
155impl<S, T> Read for StreamOwned<S, T>
156where
157 S: Session,
158 T: Read + Write,
159{
160 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
161 self.as_stream().read(buf)
162 }
163}
164
165impl<S, T> Write for StreamOwned<S, T>
166where
167 S: Session,
168 T: Read + Write,
169{
170 fn write(&mut self, buf: &[u8]) -> Result<usize> {
171 self.as_stream().write(buf)
172 }
173
174 fn flush(&mut self) -> Result<()> {
175 self.as_stream().flush()
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::{Stream, StreamOwned};
182 use crate::client::ClientSession;
183 use crate::server::ServerSession;
184 use crate::session::Session;
185 use std::net::TcpStream;
186
187 #[test]
188 fn stream_can_be_created_for_session_and_tcpstream() {
189 type _Test<'a> = Stream<'a, dyn Session, TcpStream>;
190 }
191
192 #[test]
193 fn streamowned_can_be_created_for_client_and_tcpstream() {
194 type _Test = StreamOwned<ClientSession, TcpStream>;
195 }
196
197 #[test]
198 fn streamowned_can_be_created_for_server_and_tcpstream() {
199 type _Test = StreamOwned<ServerSession, TcpStream>;
200 }
201}