h2/proto/streams/
streams.rs

1use super::recv::RecvHeaderBlockError;
2use super::store::{self, Entry, Resolve, Store};
3use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
4use crate::codec::{Codec, SendError, UserError};
5use crate::ext::Protocol;
6use crate::frame::{self, Frame, Reason};
7use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
8use crate::{client, proto, server};
9
10use bytes::{Buf, Bytes};
11use http::{HeaderMap, Request, Response};
12use std::task::{Context, Poll, Waker};
13use tokio::io::AsyncWrite;
14
15use std::sync::{Arc, Mutex};
16use std::{fmt, io};
17
18#[derive(Debug)]
19pub(crate) struct Streams<B, P>
20where
21    P: Peer,
22{
23    /// Holds most of the connection and stream related state for processing
24    /// HTTP/2 frames associated with streams.
25    inner: Arc<Mutex<Inner>>,
26
27    /// This is the queue of frames to be written to the wire. This is split out
28    /// to avoid requiring a `B` generic on all public API types even if `B` is
29    /// not technically required.
30    ///
31    /// Currently, splitting this out requires a second `Arc` + `Mutex`.
32    /// However, it should be possible to avoid this duplication with a little
33    /// bit of unsafe code. This optimization has been postponed until it has
34    /// been shown to be necessary.
35    send_buffer: Arc<SendBuffer<B>>,
36
37    _p: ::std::marker::PhantomData<P>,
38}
39
40// Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
41// Ensures that the methods only get one instantiation, instead of two (client and server)
42#[derive(Debug)]
43pub(crate) struct DynStreams<'a, B> {
44    inner: &'a Mutex<Inner>,
45
46    send_buffer: &'a SendBuffer<B>,
47
48    peer: peer::Dyn,
49}
50
51/// Reference to the stream state
52#[derive(Debug)]
53pub(crate) struct StreamRef<B> {
54    opaque: OpaqueStreamRef,
55    send_buffer: Arc<SendBuffer<B>>,
56}
57
58/// Reference to the stream state that hides the send data chunk generic
59pub(crate) struct OpaqueStreamRef {
60    inner: Arc<Mutex<Inner>>,
61    key: store::Key,
62}
63
64/// Fields needed to manage state related to managing the set of streams. This
65/// is mostly split out to make ownership happy.
66///
67/// TODO: better name
68#[derive(Debug)]
69struct Inner {
70    /// Tracks send & recv stream concurrency.
71    counts: Counts,
72
73    /// Connection level state and performs actions on streams
74    actions: Actions,
75
76    /// Stores stream state
77    store: Store,
78
79    /// The number of stream refs to this shared state.
80    refs: usize,
81}
82
83#[derive(Debug)]
84struct Actions {
85    /// Manages state transitions initiated by receiving frames
86    recv: Recv,
87
88    /// Manages state transitions initiated by sending frames
89    send: Send,
90
91    /// Task that calls `poll_complete`.
92    task: Option<Waker>,
93
94    /// If the connection errors, a copy is kept for any StreamRefs.
95    conn_error: Option<proto::Error>,
96}
97
98/// Contains the buffer of frames to be written to the wire.
99#[derive(Debug)]
100struct SendBuffer<B> {
101    inner: Mutex<Buffer<Frame<B>>>,
102}
103
104// ===== impl Streams =====
105
106impl<B, P> Streams<B, P>
107where
108    B: Buf,
109    P: Peer,
110{
111    pub fn new(config: Config) -> Self {
112        let peer = P::r#dyn();
113
114        Streams {
115            inner: Inner::new(peer, config),
116            send_buffer: Arc::new(SendBuffer::new()),
117            _p: ::std::marker::PhantomData,
118        }
119    }
120
121    pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
122        let mut me = self.inner.lock().unwrap();
123        let me = &mut *me;
124
125        me.actions
126            .recv
127            .set_target_connection_window(size, &mut me.actions.task)
128    }
129
130    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131        let mut me = self.inner.lock().unwrap();
132        let me = &mut *me;
133        me.actions.recv.next_incoming(&mut me.store).map(|key| {
134            let stream = &mut me.store.resolve(key);
135            tracing::trace!(
136                "next_incoming; id={:?}, state={:?}",
137                stream.id,
138                stream.state
139            );
140            // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
141            // the lock, so it can't.
142            me.refs += 1;
143
144            // Pending-accepted remotely-reset streams are counted.
145            if stream.state.is_remote_reset() {
146                me.counts.dec_num_remote_reset_streams();
147            }
148
149            StreamRef {
150                opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
151                send_buffer: self.send_buffer.clone(),
152            }
153        })
154    }
155
156    pub fn send_pending_refusal<T>(
157        &mut self,
158        cx: &mut Context,
159        dst: &mut Codec<T, Prioritized<B>>,
160    ) -> Poll<io::Result<()>>
161    where
162        T: AsyncWrite + Unpin,
163    {
164        let mut me = self.inner.lock().unwrap();
165        let me = &mut *me;
166        me.actions.recv.send_pending_refusal(cx, dst)
167    }
168
169    pub fn clear_expired_reset_streams(&mut self) {
170        let mut me = self.inner.lock().unwrap();
171        let me = &mut *me;
172        me.actions
173            .recv
174            .clear_expired_reset_streams(&mut me.store, &mut me.counts);
175    }
176
177    pub fn poll_complete<T>(
178        &mut self,
179        cx: &mut Context,
180        dst: &mut Codec<T, Prioritized<B>>,
181    ) -> Poll<io::Result<()>>
182    where
183        T: AsyncWrite + Unpin,
184    {
185        let mut me = self.inner.lock().unwrap();
186        me.poll_complete(&self.send_buffer, cx, dst)
187    }
188
189    pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
190        let mut me = self.inner.lock().unwrap();
191        let me = &mut *me;
192
193        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
194        let send_buffer = &mut *send_buffer;
195
196        me.counts.apply_remote_settings(frame);
197
198        me.actions.send.apply_remote_settings(
199            frame,
200            send_buffer,
201            &mut me.store,
202            &mut me.counts,
203            &mut me.actions.task,
204        )
205    }
206
207    pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
208        let mut me = self.inner.lock().unwrap();
209        let me = &mut *me;
210
211        me.actions.recv.apply_local_settings(frame, &mut me.store)
212    }
213
214    pub fn send_request(
215        &mut self,
216        mut request: Request<()>,
217        end_of_stream: bool,
218        pending: Option<&OpaqueStreamRef>,
219    ) -> Result<(StreamRef<B>, bool), SendError> {
220        use super::stream::ContentLength;
221        use http::Method;
222
223        let protocol = request.extensions_mut().remove::<Protocol>();
224
225        // Clear before taking lock, incase extensions contain a StreamRef.
226        request.extensions_mut().clear();
227
228        // TODO: There is a hazard with assigning a stream ID before the
229        // prioritize layer. If prioritization reorders new streams, this
230        // implicitly closes the earlier stream IDs.
231        //
232        // See: hyperium/h2#11
233        let mut me = self.inner.lock().unwrap();
234        let me = &mut *me;
235
236        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
237        let send_buffer = &mut *send_buffer;
238
239        me.actions.ensure_no_conn_error()?;
240        me.actions.send.ensure_next_stream_id()?;
241
242        // The `pending` argument is provided by the `Client`, and holds
243        // a store `Key` of a `Stream` that may have been not been opened
244        // yet.
245        //
246        // If that stream is still pending, the Client isn't allowed to
247        // queue up another pending stream. They should use `poll_ready`.
248        if let Some(stream) = pending {
249            if me.store.resolve(stream.key).is_pending_open {
250                return Err(UserError::Rejected.into());
251            }
252        }
253
254        if me.counts.peer().is_server() {
255            // Servers cannot open streams. PushPromise must first be reserved.
256            return Err(UserError::UnexpectedFrameType.into());
257        }
258
259        let stream_id = me.actions.send.open()?;
260
261        let mut stream = Stream::new(
262            stream_id,
263            me.actions.send.init_window_sz(),
264            me.actions.recv.init_window_sz(),
265        );
266
267        if *request.method() == Method::HEAD {
268            stream.content_length = ContentLength::Head;
269        }
270
271        // Convert the message
272        let headers =
273            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
274
275        let mut stream = me.store.insert(stream.id, stream);
276
277        let sent = me.actions.send.send_headers(
278            headers,
279            send_buffer,
280            &mut stream,
281            &mut me.counts,
282            &mut me.actions.task,
283        );
284
285        // send_headers can return a UserError, if it does,
286        // we should forget about this stream.
287        if let Err(err) = sent {
288            stream.unlink();
289            stream.remove();
290            return Err(err.into());
291        }
292
293        // Given that the stream has been initialized, it should not be in the
294        // closed state.
295        debug_assert!(!stream.state.is_closed());
296
297        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
298        // the lock, so it can't.
299        me.refs += 1;
300
301        let is_full = me.counts.next_send_stream_will_reach_capacity();
302        Ok((
303            StreamRef {
304                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
305                send_buffer: self.send_buffer.clone(),
306            },
307            is_full,
308        ))
309    }
310
311    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
312        self.inner
313            .lock()
314            .unwrap()
315            .actions
316            .send
317            .is_extended_connect_protocol_enabled()
318    }
319}
320
321impl<B> DynStreams<'_, B> {
322    pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
323        let mut me = self.inner.lock().unwrap();
324
325        me.recv_headers(self.peer, self.send_buffer, frame)
326    }
327
328    pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
329        let mut me = self.inner.lock().unwrap();
330        me.recv_data(self.peer, self.send_buffer, frame)
331    }
332
333    pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
334        let mut me = self.inner.lock().unwrap();
335
336        me.recv_reset(self.send_buffer, frame)
337    }
338
339    /// Notify all streams that a connection-level error happened.
340    pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
341        let mut me = self.inner.lock().unwrap();
342        me.handle_error(self.send_buffer, err)
343    }
344
345    pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
346        let mut me = self.inner.lock().unwrap();
347        me.recv_go_away(self.send_buffer, frame)
348    }
349
350    pub fn last_processed_id(&self) -> StreamId {
351        self.inner.lock().unwrap().actions.recv.last_processed_id()
352    }
353
354    pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
355        let mut me = self.inner.lock().unwrap();
356        me.recv_window_update(self.send_buffer, frame)
357    }
358
359    pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
360        let mut me = self.inner.lock().unwrap();
361        me.recv_push_promise(self.send_buffer, frame)
362    }
363
364    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
365        let mut me = self.inner.lock().map_err(|_| ())?;
366        me.recv_eof(self.send_buffer, clear_pending_accept)
367    }
368
369    pub fn send_reset(
370        &mut self,
371        id: StreamId,
372        reason: Reason,
373    ) -> Result<(), crate::proto::error::GoAway> {
374        let mut me = self.inner.lock().unwrap();
375        me.send_reset(self.send_buffer, id, reason)
376    }
377
378    pub fn send_go_away(&mut self, last_processed_id: StreamId) {
379        let mut me = self.inner.lock().unwrap();
380        me.actions.recv.go_away(last_processed_id);
381    }
382}
383
384impl Inner {
385    fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
386        Arc::new(Mutex::new(Inner {
387            counts: Counts::new(peer, &config),
388            actions: Actions {
389                recv: Recv::new(peer, &config),
390                send: Send::new(&config),
391                task: None,
392                conn_error: None,
393            },
394            store: Store::new(),
395            refs: 1,
396        }))
397    }
398
399    fn recv_headers<B>(
400        &mut self,
401        peer: peer::Dyn,
402        send_buffer: &SendBuffer<B>,
403        frame: frame::Headers,
404    ) -> Result<(), Error> {
405        let id = frame.stream_id();
406
407        // The GOAWAY process has begun. All streams with a greater ID than
408        // specified as part of GOAWAY should be ignored.
409        if id > self.actions.recv.max_stream_id() {
410            tracing::trace!(
411                "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
412                id,
413                self.actions.recv.max_stream_id()
414            );
415            return Ok(());
416        }
417
418        let key = match self.store.find_entry(id) {
419            Entry::Occupied(e) => e.key(),
420            Entry::Vacant(e) => {
421                // Client: it's possible to send a request, and then send
422                // a RST_STREAM while the response HEADERS were in transit.
423                //
424                // Server: we can't reset a stream before having received
425                // the request headers, so don't allow.
426                if !peer.is_server() {
427                    // This may be response headers for a stream we've already
428                    // forgotten about...
429                    if self.actions.may_have_forgotten_stream(peer, id) {
430                        tracing::debug!(
431                            "recv_headers for old stream={:?}, sending STREAM_CLOSED",
432                            id,
433                        );
434                        return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
435                    }
436                }
437
438                match self
439                    .actions
440                    .recv
441                    .open(id, Open::Headers, &mut self.counts)?
442                {
443                    Some(stream_id) => {
444                        let stream = Stream::new(
445                            stream_id,
446                            self.actions.send.init_window_sz(),
447                            self.actions.recv.init_window_sz(),
448                        );
449
450                        e.insert(stream)
451                    }
452                    None => return Ok(()),
453                }
454            }
455        };
456
457        let stream = self.store.resolve(key);
458
459        if stream.state.is_local_error() {
460            // Locally reset streams must ignore frames "for some time".
461            // This is because the remote may have sent trailers before
462            // receiving the RST_STREAM frame.
463            tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
464            return Ok(());
465        }
466
467        let actions = &mut self.actions;
468        let mut send_buffer = send_buffer.inner.lock().unwrap();
469        let send_buffer = &mut *send_buffer;
470
471        self.counts.transition(stream, |counts, stream| {
472            tracing::trace!(
473                "recv_headers; stream={:?}; state={:?}",
474                stream.id,
475                stream.state
476            );
477
478            let res = if stream.state.is_recv_headers() {
479                match actions.recv.recv_headers(frame, stream, counts) {
480                    Ok(()) => Ok(()),
481                    Err(RecvHeaderBlockError::Oversize(resp)) => {
482                        if let Some(resp) = resp {
483                            let sent = actions.send.send_headers(
484                                resp, send_buffer, stream, counts, &mut actions.task);
485                            debug_assert!(sent.is_ok(), "oversize response should not fail");
486
487                            actions.send.schedule_implicit_reset(
488                                stream,
489                                Reason::REFUSED_STREAM,
490                                counts,
491                                &mut actions.task);
492
493                            actions.recv.enqueue_reset_expiration(stream, counts);
494
495                            Ok(())
496                        } else {
497                            Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
498                        }
499                    },
500                    Err(RecvHeaderBlockError::State(err)) => Err(err),
501                }
502            } else {
503                if !frame.is_end_stream() {
504                    // Receiving trailers that don't set EOS is a "malformed"
505                    // message. Malformed messages are a stream error.
506                    proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
507                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
508                }
509
510                actions.recv.recv_trailers(frame, stream)
511            };
512
513            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
514        })
515    }
516
517    fn recv_data<B>(
518        &mut self,
519        peer: peer::Dyn,
520        send_buffer: &SendBuffer<B>,
521        frame: frame::Data,
522    ) -> Result<(), Error> {
523        let id = frame.stream_id();
524
525        let stream = match self.store.find_mut(&id) {
526            Some(stream) => stream,
527            None => {
528                // The GOAWAY process has begun. All streams with a greater ID
529                // than specified as part of GOAWAY should be ignored.
530                if id > self.actions.recv.max_stream_id() {
531                    tracing::trace!(
532                        "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
533                        id,
534                        self.actions.recv.max_stream_id()
535                    );
536                    return Ok(());
537                }
538
539                if self.actions.may_have_forgotten_stream(peer, id) {
540                    tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
541
542                    let sz = frame.payload().len();
543                    // This should have been enforced at the codec::FramedRead layer, so
544                    // this is just a sanity check.
545                    assert!(sz <= super::MAX_WINDOW_SIZE as usize);
546                    let sz = sz as WindowSize;
547
548                    self.actions.recv.ignore_data(sz)?;
549                    return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
550                }
551
552                proto_err!(conn: "recv_data: stream not found; id={:?}", id);
553                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
554            }
555        };
556
557        let actions = &mut self.actions;
558        let mut send_buffer = send_buffer.inner.lock().unwrap();
559        let send_buffer = &mut *send_buffer;
560
561        self.counts.transition(stream, |counts, stream| {
562            let sz = frame.payload().len();
563            let res = actions.recv.recv_data(frame, stream);
564
565            // Any stream error after receiving a DATA frame means
566            // we won't give the data to the user, and so they can't
567            // release the capacity. We do it automatically.
568            if let Err(Error::Reset(..)) = res {
569                actions
570                    .recv
571                    .release_connection_capacity(sz as WindowSize, &mut None);
572            }
573            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
574        })
575    }
576
577    fn recv_reset<B>(
578        &mut self,
579        send_buffer: &SendBuffer<B>,
580        frame: frame::Reset,
581    ) -> Result<(), Error> {
582        let id = frame.stream_id();
583
584        if id.is_zero() {
585            proto_err!(conn: "recv_reset: invalid stream ID 0");
586            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
587        }
588
589        // The GOAWAY process has begun. All streams with a greater ID than
590        // specified as part of GOAWAY should be ignored.
591        if id > self.actions.recv.max_stream_id() {
592            tracing::trace!(
593                "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
594                id,
595                self.actions.recv.max_stream_id()
596            );
597            return Ok(());
598        }
599
600        let stream = match self.store.find_mut(&id) {
601            Some(stream) => stream,
602            None => {
603                // TODO: Are there other error cases?
604                self.actions
605                    .ensure_not_idle(self.counts.peer(), id)
606                    .map_err(Error::library_go_away)?;
607
608                return Ok(());
609            }
610        };
611
612        let mut send_buffer = send_buffer.inner.lock().unwrap();
613        let send_buffer = &mut *send_buffer;
614
615        let actions = &mut self.actions;
616
617        self.counts.transition(stream, |counts, stream| {
618            actions.recv.recv_reset(frame, stream, counts)?;
619            actions.send.handle_error(send_buffer, stream, counts);
620            assert!(stream.state.is_closed());
621            Ok(())
622        })
623    }
624
625    fn recv_window_update<B>(
626        &mut self,
627        send_buffer: &SendBuffer<B>,
628        frame: frame::WindowUpdate,
629    ) -> Result<(), Error> {
630        let id = frame.stream_id();
631
632        let mut send_buffer = send_buffer.inner.lock().unwrap();
633        let send_buffer = &mut *send_buffer;
634
635        if id.is_zero() {
636            self.actions
637                .send
638                .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
639                .map_err(Error::library_go_away)?;
640        } else {
641            // The remote may send window updates for streams that the local now
642            // considers closed. It's ok...
643            if let Some(mut stream) = self.store.find_mut(&id) {
644                let res = self
645                    .actions
646                    .send
647                    .recv_stream_window_update(
648                        frame.size_increment(),
649                        send_buffer,
650                        &mut stream,
651                        &mut self.counts,
652                        &mut self.actions.task,
653                    )
654                    .map_err(|reason| Error::library_reset(id, reason));
655
656                return self.actions.reset_on_recv_stream_err(
657                    send_buffer,
658                    &mut stream,
659                    &mut self.counts,
660                    res,
661                );
662            } else {
663                self.actions
664                    .ensure_not_idle(self.counts.peer(), id)
665                    .map_err(Error::library_go_away)?;
666            }
667        }
668
669        Ok(())
670    }
671
672    fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
673        let actions = &mut self.actions;
674        let counts = &mut self.counts;
675        let mut send_buffer = send_buffer.inner.lock().unwrap();
676        let send_buffer = &mut *send_buffer;
677
678        let last_processed_id = actions.recv.last_processed_id();
679
680        self.store.for_each(|stream| {
681            counts.transition(stream, |counts, stream| {
682                actions.recv.handle_error(&err, &mut *stream);
683                actions.send.handle_error(send_buffer, stream, counts);
684            })
685        });
686
687        actions.conn_error = Some(err);
688
689        last_processed_id
690    }
691
692    fn recv_go_away<B>(
693        &mut self,
694        send_buffer: &SendBuffer<B>,
695        frame: &frame::GoAway,
696    ) -> Result<(), Error> {
697        let actions = &mut self.actions;
698        let counts = &mut self.counts;
699        let mut send_buffer = send_buffer.inner.lock().unwrap();
700        let send_buffer = &mut *send_buffer;
701
702        let last_stream_id = frame.last_stream_id();
703
704        actions.send.recv_go_away(last_stream_id)?;
705
706        let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
707
708        self.store.for_each(|stream| {
709            if stream.id > last_stream_id {
710                counts.transition(stream, |counts, stream| {
711                    actions.recv.handle_error(&err, &mut *stream);
712                    actions.send.handle_error(send_buffer, stream, counts);
713                })
714            }
715        });
716
717        actions.conn_error = Some(err);
718
719        Ok(())
720    }
721
722    fn recv_push_promise<B>(
723        &mut self,
724        send_buffer: &SendBuffer<B>,
725        frame: frame::PushPromise,
726    ) -> Result<(), Error> {
727        let id = frame.stream_id();
728        let promised_id = frame.promised_id();
729
730        // First, ensure that the initiating stream is still in a valid state.
731        let parent_key = match self.store.find_mut(&id) {
732            Some(stream) => {
733                // The GOAWAY process has begun. All streams with a greater ID
734                // than specified as part of GOAWAY should be ignored.
735                if id > self.actions.recv.max_stream_id() {
736                    tracing::trace!(
737                        "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
738                        id,
739                        self.actions.recv.max_stream_id()
740                    );
741                    return Ok(());
742                }
743
744                // The stream must be receive open
745                if !stream.state.ensure_recv_open()? {
746                    proto_err!(conn: "recv_push_promise: initiating stream is not opened");
747                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
748                }
749
750                stream.key()
751            }
752            None => {
753                proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
754                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
755            }
756        };
757
758        // TODO: Streams in the reserved states do not count towards the concurrency
759        // limit. However, it seems like there should be a cap otherwise this
760        // could grow in memory indefinitely.
761
762        // Ensure that we can reserve streams
763        self.actions.recv.ensure_can_reserve()?;
764
765        // Next, open the stream.
766        //
767        // If `None` is returned, then the stream is being refused. There is no
768        // further work to be done.
769        if self
770            .actions
771            .recv
772            .open(promised_id, Open::PushPromise, &mut self.counts)?
773            .is_none()
774        {
775            return Ok(());
776        }
777
778        // Try to handle the frame and create a corresponding key for the pushed stream
779        // this requires a bit of indirection to make the borrow checker happy.
780        let child_key: Option<store::Key> = {
781            // Create state for the stream
782            let stream = self.store.insert(promised_id, {
783                Stream::new(
784                    promised_id,
785                    self.actions.send.init_window_sz(),
786                    self.actions.recv.init_window_sz(),
787                )
788            });
789
790            let actions = &mut self.actions;
791
792            self.counts.transition(stream, |counts, stream| {
793                let stream_valid = actions.recv.recv_push_promise(frame, stream);
794
795                match stream_valid {
796                    Ok(()) => Ok(Some(stream.key())),
797                    _ => {
798                        let mut send_buffer = send_buffer.inner.lock().unwrap();
799                        actions
800                            .reset_on_recv_stream_err(
801                                &mut *send_buffer,
802                                stream,
803                                counts,
804                                stream_valid,
805                            )
806                            .map(|()| None)
807                    }
808                }
809            })?
810        };
811        // If we're successful, push the headers and stream...
812        if let Some(child) = child_key {
813            let mut ppp = self.store[parent_key].pending_push_promises.take();
814            ppp.push(&mut self.store.resolve(child));
815
816            let parent = &mut self.store.resolve(parent_key);
817            parent.pending_push_promises = ppp;
818            parent.notify_recv();
819        };
820
821        Ok(())
822    }
823
824    fn recv_eof<B>(
825        &mut self,
826        send_buffer: &SendBuffer<B>,
827        clear_pending_accept: bool,
828    ) -> Result<(), ()> {
829        let actions = &mut self.actions;
830        let counts = &mut self.counts;
831        let mut send_buffer = send_buffer.inner.lock().unwrap();
832        let send_buffer = &mut *send_buffer;
833
834        if actions.conn_error.is_none() {
835            actions.conn_error = Some(
836                io::Error::new(
837                    io::ErrorKind::BrokenPipe,
838                    "connection closed because of a broken pipe",
839                )
840                .into(),
841            );
842        }
843
844        tracing::trace!("Streams::recv_eof");
845
846        self.store.for_each(|stream| {
847            counts.transition(stream, |counts, stream| {
848                actions.recv.recv_eof(stream);
849
850                // This handles resetting send state associated with the
851                // stream
852                actions.send.handle_error(send_buffer, stream, counts);
853            })
854        });
855
856        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
857        Ok(())
858    }
859
860    fn poll_complete<T, B>(
861        &mut self,
862        send_buffer: &SendBuffer<B>,
863        cx: &mut Context,
864        dst: &mut Codec<T, Prioritized<B>>,
865    ) -> Poll<io::Result<()>>
866    where
867        T: AsyncWrite + Unpin,
868        B: Buf,
869    {
870        let mut send_buffer = send_buffer.inner.lock().unwrap();
871        let send_buffer = &mut *send_buffer;
872
873        // Send WINDOW_UPDATE frames first
874        //
875        // TODO: It would probably be better to interleave updates w/ data
876        // frames.
877        ready!(self
878            .actions
879            .recv
880            .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
881
882        // Send any other pending frames
883        ready!(self.actions.send.poll_complete(
884            cx,
885            send_buffer,
886            &mut self.store,
887            &mut self.counts,
888            dst
889        ))?;
890
891        // Nothing else to do, track the task
892        self.actions.task = Some(cx.waker().clone());
893
894        Poll::Ready(Ok(()))
895    }
896
897    fn send_reset<B>(
898        &mut self,
899        send_buffer: &SendBuffer<B>,
900        id: StreamId,
901        reason: Reason,
902    ) -> Result<(), crate::proto::error::GoAway> {
903        let key = match self.store.find_entry(id) {
904            Entry::Occupied(e) => e.key(),
905            Entry::Vacant(e) => {
906                // Resetting a stream we don't know about? That could be OK...
907                //
908                // 1. As a server, we just received a request, but that request
909                //    was bad, so we're resetting before even accepting it.
910                //    This is totally fine.
911                //
912                // 2. The remote may have sent us a frame on new stream that
913                //    it's *not* supposed to have done, and thus, we don't know
914                //    the stream. In that case, sending a reset will "open" the
915                //    stream in our store. Maybe that should be a connection
916                //    error instead? At least for now, we need to update what
917                //    our vision of the next stream is.
918                if self.counts.peer().is_local_init(id) {
919                    // We normally would open this stream, so update our
920                    // next-send-id record.
921                    self.actions.send.maybe_reset_next_stream_id(id);
922                } else {
923                    // We normally would recv this stream, so update our
924                    // next-recv-id record.
925                    self.actions.recv.maybe_reset_next_stream_id(id);
926                }
927
928                let stream = Stream::new(id, 0, 0);
929
930                e.insert(stream)
931            }
932        };
933
934        let stream = self.store.resolve(key);
935        let mut send_buffer = send_buffer.inner.lock().unwrap();
936        let send_buffer = &mut *send_buffer;
937        self.actions.send_reset(
938            stream,
939            reason,
940            Initiator::Library,
941            &mut self.counts,
942            send_buffer,
943        )
944    }
945}
946
947impl<B> Streams<B, client::Peer>
948where
949    B: Buf,
950{
951    pub fn poll_pending_open(
952        &mut self,
953        cx: &Context,
954        pending: Option<&OpaqueStreamRef>,
955    ) -> Poll<Result<(), crate::Error>> {
956        let mut me = self.inner.lock().unwrap();
957        let me = &mut *me;
958
959        me.actions.ensure_no_conn_error()?;
960        me.actions.send.ensure_next_stream_id()?;
961
962        if let Some(pending) = pending {
963            let mut stream = me.store.resolve(pending.key);
964            tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
965            if stream.is_pending_open {
966                stream.wait_send(cx);
967                return Poll::Pending;
968            }
969        }
970        Poll::Ready(Ok(()))
971    }
972}
973
974impl<B, P> Streams<B, P>
975where
976    P: Peer,
977{
978    pub fn as_dyn(&self) -> DynStreams<'_, B> {
979        let Self {
980            inner,
981            send_buffer,
982            _p,
983        } = self;
984        DynStreams {
985            inner,
986            send_buffer,
987            peer: P::r#dyn(),
988        }
989    }
990
991    /// This function is safe to call multiple times.
992    ///
993    /// A `Result` is returned to avoid panicking if the mutex is poisoned.
994    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
995        self.as_dyn().recv_eof(clear_pending_accept)
996    }
997
998    pub(crate) fn max_send_streams(&self) -> usize {
999        self.inner.lock().unwrap().counts.max_send_streams()
1000    }
1001
1002    pub(crate) fn max_recv_streams(&self) -> usize {
1003        self.inner.lock().unwrap().counts.max_recv_streams()
1004    }
1005
1006    #[cfg(feature = "unstable")]
1007    pub fn num_active_streams(&self) -> usize {
1008        let me = self.inner.lock().unwrap();
1009        me.store.num_active_streams()
1010    }
1011
1012    pub fn has_streams(&self) -> bool {
1013        let me = self.inner.lock().unwrap();
1014        me.counts.has_streams()
1015    }
1016
1017    pub fn has_streams_or_other_references(&self) -> bool {
1018        let me = self.inner.lock().unwrap();
1019        me.counts.has_streams() || me.refs > 1
1020    }
1021
1022    #[cfg(feature = "unstable")]
1023    pub fn num_wired_streams(&self) -> usize {
1024        let me = self.inner.lock().unwrap();
1025        me.store.num_wired_streams()
1026    }
1027}
1028
1029// no derive because we don't need B and P to be Clone.
1030impl<B, P> Clone for Streams<B, P>
1031where
1032    P: Peer,
1033{
1034    fn clone(&self) -> Self {
1035        self.inner.lock().unwrap().refs += 1;
1036        Streams {
1037            inner: self.inner.clone(),
1038            send_buffer: self.send_buffer.clone(),
1039            _p: ::std::marker::PhantomData,
1040        }
1041    }
1042}
1043
1044impl<B, P> Drop for Streams<B, P>
1045where
1046    P: Peer,
1047{
1048    fn drop(&mut self) {
1049        if let Ok(mut inner) = self.inner.lock() {
1050            inner.refs -= 1;
1051            if inner.refs == 1 {
1052                if let Some(task) = inner.actions.task.take() {
1053                    task.wake();
1054                }
1055            }
1056        }
1057    }
1058}
1059
1060// ===== impl StreamRef =====
1061
1062impl<B> StreamRef<B> {
1063    pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
1064    where
1065        B: Buf,
1066    {
1067        let mut me = self.opaque.inner.lock().unwrap();
1068        let me = &mut *me;
1069
1070        let stream = me.store.resolve(self.opaque.key);
1071        let actions = &mut me.actions;
1072        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1073        let send_buffer = &mut *send_buffer;
1074
1075        me.counts.transition(stream, |counts, stream| {
1076            // Create the data frame
1077            let mut frame = frame::Data::new(stream.id, data);
1078            frame.set_end_stream(end_stream);
1079
1080            // Send the data frame
1081            actions
1082                .send
1083                .send_data(frame, send_buffer, stream, counts, &mut actions.task)
1084        })
1085    }
1086
1087    pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
1088        let mut me = self.opaque.inner.lock().unwrap();
1089        let me = &mut *me;
1090
1091        let stream = me.store.resolve(self.opaque.key);
1092        let actions = &mut me.actions;
1093        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1094        let send_buffer = &mut *send_buffer;
1095
1096        me.counts.transition(stream, |counts, stream| {
1097            // Create the trailers frame
1098            let frame = frame::Headers::trailers(stream.id, trailers);
1099
1100            // Send the trailers frame
1101            actions
1102                .send
1103                .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
1104        })
1105    }
1106
1107    pub fn send_reset(&mut self, reason: Reason) {
1108        let mut me = self.opaque.inner.lock().unwrap();
1109        let me = &mut *me;
1110
1111        let stream = me.store.resolve(self.opaque.key);
1112        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1113        let send_buffer = &mut *send_buffer;
1114
1115        match me
1116            .actions
1117            .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer)
1118        {
1119            Ok(()) => (),
1120            Err(crate::proto::error::GoAway { .. }) => {
1121                // this should never happen, because Initiator::User resets do
1122                // not count toward the local limit.
1123                // we could perhaps make this state impossible, if we made the
1124                // initiator argument a generic, and so this could return
1125                // Infallible instead of an impossible GoAway, but oh well.
1126                unreachable!("Initiator::User should not error sending reset");
1127            }
1128        }
1129    }
1130
1131    pub fn send_response(
1132        &mut self,
1133        mut response: Response<()>,
1134        end_of_stream: bool,
1135    ) -> Result<(), UserError> {
1136        // Clear before taking lock, incase extensions contain a StreamRef.
1137        response.extensions_mut().clear();
1138        let mut me = self.opaque.inner.lock().unwrap();
1139        let me = &mut *me;
1140
1141        let stream = me.store.resolve(self.opaque.key);
1142        let actions = &mut me.actions;
1143        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1144        let send_buffer = &mut *send_buffer;
1145
1146        me.counts.transition(stream, |counts, stream| {
1147            let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
1148
1149            actions
1150                .send
1151                .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
1152        })
1153    }
1154
1155    pub fn send_push_promise(
1156        &mut self,
1157        mut request: Request<()>,
1158    ) -> Result<StreamRef<B>, UserError> {
1159        // Clear before taking lock, incase extensions contain a StreamRef.
1160        request.extensions_mut().clear();
1161        let mut me = self.opaque.inner.lock().unwrap();
1162        let me = &mut *me;
1163
1164        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1165        let send_buffer = &mut *send_buffer;
1166
1167        let actions = &mut me.actions;
1168        let promised_id = actions.send.reserve_local()?;
1169
1170        let child_key = {
1171            let mut child_stream = me.store.insert(
1172                promised_id,
1173                Stream::new(
1174                    promised_id,
1175                    actions.send.init_window_sz(),
1176                    actions.recv.init_window_sz(),
1177                ),
1178            );
1179            child_stream.state.reserve_local()?;
1180            child_stream.is_pending_push = true;
1181            child_stream.key()
1182        };
1183
1184        let pushed = {
1185            let mut stream = me.store.resolve(self.opaque.key);
1186
1187            let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
1188
1189            actions
1190                .send
1191                .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
1192        };
1193
1194        if let Err(err) = pushed {
1195            let mut child_stream = me.store.resolve(child_key);
1196            child_stream.unlink();
1197            child_stream.remove();
1198            return Err(err);
1199        }
1200
1201        me.refs += 1;
1202        let opaque =
1203            OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
1204
1205        Ok(StreamRef {
1206            opaque,
1207            send_buffer: self.send_buffer.clone(),
1208        })
1209    }
1210
1211    /// Called by the server after the stream is accepted. Given that clients
1212    /// initialize streams by sending HEADERS, the request will always be
1213    /// available.
1214    ///
1215    /// # Panics
1216    ///
1217    /// This function panics if the request isn't present.
1218    pub fn take_request(&self) -> Request<()> {
1219        let mut me = self.opaque.inner.lock().unwrap();
1220        let me = &mut *me;
1221
1222        let mut stream = me.store.resolve(self.opaque.key);
1223        me.actions.recv.take_request(&mut stream)
1224    }
1225
1226    /// Called by a client to see if the current stream is pending open
1227    pub fn is_pending_open(&self) -> bool {
1228        let mut me = self.opaque.inner.lock().unwrap();
1229        me.store.resolve(self.opaque.key).is_pending_open
1230    }
1231
1232    /// Request capacity to send data
1233    pub fn reserve_capacity(&mut self, capacity: WindowSize) {
1234        let mut me = self.opaque.inner.lock().unwrap();
1235        let me = &mut *me;
1236
1237        let mut stream = me.store.resolve(self.opaque.key);
1238
1239        me.actions
1240            .send
1241            .reserve_capacity(capacity, &mut stream, &mut me.counts)
1242    }
1243
1244    /// Returns the stream's current send capacity.
1245    pub fn capacity(&self) -> WindowSize {
1246        let mut me = self.opaque.inner.lock().unwrap();
1247        let me = &mut *me;
1248
1249        let mut stream = me.store.resolve(self.opaque.key);
1250
1251        me.actions.send.capacity(&mut stream)
1252    }
1253
1254    /// Request to be notified when the stream's capacity increases
1255    pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
1256        let mut me = self.opaque.inner.lock().unwrap();
1257        let me = &mut *me;
1258
1259        let mut stream = me.store.resolve(self.opaque.key);
1260
1261        me.actions.send.poll_capacity(cx, &mut stream)
1262    }
1263
1264    /// Request to be notified for if a `RST_STREAM` is received for this stream.
1265    pub(crate) fn poll_reset(
1266        &mut self,
1267        cx: &Context,
1268        mode: proto::PollReset,
1269    ) -> Poll<Result<Reason, crate::Error>> {
1270        let mut me = self.opaque.inner.lock().unwrap();
1271        let me = &mut *me;
1272
1273        let mut stream = me.store.resolve(self.opaque.key);
1274
1275        me.actions
1276            .send
1277            .poll_reset(cx, &mut stream, mode)
1278            .map_err(From::from)
1279    }
1280
1281    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1282        self.opaque.clone()
1283    }
1284
1285    pub fn stream_id(&self) -> StreamId {
1286        self.opaque.stream_id()
1287    }
1288}
1289
1290impl<B> Clone for StreamRef<B> {
1291    fn clone(&self) -> Self {
1292        StreamRef {
1293            opaque: self.opaque.clone(),
1294            send_buffer: self.send_buffer.clone(),
1295        }
1296    }
1297}
1298
1299// ===== impl OpaqueStreamRef =====
1300
1301impl OpaqueStreamRef {
1302    fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
1303        stream.ref_inc();
1304        OpaqueStreamRef {
1305            inner,
1306            key: stream.key(),
1307        }
1308    }
1309    /// Called by a client to check for a received response.
1310    pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
1311        let mut me = self.inner.lock().unwrap();
1312        let me = &mut *me;
1313
1314        let mut stream = me.store.resolve(self.key);
1315
1316        me.actions.recv.poll_response(cx, &mut stream)
1317    }
1318    /// Called by a client to check for a pushed request.
1319    pub fn poll_pushed(
1320        &mut self,
1321        cx: &Context,
1322    ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
1323        let mut me = self.inner.lock().unwrap();
1324        let me = &mut *me;
1325
1326        let mut stream = me.store.resolve(self.key);
1327        me.actions
1328            .recv
1329            .poll_pushed(cx, &mut stream)
1330            .map_ok(|(h, key)| {
1331                me.refs += 1;
1332                let opaque_ref =
1333                    OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
1334                (h, opaque_ref)
1335            })
1336    }
1337
1338    pub fn is_end_stream(&self) -> bool {
1339        let mut me = self.inner.lock().unwrap();
1340        let me = &mut *me;
1341
1342        let stream = me.store.resolve(self.key);
1343
1344        me.actions.recv.is_end_stream(&stream)
1345    }
1346
1347    pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
1348        let mut me = self.inner.lock().unwrap();
1349        let me = &mut *me;
1350
1351        let mut stream = me.store.resolve(self.key);
1352
1353        me.actions.recv.poll_data(cx, &mut stream)
1354    }
1355
1356    pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1357        let mut me = self.inner.lock().unwrap();
1358        let me = &mut *me;
1359
1360        let mut stream = me.store.resolve(self.key);
1361
1362        me.actions.recv.poll_trailers(cx, &mut stream)
1363    }
1364
1365    pub(crate) fn available_recv_capacity(&self) -> isize {
1366        let me = self.inner.lock().unwrap();
1367        let me = &*me;
1368
1369        let stream = &me.store[self.key];
1370        stream.recv_flow.available().into()
1371    }
1372
1373    pub(crate) fn used_recv_capacity(&self) -> WindowSize {
1374        let me = self.inner.lock().unwrap();
1375        let me = &*me;
1376
1377        let stream = &me.store[self.key];
1378        stream.in_flight_recv_data
1379    }
1380
1381    /// Releases recv capacity back to the peer. This may result in sending
1382    /// WINDOW_UPDATE frames on both the stream and connection.
1383    pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
1384        let mut me = self.inner.lock().unwrap();
1385        let me = &mut *me;
1386
1387        let mut stream = me.store.resolve(self.key);
1388
1389        me.actions
1390            .recv
1391            .release_capacity(capacity, &mut stream, &mut me.actions.task)
1392    }
1393
1394    /// Clear the receive queue and set the status to no longer receive data frames.
1395    pub(crate) fn clear_recv_buffer(&mut self) {
1396        let mut me = self.inner.lock().unwrap();
1397        let me = &mut *me;
1398
1399        let mut stream = me.store.resolve(self.key);
1400        stream.is_recv = false;
1401        me.actions.recv.clear_recv_buffer(&mut stream);
1402    }
1403
1404    pub fn stream_id(&self) -> StreamId {
1405        self.inner.lock().unwrap().store[self.key].id
1406    }
1407}
1408
1409impl fmt::Debug for OpaqueStreamRef {
1410    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1411        use std::sync::TryLockError::*;
1412
1413        match self.inner.try_lock() {
1414            Ok(me) => {
1415                let stream = &me.store[self.key];
1416                fmt.debug_struct("OpaqueStreamRef")
1417                    .field("stream_id", &stream.id)
1418                    .field("ref_count", &stream.ref_count)
1419                    .finish()
1420            }
1421            Err(Poisoned(_)) => fmt
1422                .debug_struct("OpaqueStreamRef")
1423                .field("inner", &"<Poisoned>")
1424                .finish(),
1425            Err(WouldBlock) => fmt
1426                .debug_struct("OpaqueStreamRef")
1427                .field("inner", &"<Locked>")
1428                .finish(),
1429        }
1430    }
1431}
1432
1433impl Clone for OpaqueStreamRef {
1434    fn clone(&self) -> Self {
1435        // Increment the ref count
1436        let mut inner = self.inner.lock().unwrap();
1437        inner.store.resolve(self.key).ref_inc();
1438        inner.refs += 1;
1439
1440        OpaqueStreamRef {
1441            inner: self.inner.clone(),
1442            key: self.key,
1443        }
1444    }
1445}
1446
1447impl Drop for OpaqueStreamRef {
1448    fn drop(&mut self) {
1449        drop_stream_ref(&self.inner, self.key);
1450    }
1451}
1452
1453// TODO: Move back in fn above
1454fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
1455    let mut me = match inner.lock() {
1456        Ok(inner) => inner,
1457        Err(_) => {
1458            if ::std::thread::panicking() {
1459                tracing::trace!("StreamRef::drop; mutex poisoned");
1460                return;
1461            } else {
1462                panic!("StreamRef::drop; mutex poisoned");
1463            }
1464        }
1465    };
1466
1467    let me = &mut *me;
1468    me.refs -= 1;
1469    let mut stream = me.store.resolve(key);
1470
1471    tracing::trace!("drop_stream_ref; stream={:?}", stream);
1472
1473    // decrement the stream's ref count by 1.
1474    stream.ref_dec();
1475
1476    let actions = &mut me.actions;
1477
1478    // If the stream is not referenced and it is already
1479    // closed (does not have to go through logic below
1480    // of canceling the stream), we should notify the task
1481    // (connection) so that it can close properly
1482    if stream.ref_count == 0 && stream.is_closed() {
1483        if let Some(task) = actions.task.take() {
1484            task.wake();
1485        }
1486    }
1487
1488    me.counts.transition(stream, |counts, stream| {
1489        maybe_cancel(stream, actions, counts);
1490
1491        if stream.ref_count == 0 {
1492            // Release any recv window back to connection, no one can access
1493            // it anymore.
1494            actions
1495                .recv
1496                .release_closed_capacity(stream, &mut actions.task);
1497
1498            // We won't be able to reach our push promises anymore
1499            let mut ppp = stream.pending_push_promises.take();
1500            while let Some(promise) = ppp.pop(stream.store_mut()) {
1501                counts.transition(promise, |counts, stream| {
1502                    maybe_cancel(stream, actions, counts);
1503                });
1504            }
1505        }
1506    });
1507}
1508
1509fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
1510    if stream.is_canceled_interest() {
1511        // Server is allowed to early respond without fully consuming the client input stream
1512        // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1
1513        // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376)
1514        let reason = if counts.peer().is_server()
1515            && stream.state.is_send_closed()
1516            && stream.state.is_recv_streaming()
1517        {
1518            Reason::NO_ERROR
1519        } else {
1520            Reason::CANCEL
1521        };
1522
1523        actions
1524            .send
1525            .schedule_implicit_reset(stream, reason, counts, &mut actions.task);
1526        actions.recv.enqueue_reset_expiration(stream, counts);
1527    }
1528}
1529
1530// ===== impl SendBuffer =====
1531
1532impl<B> SendBuffer<B> {
1533    fn new() -> Self {
1534        let inner = Mutex::new(Buffer::new());
1535        SendBuffer { inner }
1536    }
1537}
1538
1539// ===== impl Actions =====
1540
1541impl Actions {
1542    fn send_reset<B>(
1543        &mut self,
1544        stream: store::Ptr,
1545        reason: Reason,
1546        initiator: Initiator,
1547        counts: &mut Counts,
1548        send_buffer: &mut Buffer<Frame<B>>,
1549    ) -> Result<(), crate::proto::error::GoAway> {
1550        counts.transition(stream, |counts, stream| {
1551            if initiator.is_library() {
1552                if counts.can_inc_num_local_error_resets() {
1553                    counts.inc_num_local_error_resets();
1554                } else {
1555                    tracing::warn!(
1556                        "locally-reset streams reached limit ({:?})",
1557                        counts.max_local_error_resets().unwrap(),
1558                    );
1559                    return Err(crate::proto::error::GoAway {
1560                        reason: Reason::ENHANCE_YOUR_CALM,
1561                        debug_data: "too_many_internal_resets".into(),
1562                    });
1563                }
1564            }
1565
1566            self.send.send_reset(
1567                reason,
1568                initiator,
1569                send_buffer,
1570                stream,
1571                counts,
1572                &mut self.task,
1573            );
1574            self.recv.enqueue_reset_expiration(stream, counts);
1575            // if a RecvStream is parked, ensure it's notified
1576            stream.notify_recv();
1577
1578            Ok(())
1579        })
1580    }
1581
1582    fn reset_on_recv_stream_err<B>(
1583        &mut self,
1584        buffer: &mut Buffer<Frame<B>>,
1585        stream: &mut store::Ptr,
1586        counts: &mut Counts,
1587        res: Result<(), Error>,
1588    ) -> Result<(), Error> {
1589        if let Err(Error::Reset(stream_id, reason, initiator)) = res {
1590            debug_assert_eq!(stream_id, stream.id);
1591
1592            if counts.can_inc_num_local_error_resets() {
1593                counts.inc_num_local_error_resets();
1594
1595                // Reset the stream.
1596                self.send
1597                    .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
1598                self.recv.enqueue_reset_expiration(stream, counts);
1599                // if a RecvStream is parked, ensure it's notified
1600                stream.notify_recv();
1601                Ok(())
1602            } else {
1603                tracing::warn!(
1604                    "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})",
1605                    counts.max_local_error_resets().unwrap(),
1606                );
1607                Err(Error::library_go_away_data(
1608                    Reason::ENHANCE_YOUR_CALM,
1609                    "too_many_internal_resets",
1610                ))
1611            }
1612        } else {
1613            res
1614        }
1615    }
1616
1617    fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
1618        if peer.is_local_init(id) {
1619            self.send.ensure_not_idle(id)
1620        } else {
1621            self.recv.ensure_not_idle(id)
1622        }
1623    }
1624
1625    fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
1626        if let Some(ref err) = self.conn_error {
1627            Err(err.clone())
1628        } else {
1629            Ok(())
1630        }
1631    }
1632
1633    /// Check if we possibly could have processed and since forgotten this stream.
1634    ///
1635    /// If we send a RST_STREAM for a stream, we will eventually "forget" about
1636    /// the stream to free up memory. It's possible that the remote peer had
1637    /// frames in-flight, and by the time we receive them, our own state is
1638    /// gone. We *could* tear everything down by sending a GOAWAY, but it
1639    /// is more likely to be latency/memory constraints that caused this,
1640    /// and not a bad actor. So be less catastrophic, the spec allows
1641    /// us to send another RST_STREAM of STREAM_CLOSED.
1642    fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
1643        if id.is_zero() {
1644            return false;
1645        }
1646        if peer.is_local_init(id) {
1647            self.send.may_have_created_stream(id)
1648        } else {
1649            self.recv.may_have_created_stream(id)
1650        }
1651    }
1652
1653    fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
1654        self.recv.clear_queues(clear_pending_accept, store, counts);
1655        self.send.clear_queues(store, counts);
1656    }
1657}