tor_circmgr/mgr/
streams.rs

1//! Stream utilities to help implement
2//! [`AbstractCircMgr`](`super::AbstractCircMgr.`)
3
4use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
5use futures::task::{Context, Poll};
6use pin_project::pin_project;
7use std::pin::Pin;
8
9/// Enumeration to indicate which of two streams provided a result.
10#[derive(Debug, Copy, Clone, Eq, PartialEq)]
11pub(super) enum Source {
12    /// Indicates a result coming from the left (preferred) stream.
13    Left,
14    /// Indicates a result coming from the right (secondary) stream.
15    Right,
16}
17
18/// A stream returned by [`select_biased`]
19///
20/// See that function for more documentation.
21#[pin_project]
22pub(super) struct SelectBiased<S, T> {
23    /// Preferred underlying stream.
24    ///
25    /// When results are available from both streams, we always yield them
26    /// from this one.  When this stream is exhausted, the `SelectBiased`
27    /// is exhausted too.
28    #[pin]
29    left: Fuse<S>,
30    /// Secondary underlying stream.
31    #[pin]
32    right: Fuse<T>,
33}
34
35/// Combine two instances of [`Stream`] into one.
36///
37/// This function is similar to [`futures::stream::select`], but differs
38/// in that it treats the two underlying streams asymmetrically.  Specifically:
39///
40///  * Each result is labeled with [`Source::Left`] or
41///    [`Source::Right`], depending on which of the two streams it came
42///    from.
43///  * If both the "left" and the "right" stream are ready, we always
44///    prefer the left stream.
45///  * We stop iterating over this stream when there are no more
46///    results on the left stream, regardless whether the right stream
47///    is exhausted or not.
48///
49/// # Future plans
50///
51/// This might need a better name, especially if we use it anywhere
52/// else.
53///
54/// If we do expose this function, we might want to split up the ways in
55/// which it differs from `select`.
56pub(super) fn select_biased<S, T>(left: S, right: T) -> SelectBiased<S, T>
57where
58    S: Stream,
59    T: Stream<Item = S::Item>,
60{
61    SelectBiased {
62        left: left.fuse(),
63        right: right.fuse(),
64    }
65}
66
67impl<S, T> FusedStream for SelectBiased<S, T>
68where
69    S: Stream,
70    T: Stream<Item = S::Item>,
71{
72    fn is_terminated(&self) -> bool {
73        // We're done if the left stream is done, whether the right stream
74        // is done or not.
75        self.left.is_terminated()
76    }
77}
78
79impl<S, T> Stream for SelectBiased<S, T>
80where
81    S: Stream,
82    T: Stream<Item = S::Item>,
83{
84    type Item = (Source, S::Item);
85
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        let this = self.project();
88        // We always check the left stream first.
89        match this.left.poll_next(cx) {
90            Poll::Ready(Some(val)) => {
91                // The left stream has an item: yield it.
92                return Poll::Ready(Some((Source::Left, val)));
93            }
94            Poll::Ready(None) => {
95                // The left stream is exhausted: don't even check the right.
96                return Poll::Ready(None);
97            }
98            Poll::Pending => {}
99        }
100
101        // The left stream is pending: see whether the right stream has
102        // anything to say.
103        match this.right.poll_next(cx) {
104            Poll::Ready(Some(val)) => {
105                // The right stream has an item: yield it.
106                Poll::Ready(Some((Source::Right, val)))
107            }
108            _ => {
109                // The right stream is exhausted or pending: in either case,
110                // we need to wait.
111                Poll::Pending
112            }
113        }
114    }
115}
116
117#[cfg(test)]
118mod test {
119    // @@ begin test lint list maintained by maint/add_warning @@
120    #![allow(clippy::bool_assert_comparison)]
121    #![allow(clippy::clone_on_copy)]
122    #![allow(clippy::dbg_macro)]
123    #![allow(clippy::mixed_attributes_style)]
124    #![allow(clippy::print_stderr)]
125    #![allow(clippy::print_stdout)]
126    #![allow(clippy::single_char_pattern)]
127    #![allow(clippy::unwrap_used)]
128    #![allow(clippy::unchecked_duration_subtraction)]
129    #![allow(clippy::useless_vec)]
130    #![allow(clippy::needless_pass_by_value)]
131    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
132    use super::*;
133    use futures_await_test::async_test;
134
135    // Tests where only elements from the left stream should be yielded.
136    #[async_test]
137    async fn left_only() {
138        use futures::stream::iter;
139        use Source::Left as L;
140        // If there's nothing in the right stream, we just yield the left.
141        let left = vec![1_usize, 2, 3];
142        let right = vec![];
143
144        let s = select_biased(iter(left), iter(right));
145        let result: Vec<_> = s.collect().await;
146        assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
147
148        // If the left runs out (which this will), we don't yield anything
149        // from the right.
150        let left = vec![1_usize, 2, 3];
151        let right = vec![4, 5, 6];
152        let s = select_biased(iter(left), iter(right));
153        let result: Vec<_> = s.collect().await;
154        assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
155
156        // The same thing happens if the left stream is completely empty!
157        let left = vec![];
158        let right = vec![4_usize, 5, 6];
159        let s = select_biased(iter(left), iter(right));
160        let result: Vec<_> = s.collect().await;
161        assert_eq!(result, vec![]);
162    }
163
164    // Tests where only elements from the right stream should be yielded.
165    #[async_test]
166    async fn right_only() {
167        use futures::stream::{iter, pending};
168        use Source::Right as R;
169
170        // Try a forever-pending stream for the left hand side.
171        let left = pending();
172        let right = vec![4_usize, 5, 6];
173        let mut s = select_biased(left, iter(right));
174        assert_eq!(s.next().await, Some((R, 4)));
175        assert_eq!(s.next().await, Some((R, 5)));
176        assert_eq!(s.next().await, Some((R, 6)));
177    }
178
179    // Tests where we can find elements from both streams.
180    #[async_test]
181    async fn multiplex() {
182        use futures::SinkExt;
183        use Source::{Left as L, Right as R};
184
185        let (mut snd_l, rcv_l) = futures::channel::mpsc::channel(5);
186        let (mut snd_r, rcv_r) = futures::channel::mpsc::channel(5);
187        let mut s = select_biased(rcv_l, rcv_r);
188
189        snd_l.send(1_usize).await.unwrap();
190        snd_r.send(4_usize).await.unwrap();
191        snd_l.send(2_usize).await.unwrap();
192
193        assert_eq!(s.next().await, Some((L, 1)));
194        assert_eq!(s.next().await, Some((L, 2)));
195        assert_eq!(s.next().await, Some((R, 4)));
196
197        snd_r.send(5_usize).await.unwrap();
198        snd_l.send(3_usize).await.unwrap();
199
200        assert!(!s.is_terminated());
201        drop(snd_r);
202
203        assert_eq!(s.next().await, Some((L, 3)));
204        assert_eq!(s.next().await, Some((R, 5)));
205
206        drop(snd_l);
207        assert_eq!(s.next().await, None);
208
209        assert!(s.is_terminated());
210    }
211}