tor_proto/util/
sometimes_unbounded_sink.rs

1//! [`SometimesUnboundedSink`]
2
3use std::collections::VecDeque;
4use std::pin::Pin;
5use std::task::{ready, Context, Poll, Poll::*};
6
7use futures::{future, Sink};
8
9use pin_project::pin_project;
10
11/// Wraps a [`Sink`], providing an only-sometimes-used unbounded buffer
12///
13/// For example, consider `SometimesUnboundedSink<T, mpsc::Receiver>`.
14/// The `Receiver` is not always ready for writing:
15/// if the capacity is exceeded, `send` will block.
16///
17/// `SometimesUnboundedSink`'s `Sink` implementation works the same way.
18/// But there are also two methods
19/// [`pollish_send_unbounded`](SometimesUnboundedSink::pollish_send_unbounded)
20/// and
21/// [`send_unbounded`](SometimesUnboundedSink::send_unbounded)
22/// which will always succeed immediately.
23/// Items which the underlying sink `S` is not ready to accept are queued,
24/// and will be delivered to `S` when possible.
25///
26/// ### You must poll this type
27///
28/// For queued items to be delivered,
29/// `SometimesUnboundedSink` must be polled,
30/// even if you don't have an item to send.
31///
32/// You can use [`Sink::poll_ready`] for this.
33/// Any [`Context`]-taking methods is suitable.
34///
35/// ### Error handling
36///
37/// Errors from the underlying sink may not be reported immediately,
38/// due to the buffering in `SometimesUnboundedSink`.
39///
40/// However, if the sink reports errors from `poll_ready`
41/// these will surface in a timely fashion.
42///
43/// After an error has been reported, there may still be buffered data,
44/// which will only be delivered if `SometimesUnboundedSink` is polled again
45/// (and the error in the underlying sink was transient).
46#[pin_project]
47pub(crate) struct SometimesUnboundedSink<T, S> {
48    /// Things we couldn't send_unbounded right away
49    ///
50    /// Invariants:
51    ///
52    ///  * Everything here must be fed to `inner` before any further user data
53    ///    (unbounded user data may be appended).
54    ///
55    ///  * If this is nonempty, the executor knows to wake this task.
56    ///    This is achieved as follows:
57    ///    If this is nonempty, `inner.poll_ready()` has been called.
58    buf: VecDeque<T>,
59
60    /// The actual sink
61    ///
62    /// This also has the relevant `Waker`.
63    ///
64    /// # Waker invariant
65    ///
66    /// Whenever either
67    ///
68    ///  * The last call to any of our public methods returned `Pending`, or
69    ///  * `buf` is nonempty,
70    ///
71    /// the last method call `inner` *also* returned `Pending`.
72    /// (Or, we have reported an error.)
73    ///
74    /// So, in those situations, this task has been recorded for wakeup
75    /// by `inner` (specifically, its other end, if it's a channel)
76    /// when `inner` becomes readable.
77    ///
78    /// Therefore this task will be woken up, and, if the caller actually
79    /// polls us again (as is usual and is required by our docs),
80    /// we'll drain any queued data.
81    #[pin]
82    inner: S,
83}
84
85impl<T, S: Sink<T>> SometimesUnboundedSink<T, S> {
86    /// Wrap an inner `Sink` with a `SometimesUnboundedSink`
87    //
88    // There is no method for unwrapping.  If we make this type more public,
89    // there should be, but that method will need `where S: Unpin`.
90    pub(crate) fn new(inner: S) -> Self {
91        SometimesUnboundedSink {
92            buf: VecDeque::new(),
93            inner,
94        }
95    }
96
97    /// Return the number of T queued in this sink.
98    pub(crate) fn n_queued(&self) -> usize {
99        self.buf.len()
100    }
101
102    /// Hand `item` to the inner Sink if possible, or queue it otherwise
103    ///
104    /// Like a `poll_...` method in that it takes a `Context`.
105    /// That's needed to make sure we get polled again
106    /// when the underlying sink can accept items.
107    ///
108    /// But unlike a `poll_...` method in that it doesn't return `Poll`,
109    /// since completion is always immediate.
110    pub(crate) fn pollish_send_unbounded(
111        mut self: Pin<&mut Self>,
112        cx: &mut Context<'_>,
113        item: T,
114    ) -> Result<(), S::Error> {
115        match self.as_mut().poll_ready(cx) {
116            // Waker invariant: poll_ready only returns Ready(Ok(())) if `buf` is empty
117            Ready(Ok(())) => self.as_mut().start_send(item),
118            // Waker invariant: if we report an error, we're then allowed to expect polling again
119            Ready(Err(e)) => Err(e),
120            Pending => {
121                // Waker invariant: poll_ready() returned Pending,
122                // so the task has indeed already been recorded.
123                self.as_mut().project().buf.push_back(item);
124                Ok(())
125            }
126        }
127    }
128
129    /// Hand `item` to the inner Sink if possible, or queue it otherwise (async fn)
130    ///
131    /// You must `.await` this, but it will never block.
132    /// (Its future is always `Ready`.)
133    pub(crate) async fn send_unbounded(mut self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
134        // Waker invariant: this is just a wrapper around `pollish_send_unbounded`
135        let mut item = Some(item);
136        future::poll_fn(move |cx| {
137            let item = item.take().expect("polled after Ready");
138            Ready(self.as_mut().pollish_send_unbounded(cx, item))
139        })
140        .await
141    }
142
143    /// Flush the buffer.  On a `Ready(())` return, it's empty.
144    ///
145    /// This satisfies the Waker invariant as if it were a public method.
146    fn flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
147        let mut self_ = self.as_mut().project();
148        while !self_.buf.is_empty() {
149            // Waker invariant:
150            // if inner gave Pending, we give Pending too: ok
151            // if inner gave Err, we're allowed to want polling again
152            ready!(self_.inner.as_mut().poll_ready(cx))?;
153            let item = self_.buf.pop_front().expect("suddenly empty!");
154            // Waker invariant: returning Err
155            self_.inner.as_mut().start_send(item)?;
156        }
157        // Waker invariant: buffer is empty, and we're not about to return Pending
158        Ready(Ok(()))
159    }
160
161    /// Obtain a reference to the inner `Sink`, `S`
162    ///
163    /// This method should be used with a little care, since it bypasses the wrapper.
164    /// For example, if `S` has interior mutability, and this method is used to
165    /// modify it, the `SometimesUnboundedSink` may malfunction.
166    pub(crate) fn as_inner(&self) -> &S {
167        &self.inner
168    }
169}
170
171// Waker invariant for all these impls:
172// returning Err or Pending from flush_buf: OK, flush_buf ensures the condition holds
173// returning from the inner method: trivially OK
174impl<T, S: Sink<T>> Sink<T> for SometimesUnboundedSink<T, S> {
175    type Error = S::Error;
176
177    // Only returns `Ready(Ok(()))` if `buf` is empty
178    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
179        ready!(self.as_mut().flush_buf(cx))?;
180        self.project().inner.poll_ready(cx)
181    }
182
183    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), S::Error> {
184        assert!(self.buf.is_empty(), "start_send without poll_ready");
185        self.project().inner.start_send(item)
186    }
187
188    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
189        ready!(self.as_mut().flush_buf(cx))?;
190        self.project().inner.poll_flush(cx)
191    }
192
193    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
194        ready!(self.as_mut().flush_buf(cx))?;
195        self.project().inner.poll_close(cx)
196    }
197}
198
199#[cfg(test)]
200mod test {
201    // @@ begin test lint list maintained by maint/add_warning @@
202    #![allow(clippy::bool_assert_comparison)]
203    #![allow(clippy::clone_on_copy)]
204    #![allow(clippy::dbg_macro)]
205    #![allow(clippy::mixed_attributes_style)]
206    #![allow(clippy::print_stderr)]
207    #![allow(clippy::print_stdout)]
208    #![allow(clippy::single_char_pattern)]
209    #![allow(clippy::unwrap_used)]
210    #![allow(clippy::unchecked_duration_subtraction)]
211    #![allow(clippy::useless_vec)]
212    #![allow(clippy::needless_pass_by_value)]
213    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
214    use super::*;
215    use futures::channel::mpsc;
216    use futures::{SinkExt as _, StreamExt as _};
217    use std::pin::pin;
218    use tor_rtmock::MockRuntime;
219
220    #[test]
221    fn cases() {
222        // `test_with_various` runs with both LIFO and FIFO scheduling policies,
223        // so should interleave the sending and receiving tasks
224        // in ways that exercise the corner cases we're interested in.
225        MockRuntime::test_with_various(|runtime| async move {
226            let (tx, rx) = mpsc::channel(1);
227            let tx = SometimesUnboundedSink::new(tx);
228
229            runtime.spawn_identified("sender", async move {
230                let mut tx = pin!(tx);
231                let mut n = 0..;
232                let mut n = move || n.next().unwrap();
233
234                // unbounded when we can send right away
235                tx.as_mut().send_unbounded(n()).await.unwrap();
236                tx.as_mut().send(n()).await.unwrap();
237                tx.as_mut().send(n()).await.unwrap();
238                tx.as_mut().send(n()).await.unwrap();
239                // unbounded when we maybe can't and might queue
240                tx.as_mut().send_unbounded(n()).await.unwrap();
241                tx.as_mut().send_unbounded(n()).await.unwrap();
242                tx.as_mut().send_unbounded(n()).await.unwrap();
243                // some interleaving
244                tx.as_mut().send(n()).await.unwrap();
245                tx.as_mut().send_unbounded(n()).await.unwrap();
246                // flush
247                tx.as_mut().flush().await.unwrap();
248                // close
249                tx.as_mut().close().await.unwrap();
250            });
251
252            runtime.spawn_identified("receiver", async move {
253                let mut rx = pin!(rx);
254                let mut exp = 0..;
255
256                while let Some(n) = rx.next().await {
257                    assert_eq!(n, exp.next().unwrap());
258                }
259                assert_eq!(exp.next().unwrap(), 9);
260            });
261
262            runtime.progress_until_stalled().await;
263        });
264    }
265}