1
//! Types and code for mapping StreamIDs to streams on a circuit.
2

            
3
use crate::congestion::sendme;
4
use crate::stream::queue::StreamQueueSender;
5
use crate::stream::{AnyCmdChecker, StreamFlowControl};
6
use crate::tunnel::circuit::StreamMpscReceiver;
7
use crate::tunnel::halfstream::HalfStream;
8
use crate::tunnel::reactor::circuit::RECV_WINDOW_INIT;
9
use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
10
use crate::{Error, Result};
11
use pin_project::pin_project;
12
use tor_async_utils::peekable_stream::{PeekableStream, UnobtrusivePeekableStream};
13
use tor_async_utils::stream_peek::StreamUnobtrusivePeeker;
14
use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
15
use tor_cell::relaycell::{msg::AnyRelayMsg, StreamId};
16
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
17

            
18
use std::collections::hash_map;
19
use std::collections::HashMap;
20
use std::num::NonZeroU16;
21
use std::pin::Pin;
22
use std::task::{Poll, Waker};
23
use tor_error::{bad_api_usage, internal};
24

            
25
use rand::Rng;
26

            
27
use tracing::debug;
28

            
29
/// Entry for an open stream
30
///
31
/// (For the purposes of this module, an open stream is one where we have not
32
/// sent or received any message indicating that the stream is ended.)
33
#[derive(Debug)]
34
20350
#[pin_project]
35
pub(super) struct OpenStreamEnt {
36
    /// Sink to send relay cells tagged for this stream into.
37
    pub(super) sink: StreamQueueSender,
38
    /// Number of cells dropped due to the stream disappearing before we can
39
    /// transform this into an `EndSent`.
40
    pub(super) dropped: u16,
41
    /// A `CmdChecker` used to tell whether cells on this stream are valid.
42
    pub(super) cmd_checker: AnyCmdChecker,
43
    /// Flow control for this stream.
44
    // Non-pub because we need to proxy `put_for_incoming_sendme` to ensure
45
    // `flow_ctrl_waker` is woken.
46
    flow_ctrl: StreamFlowControl,
47
    /// Stream for cells that should be sent down this stream.
48
    // Not directly exposed. This should only be polled via
49
    // `OpenStreamEntStream`s implementation of `Stream`, which in turn should
50
    // only be used through `StreamPollSet`.
51
    #[pin]
52
    rx: StreamUnobtrusivePeeker<StreamMpscReceiver<AnyRelayMsg>>,
53
    /// Waker to be woken when more sending capacity becomes available (e.g.
54
    /// receiving a SENDME).
55
    flow_ctrl_waker: Option<Waker>,
56
}
57

            
58
impl OpenStreamEnt {
59
    /// Whether this stream is ready to send `msg`.
60
4032
    pub(crate) fn can_send<M: RelayMsg>(&self, msg: &M) -> bool {
61
4032
        self.flow_ctrl.can_send(msg)
62
4032
    }
63

            
64
    /// Handle an incoming sendme.
65
    ///
66
    /// On failure, return an error: the caller should close the stream or
67
    /// circuit with a protocol error.
68
4
    pub(crate) fn put_for_incoming_sendme(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
69
4
        self.flow_ctrl.put_for_incoming_sendme(msg)?;
70
        // Wake the stream if it was blocked on flow control.
71
4
        if let Some(waker) = self.flow_ctrl_waker.take() {
72
            waker.wake();
73
4
        }
74
4
        Ok(())
75
4
    }
76

            
77
    /// The approximate number of stream inbound data bytes buffered.
78
64
    fn approx_stream_bytes_buffered(&self) -> usize {
79
64
        // NOTE: Here we want to know the total number of buffered incoming stream data bytes. We
80
64
        // have access to the `StreamQueueSender` and can get how many bytes are buffered in that
81
64
        // queue.
82
64
        // But this isn't always the total number of buffered bytes since some bytes might be
83
64
        // buffered outside of this queue.
84
64
        // For example `DataReaderImpl` stores some stream bytes in its `pending` buffer, and we
85
64
        // have no way to access that from here in the reactor. So it's impossible to know the total
86
64
        // number of incoming stream data bytes that are buffered.
87
64
        //
88
64
        // This isn't really an issue in practice since *most* of the bytes will be queued in the
89
64
        // `StreamQueueSender`, the XOFF threshold is very large, and we don't need to be exact.
90
64
        self.sink.approx_stream_bytes()
91
64
    }
92

            
93
    /// Check if we should send an XON message.
94
    ///
95
    /// If we should, then returns the XON message that should be sent.
96
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
97
    pub(crate) fn maybe_send_xon(&mut self, rate: XonKbpsEwma) -> Result<Option<Xon>> {
98
        self.flow_ctrl
99
            .maybe_send_xon(rate, self.approx_stream_bytes_buffered())
100
    }
101

            
102
    /// Check if we should send an XOFF message.
103
    ///
104
    /// If we should, then returns the XOFF message that should be sent.
105
    /// Returns an error if XON/XOFF messages aren't supported for this type of flow control.
106
64
    pub(super) fn maybe_send_xoff(&mut self) -> Result<Option<Xoff>> {
107
64
        self.flow_ctrl
108
64
            .maybe_send_xoff(self.approx_stream_bytes_buffered())
109
64
    }
110

            
111
    /// Handle an incoming XON message.
112
    ///
113
    /// On failure, return an error: the caller should close the stream or
114
    /// circuit with a protocol error.
115
    pub(crate) fn handle_incoming_xon(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
116
        self.flow_ctrl.handle_incoming_xon(msg)
117
    }
118

            
119
    /// Handle an incoming XOFF message.
120
    ///
121
    /// On failure, return an error: the caller should close the stream or
122
    /// circuit with a protocol error.
123
    pub(crate) fn handle_incoming_xoff(&mut self, msg: UnparsedRelayMsg) -> Result<()> {
124
        self.flow_ctrl.handle_incoming_xoff(msg)
125
    }
126

            
127
    /// Take capacity to send `msg`. If there's insufficient capacity, returns
128
    /// an error. Should be called at the point we've fully committed to
129
    /// sending the message.
130
    //
131
    // TODO: Consider not exposing this, and instead taking the capacity in
132
    // `StreamMap::take_ready_msg`.
133
4016
    pub(crate) fn take_capacity_to_send<M: RelayMsg>(&mut self, msg: &M) -> Result<()> {
134
4016
        self.flow_ctrl.take_capacity_to_send(msg)
135
4016
    }
136
}
137

            
138
/// Private wrapper over `OpenStreamEnt`. We implement `futures::Stream` for
139
/// this wrapper, and not directly for `OpenStreamEnt`, so that client code
140
/// can't directly access the stream.
141
#[derive(Debug)]
142
20350
#[pin_project]
143
struct OpenStreamEntStream {
144
    /// Inner value.
145
    #[pin]
146
    inner: OpenStreamEnt,
147
}
148

            
149
impl futures::Stream for OpenStreamEntStream {
150
    type Item = AnyRelayMsg;
151

            
152
4032
    fn poll_next(
153
4032
        mut self: std::pin::Pin<&mut Self>,
154
4032
        cx: &mut std::task::Context<'_>,
155
4032
    ) -> Poll<Option<Self::Item>> {
156
4032
        if !self.as_mut().poll_peek_mut(cx).is_ready() {
157
            return Poll::Pending;
158
4032
        };
159
4032
        let res = self.project().inner.project().rx.poll_next(cx);
160
4032
        debug_assert!(res.is_ready());
161
        // TODO: consider calling `inner.flow_ctrl.take_capacity_to_send` here;
162
        // particularly if we change it to return a wrapper type that proves
163
        // we've taken the capacity. Otherwise it'd make it tricky in the reactor
164
        // to be sure we've correctly taken the capacity, since messages can originate
165
        // in other parts of the code (currently none of those should be of types that
166
        // count towards flow control, but that may change).
167
4032
        res
168
4032
    }
169
}
170

            
171
impl PeekableStream for OpenStreamEntStream {
172
12258
    fn poll_peek_mut(
173
12258
        self: Pin<&mut Self>,
174
12258
        cx: &mut std::task::Context<'_>,
175
12258
    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
176
12258
        let s = self.project();
177
12258
        let inner = s.inner.project();
178
12258
        let m = match inner.rx.poll_peek_mut(cx) {
179
12104
            Poll::Ready(Some(m)) => m,
180
28
            Poll::Ready(None) => return Poll::Ready(None),
181
126
            Poll::Pending => return Poll::Pending,
182
        };
183
12104
        if !inner.flow_ctrl.can_send(m) {
184
            inner.flow_ctrl_waker.replace(cx.waker().clone());
185
            return Poll::Pending;
186
12104
        }
187
12104
        Poll::Ready(Some(m))
188
12258
    }
189
}
190

            
191
impl UnobtrusivePeekableStream for OpenStreamEntStream {
192
4060
    fn unobtrusive_peek_mut(
193
4060
        self: std::pin::Pin<&mut Self>,
194
4060
    ) -> Option<&mut <Self as futures::Stream>::Item> {
195
4060
        let s = self.project();
196
4060
        let inner = s.inner.project();
197
4060
        let m = inner.rx.unobtrusive_peek_mut()?;
198
4032
        if inner.flow_ctrl.can_send(m) {
199
4032
            Some(m)
200
        } else {
201
            None
202
        }
203
4060
    }
204
}
205

            
206
/// Entry for a stream where we have sent an END, or other message
207
/// indicating that the stream is terminated.
208
#[derive(Debug)]
209
pub(super) struct EndSentStreamEnt {
210
    /// A "half-stream" that we use to check the validity of incoming
211
    /// messages on this stream.
212
    pub(super) half_stream: HalfStream,
213
    /// True if the sender on this stream has been explicitly dropped;
214
    /// false if we got an explicit close from `close_pending`
215
    explicitly_dropped: bool,
216
}
217

            
218
/// The entry for a stream.
219
#[derive(Debug)]
220
enum ClosedStreamEnt {
221
    /// A stream for which we have received an END cell, but not yet
222
    /// had the stream object get dropped.
223
    EndReceived,
224
    /// A stream for which we have sent an END cell but not yet received an END
225
    /// cell.
226
    ///
227
    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
228
    /// these?
229
    EndSent(EndSentStreamEnt),
230
}
231

            
232
/// Mutable reference to a stream entry.
233
pub(super) enum StreamEntMut<'a> {
234
    /// An open stream.
