1
//! Stream utilities to help implement
2
//! [`AbstractCircMgr`](`super::AbstractCircMgr.`)
3

            
4
use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
5
use futures::task::{Context, Poll};
6
use pin_project::pin_project;
7
use std::pin::Pin;
8

            
9
/// Enumeration to indicate which of two streams provided a result.
10
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
11
pub(super) enum Source {
12
    /// Indicates a result coming from the left (preferred) stream.
13
    Left,
14
    /// Indicates a result coming from the right (secondary) stream.
15
    Right,
16
}
17

            
18
/// A stream returned by [`select_biased`]
19
///
20
/// See that function for more documentation.
21
816
#[pin_project]
22
pub(super) struct SelectBiased<S, T> {
23
    /// Preferred underlying stream.
24
    ///
25
    /// When results are available from both streams, we always yield them
26
    /// from this one.  When this stream is exhausted, the `SelectBiased`
27
    /// is exhausted too.
28
    #[pin]
29
    left: Fuse<S>,
30
    /// Secondary underlying stream.
31
    #[pin]
32
    right: Fuse<T>,
33
}
34

            
35
/// Combine two instances of [`Stream`] into one.
36
///
37
/// This function is similar to [`futures::stream::select`], but differs
38
/// in that it treats the two underlying streams asymmetrically.  Specifically:
39
///
40
///  * Each result is labeled with [`Source::Left`] or
41
///    [`Source::Right`], depending on which of the two streams it came
42
///    from.
43
///  * If both the "left" and the "right" stream are ready, we always
44
///    prefer the left stream.
45
///  * We stop iterating over this stream when there are no more
46
///    results on the left stream, regardless whether the right stream
47
///    is exhausted or not.
48
///
49
/// # Future plans
50
///
51
/// This might need a better name, especially if we use it anywhere
52
/// else.
53
///
54
/// If we do expose this function, we might want to split up the ways in
55
/// which it differs from `select`.
56
160
pub(super) fn select_biased<S, T>(left: S, right: T) -> SelectBiased<S, T>
57
160
where
58
160
    S: Stream,
59
160
    T: Stream<Item = S::Item>,
60
160
{
61
160
    SelectBiased {
62
160
        left: left.fuse(),
63
160
        right: right.fuse(),
64
160
    }
65
160
}
66

            
67
impl<S, T> FusedStream for SelectBiased<S, T>
68
where
69
    S: Stream,
70
    T: Stream<Item = S::Item>,
71
{
72
4
    fn is_terminated(&self) -> bool {
73
4
        // We're done if the left stream is done, whether the right stream
74
4
        // is done or not.
75
4
        self.left.is_terminated()
76
4
    }
77
}
78

            
79
impl<S, T> Stream for SelectBiased<S, T>
80
where
81
    S: Stream,
82
    T: Stream<Item = S::Item>,
83
{
84
    type Item = (Source, S::Item);
85

            
86
816
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87
816
        let this = self.project();
88
816
        // We always check the left stream first.
89
816
        match this.left.poll_next(cx) {
90
160
            Poll::Ready(Some(val)) => {
91
160
                // The left stream has an item: yield it.
92
160
                return Poll::Ready(Some((Source::Left, val)));
93
            }
94
            Poll::Ready(None) => {
95
                // The left stream is exhausted: don't even check the right.
96
88
                return Poll::Ready(None);
97
            }
98
568
            Poll::Pending => {}
99
568
        }
100
568

            
101
568
        // The left stream is pending: see whether the right stream has
102
568
        // anything to say.
103
568
        match this.right.poll_next(cx) {
104
18
            Poll::Ready(Some(val)) => {
105
18
                // The right stream has an item: yield it.
106
18
                Poll::Ready(Some((Source::Right, val)))
107
            }
108
            _ => {
109
                // The right stream is exhausted or pending: in either case,
110
                // we need to wait.
111
550
                Poll::Pending
112
            }
113
        }
114
816
    }
