1
//! Types and code for mapping StreamIDs to streams on a circuit.
2

            
3
use crate::congestion::sendme;
4
use crate::stream::{AnyCmdChecker, StreamSendFlowControl};
5
use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
6
use crate::tunnel::halfstream::HalfStream;
7
use crate::tunnel::reactor::circuit::RECV_WINDOW_INIT;
8
use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
9
use crate::{Error, Result};
10
use pin_project::pin_project;
11
use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
12
use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
13
use tor_cell::relaycell::{msg::AnyRelayMsg, StreamId};
14
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
15

            
16
use std::collections::hash_map;
17
use std::collections::HashMap;
18
use std::num::NonZeroU16;
19
use std::pin::Pin;
20
use std::task::{Poll, Waker};
21
use tor_error::{bad_api_usage, internal};
22

            
23
use rand::Rng;
24

            
25
use tracing::debug;
26

            
27
/// Entry for an open stream
28
///
29
/// (For the purposes of this module, an open stream is one where we have not
30
/// sent or received any message indicating that the stream is ended.)
31
#[derive(Debug)]
32
13890
#[pin_project]
33
pub(super) struct OpenStreamEnt {
34
    /// Sink to send relay cells tagged for this stream into.
35
    pub(super) sink: StreamMpscSender<UnparsedRelayMsg>,
36
    /// Number of cells dropped due to the stream disappearing before we can
37
    /// transform this into an `EndSent`.
38
    pub(super) dropped: u16,
39
    /// A `CmdChecker` used to tell whether cells on this stream are valid.
40
    pub(super) cmd_checker: AnyCmdChecker,
41
    /// Flow control for this stream.
42
    // Non-pub because we need to proxy `put_for_incoming_sendme` to ensure
43
    // `flow_ctrl_waker` is woken.
44
    flow_ctrl: StreamSendFlowControl,
45
    /// Stream for cells that should be sent down this stream.
46
    // Not directly exposed. This should only be polled via
47
    // `OpenStreamEntStream`s implementation of `Stream`, which in turn should
48
    // only be used through `StreamPollSet`.
49
    #[pin]
50
    rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
51
    /// Waker to be woken when more sending capacity becomes available (e.g.
52
    /// receiving a SENDME).
53
    flow_ctrl_waker: Option<Waker>,
54
}
55

            
56
impl OpenStreamEnt {
57
    /// Whether this stream is ready to send `msg`.
58
2744
    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
59
2744
        self.flow_ctrl.can_send(msg)
60
2744
    }
61

            
62
    /// Handle an incoming sendme.
63
    ///
64
    /// On success, return the number of cells left in the window.
65
    ///
66
    /// On failure, return an error: the caller should close the stream or
67
    /// circuit with a protocol error.
68
4
    pub(crate) fn put_for_incoming_sendme(&mut self) -> Result<()> {
69
4
        self.flow_ctrl.put_for_incoming_sendme()?;
70
        // Wake the stream if it was blocked on flow control.
71
4
        if let Some(waker) = self.flow_ctrl_waker.take() {
72
            waker.wake();
73
4
        }
74
4
        Ok(())
75
4
    }
76

            
77
    /// Take capacity to send `msg`. If there's insufficient capacity, returns
78
    /// an error. Should be called at the point we've fully committed to
79
    /// sending the message.
80
    //
81
    // TODO: Consider not exposing this, and instead taking the capacity in
82
    // `StreamMap::take_ready_msg`.
83
2728
    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
84
2728
        self.flow_ctrl.take_capacity_to_send(msg)
85
2728
    }