235
    Open(&'a mut OpenStreamEnt),
236
    /// A stream for which we have received an END cell, but not yet
237
    /// had the stream object get dropped.
238
    EndReceived,
239
    /// A stream for which we have sent an END cell but not yet received an END
240
    /// cell.
241
    EndSent(&'a mut EndSentStreamEnt),
242
}
243

            
244
impl<'a> From<&'a mut ClosedStreamEnt> for StreamEntMut<'a> {
245
24
    fn from(value: &'a mut ClosedStreamEnt) -> Self {
246
24
        match value {
247
14
            ClosedStreamEnt::EndReceived => Self::EndReceived,
248
10
            ClosedStreamEnt::EndSent(e) => Self::EndSent(e),
249
        }
250
24
    }
251
}
252

            
253
impl<'a> From<&'a mut OpenStreamEntStream> for StreamEntMut<'a> {
254
8282
    fn from(value: &'a mut OpenStreamEntStream) -> Self {
255
8282
        Self::Open(&mut value.inner)
256
8282
    }
257
}
258

            
259
/// Return value to indicate whether or not we send an END cell upon
260
/// terminating a given stream.
261
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
262
pub(super) enum ShouldSendEnd {
263
    /// An END cell should be sent.
264
    Send,
265
    /// An END cell should not be sent.
266
    DontSend,
267
}
268

            
269
/// A priority for use with [`StreamPollSet`].
270
#[derive(Debug, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
271
struct Priority(u64);
272

            
273
/// A map from stream IDs to stream entries. Each circuit has one for each
274
/// hop.
275
pub(super) struct StreamMap {
276
    /// Open streams.
277
    // Invariants:
278
    // * Keys are disjoint with `closed_streams`.
279
    open_streams: StreamPollSet<StreamId, Priority, OpenStreamEntStream>,
280
    /// Closed streams.
281
    // Invariants:
282
    // * Keys are disjoint with `open_streams`.
283
    closed_streams: HashMap<StreamId, ClosedStreamEnt>,
284
    /// The next StreamId that we should use for a newly allocated
285
    /// circuit.
286
    next_stream_id: StreamId,
287
    /// Next priority to use in `rxs`. We implement round-robin scheduling of
288
    /// handling outgoing messages from streams by assigning a stream the next
289
    /// priority whenever an outgoing message is processed from that stream,
290
    /// putting it last in line.
291
    next_priority: Priority,
292
}
293

            
294
impl StreamMap {
295
    /// Make a new empty StreamMap.
296
746
    pub(super) fn new() -> Self {
297
746
        let mut rng = rand::rng();
298
746
        let next_stream_id: NonZeroU16 = rng.random();
299
746
        StreamMap {
300
746
            open_streams: StreamPollSet::new(),
301
746
            closed_streams: HashMap::new(),
302
746
            next_stream_id: next_stream_id.into(),
303
746
            next_priority: Priority(0),
304
746
        }
305
746
    }
306

            
307
    /// Return the number of open streams in this map.
308
470
    pub(super) fn n_open_streams(&self) -> usize {
309
470
        self.open_streams.len()
310
470
    }
311

            
312
    /// Return the next available priority.
313
4380
    fn take_next_priority(&mut self) -> Priority {
314
4380
        let rv = self.next_priority;
315
4380
        self.next_priority = Priority(rv.0 + 1);
316
4380
        rv
317
4380
    }
318

            
319
    /// Add an entry to this map; return the newly allocated StreamId.
320
324
    pub(super) fn add_ent(
321
324
        &mut self,
322
324
        sink: StreamQueueSender,
323
324
        rx: StreamMpscReceiver<AnyRelayMsg>,
324
324
        flow_ctrl: StreamFlowControl,
325
324
        cmd_checker: AnyCmdChecker,
326
324
    ) -> Result<StreamId> {
327
324
        let mut stream_ent = OpenStreamEntStream {
328
324
            inner: OpenStreamEnt {
329
324
                sink,
330
324
                flow_ctrl,
331
324
                dropped: 0,
332
324
                cmd_checker,
333
324
                rx: StreamUnobtrusivePeeker::new(rx),
334
324
                flow_ctrl_waker: None,
335
324
            },
336
324
        };
337
324
        let priority = self.take_next_priority();
338
        // This "65536" seems too aggressive, but it's what tor does.
339
        //
340
        // Also, going around in a loop here is (sadly) needed in order
341
        // to look like Tor clients.
342
324
        for _ in 1..=65536 {
343
324
            let id: StreamId = self.next_stream_id;
344
324
            self.next_stream_id = wrapping_next_stream_id(self.next_stream_id);
345
324
            stream_ent = match self.open_streams.try_insert(id, priority, stream_ent) {
346
324
                Ok(_) => return Ok(id),
347
                Err(KeyAlreadyInsertedError {
348
                    key: _,
349
                    priority: _,
350
                    stream,
351
                }) => stream,
352
            };
353
        }
354

            
355
        Err(Error::IdRangeFull)
356
324
    }
357

            
358
    /// Add an entry to this map using the specified StreamId.
359
    #[cfg(feature = "hs-service")]
360
24
    pub(super) fn add_ent_with_id(
361
24
        &mut self,
362
24
        sink: StreamQueueSender,
363
24
        rx: StreamMpscReceiver<AnyRelayMsg>,
364
24
        flow_ctrl: StreamFlowControl,
365
24
        id: StreamId,
366
24
        cmd_checker: AnyCmdChecker,
367
24
    ) -> Result<()> {
368
24
        let stream_ent = OpenStreamEntStream {
369
24
            inner: OpenStreamEnt {
370
24
                sink,
371
24
                flow_ctrl,
372
24
                dropped: 0,
373
24
                cmd_checker,
374
24
                rx: StreamUnobtrusivePeeker::new(rx),
375
24
                flow_ctrl_waker: None,
376
24
            },
377
24
        };
378
24
        let priority = self.take_next_priority();
379
24
        self.open_streams
380
24
            .try_insert(id, priority, stream_ent)
381
24
            .map_err(|_| Error::IdUnavailable(id))
382
24
    }
383

            
384
    /// Return the entry for `id` in this map, if any.
385
8336
    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<StreamEntMut<'_>> {
386
8336
        if let Some(e) = self.open_streams.stream_mut(&id) {
387
8282
            return Some(e.into());
388
54
        }
389
54
        if let Some(e) = self.closed_streams.get_mut(&id) {
390
24
            return Some(e.into());
391
30
        }
392
30
        None
393
8336
    }
