tor_async_utils/
prepare_send.rs

1//! Extension trait for using [`Sink`] more safely.
2
3use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use futures::future::FusedFuture;
9use futures::ready;
10use futures::Sink;
11use pin_project::pin_project;
12
13/// Switch to the nontrivial version of this, to get debugging output on stderr
14macro_rules! dprintln { { $f:literal $($a:tt)* } => { () } }
15//macro_rules! dprintln { { $f:literal $($a:tt)* } => { eprintln!(concat!("    ",$f) $($a)*) } }
16
17/// Extension trait for [`Sink`] to add a method for cancel-safe usage.
18pub trait SinkPrepareExt<'w, OS, OM>
19where
20    OS: Sink<OM>,
21{
22    /// For processing an item obtained from a future, avoiding async cancel lossage
23    ///
24    /// ```
25    /// # use futures::channel::mpsc;
26    /// # use tor_async_utils::SinkPrepareExt as _;
27    /// #
28    /// # #[tokio::main]
29    /// # async fn main() -> Result<(),mpsc::SendError> {
30    /// #   let (mut sink, sink_r) = mpsc::unbounded::<usize>();
31    /// #   let message_generator_future = futures::future::ready(42);
32    /// #   let process_message = |m| Ok::<_,mpsc::SendError>(m);
33    ///     let (message, sendable) = sink.prepare_send_from(
34    ///         message_generator_future
35    ///     ).await?;
36    ///     let message = process_message(message)?;
37    ///     sendable.send(message);
38    /// #   Ok(())
39    /// # }
40    /// ```
41    ///
42    /// Prepares to send a output message[^terminology] `OM` to an output sink `OS` (`self`),
43    /// where the `OM` is made from an input message `IM`,
44    /// and the `IM` is obtained from a future, `generator: IF`.
45    ///
46    /// [^terminology]: We sometimes use slightly inconsistent terminology,
47    /// "item" vs "message".
48    /// This avoids having to have the generic parameters by named `OI` and `II`
49    /// where `I` is sometimes "item" and sometimes "input".
50    ///
51    /// When successfully run, `prepare_send_from` gives `(IM, SinkSendable)`.
52    ///
53    /// After processing `IM` into `OM`,
54    /// use the [`SinkSendable`] to [`send`](SinkSendable::send) the `OM` to `OS`.
55    ///
56    /// # Why use this
57    ///
58    /// This avoids the an async cancellation hazard
59    /// which exists with naive use of `select!`
60    /// followed by `OS.send().await`.  You might write this:
61    ///
62    /// ```rust,ignore
63    /// select!{
64    ///     message = input_stream.next() => {
65    ///         if let Some(message) = message {
66    ///             let message = do_our_processing(message);
67    ///             output_sink(message).await; // <---**BUG**
68    ///         }
69    ///     }
70    ///     control = something_else() => { .. }
71    /// }
72    /// ```
73    ///
74    /// If, when we reach `BUG`, the output sink is not ready to receive the message,
75    /// the future for that particular `select!` branch will be suspended.
76    /// But when `select!` finds that *any one* of the branches is ready,
77    /// it *drops* the futures for the other branches.
78    /// That drops all the local variables, including possibly `message`, losing it.
79    ///
80    /// For more about cancellation safety, see
81    /// [Rust for the Polyglot Programmer](https://www.chiark.greenend.org.uk/~ianmdlvl/rust-polyglot/async.html#cancellation-safety)
82    /// which has a general summary, and
83    /// Matthias Einwag's
84    /// [extensive discussion in his gist](https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8#cancellation-in-async-rust)
85    /// with comparisons to other languages.
86    ///
87    /// ## Alternatives
88    ///
89    /// Unbounded mpsc channels, and certain other primitives,
90    /// do not suffer from this problem because they do not block.
91    /// `UnboundedSender` offers
92    /// [`unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send)
93    /// but only as an inherent method, so this does not compose with `Sink` combinators.
94    /// And of course unbounded channels do not implement any backpressure.
95    ///
96    /// The problem can otherwise be avoided by completely eschewing use of `select!`
97    /// and writing manual implementations of `Future`, `Sink`, and so on,
98    /// However, such code is typically considerably more complex and involves
99    /// entangling the primary logic with future machinery.
100    /// It is normally better to write primary functionality in `async { }`
101    /// using utilities (often "futures combinators") such as this one.
102    ///
103    // Personal note from @Diziet:
104    // IMO it is generally accepted in the Rust community that
105    // it is not good practice to write principal code at the manual futues level.
106    // However, I have not been able to find very clear support for this proposition.
107    // There are endless articles explaining how futures work internally,
108    // often by describing how to reimplement standard combinators such as `map`.
109    // ISTM that these exist to help understanding,
110    // but it seems to be only rarely stated that doing this is not generally a good idea.
111    //
112    // I did find the following:
113    //
114    //  https://dev.to/mindflavor/rust-futures-an-uneducated-short-and-hopefully-not-boring-tutorial---part-4---a-real-future-from-scratch-734#conclusion
115    //
116    //    Of course you generally do not write a future manually. You use the ones provided by
117    //    libraries and compose them as needed. It's important to understand how they work
118    //    nevertheless.
119    //
120    // And of curse the existence of the `futures` crate is indicative:
121    // it consists almost entirely of combinators and utilities
122    // whose purpose is to allow you to write many structures in async code
123    // without needing to resort to manual future impls.
124    //
125    /// # Example
126    ///
127    /// This comprehensive example demonstrates how to read from possibly multiple sources
128    /// and also be able to process other events:
129    ///
130    /// ```
131    /// # #[tokio::main]
132    /// # async fn main() {
133    /// use futures::select;
134    /// use futures::{SinkExt as _, StreamExt as _};
135    /// use tor_async_utils::SinkPrepareExt as _;
136    ///
137    /// let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::<usize>();
138    /// let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::<String>();
139    /// input_w.send(42).await;
140    /// select!{
141    ///     ret = output_w.prepare_send_from(async {
142    ///         select!{
143    ///             got_input = input_r.next() => got_input.expect("input stream ended!"),
144    ///             () = futures::future::pending() => panic!(), // other branches are OK here
145    ///         }
146    ///     }) => {
147    ///         let (input_msg, sendable) = ret.unwrap();
148    ///         let output_msg = input_msg.to_string();
149    ///         let () = sendable.send(output_msg).unwrap();
150    ///     },
151    ///     () = futures::future::pending() => panic!(), // other branches are OK here
152    /// }
153    ///
154    /// assert_eq!(output_r.next().await.unwrap(), "42");
155    /// # }
156    /// ```
157    ///
158    /// # Formally
159    ///
160    /// [`prepare_send_from`](SinkPrepareExt::prepare_send_from)
161    /// returns a [`SinkPrepareSendFuture`] which, when awaited:
162    ///
163    ///  * Waits for `OS` to be ready to receive an item.
164    ///  * Runs `message_generator` to obtain a `IM`.
165    ///  * Returns the `IM` (for processing), and a [`SinkSendable`].
166    ///
167    /// The caller should then:
168    ///
169    ///  * Check the error from `prepare_send_from`
170    ///    (which came from the *output* sink).
171    ///  * Process the `IM`, making an `OM` out of it.
172    ///  * Call [`sendable.send()`](SinkSendable::send) (and check its error).
173    ///
174    /// # Flushing
175    ///
176    /// `prepare_send_from` will (when awaited)
177    /// [`flush`](futures::SinkExt::flush) the output sink
178    /// when it finds the input is not ready yet.
179    /// Until then items may be buffered
180    /// (as if they had been written with [`feed`](futures::SinkExt::feed)).
181    ///
182    /// # Errors
183    ///
184    /// ## Output sink errors
185    ///
186    /// The call site can experience output sink errors in two places,
187    /// [`prepare_send_from()`](SinkPrepareExt::prepare_send_from) and [`SinkSendable::send()`].
188    /// The caller should typically handle them the same way regardless of when they occurred.
189    ///
190    /// If the error happens at [`SinkSendable::send()`],
191    /// the call site will usually be forced to discard the item being processed.
192    /// This will only occur if the sink is actually broken.
193    ///
194    /// ## Errors specific to the call site: faillible input, and fallible processing
195    ///
196    /// At some call sites, the input future may yield errors
197    /// (perhaps it is reading from a `Stream` of [`Result`]s).
198    /// in that case the value from the input future will be a [`Result`].
199    /// Then `IM` is a `Result`, and is provided in the `.0` element
200    /// of the "successful" return from `prepare_send_from`.
201    ///
202    /// And, at some call sites, the processing of an `IM` into an `OM` is fallible.
203    ///
204    /// Handling these latter two error caess is up to the caller,
205    /// in the code which processes `IM`.
206    /// The call site will often want to deal with such an error
207    /// without sending anything into the output sink,
208    /// and can then just drop the [`SinkSendable`].
209    ///
210    /// # Implementations
211    ///
212    /// This is an extension trait and you are not expected to need to implement it.
213    ///
214    /// There are provided implementations for `Pin<&mut impl Sink>`
215    /// and `&mut impl Sink + Unpin`, for your convenience.
216    fn prepare_send_from<IF, IM>(
217        self,
218        message_generator: IF,
219    ) -> SinkPrepareSendFuture<'w, IF, OS, OM>
220    where
221        IF: Future<Output = IM>;
222}
223
224impl<'w, OS, OM> SinkPrepareExt<'w, OS, OM> for Pin<&'w mut OS>
225where
226    OS: Sink<OM>,
227{
228    fn prepare_send_from<'r, IF, IM>(
229        self,
230        message_generator: IF,
231    ) -> SinkPrepareSendFuture<'w, IF, OS, OM>
232    where
233        IF: Future<Output = IM>,
234    {
235        SinkPrepareSendFuture {
236            output: Some(self),
237            generator: message_generator,
238            tw: PhantomData,
239        }
240    }
241}
242
243impl<'w, OS, OM> SinkPrepareExt<'w, OS, OM> for &'w mut OS
244where
245    OS: Sink<OM> + Unpin,
246{
247    fn prepare_send_from<'r, IF, IM>(
248        self,
249        message_generator: IF,
250    ) -> SinkPrepareSendFuture<'w, IF, OS, OM>
251    where
252        IF: Future<Output = IM>,
253    {
254        Pin::new(self).prepare_send_from(message_generator)
255    }
256}
257
258/// Future for `SinkPrepareExt::prepare_send_from`
259#[pin_project]
260#[must_use]
261pub struct SinkPrepareSendFuture<'w, IF, OS, OM> {
262    /// Underlying future that will yield a message.
263    #[pin]
264    generator: IF,
265
266    /// This Option exists because otherwise SinkPrepareSendFuture::poll()
267    /// can't move `output` out of this struct to put it into the `SinkSendable`.
268    /// (The poll() impl cannot borrow from SinkPrepareSendFuture.)
269    output: Option<Pin<&'w mut OS>>,
270
271    /// `fn(OM)` gives contravariance in OM.
272    ///
273    /// Variance is confusing.
274    /// Loosely, a SinkPrepareSendFuture<..OM> consumes an OM.
275    /// Actually, we don't really need to add any variance restricions wrt OM,
276    /// because the &mut OS already implies the correct variance,
277    /// so we could have used the PhantomData<fn(*const OM)> trick.
278    /// Happily there is no unsafe anywhere nearby, so it is not possible for us to write
279    /// a bug due to getting the variance wrong - only to erroneously prevent some use
280    /// case.
281    tw: PhantomData<fn(OM)>,
282}
283
284/// A [`Sink`] which is ready to receive an item
285///
286/// Produced by [`SinkPrepareExt::prepare_send_from`].  See there for the overview docs.
287///
288/// This references an output sink `OS`.
289/// It offers the ability to write into the sink without blocking,
290/// (and constitutes a proof token that the sink has declared itself ready for that).
291///
292/// The only useful method is [`send`](SinkSendable::send).
293///
294/// `SinkSendable` has no drop glue and can be freely dropped,
295/// for example if you prepare to send a message and then
296/// encounter an error when producing the output message.
297#[must_use]
298pub struct SinkSendable<'w, OS, OM> {
299    /// Reference to underlying output sink.
300    output: Pin<&'w mut OS>,
301    /// Marker to ensure that `OM` is used.
302    tw: PhantomData<fn(OM)>,
303}
304
305impl<'w, IF, OS, IM, OM> Future for SinkPrepareSendFuture<'w, IF, OS, OM>
306where
307    IF: Future<Output = IM>,
308    OS: Sink<OM>,
309{
310    type Output = Result<(IM, SinkSendable<'w, OS, OM>), OS::Error>;
311
312    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
313        let mut self_ = self.project();
314
315        /// returns `&mut Pin<&'w mut OS>` from self_.output
316        //
317        // macro because the closure's type parameters would be unnameable.
318        macro_rules! get_output {
319            ($self_:expr) => {
320                $self_.output.as_mut().expect(BAD_POLL_MSG).as_mut()
321            };
322        }
323        /// Message to give when panicking because of improper extra poll.
324        const BAD_POLL_MSG: &str =
325            "future from SinkPrepareExt::prepare_send_from (SinkPrepareSendFuture) \
326                 polled after returning Ready(Ok)";
327
328        let () = match ready!(get_output!(self_).poll_ready(cx)) {
329            Err(e) => {
330                dprintln!("poll: output poll = IF.Err    SO  IF.Err");
331                // Deliberately don't fuse by `take`ing output.  If we did that, we would expose
332                // our caller to an additional panic risk.  There is no harm in polling the output
333                // sink again: although `Sink` documents that a sink that returns errors will
334                // probably continue to do so, it is not forbidden to try it and see.  This is in
335                // any case better than definitely crashing if the `SinkPrepareSendFuture` is
336                // polled after it gave Ready.
337                return Poll::Ready(Err(e));
338            }
339            Ok(()) => {
340                dprintln!("poll: output poll = IF.Ok     calling generator");
341            }
342        };
343
344        let value = match self_.generator.as_mut().poll(cx) {
345            Poll::Pending => {
346                // We defer flushing the output until the input stops yielding.
347                // This allows our caller (which is typically a loop) to transfer multiple
348                // items from their input to their output between flushes.
349                //
350                // But we must not return `Pending` without flushing, or the caller could block
351                // without flushing output, leading to untimely delivery of buffered data.
352                dprintln!("poll: generator = Pending     calling output flush");
353                let flushed = get_output!(self_).poll_flush(cx);
354                return match flushed {
355                    Poll::Ready(Err(e)) => {
356                        dprintln!("poll: output flush = IF.Err   SO  IF.Err");
357                        Poll::Ready(Err(e))
358                    }
359                    Poll::Ready(Ok(())) => {
360                        dprintln!("poll: output flush = IF.Ok    SO  Pending");
361                        Poll::Pending
362                    }
363                    Poll::Pending => {
364                        dprintln!("poll: output flush = Pending  SO  Pending");
365                        Poll::Pending
366                    }
367                };
368            }
369            Poll::Ready(v) => {
370                dprintln!("poll: generator = Ready       SO  IF.Ok");
371                v
372            }
373        };
374
375        let sendable = SinkSendable {
376            output: self_.output.take().expect(BAD_POLL_MSG),
377            tw: PhantomData,
378        };
379
380        Poll::Ready(Ok((value, sendable)))
381    }
382}
383
384impl<'w, IF, OS, IM, OM> FusedFuture for SinkPrepareSendFuture<'w, IF, OS, OM>
385where
386    IF: Future<Output = IM>,
387    OS: Sink<OM>,
388{
389    fn is_terminated(&self) -> bool {
390        let r = self.output.is_none();
391        dprintln!("is_terminated = {}", r);
392        r
393    }
394}
395
396impl<'w, OS, OM> SinkSendable<'w, OS, OM>
397where
398    OS: Sink<OM>,
399{
400    /// Synchronously send an item into `OS`, which is a [`Sink`]
401    ///
402    /// Can fail if the sink `OS` reports an error.
403    ///
404    /// (However, the existence of the `SinkSendable` demonstrates that
405    /// the sink reported itself ready for sending,
406    /// so this call is synchronous, avoiding cancellation hazards.)
407    pub fn send(self, item: OM) -> Result<(), OS::Error> {
408        dprintln!("send ...");
409        let r = self.output.start_send(item);
410        dprintln!("send: {:?}", r.as_ref().map_err(|_| (())));
411        r
412    }
413}
414
415#[cfg(test)]
416mod test {
417    // @@ begin test lint list maintained by maint/add_warning @@
418    #![allow(clippy::bool_assert_comparison)]
419    #![allow(clippy::clone_on_copy)]
420    #![allow(clippy::dbg_macro)]
421    #![allow(clippy::mixed_attributes_style)]
422    #![allow(clippy::print_stderr)]
423    #![allow(clippy::print_stdout)]
424    #![allow(clippy::single_char_pattern)]
425    #![allow(clippy::unwrap_used)]
426    #![allow(clippy::unchecked_duration_subtraction)]
427    #![allow(clippy::useless_vec)]
428    #![allow(clippy::needless_pass_by_value)]
429    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
430
431    use super::*;
432    use futures::channel::mpsc;
433    use futures::future::poll_fn;
434    use futures::select_biased;
435    use futures::SinkExt as _;
436    use futures_await_test::async_test;
437    use std::convert::Infallible;
438    use std::sync::Arc;
439    use std::sync::Mutex;
440
441    #[async_test]
442    async fn prepare_send() {
443        // Early versions of this used unfold quite a lot more, but it is not really
444        // convenient for testing.  It buffers one item internally, and is also buggy:
445        //   https://github.com/rust-lang/futures-rs/issues/2600
446        // So we use mpsc channels, which (perhaps with buffering) are quite controllable.
447
448        // The eprintln!("FOR ...") calls correspond go the dprintln1() calls in the impl,
449        // and can check that each code path in the implementation is used,
450        // by turning on the dbug and using `--nocapture`.
451        {
452            eprintln!("-- disconnected ---");
453            eprintln!("FOR poll: output poll = IF.Err    SO  IF.Err");
454            let (mut w, r) = mpsc::unbounded::<usize>();
455            drop(r);
456            let ret = w.prepare_send_from(async { Ok::<_, Infallible>(12) }).await;
457            assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
458        }
459
460        {
461            eprintln!("-- buffered late disconnect --");
462            eprintln!("FOR poll: output poll = IF.Ok     calling generator");
463            eprintln!("FOR poll: output flush = IF.Err   SO  IF.Err");
464            let (w, r) = mpsc::unbounded::<usize>();
465            let mut w = w.buffer(10);
466            let mut r = Some(r);
467            w.feed(66).await.unwrap();
468            let ret = w
469                .prepare_send_from(poll_fn(move |_cx| {
470                    drop(r.take());
471                    Poll::Pending::<usize>
472                }))
473                .await;
474            assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
475        }
476
477        {
478            eprintln!("-- flushing before wait --");
479            eprintln!("FOR poll: output flush = IF.Ok    SO  Pending");
480            let (mut w, _r) = mpsc::unbounded::<usize>();
481            let () = select_biased! {
482                _ = w.prepare_send_from(poll_fn(
483                    move |_cx| {
484                        Poll::Pending::<usize>
485                    }
486                )) => panic!(),
487                _ = futures::future::ready(()) => { },
488            };
489        }
490
491        {
492            eprintln!("-- flush before wait is pending --");
493            eprintln!("FOR poll: output flush = Pending  SO  Pending");
494            let (mut w, _r) = mpsc::channel::<usize>(0);
495            let () = w.feed(77).await.unwrap();
496            let mut w = w.buffer(10);
497            let () = select_biased! {
498                _ = w.prepare_send_from(poll_fn(
499                    move |_cx| {
500                        Poll::Pending::<usize>
501                    }
502                )) => panic!(),
503                _ = futures::future::ready(()) => { },
504            };
505        }
506
507        {
508            eprintln!("-- flush before wait is pending --");
509            eprintln!("FOR poll: generator = Ready       SO  IF.Ok");
510            eprintln!("FOR send ...");
511            eprintln!("ALSO check that bufferinrg works as expected");
512
513            let sunk = Arc::new(Mutex::new(vec![]));
514            let unfold = futures::sink::unfold((), |(), v| {
515                let sunk = sunk.clone();
516                async move {
517                    dbg!();
518                    sunk.lock().unwrap().push(v);
519                    Ok::<_, Infallible>(())
520                }
521            });
522            let mut unfold = Box::pin(unfold.buffer(10));
523            for v in [42, 43] {
524                // We can only do two here because that's how many we can actually buffer in Buffer
525                // and Unfold.  Because our closure is always ready, the buffering isn't actually
526                // as copious as all that.  This is fine, because the point of this test is to test
527                // *flushing*.
528                dbg!(v);
529                let ret = unfold
530                    .prepare_send_from(async move { Ok::<_, Infallible>(v) })
531                    .await;
532                let (msg, sendable) = ret.unwrap();
533                let msg = msg.unwrap();
534                assert_eq!(msg, v);
535                let () = sendable.send(msg).unwrap();
536                let expect: &[u8] = &[];
537                assert_eq!(*sunk.lock().unwrap(), expect); // It's still buffered
538            }
539            select_biased! {
540                _ = unfold.prepare_send_from(futures::future::pending::<()>()) => panic!(),
541                _ = futures::future::ready(()) => { },
542            };
543            assert_eq!(*sunk.lock().unwrap(), &[42, 43]);
544        }
545    }
546}