h2/proto/
connection.rs

1use crate::codec::UserError;
2use crate::frame::{Reason, StreamId};
3use crate::{client, server};
4
5use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
6use crate::proto::*;
7
8use bytes::Bytes;
9use futures_core::Stream;
10use std::io;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tokio::io::AsyncRead;
16
17/// An H2 connection
18#[derive(Debug)]
19pub(crate) struct Connection<T, P, B: Buf = Bytes>
20where
21    P: Peer,
22{
23    /// Read / write frame values
24    codec: Codec<T, Prioritized<B>>,
25
26    inner: ConnectionInner<P, B>,
27}
28
29// Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
30// method instantiations.
31#[derive(Debug)]
32struct ConnectionInner<P, B: Buf = Bytes>
33where
34    P: Peer,
35{
36    /// Tracks the connection level state transitions.
37    state: State,
38
39    /// An error to report back once complete.
40    ///
41    /// This exists separately from State in order to support
42    /// graceful shutdown.
43    error: Option<frame::GoAway>,
44
45    /// Pending GOAWAY frames to write.
46    go_away: GoAway,
47
48    /// Ping/pong handler
49    ping_pong: PingPong,
50
51    /// Connection settings
52    settings: Settings,
53
54    /// Stream state handler
55    streams: Streams<B, P>,
56
57    /// A `tracing` span tracking the lifetime of the connection.
58    span: tracing::Span,
59
60    /// Client or server
61    _phantom: PhantomData<P>,
62}
63
64struct DynConnection<'a, B: Buf = Bytes> {
65    state: &'a mut State,
66
67    go_away: &'a mut GoAway,
68
69    streams: DynStreams<'a, B>,
70
71    error: &'a mut Option<frame::GoAway>,
72
73    ping_pong: &'a mut PingPong,
74}
75
76#[derive(Debug, Clone)]
77pub(crate) struct Config {
78    pub next_stream_id: StreamId,
79    pub initial_max_send_streams: usize,
80    pub max_send_buffer_size: usize,
81    pub reset_stream_duration: Duration,
82    pub reset_stream_max: usize,
83    pub remote_reset_stream_max: usize,
84    pub local_error_reset_streams_max: Option<usize>,
85    pub settings: frame::Settings,
86}
87
88#[derive(Debug)]
89enum State {
90    /// Currently open in a sane state
91    Open,
92
93    /// The codec must be flushed
94    Closing(Reason, Initiator),
95
96    /// In a closed state
97    Closed(Reason, Initiator),
98}
99
100impl<T, P, B> Connection<T, P, B>
101where
102    T: AsyncRead + AsyncWrite + Unpin,
103    P: Peer,
104    B: Buf,
105{
106    pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107        fn streams_config(config: &Config) -> streams::Config {
108            streams::Config {
109                local_init_window_sz: config
110                    .settings
111                    .initial_window_size()
112                    .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
113                initial_max_send_streams: config.initial_max_send_streams,
114                local_max_buffer_size: config.max_send_buffer_size,
115                local_next_stream_id: config.next_stream_id,
116                local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
117                extended_connect_protocol_enabled: config
118                    .settings
119                    .is_extended_connect_protocol_enabled()
120                    .unwrap_or(false),
121                local_reset_duration: config.reset_stream_duration,
122                local_reset_max: config.reset_stream_max,
123                remote_reset_max: config.remote_reset_stream_max,
124                remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
125                remote_max_initiated: config
126                    .settings
127                    .max_concurrent_streams()
128                    .map(|max| max as usize),
129                local_max_error_reset_streams: config.local_error_reset_streams_max,
130            }
131        }
132        let streams = Streams::new(streams_config(&config));
133        Connection {
134            codec,
135            inner: ConnectionInner {
136                state: State::Open,
137                error: None,
138                go_away: GoAway::new(),
139                ping_pong: PingPong::new(),
140                settings: Settings::new(config.settings),
141                streams,
142                span: tracing::debug_span!("Connection", peer = %P::NAME),
143                _phantom: PhantomData,
144            },
145        }
146    }
147
148    /// connection flow control
149    pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
150        let _res = self.inner.streams.set_target_connection_window_size(size);
151        // TODO: proper error handling
152        debug_assert!(_res.is_ok());
153    }
154
155    /// Send a new SETTINGS frame with an updated initial window size.
156    pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
157        let mut settings = frame::Settings::default();
158        settings.set_initial_window_size(Some(size));
159        self.inner.settings.send_settings(settings)
160    }
161
162    /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
163    pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
164        let mut settings = frame::Settings::default();
165        settings.set_enable_connect_protocol(Some(1));
166        self.inner.settings.send_settings(settings)
167    }
168
169    /// Returns the maximum number of concurrent streams that may be initiated
170    /// by this peer.
171    pub(crate) fn max_send_streams(&self) -> usize {
172        self.inner.streams.max_send_streams()
173    }
174
175    /// Returns the maximum number of concurrent streams that may be initiated
176    /// by the remote peer.
177    pub(crate) fn max_recv_streams(&self) -> usize {
178        self.inner.streams.max_recv_streams()
179    }
180
181    #[cfg(feature = "unstable")]
182    pub fn num_wired_streams(&self) -> usize {
183        self.inner.streams.num_wired_streams()
184    }
185
186    /// Returns `Ready` when the connection is ready to receive a frame.
187    ///
188    /// Returns `Error` as this may raise errors that are caused by delayed
189    /// processing of received frames.
190    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
191        let _e = self.inner.span.enter();
192        let span = tracing::trace_span!("poll_ready");
193        let _e = span.enter();
194        // The order of these calls don't really matter too much
195        ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
196        ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
197        ready!(self
198            .inner
199            .settings
200            .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
201        ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
202
203        Poll::Ready(Ok(()))
204    }
205
206    /// Send any pending GOAWAY frames.
207    ///
208    /// This will return `Some(reason)` if the connection should be closed
209    /// afterwards. If this is a graceful shutdown, this returns `None`.
210    fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
211        self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
212    }
213
214    pub fn go_away_from_user(&mut self, e: Reason) {
215        self.inner.as_dyn().go_away_from_user(e)
216    }
217
218    fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
219        let (debug_data, theirs) = self
220            .inner
221            .error
222            .take()
223            .as_ref()
224            .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
225                (frame.debug_data().clone(), frame.reason())
226            });
227
228        match (ours, theirs) {
229            (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
230            (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
231            // If both sides reported an error, give their
232            // error back to th user. We assume our error
233            // was a consequence of their error, and less
234            // important.
235            (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
236        }
237    }
238
239    /// Closes the connection by transitioning to a GOAWAY state
240    /// iff there are no streams or references
241    pub fn maybe_close_connection_if_no_streams(&mut self) {
242        // If we poll() and realize that there are no streams or references
243        // then we can close the connection by transitioning to GOAWAY
244        if !self.inner.streams.has_streams_or_other_references() {
245            self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
246        }
247    }
248
249    pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
250        self.inner.ping_pong.take_user_pings()
251    }
252
253    /// Advances the internal state of the connection.
254    pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
255        // XXX(eliza): cloning the span is unfortunately necessary here in
256        // order to placate the borrow checker — `self` is mutably borrowed by
257        // `poll2`, which means that we can't borrow `self.span` to enter it.
258        // The clone is just an atomic ref bump.
259        let span = self.inner.span.clone();
260        let _e = span.enter();
261        let span = tracing::trace_span!("poll");
262        let _e = span.enter();
263
264        loop {
265            tracing::trace!(connection.state = ?self.inner.state);
266            // TODO: probably clean up this glob of code
267            match self.inner.state {
268                // When open, continue to poll a frame
269                State::Open => {
270                    let result = match self.poll2(cx) {
271                        Poll::Ready(result) => result,
272                        // The connection is not ready to make progress
273                        Poll::Pending => {
274                            // Ensure all window updates have been sent.
275                            //
276                            // This will also handle flushing `self.codec`
277                            ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
278
279                            if (self.inner.error.is_some()
280                                || self.inner.go_away.should_close_on_idle())
281                                && !self.inner.streams.has_streams()
282                            {
283                                self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
284                                continue;
285                            }
286
287                            return Poll::Pending;
288                        }
289                    };
290
291                    self.inner.as_dyn().handle_poll2_result(result)?
292                }
293                State::Closing(reason, initiator) => {
294                    tracing::trace!("connection closing after flush");
295                    // Flush/shutdown the codec
296                    ready!(self.codec.shutdown(cx))?;
297
298                    // Transition the state to error
299                    self.inner.state = State::Closed(reason, initiator);
300                }
301                State::Closed(reason, initiator) => {
302                    return Poll::Ready(self.take_error(reason, initiator));
303                }
304            }
305        }
306    }
307
308    fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
309        // This happens outside of the loop to prevent needing to do a clock
310        // check and then comparison of the queue possibly multiple times a
311        // second (and thus, the clock wouldn't have changed enough to matter).
312        self.clear_expired_reset_streams();
313
314        loop {
315            // First, ensure that the `Connection` is able to receive a frame
316            //
317            // The order here matters:
318            // - poll_go_away may buffer a graceful shutdown GOAWAY frame
319            // - If it has, we've also added a PING to be sent in poll_ready
320            if let Some(reason) = ready!(self.poll_go_away(cx)?) {
321                if self.inner.go_away.should_close_now() {
322                    if self.inner.go_away.is_user_initiated() {
323                        // A user initiated abrupt shutdown shouldn't return
324                        // the same error back to the user.
325                        return Poll::Ready(Ok(()));
326                    } else {
327                        return Poll::Ready(Err(Error::library_go_away(reason)));
328                    }
329                }
330                // Only NO_ERROR should be waiting for idle
331                debug_assert_eq!(
332                    reason,
333                    Reason::NO_ERROR,
334                    "graceful GOAWAY should be NO_ERROR"
335                );
336            }
337            ready!(self.poll_ready(cx))?;
338
339            match self
340                .inner
341                .as_dyn()
342                .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
343            {
344                ReceivedFrame::Settings(frame) => {
345                    self.inner.settings.recv_settings(
346                        frame,
347                        &mut self.codec,
348                        &mut self.inner.streams,
349                    )?;
350                }
351                ReceivedFrame::Continue => (),
352                ReceivedFrame::Done => {
353                    return Poll::Ready(Ok(()));
354                }
355            }
356        }
357    }
358
359    fn clear_expired_reset_streams(&mut self) {
360        self.inner.streams.clear_expired_reset_streams();
361    }
362}
363
364impl<P, B> ConnectionInner<P, B>
365where
366    P: Peer,
367    B: Buf,
368{
369    fn as_dyn(&mut self) -> DynConnection<'_, B> {
370        let ConnectionInner {
371            state,
372            go_away,
373            streams,
374            error,
375            ping_pong,
376            ..
377        } = self;
378        let streams = streams.as_dyn();
379        DynConnection {
380            state,
381            go_away,
382            streams,
383            error,
384            ping_pong,
385        }
386    }
387}
388
389impl<B> DynConnection<'_, B>
390where
391    B: Buf,
392{
393    fn go_away(&mut self, id: StreamId, e: Reason) {
394        let frame = frame::GoAway::new(id, e);
395        self.streams.send_go_away(id);
396        self.go_away.go_away(frame);
397    }
398
399    fn go_away_now(&mut self, e: Reason) {
400        let last_processed_id = self.streams.last_processed_id();
401        let frame = frame::GoAway::new(last_processed_id, e);
402        self.go_away.go_away_now(frame);
403    }
404
405    fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
406        let last_processed_id = self.streams.last_processed_id();
407        let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
408        self.go_away.go_away_now(frame);
409    }
410
411    fn go_away_from_user(&mut self, e: Reason) {
412        let last_processed_id = self.streams.last_processed_id();
413        let frame = frame::GoAway::new(last_processed_id, e);
414        self.go_away.go_away_from_user(frame);
415
416        // Notify all streams of reason we're abruptly closing.
417        self.streams.handle_error(Error::user_go_away(e));
418    }
419
420    fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
421        match result {
422            // The connection has shutdown normally
423            Ok(()) => {
424                *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
425                Ok(())
426            }
427            // Attempting to read a frame resulted in a connection level
428            // error. This is handled by setting a GOAWAY frame followed by
429            // terminating the connection.
430            Err(Error::GoAway(debug_data, reason, initiator)) => {
431                self.handle_go_away(reason, debug_data, initiator);
432                Ok(())
433            }
434            // Attempting to read a frame resulted in a stream level error.
435            // This is handled by resetting the frame then trying to read
436            // another frame.
437            Err(Error::Reset(id, reason, initiator)) => {
438                debug_assert_eq!(initiator, Initiator::Library);
439                tracing::trace!(?id, ?reason, "stream error");
440                match self.streams.send_reset(id, reason) {
441                    Ok(()) => (),
442                    Err(crate::proto::error::GoAway { debug_data, reason }) => {
443                        self.handle_go_away(reason, debug_data, Initiator::Library);
444                    }
445                }
446                Ok(())
447            }
448            // Attempting to read a frame resulted in an I/O error. All
449            // active streams must be reset.
450            //
451            // TODO: Are I/O errors recoverable?
452            Err(Error::Io(e, inner)) => {
453                tracing::debug!(error = ?e, "Connection::poll; IO error");
454                let e = Error::Io(e, inner);
455
456                // Reset all active streams
457                self.streams.handle_error(e.clone());
458
459                // Return the error
460                Err(e)
461            }
462        }
463    }
464
465    fn handle_go_away(&mut self, reason: Reason, debug_data: Bytes, initiator: Initiator) {
466        let e = Error::GoAway(debug_data.clone(), reason, initiator);
467        tracing::debug!(error = ?e, "Connection::poll; connection error");
468
469        // We may have already sent a GOAWAY for this error,
470        // if so, don't send another, just flush and close up.
471        if self
472            .go_away
473            .going_away()
474            .map_or(false, |frame| frame.reason() == reason)
475        {
476            tracing::trace!("    -> already going away");
477            *self.state = State::Closing(reason, initiator);
478            return;
479        }
480
481        // Reset all active streams
482        self.streams.handle_error(e);
483        self.go_away_now_data(reason, debug_data);
484    }
485
486    fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
487        use crate::frame::Frame::*;
488        match frame {
489            Some(Headers(frame)) => {
490                tracing::trace!(?frame, "recv HEADERS");
491                self.streams.recv_headers(frame)?;
492            }
493            Some(Data(frame)) => {
494                tracing::trace!(?frame, "recv DATA");
495                self.streams.recv_data(frame)?;
496            }
497            Some(Reset(frame)) => {
498                tracing::trace!(?frame, "recv RST_STREAM");
499                self.streams.recv_reset(frame)?;
500            }
501            Some(PushPromise(frame)) => {
502                tracing::trace!(?frame, "recv PUSH_PROMISE");
503                self.streams.recv_push_promise(frame)?;
504            }
505            Some(Settings(frame)) => {
506                tracing::trace!(?frame, "recv SETTINGS");
507                return Ok(ReceivedFrame::Settings(frame));
508            }
509            Some(GoAway(frame)) => {
510                tracing::trace!(?frame, "recv GOAWAY");
511                // This should prevent starting new streams,
512                // but should allow continuing to process current streams
513                // until they are all EOS. Once they are, State should
514                // transition to GoAway.
515                self.streams.recv_go_away(&frame)?;
516                *self.error = Some(frame);
517            }
518            Some(Ping(frame)) => {
519                tracing::trace!(?frame, "recv PING");
520                let status = self.ping_pong.recv_ping(frame);
521                if status.is_shutdown() {
522                    assert!(
523                        self.go_away.is_going_away(),
524                        "received unexpected shutdown ping"
525                    );
526
527                    let last_processed_id = self.streams.last_processed_id();
528                    self.go_away(last_processed_id, Reason::NO_ERROR);
529                }
530            }
531            Some(WindowUpdate(frame)) => {
532                tracing::trace!(?frame, "recv WINDOW_UPDATE");
533                self.streams.recv_window_update(frame)?;
534            }
535            Some(Priority(frame)) => {
536                tracing::trace!(?frame, "recv PRIORITY");
537                // TODO: handle
538            }
539            None => {
540                tracing::trace!("codec closed");
541                self.streams.recv_eof(false).expect("mutex poisoned");
542                return Ok(ReceivedFrame::Done);
543            }
544        }
545        Ok(ReceivedFrame::Continue)
546    }
547}
548
549enum ReceivedFrame {
550    Settings(frame::Settings),
551    Continue,
552    Done,
553}
554
555impl<T, B> Connection<T, client::Peer, B>
556where
557    T: AsyncRead + AsyncWrite,
558    B: Buf,
559{
560    pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
561        &self.inner.streams
562    }
563}
564
565impl<T, B> Connection<T, server::Peer, B>
566where
567    T: AsyncRead + AsyncWrite + Unpin,
568    B: Buf,
569{
570    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
571        self.inner.streams.next_incoming()
572    }
573
574    // Graceful shutdown only makes sense for server peers.
575    pub fn go_away_gracefully(&mut self) {
576        if self.inner.go_away.is_going_away() {
577            // No reason to start a new one.
578            return;
579        }
580
581        // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
582        //
583        // > A server that is attempting to gracefully shut down a connection
584        // > SHOULD send an initial GOAWAY frame with the last stream
585        // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
586        // > client that a shutdown is imminent and that initiating further
587        // > requests is prohibited. After allowing time for any in-flight
588        // > stream creation (at least one round-trip time), the server can
589        // > send another GOAWAY frame with an updated last stream identifier.
590        // > This ensures that a connection can be cleanly shut down without
591        // > losing requests.
592        self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
593
594        // We take the advice of waiting 1 RTT literally, and wait
595        // for a pong before proceeding.
596        self.inner.ping_pong.ping_shutdown();
597    }
598}
599
600impl<T, P, B> Drop for Connection<T, P, B>
601where
602    P: Peer,
603    B: Buf,
604{
605    fn drop(&mut self) {
606        // Ignore errors as this indicates that the mutex is poisoned.
607        let _ = self.inner.streams.recv_eof(true);
608    }
609}