86
}
87

            
88
/// Private wrapper over `OpenStreamEnt`. We implement `futures::Stream` for
89
/// this wrapper, and not directly for `OpenStreamEnt`, so that client code
90
/// can't directly access the stream.
91
#[derive(Debug)]
92
13890
#[pin_project]
93
struct OpenStreamEntStream {
94
    /// Inner value.
95
    #[pin]
96
    inner: OpenStreamEnt,
97
}
98

            
99
impl futures::Stream for OpenStreamEntStream {
100
    type Item = AnyRelayMsg;
101

            
102
2744
    fn poll_next(
103
2744
        mut self: std::pin::Pin<&mut Self>,
104
2744
        cx: &mut std::task::Context<'_>,
105
2744
    ) -> Poll<Option<Self::Item>> {
106
2744
        if !self.as_mut().poll_peek_mut(cx).is_ready() {
107
            return Poll::Pending;
108
2744
        };
109
2744
        let res = self.project().inner.project().rx.poll_next(cx);
110
2744
        debug_assert!(res.is_ready());
111
        // TODO: consider calling `inner.flow_ctrl.take_capacity_to_send` here;
112
        // particularly if we change it to return a wrapper type that proves
113
        // we've taken the capacity. Otherwise it'd make it tricky in the reactor
114
        // to be sure we've correctly taken the capacity, since messages can originate
115
        // in other parts of the code (currently none of those should be of types that
116
        // count towards flow control, but that may change).
117
2744
        res
118
2744
    }
119
}
120

            
121
impl PeekableStream for OpenStreamEntStream {
122
8370
    fn poll_peek_mut(
123
8370
        self: Pin<&mut Self>,
124
8370
        cx: &mut std::task::Context<'_>,
125
8370
    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
126
8370
        let s = self.project();
127
8370
        let inner = s.inner.project();
128
8370
        let m = match inner.rx.poll_peek_mut(cx) {
129
8240
            Poll::Ready(Some(m)) => m,
130
32
            Poll::Ready(None) => return Poll::Ready(None),
131
98
            Poll::Pending => return Poll::Pending,
132
        };
133
8240
        if !inner.flow_ctrl.can_send(m) {
134
            inner.flow_ctrl_waker.replace(cx.waker().clone());
135
            return Poll::Pending;
136
8240
        }
137
8240
        Poll::Ready(Some(m))
138
8370
    }
139
}
140

            
141
impl UnobtrusivePeekableStream for OpenStreamEntStream {
142
2776
    fn unobtrusive_peek_mut(
143
2776
        self: std::pin::Pin<&mut Self>,
144
2776
    ) -> Option<&mut <Self as futures::Stream>::Item> {
145
2776
        let s = self.project();
146
2776
        let inner = s.inner.project();
147
2776
        let m = inner.rx.unobtrusive_peek_mut()?;
148
2744
        if inner.flow_ctrl.can_send(m) {
149
2744
            Some(m)
150
        } else {
151
            None
152
        }
153
2776
    }
154
}
155

            
156
/// Entry for a stream where we have sent an END, or other message
157
/// indicating that the stream is terminated.
158
#[derive(Debug)]
159
pub(super) struct EndSentStreamEnt {
160
    /// A "half-stream" that we use to check the validity of incoming
161
    /// messages on this stream.
162
    pub(super) half_stream: HalfStream,
163
    /// True if the sender on this stream has been explicitly dropped;
164
    /// false if we got an explicit close from `close_pending`
165
    explicitly_dropped: bool,
166
}
167

            
168
/// The entry for a stream.
169
#[derive(Debug)]
170
enum ClosedStreamEnt {
171
    /// A stream for which we have received an END cell, but not yet
172
    /// had the stream object get dropped.
173
    EndReceived,
174
    /// A stream for which we have sent an END cell but not yet received an END
175
    /// cell.
176
    ///
177
    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
178
    /// these?
179
    EndSent(EndSentStreamEnt),
180
}
181

            
182
/// Mutable reference to a stream entry.
183
pub(super) enum StreamEntMut<'a> {
184
    /// An open stream.
185
    Open(&'a mut OpenStreamEnt),
186
    /// A stream for which we have received an END cell, but not yet
187
    /// had the stream object get dropped.
188
    EndReceived,
189
    /// A stream for which we have sent an END cell but not yet received an END
190
    /// cell.
191
    EndSent(&'a mut EndSentStreamEnt),
192
}
193

            
194
impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
195
12
    fn from(value: &'a mut ClosedStreamEnt) -> Self {
196
12
        match value {
197
2
            ClosedStreamEnt::EndReceived => Self::EndReceived,
198
10
            ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
199
        }
200
12
    }
201
}
202

            
203
impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
204
5566
    fn from(value: &'a mut OpenStreamEntStream) -> Self {
205
5566
        Self::Open(&mut value.inner)
206
5566
    }