115
}
116

            
117
#[cfg(test)]
118
mod test {
119
    // @@ begin test lint list maintained by maint/add_warning @@
120
    #![allow(clippy::bool_assert_comparison)]
121
    #![allow(clippy::clone_on_copy)]
122
    #![allow(clippy::dbg_macro)]
123
    #![allow(clippy::mixed_attributes_style)]
124
    #![allow(clippy::print_stderr)]
125
    #![allow(clippy::print_stdout)]
126
    #![allow(clippy::single_char_pattern)]
127
    #![allow(clippy::unwrap_used)]
128
    #![allow(clippy::unchecked_duration_subtraction)]
129
    #![allow(clippy::useless_vec)]
130
    #![allow(clippy::needless_pass_by_value)]
131
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
132
    use super::*;
133
    use futures_await_test::async_test;
134

            
135
    // Tests where only elements from the left stream should be yielded.
136
    #[async_test]
137
    async fn left_only() {
138
        use futures::stream::iter;
139
        use Source::Left as L;
140
        // If there's nothing in the right stream, we just yield the left.
141
        let left = vec![1_usize, 2, 3];
142
        let right = vec![];
143

            
144
        let s = select_biased(iter(left), iter(right));
145
        let result: Vec<_> = s.collect().await;
146
        assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
147

            
148
        // If the left runs out (which this will), we don't yield anything
149
        // from the right.
150
        let left = vec![1_usize, 2, 3];
151
        let right = vec![4, 5, 6];
152
        let s = select_biased(iter(left), iter(right));
153
        let result: Vec<_> = s.collect().await;
154
        assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
155

            
156
        // The same thing happens if the left stream is completely empty!
157
        let left = vec![];
158
        let right = vec![4_usize, 5, 6];
159
        let s = select_biased(iter(left), iter(right));
160
        let result: Vec<_> = s.collect().await;
161
        assert_eq!(result, vec![]);
162
    }
163

            
164
    // Tests where only elements from the right stream should be yielded.
165
    #[async_test]
166
    async fn right_only() {
167
        use futures::stream::{iter, pending};
168
        use Source::Right as R;
169

            
170
        // Try a forever-pending stream for the left hand side.
171
        let left = pending();
172
        let right = vec![4_usize, 5, 6];
173
        let mut s = select_biased(left, iter(right));
174
        assert_eq!(s.next().await, Some((R, 4)));
175
        assert_eq!(s.next().await, Some((R, 5)));
176
        assert_eq!(s.next().await, Some((R, 6)));
177
    }
178

            
179
    // Tests where we can find elements from both streams.
180
    #[async_test]
181
    async fn multiplex() {
182
        use futures::SinkExt;
183
        use Source::{Left as L, Right as R};
184

            
185
        let (mut snd_l, rcv_l) = futures::channel::mpsc::channel(5);
186
        let (mut snd_r, rcv_r) = futures::channel::mpsc::channel(5);
187
        let mut s = select_biased(rcv_l, rcv_r);
188

            
189
        snd_l.send(1_usize).await.unwrap();
190
        snd_r.send(4_usize).await.unwrap();
191
        snd_l.send(2_usize).await.unwrap();
192

            
193
        assert_eq!(s.next().await, Some((L, 1)));
194
        assert_eq!(s.next().await, Some((L, 2)));
195
        assert_eq!(s.next().await, Some((R, 4)));
196

            
197
        snd_r.send(5_usize).await.unwrap();
198
        snd_l.send(3_usize).await.unwrap();
199

            
200
        assert!(!s.is_terminated());
201
        drop(snd_r);
202

            
203
        assert_eq!(s.next().await, Some((L, 3)));
204
        assert_eq!(s.next().await, Some((R, 5)));
205

            
206
        drop(snd_l);
207
        assert_eq!(s.next().await, None);
208

            
209
        assert!(s.is_terminated());
210
    }
211
}