tor_proto/util/
stream_poll_set.rs

1//! Provides [`StreamPollSet`]
2
3// So that we can declare these things as if they were in their own crate.
4#![allow(unreachable_pub)]
5
6use std::{
7    collections::{hash_map, BTreeMap, HashMap},
8    future::Future,
9    hash::Hash,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use futures::{task::noop_waker_ref, FutureExt, StreamExt as _};
15use tor_async_utils::peekable_stream::PeekableStream;
16
17use crate::util::keyed_futures_unordered::KeyedFuturesUnordered;
18
19/// A future that wraps a [`PeekableStream`], and yields the stream
20/// when an item becomes available.
21struct PeekableReady<S> {
22    /// The stream to be peeked.
23    stream: Option<S>,
24}
25
26impl<S> PeekableReady<S> {
27    /// Create a new [`PeekableReady`].
28    fn new(st: S) -> Self {
29        Self { stream: Some(st) }
30    }
31
32    /// Get a reference to the inner `S`.
33    ///
34    /// None if the future has already completed.
35    fn get_ref(&self) -> Option<&S> {
36        self.stream.as_ref()
37    }
38
39    /// Get a mut reference to the inner `S`.
40    ///
41    /// None if the future has already completed.
42    fn get_mut(&mut self) -> Option<&mut S> {
43        self.stream.as_mut()
44    }
45
46    /// Unwrap inner `S`.
47    ///
48    /// None if the future has already completed.
49    fn into_inner(self) -> Option<S> {
50        self.stream
51    }
52}
53
54impl<S> Future for PeekableReady<S>
55where
56    S: PeekableStream + Unpin,
57{
58    type Output = S;
59
60    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
61        let Some(stream) = &mut self.stream else {
62            panic!("Polled completed future");
63        };
64        match Pin::new(stream).poll_peek(cx) {
65            Poll::Ready(_) => Poll::Ready(self.stream.take().expect("Stream disappeared")),
66            Poll::Pending => Poll::Pending,
67        }
68    }
69}
70
71/// Manages a dynamic set of [`futures::Stream`] with associated keys and
72/// priorities.
73///
74/// Notable features:
75///
76/// * Prioritization: streams have an associated priority, and ready-streams are
77///   iterated over in ascending priority order.
78/// * Efficient polling: an unready stream won't be polled again until it's
79///   ready or exhausted (e.g. a corresponding [`futures::Sink`] is written-to or
80///   dropped). A ready stream won't be polled again until the ready item has been
81///   removed.
82pub struct StreamPollSet<K, P, S>
83where
84    S: PeekableStream + Unpin,
85{
86    /// Priority for each stream in the set.
87    // We keep the priority for each stream here instead of bundling it together
88    // with the stream, so that the priority can easily be changed even while a
89    // future waiting on the stream is still pending (e.g. to support rescaling
90    // priorities for EWMA).
91    // Invariants:
92    // * Every key is also present in exactly one of `ready_values` or `pending_streams`.
93    priorities: HashMap<K, P>,
94    /// Streams that have a result ready, in ascending order by priority.
95    // Invariants:
96    // * Keys are a (non-strict) subset of those in `priorities`.
97    ready_streams: BTreeMap<(P, K), S>,
98    /// Streams for which we're still waiting for the next result.
99    // Invariants:
100    // * Keys are a (non-strict) subset of those in `priorities`.
101    pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>,
102}
103
104impl<K, P, S> StreamPollSet<K, P, S>
105where
106    K: Ord + Hash + Clone + Send + Sync + 'static,
107    S: PeekableStream + Unpin,
108    P: Ord + Clone,
109{
110    /// Create a new, empty, `StreamPollSet`.
111    pub fn new() -> Self {
112        Self {
113            priorities: Default::default(),
114            ready_streams: Default::default(),
115            pending_streams: KeyedFuturesUnordered::new(),
116        }
117    }
118
119    /// Insert a `stream`, with an associated `key` and `priority`.
120    ///
121    /// If the `key` is already in use, the parameters are returned without altering `self`.
122    // To *replace* an existing key, we'd need to cancel any pending future and
123    // ensure that the cancellation is processed before inserting the new key, to
124    // ensure we don't assign a value from the previous key to the new key's
125    // stream.
126    pub fn try_insert(
127        &mut self,
128        key: K,
129        priority: P,
130        stream: S,
131    ) -> Result<(), KeyAlreadyInsertedError<K, P, S>> {
132        let hash_map::Entry::Vacant(v) = self.priorities.entry(key.clone()) else {
133            // We already have an entry for this key.
134            return Err(KeyAlreadyInsertedError {
135                key,
136                priority,
137                stream,
138            });
139        };
140        self.pending_streams
141            .try_insert(key, PeekableReady::new(stream))
142            // By `pending_streams` invariant that keys are a subset of those in
143            // `priorities`.
144            .unwrap_or_else(|_| panic!("Unexpected duplicate key"));
145        v.insert(priority);
146        Ok(())
147    }
148
149    /// Remove the entry for `key`, if any. This is the key, priority, buffered
150    /// poll_next result, and stream.
151    pub fn remove(&mut self, key: &K) -> Option<(K, P, S)> {
152        let priority = self.priorities.remove(key)?;
153        if let Some((key, fut)) = self.pending_streams.remove(key) {
154            // Validate `priorities` invariant that keys are also present in exactly one of
155            // `pending_streams` and `ready_values`.
156            debug_assert!(!self
157                .ready_streams
158                .contains_key(&(priority.clone(), key.clone())));
159            let stream = fut
160                .into_inner()
161                // We know the future hasn't completed, so the stream should be present.
162                .expect("Missing stream");
163            Some((key, priority, stream))
164        } else {
165            let ((_priority, key), stream) = self
166                .ready_streams
167                .remove_entry(&(priority.clone(), key.clone()))
168                // By
169                // * `pending_streams` invariant that keys are also present in
170                // exactly one of `pending_streams` and `ready_values`.
171                // * validated above that the key was in `pending_streams`, and
172                // not in `ready_values`.
173                .expect("Unexpectedly no value for key");
174            Some((key, priority, stream))
175        }
176    }
177
178    /// Polls streams that are ready to be polled, and returns an iterator over all streams
179    /// for which we have a buffered `Poll::Ready` result, in ascending priority order.
180    ///
181    /// Registers the provided [`Context`] to be woken when
182    /// any of the internal streams that weren't ready in the previous call to
183    /// this method (and therefore wouldn't have appeared in the iterator
184    /// results) become potentially ready (based on when the inner stream wakes
185    /// the `Context` provided to its own `poll_next`).
186    ///
187    /// The same restrictions apply as for [`Self::stream_mut`].  e.g. do not
188    /// directly call [`PeekableStream::poll_peek`] to see what item is
189    /// available on the stream; instead use [`Self::peek_mut`]. (Or
190    /// [`tor_async_utils::peekable_stream::UnobtrusivePeekableStream`] if
191    /// implemented for the stream).
192    ///
193    /// This method does *not* drain ready items. `Some` values can be removed
194    /// with [`Self::take_ready_value_and_reprioritize`]. `None` values can only
195    /// be removed by removing the whole stream with [`Self::remove`].
196    ///
197    /// This API is meant to allow callers to find the first stream (in priority
198    /// order) that is ready, and that the caller is able to process now. i.e.
199    /// it's specifically to support the use-case where external factors may
200    /// prevent the processing of some streams but not others.
201    ///
202    /// Example:
203    ///
204    /// ```nocompile
205    /// # // We need the `nocompile` since `StreamPollSet` is non-pub.
206    /// # // TODO: take away the nocompile if we make this pub or implement some
207    /// # // workaround to expose it to doc-tests.
208    /// # type Key=u64;
209    /// # type Value=u64;
210    /// # type Priority=u64;
211    /// # type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
212    /// # fn can_process(key: &Key, val: &Value) -> bool { true }
213    /// # fn process(val: Value) { }
214    /// # fn new_priority(priority: &Priority) -> Priority { *priority }
215    /// fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
216    ///   let mut iter = sps.poll_ready_iter(cx);
217    ///   while let Some((key, priority, stream)) = iter.next() {
218    ///     let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
219    ///        // Stream exhausted. Remove the stream. We have to drop the iterator
220    ///        // first, though, so that we can mutate.
221    ///        let key = *key;
222    ///        drop(iter);
223    ///        sps.remove(&key).unwrap();
224    ///        return std::task::Poll::Ready(());
225    ///     };
226    ///     if can_process(key, value) {
227    ///        let key = *key;
228    ///        let priority = new_priority(priority);
229    ///        drop(iter);
230    ///        let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
231    ///        process(value);
232    ///        return std::task::Poll::Ready(());
233    ///     }
234    ///   }
235    ///   return std::task::Poll::Pending;
236    /// }
237    /// ```
238    // In the current implementation we *could* actually permit the caller to
239    // `poll_peek` a stream that we know is ready. But this may change as the
240    // impl evolves further, and it's probably better to blanket disallow it
241    // than to have complex rules for the caller about when it's ok.
242    //
243    // TODO: It would be nice if the returned iterator supported additional
244    // actions, e.g. allowing the user to consume the iterator and take and
245    // reprioritize the inner value, but this is tricky.
246    //
247    // I've sketched out a working "cursor" that holds the current position (K, P)
248    // and a &mut StreamPollSet. This can't implement the Iterator interface though
249    // since it needs to borrow from self. I was able to implement an Iterator-*like* interface
250    // that does borrow from self, but this doesn't compose well. e.g. in StreamMap
251    // we can't use the same technique again since the object would need a mut reference to the
252    // StreamMap *and* to this inner cursor object, which is illegal.
253    pub fn poll_ready_iter_mut<'a>(
254        &'a mut self,
255        cx: &mut Context,
256    ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a {
257        // First poll for ready streams
258        while let Poll::Ready(Some((key, stream))) = self.pending_streams.poll_next_unpin(cx) {
259            let priority = self
260                .priorities
261                .get(&key)
262                // By `pending_streams` invariant that all keys are also in `priorities`.
263                .expect("Missing priority");
264            let prev = self.ready_streams.insert((priority.clone(), key), stream);
265            assert!(prev.is_none());
266        }
267        self.ready_streams.iter_mut().map(|((p, k), s)| (k, p, s))
268    }
269
270    /// If the stream for `key` has `Some(value)` ready, take that value and set the
271    /// priority for it to `new_priority`.
272    ///
273    /// This method doesn't register a waker with the polled stream. Use
274    /// `poll_ready_iter` to ensure streams make progress.
275    ///
276    /// If the key doesn't exist, the stream isn't ready, or the stream's value
277    /// is `None` (indicating the end of the stream), this function returns
278    /// `None` without mutating anything.
279    ///
280    /// Ended streams should be removed using [`Self::remove`].
281    pub fn take_ready_value_and_reprioritize(
282        &mut self,
283        key: &K,
284        new_priority: P,
285    ) -> Option<(P, S::Item)> {
286        // Get the priority entry, but don't replace until the lookup in ready_streams is confirmed.
287        let hash_map::Entry::Occupied(mut priority_entry) = self.priorities.entry(key.clone())
288        else {
289            // Key isn't present at all.
290            return None;
291        };
292        let priority_mut = priority_entry.get_mut();
293        let Some(((_p, key), mut stream)) = self
294            .ready_streams
295            .remove_entry(&(priority_mut.clone(), key.clone()))
296        else {
297            // This stream isn't in the ready list.
298            return None;
299        };
300        match Pin::new(&mut stream)
301            .poll_peek(&mut Context::from_waker(&futures::task::noop_waker()))
302        {
303            Poll::Ready(Some(_val)) => (), // Stream is ready, and has an item. Proceed.
304            Poll::Ready(None) => {
305                // Stream is ready, but is terminated.
306                // Leave in place and return `None`.
307                return None;
308            }
309            Poll::Pending => {
310                // Stream wasn't actually ready, despite being on the ready
311                // list. This should be impossible by the stability guarantees
312                // of `PeekableStream` and our own internal logic, but we can
313                // recover.
314                tracing::error!("Bug: Stream unexpectedly unready");
315                self.pending_streams
316                    .try_insert(key.clone(), PeekableReady::new(stream))
317                    // By invariant on `priorities` that keys are in exactly one of the ready or pending lists.
318                    .unwrap_or_else(|_| {
319                        unreachable!("Key unexpectedly in both ready and unready list")
320                    });
321                return None;
322            }
323        }
324        let Some(Some(val)) = stream.next().now_or_never() else {
325            panic!("Polling stream returned a different result than peeking");
326        };
327        let prev_priority = std::mem::replace(priority_mut, new_priority);
328        self.pending_streams
329            .try_insert(key, PeekableReady::new(stream))
330            // We verified above that the key wasn't present in `priorities`,
331            // and `pending_streams` has the invariant that its keys are a
332            // subset of those in `priorities`.
333            .unwrap_or_else(|_| panic!("Unexpected pending stream entry"));
334        Some((prev_priority, val))
335    }
336
337    /// Get a mut reference to a ready value for key `key`, if one exists.
338    ///
339    /// This method doesn't poll the internal streams. Use `poll_ready_iter` to
340    /// ensure streams make progress.
341    // This will be used for packing and fragmentation, to take part of a DATA message.
342    #[allow(unused)]
343    pub fn peek_mut<'a>(&'a mut self, key: &K) -> Option<Poll<Option<&'a mut S::Item>>> {
344        let priority = self.priorities.get(key)?;
345        let Some(peekable) = self.ready_streams.get_mut(&(priority.clone(), key.clone())) else {
346            return Some(Poll::Pending);
347        };
348        // We don't have a waker registered here, so we can just use the noop waker.
349        // TODO: Create a mut future for `PeekableStream`.
350        Some(Pin::new(peekable).poll_peek_mut(&mut Context::from_waker(noop_waker_ref())))
351    }
352
353    /// Get a reference to the stream for `key`.
354    ///
355    /// The same restrictions apply as for [`Self::stream_mut`] (e.g. using
356    /// interior mutability).
357    #[allow(dead_code)]
358    pub fn stream(&self, key: &K) -> Option<&S> {
359        if let Some(s) = self.pending_streams.get(key) {
360            let s = s.get_ref();
361            // Stream must be present since it's still pending.
362            debug_assert!(s.is_some(), "Unexpected missing pending stream");
363            return s;
364        }
365        let priority = self.priorities.get(key)?;
366        self.ready_streams.get(&(priority.clone(), key.clone()))
367    }
368
369    /// Get a mut reference to the stream for `key`.
370    ///
371    /// Polling the stream through this reference, or otherwise causing its
372    /// registered `Waker` to be removed without waking it, will result in
373    /// unspecified (but not unsound) behavior.
374    ///
375    /// This is mostly intended for accessing non-`Stream` functionality of the stream
376    /// object, though it *is* permitted to mutate it in a way that the stream becomes
377    /// ready (potentially removing and waking its registered Waker(s)).
378    //
379    // In particular:
380    // * Polling a stream in the pending list and getting a Pending result
381    //   will overwrite our Waker, resulting in us not polling it again.
382    // * Doing so with a stream on the pending list and getting a Ready result
383    //   might be ok if it had already woken our waker. Otoh it could potentially
384    //   result in our waker never getting woken, and hence us not polling it again.
385    // * Doing so with a stream on the ready list should actually be ok, since
386    //   we don't have a registered waker, and don't do our own buffering.
387    pub fn stream_mut(&mut self, key: &K) -> Option<&mut S> {
388        if let Some(s) = self.pending_streams.get_mut(key) {
389            let s = s.get_mut();
390            // Stream must be present since it's still pending.
391            debug_assert!(s.is_some(), "Unexpected missing pending stream");
392            return s;
393        }
394        let priority = self.priorities.get(key)?;
395        self.ready_streams.get_mut(&(priority.clone(), key.clone()))
396    }
397
398    /// Number of streams managed by this object.
399    pub fn len(&self) -> usize {
400        self.priorities.len()
401    }
402}
403
404/// Error returned by [`StreamPollSet::try_insert`].
405#[derive(Debug, thiserror::Error)]
406#[allow(clippy::exhaustive_structs)]
407pub struct KeyAlreadyInsertedError<K, P, S> {
408    /// Key that caller tried to insert.
409    #[allow(dead_code)]
410    pub key: K,
411    /// Priority that caller tried to insert.
412    #[allow(dead_code)]
413    pub priority: P,
414    /// Stream that caller tried to insert.
415    #[allow(dead_code)]
416    pub stream: S,
417}
418
419#[cfg(test)]
420mod test {
421    // @@ begin test lint list maintained by maint/add_warning @@
422    #![allow(clippy::bool_assert_comparison)]
423    #![allow(clippy::clone_on_copy)]
424    #![allow(clippy::dbg_macro)]
425    #![allow(clippy::mixed_attributes_style)]
426    #![allow(clippy::print_stderr)]
427    #![allow(clippy::print_stdout)]
428    #![allow(clippy::single_char_pattern)]
429    #![allow(clippy::unwrap_used)]
430    #![allow(clippy::unchecked_duration_subtraction)]
431    #![allow(clippy::useless_vec)]
432    #![allow(clippy::needless_pass_by_value)]
433    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
434
435    use std::{
436        collections::VecDeque,
437        sync::{Arc, Mutex},
438        task::Poll,
439    };
440
441    use futures::{stream::Peekable, SinkExt as _};
442    use pin_project::pin_project;
443    use tor_rtmock::MockRuntime;
444
445    use super::*;
446
447    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
448    struct Key(u64);
449
450    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
451    struct Priority(u64);
452
453    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
454    struct Value(u64);
455
456    /// Test stream that we can directly manipulate and examine.
457    #[derive(Debug)]
458    #[pin_project]
459    struct VecDequeStream<T> {
460        // Ready items.
461        vec: VecDeque<T>,
462        // Whether any more items will be written.
463        closed: bool,
464        // Registered waker.
465        waker: Option<std::task::Waker>,
466    }
467    impl<T> VecDequeStream<T> {
468        fn new_open<I: IntoIterator<Item = T>>(values: I) -> Self {
469            Self {
470                vec: VecDeque::from_iter(values),
471                waker: None,
472                closed: false,
473            }
474        }
475        fn new_closed<I: IntoIterator<Item = T>>(values: I) -> Self {
476            Self {
477                vec: VecDeque::from_iter(values),
478                waker: None,
479                closed: true,
480            }
481        }
482        fn push(&mut self, value: T) {
483            assert!(!self.closed);
484            self.vec.push_back(value);
485            if let Some(waker) = self.waker.take() {
486                waker.wake();
487            }
488        }
489    }
490    impl<T> futures::Stream for VecDequeStream<T> {
491        type Item = T;
492
493        fn poll_next(
494            mut self: std::pin::Pin<&mut Self>,
495            cx: &mut std::task::Context<'_>,
496        ) -> Poll<Option<Self::Item>> {
497            if let Some(val) = self.as_mut().vec.pop_front() {
498                Poll::Ready(Some(val))
499            } else if self.as_mut().closed {
500                // No more items coming.
501                Poll::Ready(None)
502            } else {
503                self.as_mut().waker.replace(cx.waker().clone());
504                Poll::Pending
505            }
506        }
507    }
508    impl<T> PeekableStream for VecDequeStream<T> {
509        fn poll_peek_mut(
510            self: Pin<&mut Self>,
511            cx: &mut Context<'_>,
512        ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
513            let s = self.project();
514            if let Some(val) = s.vec.front_mut() {
515                Poll::Ready(Some(val))
516            } else if *s.closed {
517                // No more items coming.
518                Poll::Ready(None)
519            } else {
520                s.waker.replace(cx.waker().clone());
521                Poll::Pending
522            }
523        }
524    }
525    impl<T> std::cmp::PartialEq for VecDequeStream<T>
526    where
527        T: std::cmp::PartialEq,
528    {
529        fn eq(&self, other: &Self) -> bool {
530            // Ignore waker, which isn't comparable
531            self.vec == other.vec && self.closed == other.closed
532        }
533    }
534    impl<T> std::cmp::Eq for VecDequeStream<T> where T: std::cmp::Eq {}
535
536    type TestStream = VecDequeStream<Value>;
537
538    #[test]
539    fn test_empty() {
540        futures::executor::block_on(futures::future::poll_fn(|ctx| {
541            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
542            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
543            Poll::Ready(())
544        }));
545    }
546
547    #[test]
548    fn test_one_pending() {
549        futures::executor::block_on(futures::future::poll_fn(|ctx| {
550            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
551            pollset
552                .try_insert(Key(0), Priority(0), TestStream::new_open([]))
553                .unwrap();
554            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
555            Poll::Ready(())
556        }));
557    }
558
559    #[test]
560    fn test_one_ready() {
561        futures::executor::block_on(futures::future::poll_fn(|ctx| {
562            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
563            pollset
564                .try_insert(
565                    Key(0),
566                    Priority(0),
567                    TestStream::new_closed([Value(1), Value(2)]),
568                )
569                .unwrap();
570
571            // We only see the first value of the one ready stream.
572            assert_eq!(
573                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
574                vec![(
575                    &Key(0),
576                    &Priority(0),
577                    &mut TestStream::new_closed([Value(1), Value(2)])
578                )],
579            );
580
581            // Same result, the same value is still at the head of the stream..
582            assert_eq!(
583                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
584                vec![(
585                    &Key(0),
586                    &Priority(0),
587                    &mut TestStream::new_closed([Value(1), Value(2)])
588                )]
589            );
590
591            // Take the head of the stream.
592            assert_eq!(
593                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(1)),
594                Some((Priority(0), Value(1)))
595            );
596
597            // Should see the next value, with the new priority.
598            assert_eq!(
599                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
600                vec![(
601                    &Key(0),
602                    &Priority(1),
603                    &mut TestStream::new_closed([Value(2)])
604                )]
605            );
606
607            // Take again.
608            assert_eq!(
609                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
610                Some((Priority(1), Value(2)))
611            );
612
613            // Should see end-of-stream.
614            assert_eq!(
615                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
616                vec![(&Key(0), &Priority(2), &mut TestStream::new_closed([]))]
617            );
618
619            // Remove the now-ended stream.
620            assert_eq!(
621                pollset.remove(&Key(0)),
622                Some((Key(0), Priority(2), TestStream::new_closed([])))
623            );
624
625            // Should now be empty.
626            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
627
628            Poll::Ready(())
629        }));
630    }
631
632    #[test]
633    fn test_round_robin() {
634        futures::executor::block_on(futures::future::poll_fn(|ctx| {
635            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
636            pollset
637                .try_insert(
638                    Key(0),
639                    Priority(0),
640                    TestStream::new_closed([Value(1), Value(2)]),
641                )
642                .unwrap();
643            pollset
644                .try_insert(
645                    Key(1),
646                    Priority(1),
647                    TestStream::new_closed([Value(3), Value(4)]),
648                )
649                .unwrap();
650
651            // Should see both ready streams, in priority order.
652            assert_eq!(
653                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
654                vec![
655                    (
656                        &Key(0),
657                        &Priority(0),
658                        &mut TestStream::new_closed([Value(1), Value(2)])
659                    ),
660                    (
661                        &Key(1),
662                        &Priority(1),
663                        &mut TestStream::new_closed([Value(3), Value(4)])
664                    ),
665                ]
666            );
667
668            // Take from the first stream and send it to the back via priority assignment.
669            assert_eq!(
670                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
671                Some((Priority(0), Value(1)))
672            );
673
674            // Should see both ready streams, in the new priority order.
675            assert_eq!(
676                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
677                vec![
678                    (
679                        &Key(1),
680                        &Priority(1),
681                        &mut TestStream::new_closed([Value(3), Value(4)])
682                    ),
683                    (
684                        &Key(0),
685                        &Priority(2),
686                        &mut TestStream::new_closed([Value(2)])
687                    ),
688                ]
689            );
690
691            // Keep going ...
692            assert_eq!(
693                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(3)),
694                Some((Priority(1), Value(3)))
695            );
696            assert_eq!(
697                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
698                vec![
699                    (
700                        &Key(0),
701                        &Priority(2),
702                        &mut TestStream::new_closed([Value(2)])
703                    ),
704                    (
705                        &Key(1),
706                        &Priority(3),
707                        &mut TestStream::new_closed([Value(4)])
708                    ),
709                ]
710            );
711            assert_eq!(
712                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(4)),
713                Some((Priority(2), Value(2)))
714            );
715            assert_eq!(
716                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
717                vec![
718                    (
719                        &Key(1),
720                        &Priority(3),
721                        &mut TestStream::new_closed([Value(4)])
722                    ),
723                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
724                ]
725            );
726            assert_eq!(
727                pollset.take_ready_value_and_reprioritize(&Key(1), Priority(5)),
728                Some((Priority(3), Value(4)))
729            );
730            assert_eq!(
731                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
732                vec![
733                    (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
734                    (&Key(1), &Priority(5), &mut TestStream::new_closed([])),
735                ]
736            );
737
738            Poll::Ready(())
739        }));
740    }
741
742    #[test]
743    fn test_remove_and_reuse_key() {
744        futures::executor::block_on(futures::future::poll_fn(|ctx| {
745            let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
746            pollset
747                .try_insert(
748                    Key(0),
749                    Priority(0),
750                    TestStream::new_closed([Value(1), Value(2)]),
751                )
752                .unwrap();
753            assert_eq!(
754                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
755                vec![(
756                    &Key(0),
757                    &Priority(0),
758                    &mut TestStream::new_closed([Value(1), Value(2)])
759                ),]
760            );
761            assert_eq!(
762                pollset.remove(&Key(0)),
763                Some((
764                    Key(0),
765                    Priority(0),
766                    TestStream::new_closed([Value(1), Value(2)])
767                ))
768            );
769            pollset
770                .try_insert(
771                    Key(0),
772                    Priority(1),
773                    TestStream::new_closed([Value(3), Value(4)]),
774                )
775                .unwrap();
776            // Ensure we see the ready value in the new stream, and *not* anything from the previous stream at that key.
777            assert_eq!(
778                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
779                vec![(
780                    &Key(0),
781                    &Priority(1),
782                    &mut TestStream::new_closed([Value(3), Value(4)])
783                ),]
784            );
785            Poll::Ready(())
786        }));
787    }
788
789    #[test]
790    fn get_ready_stream() {
791        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
792            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
793            pollset
794                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(1)]))
795                .unwrap();
796            assert_eq!(pollset.stream(&Key(0)).unwrap().vec[0], Value(1));
797            Poll::Ready(())
798        }));
799    }
800
801    #[test]
802    fn get_pending_stream() {
803        futures::executor::block_on(futures::future::poll_fn(|_ctx| {
804            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
805            pollset
806                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
807                .unwrap();
808            assert!(pollset.stream(&Key(0)).unwrap().vec.is_empty());
809            Poll::Ready(())
810        }));
811    }
812
813    #[test]
814    fn mutate_pending_stream() {
815        futures::executor::block_on(futures::future::poll_fn(|ctx| {
816            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
817            pollset
818                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
819                .unwrap();
820            assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
821
822            // This should cause the stream to become ready.
823            pollset.stream_mut(&Key(0)).unwrap().push(Value(0));
824
825            assert_eq!(
826                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
827                vec![(
828                    &Key(0),
829                    &Priority(0),
830                    &mut VecDequeStream::new_open([Value(0)])
831                ),]
832            );
833
834            Poll::Ready(())
835        }));
836    }
837
838    #[test]
839    fn mutate_ready_stream() {
840        futures::executor::block_on(futures::future::poll_fn(|ctx| {
841            let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
842            pollset
843                .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(0)]))
844                .unwrap();
845            assert_eq!(
846                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
847                vec![(
848                    &Key(0),
849                    &Priority(0),
850                    &mut VecDequeStream::new_open([Value(0)])
851                ),]
852            );
853
854            pollset.stream_mut(&Key(0)).unwrap().push(Value(1));
855
856            assert_eq!(
857                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
858                vec![(
859                    &Key(0),
860                    &Priority(0),
861                    &mut VecDequeStream::new_open([Value(0), Value(1)])
862                ),]
863            );
864
865            // Consume the value that was there.
866            assert_eq!(
867                pollset.take_ready_value_and_reprioritize(&Key(0), Priority(0)),
868                Some((Priority(0), Value(0)))
869            );
870
871            // We should now see the value we added.
872            assert_eq!(
873                pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
874                vec![(
875                    &Key(0),
876                    &Priority(0),
877                    &mut VecDequeStream::new_open([Value(1)])
878                ),]
879            );
880
881            Poll::Ready(())
882        }));
883    }
884
885    #[test]
886    fn test_async() {
887        MockRuntime::test_with_various(|rt| async move {
888            let mut pollset = StreamPollSet::<
889                Key,
890                Priority,
891                Peekable<futures::channel::mpsc::Receiver<Value>>,
892            >::new();
893
894            // Create 2 mpsc channels, bounded so that we can exercise back-pressure.
895            // These are analogous to Tor streams.
896            for streami in 1..=2 {
897                let (mut send, recv) = futures::channel::mpsc::channel::<Value>(2);
898                pollset
899                    .try_insert(Key(streami), Priority(streami), recv.peekable())
900                    .unwrap();
901                rt.spawn_identified(format!("stream{streami}"), async move {
902                    for val in 0..10 {
903                        send.send(Value(val * streami)).await.unwrap();
904                    }
905                });
906            }
907
908            let output = Arc::new(Mutex::new(Vec::new()));
909
910            rt.spawn_identified("mux", {
911                let output = output.clone();
912                async move {
913                    loop {
914                        let (key, priority, value) = futures::future::poll_fn(|ctx| {
915                            match pollset.poll_ready_iter_mut(ctx).next() {
916                                Some((key, priority, stream)) => {
917                                    let Poll::Ready(value) = Pin::new(stream).poll_peek(ctx) else {
918                                        panic!("poll_ready_iter_mut returned non-ready stream")
919                                    };
920                                    Poll::Ready((*key, *priority, value.copied()))
921                                }
922                                // No streams ready, but there could be more items coming.
923                                // The current `ctx` should be registered to wake us
924                                // if and when there are.
925                                None => Poll::Pending,
926                            }
927                        })
928                        .await;
929                        if let Some(value) = value {
930                            // Take the value, and haphazardly set priority to push this stream "back".
931                            pollset
932                                .take_ready_value_and_reprioritize(&key, Priority(priority.0 + 10))
933                                .unwrap();
934                            output.lock().unwrap().push((key, value));
935                        } else {
936                            // Stream ended. Remove it.
937                            let _ = pollset.remove(&key).unwrap();
938                        }
939                    }
940                }
941            });
942
943            rt.advance_until_stalled().await;
944
945            let output = output.lock().unwrap();
946
947            // We can't predict exactly how the stream values will be
948            // interleaved, but we should get all items from each stream, with
949            // correct order within each stream.
950            for streami in 1..=2 {
951                let expected = (0..10).map(|val| Value(val * streami)).collect::<Vec<_>>();
952                let actual = output
953                    .iter()
954                    .filter_map(|(k, v)| (k == &Key(streami)).then_some(*v))
955                    .collect::<Vec<_>>();
956                assert_eq!(actual, expected);
957            }
958        });
959    }
960}