394

            
395
    /// Note that we received an END message (or other message indicating the end of
396
    /// the stream) on the stream with `id`.
397
    ///
398
    /// Returns true if there was really a stream there.
399
36
    pub(super) fn ending_msg_received(&mut self, id: StreamId) -> Result<()> {
400
36
        if self.open_streams.remove(&id).is_some() {
401
22
            let prev = self.closed_streams.insert(id, ClosedStreamEnt::EndReceived);
402
22
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
403
22
            return Ok(());
404
14
        }
405
14
        let hash_map::Entry::Occupied(closed_entry) = self.closed_streams.entry(id) else {
406
2
            return Err(Error::CircProto(
407
2
                "Received END cell on nonexistent stream".into(),
408
2
            ));
409
        };
410
        // Progress the stream's state machine accordingly
411
12
        match closed_entry.get() {
412
2
            ClosedStreamEnt::EndReceived => Err(Error::CircProto(
413
2
                "Received two END cells on same stream".into(),
414
2
            )),
415
            ClosedStreamEnt::EndSent { .. } => {
416
10
                debug!("Actually got an end cell on a half-closed stream!");
417
                // We got an END, and we already sent an END. Great!
418
                // we can forget about this stream.
419
10
                closed_entry.remove_entry();
420
10
                Ok(())
421
            }
422
        }
423
36
    }
