1//! Stream utilities to help implement
2//! [`AbstractCircMgr`](`super::AbstractCircMgr.`)
34use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
5use futures::task::{Context, Poll};
6use pin_project::pin_project;
7use std::pin::Pin;
89/// Enumeration to indicate which of two streams provided a result.
10#[derive(Debug, Copy, Clone, Eq, PartialEq)]
11pub(super) enum Source {
12/// Indicates a result coming from the left (preferred) stream.
13Left,
14/// Indicates a result coming from the right (secondary) stream.
15Right,
16}
1718/// A stream returned by [`select_biased`]
19///
20/// See that function for more documentation.
21#[pin_project]
22pub(super) struct SelectBiased<S, T> {
23/// Preferred underlying stream.
24 ///
25 /// When results are available from both streams, we always yield them
26 /// from this one. When this stream is exhausted, the `SelectBiased`
27 /// is exhausted too.
28#[pin]
29left: Fuse<S>,
30/// Secondary underlying stream.
31#[pin]
32right: Fuse<T>,
33}
3435/// Combine two instances of [`Stream`] into one.
36///
37/// This function is similar to [`futures::stream::select`], but differs
38/// in that it treats the two underlying streams asymmetrically. Specifically:
39///
40/// * Each result is labeled with [`Source::Left`] or
41/// [`Source::Right`], depending on which of the two streams it came
42/// from.
43/// * If both the "left" and the "right" stream are ready, we always
44/// prefer the left stream.
45/// * We stop iterating over this stream when there are no more
46/// results on the left stream, regardless whether the right stream
47/// is exhausted or not.
48///
49/// # Future plans
50///
51/// This might need a better name, especially if we use it anywhere
52/// else.
53///
54/// If we do expose this function, we might want to split up the ways in
55/// which it differs from `select`.
56pub(super) fn select_biased<S, T>(left: S, right: T) -> SelectBiased<S, T>
57where
58S: Stream,
59 T: Stream<Item = S::Item>,
60{
61 SelectBiased {
62 left: left.fuse(),
63 right: right.fuse(),
64 }
65}
6667impl<S, T> FusedStream for SelectBiased<S, T>
68where
69S: Stream,
70 T: Stream<Item = S::Item>,
71{
72fn is_terminated(&self) -> bool {
73// We're done if the left stream is done, whether the right stream
74 // is done or not.
75self.left.is_terminated()
76 }
77}
7879impl<S, T> Stream for SelectBiased<S, T>
80where
81S: Stream,
82 T: Stream<Item = S::Item>,
83{
84type Item = (Source, S::Item);
8586fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87let this = self.project();
88// We always check the left stream first.
89match this.left.poll_next(cx) {
90 Poll::Ready(Some(val)) => {
91// The left stream has an item: yield it.
92return Poll::Ready(Some((Source::Left, val)));
93 }
94 Poll::Ready(None) => {
95// The left stream is exhausted: don't even check the right.
96return Poll::Ready(None);
97 }
98 Poll::Pending => {}
99 }
100101// The left stream is pending: see whether the right stream has
102 // anything to say.
103match this.right.poll_next(cx) {
104 Poll::Ready(Some(val)) => {
105// The right stream has an item: yield it.
106Poll::Ready(Some((Source::Right, val)))
107 }
108_ => {
109// The right stream is exhausted or pending: in either case,
110 // we need to wait.
111Poll::Pending
112 }
113 }
114 }
115}
116117#[cfg(test)]
118mod test {
119// @@ begin test lint list maintained by maint/add_warning @@
120#![allow(clippy::bool_assert_comparison)]
121 #![allow(clippy::clone_on_copy)]
122 #![allow(clippy::dbg_macro)]
123 #![allow(clippy::mixed_attributes_style)]
124 #![allow(clippy::print_stderr)]
125 #![allow(clippy::print_stdout)]
126 #![allow(clippy::single_char_pattern)]
127 #![allow(clippy::unwrap_used)]
128 #![allow(clippy::unchecked_duration_subtraction)]
129 #![allow(clippy::useless_vec)]
130 #![allow(clippy::needless_pass_by_value)]
131//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
132use super::*;
133use futures_await_test::async_test;
134135// Tests where only elements from the left stream should be yielded.
136#[async_test]
137async fn left_only() {
138use futures::stream::iter;
139use Source::Left as L;
140// If there's nothing in the right stream, we just yield the left.
141let left = vec![1_usize, 2, 3];
142let right = vec![];
143144let s = select_biased(iter(left), iter(right));
145let result: Vec<_> = s.collect().await;
146assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
147148// If the left runs out (which this will), we don't yield anything
149 // from the right.
150let left = vec![1_usize, 2, 3];
151let right = vec![4, 5, 6];
152let s = select_biased(iter(left), iter(right));
153let result: Vec<_> = s.collect().await;
154assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
155156// The same thing happens if the left stream is completely empty!
157let left = vec![];
158let right = vec![4_usize, 5, 6];
159let s = select_biased(iter(left), iter(right));
160let result: Vec<_> = s.collect().await;
161assert_eq!(result, vec![]);
162 }
163164// Tests where only elements from the right stream should be yielded.
165#[async_test]
166async fn right_only() {
167use futures::stream::{iter, pending};
168use Source::Right as R;
169170// Try a forever-pending stream for the left hand side.
171let left = pending();
172let right = vec![4_usize, 5, 6];
173let mut s = select_biased(left, iter(right));
174assert_eq!(s.next().await, Some((R, 4)));
175assert_eq!(s.next().await, Some((R, 5)));
176assert_eq!(s.next().await, Some((R, 6)));
177 }
178179// Tests where we can find elements from both streams.
180#[async_test]
181async fn multiplex() {
182use futures::SinkExt;
183use Source::{Left as L, Right as R};
184185let (mut snd_l, rcv_l) = futures::channel::mpsc::channel(5);
186let (mut snd_r, rcv_r) = futures::channel::mpsc::channel(5);
187let mut s = select_biased(rcv_l, rcv_r);
188189 snd_l.send(1_usize).await.unwrap();
190 snd_r.send(4_usize).await.unwrap();
191 snd_l.send(2_usize).await.unwrap();
192193assert_eq!(s.next().await, Some((L, 1)));
194assert_eq!(s.next().await, Some((L, 2)));
195assert_eq!(s.next().await, Some((R, 4)));
196197 snd_r.send(5_usize).await.unwrap();
198 snd_l.send(3_usize).await.unwrap();
199200assert!(!s.is_terminated());
201 drop(snd_r);
202203assert_eq!(s.next().await, Some((L, 3)));
204assert_eq!(s.next().await, Some((R, 5)));
205206 drop(snd_l);
207assert_eq!(s.next().await, None);
208209assert!(s.is_terminated());
210 }
211}