tor_proto/tunnel/
streammap.rs

1//! Types and code for mapping StreamIDs to streams on a circuit.
2
3use crate::congestion::sendme;
4use crate::stream::queue::StreamQueueSender;
5use crate::stream::{AnyCmdChecker, StreamFlowControl};
6use crate::tunnel::circuit::StreamMpscReceiver;
7use crate::tunnel::halfstream::HalfStream;
8use crate::tunnel::reactor::circuit::RECV_WINDOW_INIT;
9use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
10use crate::{Error, Result};
11use pin_project::pin_project;
12use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
13use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
14use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
15use tor_cell::relaycell::{msg::AnyRelayMsg, StreamId};
16use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
17
18use std::collections::hash_map;
19use std::collections::HashMap;
20use std::num::NonZeroU16;
21use std::pin::Pin;
22use std::task::{Poll, Waker};
23use tor_error::{bad_api_usage, internal};
24
25use rand::Rng;
26
27use tracing::debug;
28
29/// Entry for an open stream
30///
31/// (For the purposes of this module, an open stream is one where we have not
32/// sent or received any message indicating that the stream is ended.)
33#[derive(Debug)]
34#[pin_project]
35pub(super) struct OpenStreamEnt {
36    /// Sink to send relay cells tagged for this stream into.
37    pub(super) sink: StreamQueueSender,
38    /// Number of cells dropped due to the stream disappearing before we can
39    /// transform this into an `EndSent`.
40    pub(super) dropped: u16,
41    /// A `CmdChecker` used to tell whether cells on this stream are valid.
42    pub(super) cmd_checker: AnyCmdChecker,
43    /// Flow control for this stream.
44    // Non-pub because we need to proxy `put_for_incoming_sendme` to ensure
45    // `flow_ctrl_waker` is woken.
46    flow_ctrl: StreamFlowControl,
47    /// Stream for cells that should be sent down this stream.
48    // Not directly exposed. This should only be polled via
49    // `OpenStreamEntStream`s implementation of `Stream`, which in turn should
50    // only be used through `StreamPollSet`.
51    #[pin]
52    rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
53    /// Waker to be woken when more sending capacity becomes available (e.g.
54    /// receiving a SENDME).
55    flow_ctrl_waker: Option<Waker>,
56}
57
58impl OpenStreamEnt {
59    /// Whether this stream is ready to send `msg`.
60    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
61        self.flow_ctrl.can_send(msg)
62    }
63
64    /// Handle an incoming sendme.
65    ///
66    /// On failure, return an error: the caller should close the stream or
67    /// circuit with a protocol error.
68    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
69        self.flow_ctrl.put_for_incoming_sendme(msg)?;
70        // Wake the stream if it was blocked on flow control.
71        if let Some(waker) = self.flow_ctrl_waker.take() {
72            waker.wake();
73        }
74        Ok(())
75    }
76
77    /// The approximate number of stream inbound data bytes buffered.
78    fn approx_stream_bytes_buffered(&self) -> usize {
79        // NOTE: Here we want to know the total number of buffered incoming stream data bytes. We
80        // have access to the `StreamQueueSender` and can get how many bytes are buffered in that
81        // queue.
82        // But this isn't always the total number of buffered bytes since some bytes might be
83        // buffered outside of this queue.
84        // For example `DataReaderImpl` stores some stream bytes in its `pending` buffer, and we
85        // have no way to access that from here in the reactor. So it's impossible to know the total
86        // number of incoming stream data bytes that are buffered.
87        //
88        // This isn't really an issue in practice since *most* of the bytes will be queued in the
89        // `StreamQueueSender`, the XOFF threshold is very large, and we don't need to be exact.
90        self.sink.approx_stream_bytes()
91    }
92
93    /// Check if we should send an XON message.
94    ///
95    /// If we should, then returns the XON message that should be sent.
96    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
97    pub(crate) fn maybe_send_xon(&mut self, rate: XonKbpsEwma) -> Result<Option<Xon>> {
98        self.flow_ctrl
99            .maybe_send_xon(rate, self.approx_stream_bytes_buffered())
100    }
101
102    /// Check if we should send an XOFF message.
103    ///
104    /// If we should, then returns the XOFF message that should be sent.
105    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
106    pub(super) fn maybe_send_xoff(&mut self) -> Result<Option<Xoff>> {
107        self.flow_ctrl
108            .maybe_send_xoff(self.approx_stream_bytes_buffered())
109    }
110
111    /// Handle an incoming XON message.
112    ///
113    /// On failure, return an error: the caller should close the stream or
114    /// circuit with a protocol error.
115    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
116        self.flow_ctrl.handle_incoming_xon(msg)
117    }
118
119    /// Handle an incoming XOFF message.
120    ///
121    /// On failure, return an error: the caller should close the stream or
122    /// circuit with a protocol error.
123    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
124        self.flow_ctrl.handle_incoming_xoff(msg)
125    }
126
127    /// Take capacity to send `msg`. If there's insufficient capacity, returns
128    /// an error. Should be called at the point we've fully committed to
129    /// sending the message.
130    //
131    // TODO: Consider not exposing this, and instead taking the capacity in
132    // `StreamMap::take_ready_msg`.
133    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
134        self.flow_ctrl.take_capacity_to_send(msg)
135    }
136}
137
138/// Private wrapper over `OpenStreamEnt`. We implement `futures::Stream` for
139/// this wrapper, and not directly for `OpenStreamEnt`, so that client code
140/// can't directly access the stream.
141#[derive(Debug)]
142#[pin_project]
143struct OpenStreamEntStream {
144    /// Inner value.
145    #[pin]
146    inner: OpenStreamEnt,
147}
148
149impl futures::Stream for OpenStreamEntStream {
150    type Item = AnyRelayMsg;
151
152    fn poll_next(
153        mut self: std::pin::Pin<&mut Self>,
154        cx: &mut std::task::Context<'_>,
155    ) -> Poll<Option<Self::Item>> {
156        if !self.as_mut().poll_peek_mut(cx).is_ready() {
157            return Poll::Pending;
158        };
159        let res = self.project().inner.project().rx.poll_next(cx);
160        debug_assert!(res.is_ready());
161        // TODO: consider calling `inner.flow_ctrl.take_capacity_to_send` here;
162        // particularly if we change it to return a wrapper type that proves
163        // we've taken the capacity. Otherwise it'd make it tricky in the reactor
164        // to be sure we've correctly taken the capacity, since messages can originate
165        // in other parts of the code (currently none of those should be of types that
166        // count towards flow control, but that may change).
167        res
168    }
169}
170
171impl PeekableStream for OpenStreamEntStream {
172    fn poll_peek_mut(
173        self: Pin<&mut Self>,
174        cx: &mut std::task::Context<'_>,
175    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
176        let s = self.project();
177        let inner = s.inner.project();
178        let m = match inner.rx.poll_peek_mut(cx) {
179            Poll::Ready(Some(m)) => m,
180            Poll::Ready(None) => return Poll::Ready(None),
181            Poll::Pending => return Poll::Pending,
182        };
183        if !inner.flow_ctrl.can_send(m) {
184            inner.flow_ctrl_waker.replace(cx.waker().clone());
185            return Poll::Pending;
186        }
187        Poll::Ready(Some(m))
188    }
189}
190
191impl UnobtrusivePeekableStream for OpenStreamEntStream {
192    fn unobtrusive_peek_mut(
193        self: std::pin::Pin<&mut Self>,
194    ) -> Option<&mut <Self as futures::Stream>::Item> {
195        let s = self.project();
196        let inner = s.inner.project();
197        let m = inner.rx.unobtrusive_peek_mut()?;
198        if inner.flow_ctrl.can_send(m) {
199            Some(m)
200        } else {
201            None
202        }
203    }
204}
205
206/// Entry for a stream where we have sent an END, or other message
207/// indicating that the stream is terminated.
208#[derive(Debug)]
209pub(super) struct EndSentStreamEnt {
210    /// A "half-stream" that we use to check the validity of incoming
211    /// messages on this stream.
212    pub(super) half_stream: HalfStream,
213    /// True if the sender on this stream has been explicitly dropped;
214    /// false if we got an explicit close from `close_pending`
215    explicitly_dropped: bool,
216}
217
218/// The entry for a stream.
219#[derive(Debug)]
220enum ClosedStreamEnt {
221    /// A stream for which we have received an END cell, but not yet
222    /// had the stream object get dropped.
223    EndReceived,
224    /// A stream for which we have sent an END cell but not yet received an END
225    /// cell.
226    ///
227    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
228    /// these?
229    EndSent(EndSentStreamEnt),
230}
231
232/// Mutable reference to a stream entry.
233pub(super) enum StreamEntMut<'a> {
234    /// An open stream.
235    Open(&'a mut OpenStreamEnt),
236    /// A stream for which we have received an END cell, but not yet
237    /// had the stream object get dropped.
238    EndReceived,
239    /// A stream for which we have sent an END cell but not yet received an END
240    /// cell.
241    EndSent(&'a mut EndSentStreamEnt),
242}
243
244impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
245    fn from(value: &'a mut ClosedStreamEnt) -> Self {
246        match value {
247            ClosedStreamEnt::EndReceived => Self::EndReceived,
248            ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
249        }
250    }
251}
252
253impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
254    fn from(value: &'a mut OpenStreamEntStream) -> Self {
255        Self::Open(&mut value.inner)
256    }
257}
258
259/// Return value to indicate whether or not we send an END cell upon
260/// terminating a given stream.
261#[derive(Debug, Copy, Clone, Eq, PartialEq)]
262pub(super) enum ShouldSendEnd {
263    /// An END cell should be sent.
264    Send,
265    /// An END cell should not be sent.
266    DontSend,
267}
268
269/// A priority for use with [`StreamPollSet`].
270#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
271struct Priority(u64);
272
273/// A map from stream IDs to stream entries. Each circuit has one for each
274/// hop.
275pub(super) struct StreamMap {
276    /// Open streams.
277    // Invariants:
278    // * Keys are disjoint with `closed_streams`.
279    open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
280    /// Closed streams.
281    // Invariants:
282    // * Keys are disjoint with `open_streams`.
283    closed_streams: HashMap<StreamId, ClosedStreamEnt>,
284    /// The next StreamId that we should use for a newly allocated
285    /// circuit.
286    next_stream_id: StreamId,
287    /// Next priority to use in `rxs`. We implement round-robin scheduling of
288    /// handling outgoing messages from streams by assigning a stream the next
289    /// priority whenever an outgoing message is processed from that stream,
290    /// putting it last in line.
291    next_priority: Priority,
292}
293
294impl StreamMap {
295    /// Make a new empty StreamMap.
296    pub(super) fn new() -> Self {
297        let mut rng = rand::rng();
298        let next_stream_id: NonZeroU16 = rng.random();
299        StreamMap {
300            open_streams: StreamPollSet::new(),
301            closed_streams: HashMap::new(),
302            next_stream_id: next_stream_id.into(),
303            next_priority: Priority(0),
304        }
305    }
306
307    /// Return the number of open streams in this map.
308    pub(super) fn n_open_streams(&self) -> usize {
309        self.open_streams.len()
310    }
311
312    /// Return the next available priority.
313    fn take_next_priority(&mut self) -> Priority {
314        let rv = self.next_priority;
315        self.next_priority = Priority(rv.0 + 1);
316        rv
317    }
318
319    /// Add an entry to this map; return the newly allocated StreamId.
320    pub(super) fn add_ent(
321        &mut self,
322        sink: StreamQueueSender,
323        rx: StreamMpscReceiver<AnyRelayMsg>,
324        flow_ctrl: StreamFlowControl,
325        cmd_checker: AnyCmdChecker,
326    ) -> Result<StreamId> {
327        let mut stream_ent = OpenStreamEntStream {
328            inner: OpenStreamEnt {
329                sink,
330                flow_ctrl,
331                dropped: 0,
332                cmd_checker,
333                rx: StreamUnobtrusivePeeker::new(rx),
334                flow_ctrl_waker: None,
335            },
336        };
337        let priority = self.take_next_priority();
338        // This "65536" seems too aggressive, but it's what tor does.
339        //
340        // Also, going around in a loop here is (sadly) needed in order
341        // to look like Tor clients.
342        for _ in 1..=65536 {
343            let id: StreamId = self.next_stream_id;
344            self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
345            stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
346                Ok(_) => return Ok(id),
347                Err(KeyAlreadyInsertedError {
348                    key: _,
349                    priority: _,
350                    stream,
351                }) => stream,
352            };
353        }
354
355        Err(Error::IdRangeFull)
356    }
357
358    /// Add an entry to this map using the specified StreamId.
359    #[cfg(feature = "hs-service")]
360    pub(super) fn add_ent_with_id(
361        &mut self,
362        sink: StreamQueueSender,
363        rx: StreamMpscReceiver<AnyRelayMsg>,
364        flow_ctrl: StreamFlowControl,
365        id: StreamId,
366        cmd_checker: AnyCmdChecker,
367    ) -> Result<()> {
368        let stream_ent = OpenStreamEntStream {
369            inner: OpenStreamEnt {
370                sink,
371                flow_ctrl,
372                dropped: 0,
373                cmd_checker,
374                rx: StreamUnobtrusivePeeker::new(rx),
375                flow_ctrl_waker: None,
376            },
377        };
378        let priority = self.take_next_priority();
379        self.open_streams
380            .try_insert(id, priority, stream_ent)
381            .map_err(|_| Error::IdUnavailable(id))
382    }
383
384    /// Return the entry for `id` in this map, if any.
385    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
386        if let Some(e) = self.open_streams.stream_mut(&id) {
387            return Some(e.into());
388        }
389        if let Some(e) = self.closed_streams.get_mut(&id) {
390            return Some(e.into());
391        }
392        None
393    }
394
395    /// Note that we received an END message (or other message indicating the end of
396    /// the stream) on the stream with `id`.
397    ///
398    /// Returns true if there was really a stream there.
399    pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
400        if self.open_streams.remove(&id).is_some() {
401            let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
402            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
403            return Ok(());
404        }
405        let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
406            return Err(Error::CircProto(
407                "Received END cell on nonexistent stream".into(),
408            ));
409        };
410        // Progress the stream's state machine accordingly
411        match closed_entry.get() {
412            ClosedStreamEnt::EndReceived => Err(Error::CircProto(
413                "Received two END cells on same stream".into(),
414            )),
415            ClosedStreamEnt::EndSent { .. } => {
416                debug!("Actually got an end cell on a half-closed stream!");
417                // We got an END, and we already sent an END. Great!
418                // we can forget about this stream.
419                closed_entry.remove_entry();
420                Ok(())
421            }
422        }
423    }
424
425    /// Handle a termination of the stream with `id` from this side of
426    /// the circuit. Return true if the stream was open and an END
427    /// ought to be sent.
428    pub(super) fn terminate(
429        &mut self,
430        id: StreamId,
431        why: TerminateReason,
432    ) -> Result<ShouldSendEnd> {
433        use TerminateReason as TR;
434
435        if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
436            let OpenStreamEntStream {
437                inner:
438                    OpenStreamEnt {
439                        flow_ctrl,
440                        dropped,
441                        cmd_checker,
442                        // notably absent: the channels for sink and stream, which will get dropped and
443                        // closed (meaning reads/writes from/to this stream will now fail)
444                        ..
445                    },
446            } = ent;
447            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
448            //             so a malicious peer can send us slightly more data than they should
449            //             be able to; see arti#230.
450            let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
451            recv_window.decrement_n(dropped)?;
452            // TODO: would be nice to avoid new_ref.
453            let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
454            let explicitly_dropped = why == TR::StreamTargetClosed;
455            let prev = self.closed_streams.insert(
456                id,
457                ClosedStreamEnt::EndSent(EndSentStreamEnt {
458                    half_stream,
459                    explicitly_dropped,
460                }),
461            );
462            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
463            return Ok(ShouldSendEnd::Send);
464        }
465
466        // Progress the stream's state machine accordingly
467        match self
468            .closed_streams
469            .remove(&id)
470            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
471        {
472            ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
473            ClosedStreamEnt::EndSent(EndSentStreamEnt {
474                ref mut explicitly_dropped,
475                ..
476            }) => match (*explicitly_dropped, why) {
477                (false, TR::StreamTargetClosed) => {
478                    *explicitly_dropped = true;
479                    Ok(ShouldSendEnd::DontSend)
480                }
481                (true, TR::StreamTargetClosed) => {
482                    Err(bad_api_usage!("Tried to close an already closed stream.").into())
483                }
484                (_, TR::ExplicitEnd) => Err(bad_api_usage!(
485                    "Tried to end an already closed stream. (explicitly_dropped={:?})",
486                    *explicitly_dropped
487                )
488                .into()),
489            },
490        }
491    }
492
493    /// Get an up-to-date iterator of streams with ready items. `Option<AnyRelayMsg>::None`
494    /// indicates that the local sender has been dropped.
495    ///
496    /// Conceptually all streams are in a queue; new streams are added to the
497    /// back of the queue, and a stream is sent to the back of the queue
498    /// whenever a ready message is taken from it (via
499    /// [`Self::take_ready_msg`]). The returned iterator is an ordered view of
500    /// this queue, showing the subset of streams that have a message ready to
501    /// send, or whose sender has been dropped.
502    pub(super) fn poll_ready_streams_iter<'a>(
503        &'a mut self,
504        cx: &mut std::task::Context,
505    ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a {
506        self.open_streams
507            .poll_ready_iter_mut(cx)
508            .map(|(sid, _priority, ent)| {
509                let ent = Pin::new(ent);
510                let msg = ent.unobtrusive_peek();
511                (*sid, msg)
512            })
513    }
514
515    /// If the stream `sid` has a message ready, take it, and reprioritize `sid`
516    /// to the "back of the line" with respect to
517    /// [`Self::poll_ready_streams_iter`].
518    pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
519        let new_priority = self.take_next_priority();
520        let (_prev_priority, val) = self
521            .open_streams
522            .take_ready_value_and_reprioritize(&sid, new_priority)?;
523        Some(val)
524    }
525
526    // TODO: Eventually if we want relay support, we'll need to support
527    // stream IDs chosen by somebody else. But for now, we don't need those.
528}
529
530/// A reason for terminating a stream.
531///
532/// We use this type in order to ensure that we obey the API restrictions of [`StreamMap::terminate`]
533#[derive(Copy, Clone, Debug, PartialEq, Eq)]
534pub(super) enum TerminateReason {
535    /// Closing a stream because the receiver got `Ok(None)`, indicating that the
536    /// corresponding senders were all dropped.
537    StreamTargetClosed,
538    /// Closing a stream because we were explicitly told to end it via
539    /// [`StreamTarget::close_pending`](crate::tunnel::StreamTarget::close_pending).
540    ExplicitEnd,
541}
542
543/// Convenience function for doing a wrapping increment of a `StreamId`.
544fn wrapping_next_stream_id(id: StreamId) -> StreamId {
545    let next_val = NonZeroU16::from(id)
546        .checked_add(1)
547        .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
548    next_val.into()
549}
550
551#[cfg(test)]
552mod test {
553    // @@ begin test lint list maintained by maint/add_warning @@
554    #![allow(clippy::bool_assert_comparison)]
555    #![allow(clippy::clone_on_copy)]
556    #![allow(clippy::dbg_macro)]
557    #![allow(clippy::mixed_attributes_style)]
558    #![allow(clippy::print_stderr)]
559    #![allow(clippy::print_stdout)]
560    #![allow(clippy::single_char_pattern)]
561    #![allow(clippy::unwrap_used)]
562    #![allow(clippy::unchecked_duration_subtraction)]
563    #![allow(clippy::useless_vec)]
564    #![allow(clippy::needless_pass_by_value)]
565    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
566    use super::*;
567    use crate::stream::queue::fake_stream_queue;
568    use crate::tunnel::circuit::test::fake_mpsc;
569    use crate::{congestion::sendme::StreamSendWindow, stream::DataCmdChecker};
570
571    #[test]
572    fn test_wrapping_next_stream_id() {
573        let one = StreamId::new(1).unwrap();
574        let two = StreamId::new(2).unwrap();
575        let max = StreamId::new(0xffff).unwrap();
576        assert_eq!(wrapping_next_stream_id(one), two);
577        assert_eq!(wrapping_next_stream_id(max), one);
578    }
579
580    #[test]
581    #[allow(clippy::cognitive_complexity)]
582    fn streammap_basics() -> Result<()> {
583        let mut map = StreamMap::new();
584        let mut next_id = map.next_stream_id;
585        let mut ids = Vec::new();
586
587        assert_eq!(map.n_open_streams(), 0);
588
589        // Try add_ent
590        for n in 1..=128 {
591            let (sink, _) = fake_stream_queue(128);
592            let (_, rx) = fake_mpsc(2);
593            let id = map.add_ent(
594                sink,
595                rx,
596                StreamFlowControl::new_window_based(StreamSendWindow::new(500)),
597                DataCmdChecker::new_any(),
598            )?;
599            let expect_id: StreamId = next_id;
600            assert_eq!(expect_id, id);
601            next_id = wrapping_next_stream_id(next_id);
602            ids.push(id);
603            assert_eq!(map.n_open_streams(), n);
604        }
605
606        // Test get_mut.
607        let nonesuch_id = next_id;
608        assert!(matches!(
609            map.get_mut(ids[0]),
610            Some(StreamEntMut::Open { .. })
611        ));
612        assert!(map.get_mut(nonesuch_id).is_none());
613
614        // Test end_received
615        assert!(map.ending_msg_received(nonesuch_id).is_err());
616        assert_eq!(map.n_open_streams(), 128);
617        assert!(map.ending_msg_received(ids[1]).is_ok());
618        assert_eq!(map.n_open_streams(), 127);
619        assert!(matches!(
620            map.get_mut(ids[1]),
621            Some(StreamEntMut::EndReceived)
622        ));
623        assert!(map.ending_msg_received(ids[1]).is_err());
624
625        // Test terminate
626        use TerminateReason as TR;
627        assert!(map.terminate(nonesuch_id, TR::ExplicitEnd).is_err());
628        assert_eq!(map.n_open_streams(), 127);
629        assert_eq!(
630            map.terminate(ids[2], TR::ExplicitEnd).unwrap(),
631            ShouldSendEnd::Send
632        );
633        assert_eq!(map.n_open_streams(), 126);
634        assert!(matches!(
635            map.get_mut(ids[2]),
636            Some(StreamEntMut::EndSent { .. })
637        ));
638        assert_eq!(
639            map.terminate(ids[1], TR::ExplicitEnd).unwrap(),
640            ShouldSendEnd::DontSend
641        );
642        // This stream was already closed when we called `ending_msg_received`
643        // above.
644        assert_eq!(map.n_open_streams(), 126);
645        assert!(map.get_mut(ids[1]).is_none());
646
647        // Try receiving an end after a terminate.
648        assert!(map.ending_msg_received(ids[2]).is_ok());
649        assert!(map.get_mut(ids[2]).is_none());
650        assert_eq!(map.n_open_streams(), 126);
651
652        Ok(())
653    }
654}