424

            
425
    /// Handle a termination of the stream with `id` from this side of
426
    /// the circuit. Return true if the stream was open and an END
427
    /// ought to be sent.
428
42
    pub(super) fn terminate(
429
42
        &mut self,
430
42
        id: StreamId,
431
42
        why: TerminateReason,
432
42
    ) -> Result<ShouldSendEnd> {
433
        use TerminateReason as TR;
434

            
435
42
        if let Some((_id, _priority, ent)) = self.open_streams.remove(&id) {
436
            let OpenStreamEntStream {
437
                inner:
438
                    OpenStreamEnt {
439
38
                        flow_ctrl,
440
38
                        dropped,
441
38
                        cmd_checker,
442
38
                        // notably absent: the channels for sink and stream, which will get dropped and
443
38
                        // closed (meaning reads/writes from/to this stream will now fail)
444
38
                        ..
445
38
                    },
446
38
            } = ent;
447
38
            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
448
38
            //             so a malicious peer can send us slightly more data than they should
449
38
            //             be able to; see arti#230.
450
38
            let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
451
38
            recv_window.decrement_n(dropped)?;
452
            // TODO: would be nice to avoid new_ref.
453
38
            let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
454
38
            let explicitly_dropped = why == TR::StreamTargetClosed;
455
38
            let prev = self.closed_streams.insert(
456
38
                id,
457
38
                ClosedStreamEnt::EndSent(EndSentStreamEnt {
458
38
                    half_stream,
459
38
                    explicitly_dropped,
460
38
                }),
461
38
            );
462
38
            debug_assert!(prev.is_none(), "Unexpected duplicate entry for {id}");
463
38
            return Ok(ShouldSendEnd::Send);
464
4
        }
465
4

            
466
4
        // Progress the stream's state machine accordingly
467
4
        match self
468
4
            .closed_streams
469
4
            .remove(&id)
470
5
            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
471
        {
472
2
            ClosedStreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
473
            ClosedStreamEnt::EndSent(EndSentStreamEnt {
474
                ref mut explicitly_dropped,
475
                ..
476
            }) => match (*explicitly_dropped, why) {
477
                (false, TR::StreamTargetClosed) => {
478
                    *explicitly_dropped = true;
479
                    Ok(ShouldSendEnd::DontSend)
480
                }
481
                (true, TR::StreamTargetClosed) => {
482
                    Err(bad_api_usage!("Tried to close an already closed stream.").into())
483
                }
484
                (_, TR::ExplicitEnd) => Err(bad_api_usage!(
485
                    "Tried to end an already closed stream. (explicitly_dropped={:?})",
486
                    *explicitly_dropped
487
                )
488
                .into()),
489
            },
490
        }
491
42
    }