207
}
208

            
209
/// Return value to indicate whether or not we send an END cell upon
210
/// terminating a given stream.
211
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
212
pub(super) enum ShouldSendEnd {
213
    /// An END cell should be sent.
214
    Send,
215
    /// An END cell should not be sent.
216
    DontSend,
217
}
218

            
219
/// A priority for use with [`StreamPollSet`].
220
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
221
struct Priority(u64);
222

            
223
/// A map from stream IDs to stream entries. Each circuit has one for each
224
/// hop.
225
pub(super) struct StreamMap {
226
    /// Open streams.
227
    // Invariants:
228
    // * Keys are disjoint with `closed_streams`.
229
    open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
230
    /// Closed streams.
231
    // Invariants:
232
    // * Keys are disjoint with `open_streams`.
233
    closed_streams: HashMap<StreamId, ClosedStreamEnt>,
234
    /// The next StreamId that we should use for a newly allocated
235
    /// circuit.
236
    next_stream_id: StreamId,
237
    /// Next priority to use in `rxs`. We implement round-robin scheduling of
238
    /// handling outgoing messages from streams by assigning a stream the next
239
    /// priority whenever an outgoing message is processed from that stream,
240
    /// putting it last in line.
241
    next_priority: Priority,
242
}
243

            
244
impl StreamMap {
245
    /// Make a new empty StreamMap.
246
410
    pub(super) fn new() -> Self {
247
410
        let mut rng = rand::rng();
248
410
        let next_stream_id: NonZeroU16 = rng.random();
249
410
        StreamMap {
250
410
            open_streams: StreamPollSet::new(),
251
410
            closed_streams: HashMap::new(),
252
410
            next_stream_id: next_stream_id.into(),
253
410
            next_priority: Priority(0),
254
410
        }
255
410
    }
256

            
257
    /// Return the number of open streams in this map.
258
270
    pub(super) fn n_open_streams(&self) -> usize {
259
270
        self.open_streams.len()
260
270
    }
261

            
262
    /// Return the next available priority.
263
3080
    fn take_next_priority(&mut self) -> Priority {
264
3080
        let rv = self.next_priority;
265
3080
        self.next_priority = Priority(rv.0 + 1);
266
3080
        rv
267
3080
    }
268

            
269
    /// Add an entry to this map; return the newly allocated StreamId.
270
312
    pub(super) fn add_ent(
271
312
        &mut self,
272
312
        sink: StreamMpscSender<UnparsedRelayMsg>,
273
312
        rx: StreamMpscReceiver<AnyRelayMsg>,
274
312
        flow_ctrl: StreamSendFlowControl,
275
312
        cmd_checker: AnyCmdChecker,
276
312
    ) -> Result<StreamId> {
277
312
        let mut stream_ent = OpenStreamEntStream {
278
312
            inner: OpenStreamEnt {
279
312
                sink,
280
312
                flow_ctrl,
281
312
                dropped: 0,
282
312
                cmd_checker,
283
312
                rx: StreamUnobtrusivePeeker::new(rx),
284
312
                flow_ctrl_waker: None,
285
312
            },
286
312
        };
287
312
        let priority = self.take_next_priority();
288
        // This "65536" seems too aggressive, but it's what tor does.
289
        //
290
        // Also, going around in a loop here is (sadly) needed in order
291
        // to look like Tor clients.
292
312
        for _ in 1..=65536 {
293
312
            let id: StreamId = self.next_stream_id;
294
312
            self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
295
312
            stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
296
312
                Ok(_) => return Ok(id),
297
                Err(KeyAlreadyInsertedError {
298
                    key: _,
299
                    priority: _,
300
                    stream,
301
                }) => stream,
302
            };
303
        }
304

            
305
        Err(Error::IdRangeFull)
306
312
    }
