tor_proto/util/stream_poll_set.rs
1//! Provides [`StreamPollSet`]
2
3// So that we can declare these things as if they were in their own crate.
4#![allow(unreachable_pub)]
5
6use std::{
7 collections::{hash_map, BTreeMap, HashMap},
8 future::Future,
9 hash::Hash,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use futures::{task::noop_waker_ref, FutureExt, StreamExt as _};
15use tor_async_utils::peekable_stream::PeekableStream;
16
17use crate::util::keyed_futures_unordered::KeyedFuturesUnordered;
18
19/// A future that wraps a [`PeekableStream`], and yields the stream
20/// when an item becomes available.
21struct PeekableReady<S> {
22 /// The stream to be peeked.
23 stream: Option<S>,
24}
25
26impl<S> PeekableReady<S> {
27 /// Create a new [`PeekableReady`].
28 fn new(st: S) -> Self {
29 Self { stream: Some(st) }
30 }
31
32 /// Get a reference to the inner `S`.
33 ///
34 /// None if the future has already completed.
35 fn get_ref(&self) -> Option<&S> {
36 self.stream.as_ref()
37 }
38
39 /// Get a mut reference to the inner `S`.
40 ///
41 /// None if the future has already completed.
42 fn get_mut(&mut self) -> Option<&mut S> {
43 self.stream.as_mut()
44 }
45
46 /// Unwrap inner `S`.
47 ///
48 /// None if the future has already completed.
49 fn into_inner(self) -> Option<S> {
50 self.stream
51 }
52}
53
54impl<S> Future for PeekableReady<S>
55where
56 S: PeekableStream + Unpin,
57{
58 type Output = S;
59
60 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
61 let Some(stream) = &mut self.stream else {
62 panic!("Polled completed future");
63 };
64 match Pin::new(stream).poll_peek(cx) {
65 Poll::Ready(_) => Poll::Ready(self.stream.take().expect("Stream disappeared")),
66 Poll::Pending => Poll::Pending,
67 }
68 }
69}
70
71/// Manages a dynamic set of [`futures::Stream`] with associated keys and
72/// priorities.
73///
74/// Notable features:
75///
76/// * Prioritization: streams have an associated priority, and ready-streams are
77/// iterated over in ascending priority order.
78/// * Efficient polling: an unready stream won't be polled again until it's
79/// ready or exhausted (e.g. a corresponding [`futures::Sink`] is written-to or
80/// dropped). A ready stream won't be polled again until the ready item has been
81/// removed.
82pub struct StreamPollSet<K, P, S>
83where
84 S: PeekableStream + Unpin,
85{
86 /// Priority for each stream in the set.
87 // We keep the priority for each stream here instead of bundling it together
88 // with the stream, so that the priority can easily be changed even while a
89 // future waiting on the stream is still pending (e.g. to support rescaling
90 // priorities for EWMA).
91 // Invariants:
92 // * Every key is also present in exactly one of `ready_values` or `pending_streams`.
93 priorities: HashMap<K, P>,
94 /// Streams that have a result ready, in ascending order by priority.
95 // Invariants:
96 // * Keys are a (non-strict) subset of those in `priorities`.
97 ready_streams: BTreeMap<(P, K), S>,
98 /// Streams for which we're still waiting for the next result.
99 // Invariants:
100 // * Keys are a (non-strict) subset of those in `priorities`.
101 pending_streams: KeyedFuturesUnordered<K, PeekableReady<S>>,
102}
103
104impl<K, P, S> StreamPollSet<K, P, S>
105where
106 K: Ord + Hash + Clone + Send + Sync + 'static,
107 S: PeekableStream + Unpin,
108 P: Ord + Clone,
109{
110 /// Create a new, empty, `StreamPollSet`.
111 pub fn new() -> Self {
112 Self {
113 priorities: Default::default(),
114 ready_streams: Default::default(),
115 pending_streams: KeyedFuturesUnordered::new(),
116 }
117 }
118
119 /// Insert a `stream`, with an associated `key` and `priority`.
120 ///
121 /// If the `key` is already in use, the parameters are returned without altering `self`.
122 // To *replace* an existing key, we'd need to cancel any pending future and
123 // ensure that the cancellation is processed before inserting the new key, to
124 // ensure we don't assign a value from the previous key to the new key's
125 // stream.
126 pub fn try_insert(
127 &mut self,
128 key: K,
129 priority: P,
130 stream: S,
131 ) -> Result<(), KeyAlreadyInsertedError<K, P, S>> {
132 let hash_map::Entry::Vacant(v) = self.priorities.entry(key.clone()) else {
133 // We already have an entry for this key.
134 return Err(KeyAlreadyInsertedError {
135 key,
136 priority,
137 stream,
138 });
139 };
140 self.pending_streams
141 .try_insert(key, PeekableReady::new(stream))
142 // By `pending_streams` invariant that keys are a subset of those in
143 // `priorities`.
144 .unwrap_or_else(|_| panic!("Unexpected duplicate key"));
145 v.insert(priority);
146 Ok(())
147 }
148
149 /// Remove the entry for `key`, if any. This is the key, priority, buffered
150 /// poll_next result, and stream.
151 pub fn remove(&mut self, key: &K) -> Option<(K, P, S)> {
152 let priority = self.priorities.remove(key)?;
153 if let Some((key, fut)) = self.pending_streams.remove(key) {
154 // Validate `priorities` invariant that keys are also present in exactly one of
155 // `pending_streams` and `ready_values`.
156 debug_assert!(!self
157 .ready_streams
158 .contains_key(&(priority.clone(), key.clone())));
159 let stream = fut
160 .into_inner()
161 // We know the future hasn't completed, so the stream should be present.
162 .expect("Missing stream");
163 Some((key, priority, stream))
164 } else {
165 let ((_priority, key), stream) = self
166 .ready_streams
167 .remove_entry(&(priority.clone(), key.clone()))
168 // By
169 // * `pending_streams` invariant that keys are also present in
170 // exactly one of `pending_streams` and `ready_values`.
171 // * validated above that the key was in `pending_streams`, and
172 // not in `ready_values`.
173 .expect("Unexpectedly no value for key");
174 Some((key, priority, stream))
175 }
176 }
177
178 /// Polls streams that are ready to be polled, and returns an iterator over all streams
179 /// for which we have a buffered `Poll::Ready` result, in ascending priority order.
180 ///
181 /// Registers the provided [`Context`] to be woken when
182 /// any of the internal streams that weren't ready in the previous call to
183 /// this method (and therefore wouldn't have appeared in the iterator
184 /// results) become potentially ready (based on when the inner stream wakes
185 /// the `Context` provided to its own `poll_next`).
186 ///
187 /// The same restrictions apply as for [`Self::stream_mut`]. e.g. do not
188 /// directly call [`PeekableStream::poll_peek`] to see what item is
189 /// available on the stream; instead use [`Self::peek_mut`]. (Or
190 /// [`tor_async_utils::peekable_stream::UnobtrusivePeekableStream`] if
191 /// implemented for the stream).
192 ///
193 /// This method does *not* drain ready items. `Some` values can be removed
194 /// with [`Self::take_ready_value_and_reprioritize`]. `None` values can only
195 /// be removed by removing the whole stream with [`Self::remove`].
196 ///
197 /// This API is meant to allow callers to find the first stream (in priority
198 /// order) that is ready, and that the caller is able to process now. i.e.
199 /// it's specifically to support the use-case where external factors may
200 /// prevent the processing of some streams but not others.
201 ///
202 /// Example:
203 ///
204 /// ```nocompile
205 /// # // We need the `nocompile` since `StreamPollSet` is non-pub.
206 /// # // TODO: take away the nocompile if we make this pub or implement some
207 /// # // workaround to expose it to doc-tests.
208 /// # type Key=u64;
209 /// # type Value=u64;
210 /// # type Priority=u64;
211 /// # type MyStream=Box<dyn futures::Stream<Item=Value> + Unpin>;
212 /// # fn can_process(key: &Key, val: &Value) -> bool { true }
213 /// # fn process(val: Value) { }
214 /// # fn new_priority(priority: &Priority) -> Priority { *priority }
215 /// fn process_a_ready_stream(sps: &mut StreamPollSet<Key, Value, Priority, MyStream>, cx: &mut std::task::Context) -> std::task::Poll<()> {
216 /// let mut iter = sps.poll_ready_iter(cx);
217 /// while let Some((key, priority, stream)) = iter.next() {
218 /// let Some(value) = stream.unobtrusive_peek(Pin::new(stream)) else {
219 /// // Stream exhausted. Remove the stream. We have to drop the iterator
220 /// // first, though, so that we can mutate.
221 /// let key = *key;
222 /// drop(iter);
223 /// sps.remove(&key).unwrap();
224 /// return std::task::Poll::Ready(());
225 /// };
226 /// if can_process(key, value) {
227 /// let key = *key;
228 /// let priority = new_priority(priority);
229 /// drop(iter);
230 /// let (_old_priority, value) = sps.take_ready_value_and_reprioritize(&key, priority).unwrap();
231 /// process(value);
232 /// return std::task::Poll::Ready(());
233 /// }
234 /// }
235 /// return std::task::Poll::Pending;
236 /// }
237 /// ```
238 // In the current implementation we *could* actually permit the caller to
239 // `poll_peek` a stream that we know is ready. But this may change as the
240 // impl evolves further, and it's probably better to blanket disallow it
241 // than to have complex rules for the caller about when it's ok.
242 //
243 // TODO: It would be nice if the returned iterator supported additional
244 // actions, e.g. allowing the user to consume the iterator and take and
245 // reprioritize the inner value, but this is tricky.
246 //
247 // I've sketched out a working "cursor" that holds the current position (K, P)
248 // and a &mut StreamPollSet. This can't implement the Iterator interface though
249 // since it needs to borrow from self. I was able to implement an Iterator-*like* interface
250 // that does borrow from self, but this doesn't compose well. e.g. in StreamMap
251 // we can't use the same technique again since the object would need a mut reference to the
252 // StreamMap *and* to this inner cursor object, which is illegal.
253 pub fn poll_ready_iter_mut<'a>(
254 &'a mut self,
255 cx: &mut Context,
256 ) -> impl Iterator<Item = (&'a K, &'a P, &'a mut S)> + 'a {
257 // First poll for ready streams
258 while let Poll::Ready(Some((key, stream))) = self.pending_streams.poll_next_unpin(cx) {
259 let priority = self
260 .priorities
261 .get(&key)
262 // By `pending_streams` invariant that all keys are also in `priorities`.
263 .expect("Missing priority");
264 let prev = self.ready_streams.insert((priority.clone(), key), stream);
265 assert!(prev.is_none());
266 }
267 self.ready_streams.iter_mut().map(|((p, k), s)| (k, p, s))
268 }
269
270 /// If the stream for `key` has `Some(value)` ready, take that value and set the
271 /// priority for it to `new_priority`.
272 ///
273 /// This method doesn't register a waker with the polled stream. Use
274 /// `poll_ready_iter` to ensure streams make progress.
275 ///
276 /// If the key doesn't exist, the stream isn't ready, or the stream's value
277 /// is `None` (indicating the end of the stream), this function returns
278 /// `None` without mutating anything.
279 ///
280 /// Ended streams should be removed using [`Self::remove`].
281 pub fn take_ready_value_and_reprioritize(
282 &mut self,
283 key: &K,
284 new_priority: P,
285 ) -> Option<(P, S::Item)> {
286 // Get the priority entry, but don't replace until the lookup in ready_streams is confirmed.
287 let hash_map::Entry::Occupied(mut priority_entry) = self.priorities.entry(key.clone())
288 else {
289 // Key isn't present at all.
290 return None;
291 };
292 let priority_mut = priority_entry.get_mut();
293 let Some(((_p, key), mut stream)) = self
294 .ready_streams
295 .remove_entry(&(priority_mut.clone(), key.clone()))
296 else {
297 // This stream isn't in the ready list.
298 return None;
299 };
300 match Pin::new(&mut stream)
301 .poll_peek(&mut Context::from_waker(&futures::task::noop_waker()))
302 {
303 Poll::Ready(Some(_val)) => (), // Stream is ready, and has an item. Proceed.
304 Poll::Ready(None) => {
305 // Stream is ready, but is terminated.
306 // Leave in place and return `None`.
307 return None;
308 }
309 Poll::Pending => {
310 // Stream wasn't actually ready, despite being on the ready
311 // list. This should be impossible by the stability guarantees
312 // of `PeekableStream` and our own internal logic, but we can
313 // recover.
314 tracing::error!("Bug: Stream unexpectedly unready");
315 self.pending_streams
316 .try_insert(key.clone(), PeekableReady::new(stream))
317 // By invariant on `priorities` that keys are in exactly one of the ready or pending lists.
318 .unwrap_or_else(|_| {
319 unreachable!("Key unexpectedly in both ready and unready list")
320 });
321 return None;
322 }
323 }
324 let Some(Some(val)) = stream.next().now_or_never() else {
325 panic!("Polling stream returned a different result than peeking");
326 };
327 let prev_priority = std::mem::replace(priority_mut, new_priority);
328 self.pending_streams
329 .try_insert(key, PeekableReady::new(stream))
330 // We verified above that the key wasn't present in `priorities`,
331 // and `pending_streams` has the invariant that its keys are a
332 // subset of those in `priorities`.
333 .unwrap_or_else(|_| panic!("Unexpected pending stream entry"));
334 Some((prev_priority, val))
335 }
336
337 /// Get a mut reference to a ready value for key `key`, if one exists.
338 ///
339 /// This method doesn't poll the internal streams. Use `poll_ready_iter` to
340 /// ensure streams make progress.
341 // This will be used for packing and fragmentation, to take part of a DATA message.
342 #[allow(unused)]
343 pub fn peek_mut<'a>(&'a mut self, key: &K) -> Option<Poll<Option<&'a mut S::Item>>> {
344 let priority = self.priorities.get(key)?;
345 let Some(peekable) = self.ready_streams.get_mut(&(priority.clone(), key.clone())) else {
346 return Some(Poll::Pending);
347 };
348 // We don't have a waker registered here, so we can just use the noop waker.
349 // TODO: Create a mut future for `PeekableStream`.
350 Some(Pin::new(peekable).poll_peek_mut(&mut Context::from_waker(noop_waker_ref())))
351 }
352
353 /// Get a reference to the stream for `key`.
354 ///
355 /// The same restrictions apply as for [`Self::stream_mut`] (e.g. using
356 /// interior mutability).
357 #[allow(dead_code)]
358 pub fn stream(&self, key: &K) -> Option<&S> {
359 if let Some(s) = self.pending_streams.get(key) {
360 let s = s.get_ref();
361 // Stream must be present since it's still pending.
362 debug_assert!(s.is_some(), "Unexpected missing pending stream");
363 return s;
364 }
365 let priority = self.priorities.get(key)?;
366 self.ready_streams.get(&(priority.clone(), key.clone()))
367 }
368
369 /// Get a mut reference to the stream for `key`.
370 ///
371 /// Polling the stream through this reference, or otherwise causing its
372 /// registered `Waker` to be removed without waking it, will result in
373 /// unspecified (but not unsound) behavior.
374 ///
375 /// This is mostly intended for accessing non-`Stream` functionality of the stream
376 /// object, though it *is* permitted to mutate it in a way that the stream becomes
377 /// ready (potentially removing and waking its registered Waker(s)).
378 //
379 // In particular:
380 // * Polling a stream in the pending list and getting a Pending result
381 // will overwrite our Waker, resulting in us not polling it again.
382 // * Doing so with a stream on the pending list and getting a Ready result
383 // might be ok if it had already woken our waker. Otoh it could potentially
384 // result in our waker never getting woken, and hence us not polling it again.
385 // * Doing so with a stream on the ready list should actually be ok, since
386 // we don't have a registered waker, and don't do our own buffering.
387 pub fn stream_mut(&mut self, key: &K) -> Option<&mut S> {
388 if let Some(s) = self.pending_streams.get_mut(key) {
389 let s = s.get_mut();
390 // Stream must be present since it's still pending.
391 debug_assert!(s.is_some(), "Unexpected missing pending stream");
392 return s;
393 }
394 let priority = self.priorities.get(key)?;
395 self.ready_streams.get_mut(&(priority.clone(), key.clone()))
396 }
397
398 /// Number of streams managed by this object.
399 pub fn len(&self) -> usize {
400 self.priorities.len()
401 }
402}
403
404/// Error returned by [`StreamPollSet::try_insert`].
405#[derive(Debug, thiserror::Error)]
406#[allow(clippy::exhaustive_structs)]
407pub struct KeyAlreadyInsertedError<K, P, S> {
408 /// Key that caller tried to insert.
409 #[allow(dead_code)]
410 pub key: K,
411 /// Priority that caller tried to insert.
412 #[allow(dead_code)]
413 pub priority: P,
414 /// Stream that caller tried to insert.
415 #[allow(dead_code)]
416 pub stream: S,
417}
418
419#[cfg(test)]
420mod test {
421 // @@ begin test lint list maintained by maint/add_warning @@
422 #![allow(clippy::bool_assert_comparison)]
423 #![allow(clippy::clone_on_copy)]
424 #![allow(clippy::dbg_macro)]
425 #![allow(clippy::mixed_attributes_style)]
426 #![allow(clippy::print_stderr)]
427 #![allow(clippy::print_stdout)]
428 #![allow(clippy::single_char_pattern)]
429 #![allow(clippy::unwrap_used)]
430 #![allow(clippy::unchecked_duration_subtraction)]
431 #![allow(clippy::useless_vec)]
432 #![allow(clippy::needless_pass_by_value)]
433 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
434
435 use std::{
436 collections::VecDeque,
437 sync::{Arc, Mutex},
438 task::Poll,
439 };
440
441 use futures::{stream::Peekable, SinkExt as _};
442 use pin_project::pin_project;
443 use tor_rtmock::MockRuntime;
444
445 use super::*;
446
447 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
448 struct Key(u64);
449
450 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
451 struct Priority(u64);
452
453 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
454 struct Value(u64);
455
456 /// Test stream that we can directly manipulate and examine.
457 #[derive(Debug)]
458 #[pin_project]
459 struct VecDequeStream<T> {
460 // Ready items.
461 vec: VecDeque<T>,
462 // Whether any more items will be written.
463 closed: bool,
464 // Registered waker.
465 waker: Option<std::task::Waker>,
466 }
467 impl<T> VecDequeStream<T> {
468 fn new_open<I: IntoIterator<Item = T>>(values: I) -> Self {
469 Self {
470 vec: VecDeque::from_iter(values),
471 waker: None,
472 closed: false,
473 }
474 }
475 fn new_closed<I: IntoIterator<Item = T>>(values: I) -> Self {
476 Self {
477 vec: VecDeque::from_iter(values),
478 waker: None,
479 closed: true,
480 }
481 }
482 fn push(&mut self, value: T) {
483 assert!(!self.closed);
484 self.vec.push_back(value);
485 if let Some(waker) = self.waker.take() {
486 waker.wake();
487 }
488 }
489 }
490 impl<T> futures::Stream for VecDequeStream<T> {
491 type Item = T;
492
493 fn poll_next(
494 mut self: std::pin::Pin<&mut Self>,
495 cx: &mut std::task::Context<'_>,
496 ) -> Poll<Option<Self::Item>> {
497 if let Some(val) = self.as_mut().vec.pop_front() {
498 Poll::Ready(Some(val))
499 } else if self.as_mut().closed {
500 // No more items coming.
501 Poll::Ready(None)
502 } else {
503 self.as_mut().waker.replace(cx.waker().clone());
504 Poll::Pending
505 }
506 }
507 }
508 impl<T> PeekableStream for VecDequeStream<T> {
509 fn poll_peek_mut(
510 self: Pin<&mut Self>,
511 cx: &mut Context<'_>,
512 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
513 let s = self.project();
514 if let Some(val) = s.vec.front_mut() {
515 Poll::Ready(Some(val))
516 } else if *s.closed {
517 // No more items coming.
518 Poll::Ready(None)
519 } else {
520 s.waker.replace(cx.waker().clone());
521 Poll::Pending
522 }
523 }
524 }
525 impl<T> std::cmp::PartialEq for VecDequeStream<T>
526 where
527 T: std::cmp::PartialEq,
528 {
529 fn eq(&self, other: &Self) -> bool {
530 // Ignore waker, which isn't comparable
531 self.vec == other.vec && self.closed == other.closed
532 }
533 }
534 impl<T> std::cmp::Eq for VecDequeStream<T> where T: std::cmp::Eq {}
535
536 type TestStream = VecDequeStream<Value>;
537
538 #[test]
539 fn test_empty() {
540 futures::executor::block_on(futures::future::poll_fn(|ctx| {
541 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
542 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
543 Poll::Ready(())
544 }));
545 }
546
547 #[test]
548 fn test_one_pending() {
549 futures::executor::block_on(futures::future::poll_fn(|ctx| {
550 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
551 pollset
552 .try_insert(Key(0), Priority(0), TestStream::new_open([]))
553 .unwrap();
554 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
555 Poll::Ready(())
556 }));
557 }
558
559 #[test]
560 fn test_one_ready() {
561 futures::executor::block_on(futures::future::poll_fn(|ctx| {
562 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
563 pollset
564 .try_insert(
565 Key(0),
566 Priority(0),
567 TestStream::new_closed([Value(1), Value(2)]),
568 )
569 .unwrap();
570
571 // We only see the first value of the one ready stream.
572 assert_eq!(
573 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
574 vec![(
575 &Key(0),
576 &Priority(0),
577 &mut TestStream::new_closed([Value(1), Value(2)])
578 )],
579 );
580
581 // Same result, the same value is still at the head of the stream..
582 assert_eq!(
583 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
584 vec![(
585 &Key(0),
586 &Priority(0),
587 &mut TestStream::new_closed([Value(1), Value(2)])
588 )]
589 );
590
591 // Take the head of the stream.
592 assert_eq!(
593 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(1)),
594 Some((Priority(0), Value(1)))
595 );
596
597 // Should see the next value, with the new priority.
598 assert_eq!(
599 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
600 vec![(
601 &Key(0),
602 &Priority(1),
603 &mut TestStream::new_closed([Value(2)])
604 )]
605 );
606
607 // Take again.
608 assert_eq!(
609 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
610 Some((Priority(1), Value(2)))
611 );
612
613 // Should see end-of-stream.
614 assert_eq!(
615 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
616 vec![(&Key(0), &Priority(2), &mut TestStream::new_closed([]))]
617 );
618
619 // Remove the now-ended stream.
620 assert_eq!(
621 pollset.remove(&Key(0)),
622 Some((Key(0), Priority(2), TestStream::new_closed([])))
623 );
624
625 // Should now be empty.
626 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
627
628 Poll::Ready(())
629 }));
630 }
631
632 #[test]
633 fn test_round_robin() {
634 futures::executor::block_on(futures::future::poll_fn(|ctx| {
635 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
636 pollset
637 .try_insert(
638 Key(0),
639 Priority(0),
640 TestStream::new_closed([Value(1), Value(2)]),
641 )
642 .unwrap();
643 pollset
644 .try_insert(
645 Key(1),
646 Priority(1),
647 TestStream::new_closed([Value(3), Value(4)]),
648 )
649 .unwrap();
650
651 // Should see both ready streams, in priority order.
652 assert_eq!(
653 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
654 vec![
655 (
656 &Key(0),
657 &Priority(0),
658 &mut TestStream::new_closed([Value(1), Value(2)])
659 ),
660 (
661 &Key(1),
662 &Priority(1),
663 &mut TestStream::new_closed([Value(3), Value(4)])
664 ),
665 ]
666 );
667
668 // Take from the first stream and send it to the back via priority assignment.
669 assert_eq!(
670 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(2)),
671 Some((Priority(0), Value(1)))
672 );
673
674 // Should see both ready streams, in the new priority order.
675 assert_eq!(
676 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
677 vec![
678 (
679 &Key(1),
680 &Priority(1),
681 &mut TestStream::new_closed([Value(3), Value(4)])
682 ),
683 (
684 &Key(0),
685 &Priority(2),
686 &mut TestStream::new_closed([Value(2)])
687 ),
688 ]
689 );
690
691 // Keep going ...
692 assert_eq!(
693 pollset.take_ready_value_and_reprioritize(&Key(1), Priority(3)),
694 Some((Priority(1), Value(3)))
695 );
696 assert_eq!(
697 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
698 vec![
699 (
700 &Key(0),
701 &Priority(2),
702 &mut TestStream::new_closed([Value(2)])
703 ),
704 (
705 &Key(1),
706 &Priority(3),
707 &mut TestStream::new_closed([Value(4)])
708 ),
709 ]
710 );
711 assert_eq!(
712 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(4)),
713 Some((Priority(2), Value(2)))
714 );
715 assert_eq!(
716 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
717 vec![
718 (
719 &Key(1),
720 &Priority(3),
721 &mut TestStream::new_closed([Value(4)])
722 ),
723 (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
724 ]
725 );
726 assert_eq!(
727 pollset.take_ready_value_and_reprioritize(&Key(1), Priority(5)),
728 Some((Priority(3), Value(4)))
729 );
730 assert_eq!(
731 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
732 vec![
733 (&Key(0), &Priority(4), &mut TestStream::new_closed([])),
734 (&Key(1), &Priority(5), &mut TestStream::new_closed([])),
735 ]
736 );
737
738 Poll::Ready(())
739 }));
740 }
741
742 #[test]
743 fn test_remove_and_reuse_key() {
744 futures::executor::block_on(futures::future::poll_fn(|ctx| {
745 let mut pollset = StreamPollSet::<Key, Priority, TestStream>::new();
746 pollset
747 .try_insert(
748 Key(0),
749 Priority(0),
750 TestStream::new_closed([Value(1), Value(2)]),
751 )
752 .unwrap();
753 assert_eq!(
754 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
755 vec![(
756 &Key(0),
757 &Priority(0),
758 &mut TestStream::new_closed([Value(1), Value(2)])
759 ),]
760 );
761 assert_eq!(
762 pollset.remove(&Key(0)),
763 Some((
764 Key(0),
765 Priority(0),
766 TestStream::new_closed([Value(1), Value(2)])
767 ))
768 );
769 pollset
770 .try_insert(
771 Key(0),
772 Priority(1),
773 TestStream::new_closed([Value(3), Value(4)]),
774 )
775 .unwrap();
776 // Ensure we see the ready value in the new stream, and *not* anything from the previous stream at that key.
777 assert_eq!(
778 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
779 vec![(
780 &Key(0),
781 &Priority(1),
782 &mut TestStream::new_closed([Value(3), Value(4)])
783 ),]
784 );
785 Poll::Ready(())
786 }));
787 }
788
789 #[test]
790 fn get_ready_stream() {
791 futures::executor::block_on(futures::future::poll_fn(|_ctx| {
792 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
793 pollset
794 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(1)]))
795 .unwrap();
796 assert_eq!(pollset.stream(&Key(0)).unwrap().vec[0], Value(1));
797 Poll::Ready(())
798 }));
799 }
800
801 #[test]
802 fn get_pending_stream() {
803 futures::executor::block_on(futures::future::poll_fn(|_ctx| {
804 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
805 pollset
806 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
807 .unwrap();
808 assert!(pollset.stream(&Key(0)).unwrap().vec.is_empty());
809 Poll::Ready(())
810 }));
811 }
812
813 #[test]
814 fn mutate_pending_stream() {
815 futures::executor::block_on(futures::future::poll_fn(|ctx| {
816 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
817 pollset
818 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([]))
819 .unwrap();
820 assert_eq!(pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(), vec![]);
821
822 // This should cause the stream to become ready.
823 pollset.stream_mut(&Key(0)).unwrap().push(Value(0));
824
825 assert_eq!(
826 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
827 vec![(
828 &Key(0),
829 &Priority(0),
830 &mut VecDequeStream::new_open([Value(0)])
831 ),]
832 );
833
834 Poll::Ready(())
835 }));
836 }
837
838 #[test]
839 fn mutate_ready_stream() {
840 futures::executor::block_on(futures::future::poll_fn(|ctx| {
841 let mut pollset = StreamPollSet::<Key, Priority, VecDequeStream<Value>>::new();
842 pollset
843 .try_insert(Key(0), Priority(0), VecDequeStream::new_open([Value(0)]))
844 .unwrap();
845 assert_eq!(
846 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
847 vec![(
848 &Key(0),
849 &Priority(0),
850 &mut VecDequeStream::new_open([Value(0)])
851 ),]
852 );
853
854 pollset.stream_mut(&Key(0)).unwrap().push(Value(1));
855
856 assert_eq!(
857 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
858 vec![(
859 &Key(0),
860 &Priority(0),
861 &mut VecDequeStream::new_open([Value(0), Value(1)])
862 ),]
863 );
864
865 // Consume the value that was there.
866 assert_eq!(
867 pollset.take_ready_value_and_reprioritize(&Key(0), Priority(0)),
868 Some((Priority(0), Value(0)))
869 );
870
871 // We should now see the value we added.
872 assert_eq!(
873 pollset.poll_ready_iter_mut(ctx).collect::<Vec<_>>(),
874 vec![(
875 &Key(0),
876 &Priority(0),
877 &mut VecDequeStream::new_open([Value(1)])
878 ),]
879 );
880
881 Poll::Ready(())
882 }));
883 }
884
885 #[test]
886 fn test_async() {
887 MockRuntime::test_with_various(|rt| async move {
888 let mut pollset = StreamPollSet::<
889 Key,
890 Priority,
891 Peekable<futures::channel::mpsc::Receiver<Value>>,
892 >::new();
893
894 // Create 2 mpsc channels, bounded so that we can exercise back-pressure.
895 // These are analogous to Tor streams.
896 for streami in 1..=2 {
897 let (mut send, recv) = futures::channel::mpsc::channel::<Value>(2);
898 pollset
899 .try_insert(Key(streami), Priority(streami), recv.peekable())
900 .unwrap();
901 rt.spawn_identified(format!("stream{streami}"), async move {
902 for val in 0..10 {
903 send.send(Value(val * streami)).await.unwrap();
904 }
905 });
906 }
907
908 let output = Arc::new(Mutex::new(Vec::new()));
909
910 rt.spawn_identified("mux", {
911 let output = output.clone();
912 async move {
913 loop {
914 let (key, priority, value) = futures::future::poll_fn(|ctx| {
915 match pollset.poll_ready_iter_mut(ctx).next() {
916 Some((key, priority, stream)) => {
917 let Poll::Ready(value) = Pin::new(stream).poll_peek(ctx) else {
918 panic!("poll_ready_iter_mut returned non-ready stream")
919 };
920 Poll::Ready((*key, *priority, value.copied()))
921 }
922 // No streams ready, but there could be more items coming.
923 // The current `ctx` should be registered to wake us
924 // if and when there are.
925 None => Poll::Pending,
926 }
927 })
928 .await;
929 if let Some(value) = value {
930 // Take the value, and haphazardly set priority to push this stream "back".
931 pollset
932 .take_ready_value_and_reprioritize(&key, Priority(priority.0 + 10))
933 .unwrap();
934 output.lock().unwrap().push((key, value));
935 } else {
936 // Stream ended. Remove it.
937 let _ = pollset.remove(&key).unwrap();
938 }
939 }
940 }
941 });
942
943 rt.advance_until_stalled().await;
944
945 let output = output.lock().unwrap();
946
947 // We can't predict exactly how the stream values will be
948 // interleaved, but we should get all items from each stream, with
949 // correct order within each stream.
950 for streami in 1..=2 {
951 let expected = (0..10).map(|val| Value(val * streami)).collect::<Vec<_>>();
952 let actual = output
953 .iter()
954 .filter_map(|(k, v)| (k == &Key(streami)).then_some(*v))
955 .collect::<Vec<_>>();
956 assert_eq!(actual, expected);
957 }
958 });
959 }
960}