492

            
493
    /// Get an up-to-date iterator of streams with ready items. `Option<AnyRelayMsg>::None`
494
    /// indicates that the local sender has been dropped.
495
    ///
496
    /// Conceptually all streams are in a queue; new streams are added to the
497
    /// back of the queue, and a stream is sent to the back of the queue
498
    /// whenever a ready message is taken from it (via
499
    /// [`Self::take_ready_msg`]). The returned iterator is an ordered view of
500
    /// this queue, showing the subset of streams that have a message ready to
501
    /// send, or whose sender has been dropped.
502
15394
    pub(super) fn poll_ready_streams_iter<'a>(
503
15394
        &'a mut self,
504
15394
        cx: &mut std::task::Context,
505
15394
    ) -> impl Iterator<Item = (StreamId, Option<&'a AnyRelayMsg>)> + 'a {
506
15394
        self.open_streams
507
15394
            .poll_ready_iter_mut(cx)
508
17424
            .map(|(sid, _priority, ent)| {
509
4060
                let ent = Pin::new(ent);
510
4060
                let msg = ent.unobtrusive_peek();
511
4060
                (*sid, msg)
512
17424
            })
513
15394
    }
514

            
515
    /// If the stream `sid` has a message ready, take it, and reprioritize `sid`
516
    /// to the "back of the line" with respect to
517
    /// [`Self::poll_ready_streams_iter`].
518
4032
    pub(super) fn take_ready_msg(&mut self, sid: StreamId) -> Option<AnyRelayMsg> {
519
4032
        let new_priority = self.take_next_priority();
520
4032
        let (_prev_priority, val) = self
521
4032
            .open_streams
522
4032
            .take_ready_value_and_reprioritize(&sid, new_priority)?;
523
4032
        Some(val)
524
4032
    }
525

            
526
    // TODO: Eventually if we want relay support, we'll need to support
527
    // stream IDs chosen by somebody else. But for now, we don't need those.
528
}
529

            
530
/// A reason for terminating a stream.
531
///
532
/// We use this type in order to ensure that we obey the API restrictions of [`StreamMap::terminate`]
533
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
534
pub(super) enum TerminateReason {
535
    /// Closing a stream because the receiver got `Ok(None)`, indicating that the
536
    /// corresponding senders were all dropped.
537
    StreamTargetClosed,
538
    /// Closing a stream because we were explicitly told to end it via
539
    /// [`StreamTarget::close_pending`](crate::tunnel::StreamTarget::close_pending).
540
    ExplicitEnd,
541
}
542

            
543
/// Convenience function for doing a wrapping increment of a `StreamId`.
544
584
fn wrapping_next_stream_id(id: StreamId) -> StreamId {
545
584
    let next_val = NonZeroU16::from(id)
546
584
        .checked_add(1)
547
585
        .unwrap_or_else(|| NonZeroU16::new(1).expect("Impossibly got 0 value"));
548
584
    next_val.into()
549
584
}
550

            
551
#[cfg(test)]
552
mod test {
553
    // @@ begin test lint list maintained by maint/add_warning @@
554
    #![allow(clippy::bool_assert_comparison)]
555
    #![allow(clippy::clone_on_copy)]
556
    #![allow(clippy::dbg_macro)]
557
    #![allow(clippy::mixed_attributes_style)]
558
    #![allow(clippy::print_stderr)]
559
    #![allow(clippy::print_stdout)]
560
    #![allow(clippy::single_char_pattern)]
561
    #![allow(clippy::unwrap_used)]
562
    #![allow(clippy::unchecked_duration_subtraction)]
563
    #![allow(clippy::useless_vec)]
564
    #![allow(clippy::needless_pass_by_value)]
565
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
566
    use super::*;
567
    use crate::stream::queue::fake_stream_queue;
568
    use crate::tunnel::circuit::test::fake_mpsc;
569
    use crate::{congestion::sendme::StreamSendWindow, stream::DataCmdChecker};
570

            
571
    #[test]
572
    fn test_wrapping_next_stream_id() {
573
        let one = StreamId::new(1).unwrap();
574
        let two = StreamId::new(2).unwrap();
575
        let max = StreamId::new(0xffff).unwrap();
576
        assert_eq!(wrapping_next_stream_id(one), two);
577
        assert_eq!(wrapping_next_stream_id(max), one);
578
    }
579

            
580
    #[test]
581
    #[allow(clippy::cognitive_complexity)]
582
    fn streammap_basics() -> Result<()> {
583
        let mut map = StreamMap::new();
584
        let mut next_id = map.next_stream_id;
585
        let mut ids = Vec::new();
586

            
587
        assert_eq!(map.n_open_streams(), 0);
588

            
589
        // Try add_ent
590
        for n in 1..=128 {
591
            let (sink, _) = fake_stream_queue(128);
592
            let (_, rx) = fake_mpsc(2);
593
            let id = map.add_ent(
594
                sink,
595
                rx,
596
                StreamFlowControl::new_window_based(StreamSendWindow::new(500)),
597
                DataCmdChecker::new_any(),
598
            )?;
599
            let expect_id: StreamId = next_id;
600
            assert_eq!(expect_id, id);
601
            next_id = wrapping_next_stream_id(next_id);
602
            ids.push(id);
603
            assert_eq!(map.n_open_streams(), n);
604
        }
605

            
606
        // Test get_mut.
607
        let nonesuch_id = next_id;
608
        assert!(matches!(
609
            map.get_mut(ids[0]),
610
            Some(StreamEntMut::Open { .. })
611
        ));
612
        assert!(map.get_mut(nonesuch_id).is_none());
613

            
614
        // Test end_received
615
        assert!(map.ending_msg_received(nonesuch_id).is_err());
616
        assert_eq!(map.n_open_streams(), 128);
617
        assert!(map.ending_msg_received(ids[1]).is_ok());
618
        assert_eq!(map.n_open_streams(), 127);
619
        assert!(matches!(
620
            map.get_mut(ids[1]),
621
            Some(StreamEntMut::EndReceived)
622
        ));
623
        assert!(map.ending_msg_received(ids[1]).is_err());
624

            
625
        // Test terminate
626
        use TerminateReason as TR;
627
        assert!(map.terminate(nonesuch_id, TR::ExplicitEnd).is_err());
628
        assert_eq!(map.n_open_streams(), 127);
629
        assert_eq!(
630
            map.terminate(ids[2], TR::ExplicitEnd).unwrap(),
631
            ShouldSendEnd::Send
632
        );
633
        assert_eq!(map.n_open_streams(), 126);
634
        assert!(matches!(
635
            map.get_mut(ids[2]),
636
            Some(StreamEntMut::EndSent { .. })
637
        ));
638
        assert_eq!(
639
            map.terminate(ids[1], TR::ExplicitEnd).unwrap(),
640
            ShouldSendEnd::DontSend
641
        );
642
        // This stream was already closed when we called `ending_msg_received`
643
        // above.
644
        assert_eq!(map.n_open_streams(), 126);
645
        assert!(map.get_mut(ids[1]).is_none());
646

            
647
        // Try receiving an end after a terminate.
648
        assert!(map.ending_msg_received(ids[2]).is_ok());
649
        assert!(map.get_mut(ids[2]).is_none());
650
        assert_eq!(map.n_open_streams(), 126);
651

            
652
        Ok(())
653
    }
654
}