307

            
308
    /// Add an entry to this map using the specified StreamId.
309
    #[cfg(feature = "hs-service")]
310
24
    pub(super) fn add_ent_with_id(
311
24
        &mut self,
312
24
        sink: StreamMpscSender<UnparsedRelayMsg>,
313
24
        rx: StreamMpscReceiver<AnyRelayMsg>,
314
24
        flow_ctrl: StreamSendFlowControl,
315
24
        id: StreamId,
316
24
        cmd_checker: AnyCmdChecker,
317
24
    ) -> Result<()> {
318
24
        let stream_ent = OpenStreamEntStream {
319
24
            inner: OpenStreamEnt {
320
24
                sink,
321
24
                flow_ctrl,
322
24
                dropped: 0,
323
24
                cmd_checker,
324
24
                rx: StreamUnobtrusivePeeker::new(rx),
325
24
                flow_ctrl_waker: None,
326
24
            },
327
24
        };
328
24
        let priority = self.take_next_priority();
329
24
        self.open_streams
330
24
            .try_insert(id, priority, stream_ent)
331
24
            .map_err(|_| Error::IdUnavailable(id))
332
24
    }
333

            
334
    /// Return the entry for `id` in this map, if any.
335
5608
    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
336
5608
        if let Some(e) = self.open_streams.stream_mut(&id) {
337
5566
            return Some(e.into());
338
42
        }
339
42
        if let Some(e) = self.closed_streams.get_mut(&id) {
340
12
            return Some(e.into());
341
30
        }
342
30
        None
343
5608
    }
344

            
345
    /// Note that we received an END message (or other message indicating the end of
346
    /// the stream) on the stream with `id`.
347
    ///
348
    /// Returns true if there was really a stream there.
349
24
    pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
350
24
        if self.open_streams.remove(&id).is_some() {
351
10
            let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
352
10
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
353
10
            return Ok(());
354
14
        }
355
14
        let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
356
2
            return Err(Error::CircProto(
357
2
                "Received END cell on nonexistent stream".into(),
358
2
            ));
359
        };
360
        // Progress the stream's state machine accordingly
361
12
        match closed_entry.get() {
362
2
            ClosedStreamEnt::EndReceived => Err(Error::CircProto(
363
2
                "Received two END cells on same stream".into(),
364
2
            )),
365
            ClosedStreamEnt::EndSent { .. } => {
366
10
                debug!("Actually got an end cell on a half-closed stream!");
367
                // We got an END, and we already sent an END. Great!
368
                // we can forget about this stream.
369
10
                closed_entry.remove_entry();
370
10
                Ok(())
371
            }
372
        }
373
24
    }
374

            
375
    /// Handle a termination of the stream with `id` from this side of
376
    /// the circuit. Return true if the stream was open and an END
377
    /// ought to be sent.
378
46
    pub(super) fn terminate(
379
46
        &mut self,
380
46
        id: StreamId,
381
46
        why: TerminateReason,
382
46
    ) -> Result<ShouldSendEnd> {
383
        use TerminateReason as TR;
384

            
385
46
        if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
386
            let OpenStreamEntStream {
387
                inner:
388
                    OpenStreamEnt {
389
42
                        flow_ctrl,
390
42
                        dropped,
391
42
                        cmd_checker,
392
42
                        // notably absent: the channels for sink and stream, which will get dropped and
393
42
                        // closed (meaning reads/writes from/to this stream will now fail)
394
42
                        ..
395
42
                    },
396
42
            } = ent;
397
42
            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
398
42
            //             so a malicious peer can send us slightly more data than they should
399
42
            //             be able to; see arti#230.
400
42
            let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
401
42
            recv_window.decrement_n(dropped)?;
402
            // TODO: would be nice to avoid new_ref.
403
42
            let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
404
42
            let explicitly_dropped = why == TR::StreamTargetClosed;
405
42
            let prev = self.closed_streams.insert(
406
42
                id,
407
42
                ClosedStreamEnt::EndSent(EndSentStreamEnt {
408
42
                    half_stream,
409
42
                    explicitly_dropped,
410
42
                }),
411
42
            );
412
42
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
413
42
            return Ok(ShouldSendEnd::Send);
414
4
        }
415
4

            
416
4
        // Progress the stream's state machine accordingly
417
4
        match self
418
4
            .closed_streams
419
4
            .remove(&id)
420
5
            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
421
        {
422
2
            ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
423
            ClosedStreamEnt::EndSent(EndSentStreamEnt {
424
                ref mut explicitly_dropped,
425
                ..
426
            }) => match (*explicitly_dropped, why) {
427
                (false, TR::StreamTargetClosed) => {
428
                    *explicitly_dropped = true;
429
                    Ok(ShouldSendEnd::DontSend)
430
                }
431
                (true, TR::StreamTargetClosed) => {
432
                    Err(bad_api_usage!("Tried to close an already closed stream.").into())
433
                }
434
                (_, TR::ExplicitEnd) => Err(bad_api_usage!(
435
                    "Tried to end an already closed stream. (explicitly_dropped={:?})",
436
                    *explicitly_dropped
437
                )
438
                .into()),
439
            },
440
        }
441
46
    }
442

            
443
    /// Get an up-to-date iterator of streams with ready items. `Option<AnyRelayMsg>::None`
444
    /// indicates that the local sender has been dropped.
445
    ///
446
    /// Conceptually all streams are in a queue; new streams are added to the
447
    /// back of the queue, and a stream is sent to the back of the queue
448
    /// whenever a ready message is taken from it (via
449
    /// [`Self::take_ready_msg`]). The returned iterator is an ordered view of
450
    /// this queue, showing the subset of streams that have a message ready to
451
    /// send, or whose sender has been dropped.
452
9758
    pub(super) fn poll_ready_streams_iter<'a>(
453
9758
        &'a mut self,
454
9758
        cx: &mut std::task::Context,
455
9758
    ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a {
456
9758
        self.open_streams
457
9758
            .poll_ready_iter_mut(cx)
458
11146
            .map(|(sid, _priority, ent)| {
459
2776
                let ent = Pin::new(ent);
460
2776
                let msg = ent.unobtrusive_peek();
461
2776
                (*sid, msg)
462
11146
            })
463
9758
    }
464

            
465
    /// If the stream `sid` has a message ready, take it, and reprioritize `sid`
466
    /// to the "back of the line" with respect to
467
    /// [`Self::poll_ready_streams_iter`].
468
2744
    pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
469
2744
        let new_priority = self.take_next_priority();
470
2744
        let (_prev_priority, val) = self
471
2744
            .open_streams
472
2744
            .take_ready_value_and_reprioritize(&sid, new_priority)?;
473
2744
        Some(val)
474
2744
    }
475

            
476
    // TODO: Eventually if we want relay support, we'll need to support
477
    // stream IDs chosen by somebody else. But for now, we don't need those.
478
}
479

            
480
/// A reason for terminating a stream.
481
///
482
/// We use this type in order to ensure that we obey the API restrictions of [`StreamMap::terminate`]
483
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
484
pub(super) enum TerminateReason {
485
    /// Closing a stream because the receiver got `Ok(None)`, indicating that the
486
    /// corresponding senders were all dropped.
487
    StreamTargetClosed,
488
    /// Closing a stream because we were explicitly told to end it via
489
    /// [`StreamTarget::close_pending`](crate::tunnel::StreamTarget::close_pending).
490
    ExplicitEnd,
491
}
492

            
493
/// Convenience function for doing a wrapping increment of a `StreamId`.
494
572
fn wrapping_next_stream_id(id: StreamId) -> StreamId {
495
572
    let next_val = NonZeroU16::from(id)
496
572
        .checked_add(1)
497
573
        .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
498
572
    next_val.into()
499
572
}
500

            
501
#[cfg(test)]
502
mod test {
503
    // @@ begin test lint list maintained by maint/add_warning @@
504
    #![allow(clippy::bool_assert_comparison)]
505
    #![allow(clippy::clone_on_copy)]
506
    #![allow(clippy::dbg_macro)]
507
    #![allow(clippy::mixed_attributes_style)]
508
    #![allow(clippy::print_stderr)]
509
    #![allow(clippy::print_stdout)]
510
    #![allow(clippy::single_char_pattern)]
511
    #![allow(clippy::unwrap_used)]
512
    #![allow(clippy::unchecked_duration_subtraction)]
513
    #![allow(clippy::useless_vec)]
514
    #![allow(clippy::needless_pass_by_value)]
515
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
516
    use super::*;
517
    use crate::tunnel::circuit::test::fake_mpsc;
518
    use crate::{congestion::sendme::StreamSendWindow, stream::DataCmdChecker};
519

            
520
    #[test]
521
    fn test_wrapping_next_stream_id() {
522
        let one = StreamId::new(1).unwrap();
523
        let two = StreamId::new(2).unwrap();
524
        let max = StreamId::new(0xffff).unwrap();
525
        assert_eq!(wrapping_next_stream_id(one), two);
526
        assert_eq!(wrapping_next_stream_id(max), one);
527
    }
528

            
529
    #[test]
530
    #[allow(clippy::cognitive_complexity)]
531
    fn streammap_basics() -> Result<()> {
532
        let mut map = StreamMap::new();
533
        let mut next_id = map.next_stream_id;
534
        let mut ids = Vec::new();
535

            
536
        assert_eq!(map.n_open_streams(), 0);
537

            
538
        // Try add_ent
539
        for n in 1..=128 {
540
            let (sink, _) = fake_mpsc(128);
541
            let (_, rx) = fake_mpsc(2);
542
            let id = map.add_ent(
543
                sink,
544
                rx,
545
                StreamSendFlowControl::new_window_based(StreamSendWindow::new(500)),
546
                DataCmdChecker::new_any(),
547
            )?;
548
            let expect_id: StreamId = next_id;
549
            assert_eq!(expect_id, id);
550
            next_id = wrapping_next_stream_id(next_id);
551
            ids.push(id);
552
            assert_eq!(map.n_open_streams(), n);
553
        }
554

            
555
        // Test get_mut.
556
        let nonesuch_id = next_id;
557
        assert!(matches!(
558
            map.get_mut(ids[0]),
559
            Some(StreamEntMut::Open { .. })
560
        ));
561
        assert!(map.get_mut(nonesuch_id).is_none());
562

            
563
        // Test end_received
564
        assert!(map.ending_msg_received(nonesuch_id).is_err());
565
        assert_eq!(map.n_open_streams(), 128);
566
        assert!(map.ending_msg_received(ids[1]).is_ok());
567
        assert_eq!(map.n_open_streams(), 127);
568
        assert!(matches!(
569
            map.get_mut(ids[1]),
570
            Some(StreamEntMut::EndReceived)
571
        ));
572
        assert!(map.ending_msg_received(ids[1]).is_err());
573

            
574
        // Test terminate
575
        use TerminateReason as TR;
576
        assert!(map.terminate(nonesuch_id, TR::ExplicitEnd).is_err());
577
        assert_eq!(map.n_open_streams(), 127);
578
        assert_eq!(
579
            map.terminate(ids[2], TR::ExplicitEnd).unwrap(),
580
            ShouldSendEnd::Send
581
        );
582
        assert_eq!(map.n_open_streams(), 126);
583
        assert!(matches!(
584
            map.get_mut(ids[2]),
585
            Some(StreamEntMut::EndSent { .. })
586
        ));
587
        assert_eq!(
588
            map.terminate(ids[1], TR::ExplicitEnd).unwrap(),
589
            ShouldSendEnd::DontSend
590
        );
591
        // This stream was already closed when we called `ending_msg_received`
592
        // above.
593
        assert_eq!(map.n_open_streams(), 126);
594
        assert!(map.get_mut(ids[1]).is_none());
595

            
596
        // Try receiving an end after a terminate.
597
        assert!(map.ending_msg_received(ids[2]).is_ok());
598
        assert!(map.get_mut(ids[2]).is_none());
599
        assert_eq!(map.n_open_streams(), 126);
600

            
601
        Ok(())
602
    }
603
}