tor_async_utils/
peekable_stream.rs

1//! Provides utilities for peeking at items in [`futures::Stream`].
2//!
3//! # Stability of peeked values
4//!
5//! Implementors of this trait guarantee that a peeked `Poll::Ready` result is
6//! required to remain at the head of the stream until
7//! [`futures::Stream::poll_next`] or another method requiring a `&mut`
8//! reference (and documented to potentially change the head of the stream) is
9//! called. e.g. a caller holding a `Pin<&mut Self>` that observes a Ready value
10//! via [`PeekableStream::poll_peek`] is guaranteed to observe that same value
11//! again on a subsequent call to [`PeekableStream::poll_peek`],
12//! [`futures::Stream::poll_next`], etc.
13//!
14//! This property must not be relied up on to prove *soundness*, but can be
15//! relied upon to prove correctness.
16
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20/// A stream that provides the ability to peek at the next available item.
21///
22/// This provides an alternative to interfaces and data structure that would
23/// otherwise want a [`futures::stream::Peekable<S>`], which can potentially
24/// avoid multiple layers of buffering where one would do.
25///
26/// # Tasks, waking, and calling context
27///
28/// These methods should be called by the task that is reading from the stream.
29/// If they are called by another task, the reading task would miss
30/// notifications.
31pub trait PeekableStream: futures::Stream {
32    /// Poll for an item to be ready, and then inspect it.
33    ///
34    /// Equivalent to [`futures::stream::Peekable::poll_peek`].
35    ///
36    /// Guarantees that a returned `Ready` result is stable (See "Stability ..." in
37    /// [`crate::peekable_stream`]).
38    ///
39    /// Should be called only by the task that is reading the stream (see
40    /// "Tasks ..." in [`PeekableStream`]).
41    fn poll_peek(
42        self: Pin<&mut Self>,
43        cx: &mut Context<'_>,
44    ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
45        self.poll_peek_mut(cx).map(|x| x.map(|x| &*x))
46    }
47
48    /// Poll for an item to be ready, and then inspect it.
49    ///
50    /// Equivalent to [`futures::stream::Peekable::poll_peek_mut`].
51    ///
52    /// Guarantees that a returned `Ready` result is stable (See "Stability" in
53    /// [`crate::peekable_stream`]).
54    ///
55    /// Should be called only by the task that is reading the stream (see
56    /// "Tasks ..." in [`PeekableStream`]).
57    fn poll_peek_mut(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>>;
61}
62
63impl<S> PeekableStream for futures::stream::Peekable<S>
64where
65    S: futures::Stream,
66{
67    fn poll_peek(
68        self: Pin<&mut Self>,
69        cx: &mut Context<'_>,
70    ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
71        self.poll_peek(cx)
72    }
73
74    fn poll_peek_mut(
75        self: Pin<&mut Self>,
76        cx: &mut Context<'_>,
77    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
78        self.poll_peek_mut(cx)
79    }
80}
81
82/// A stream that supports peeking without perturbing any registered waker.
83///
84/// # Tasks, waking, and calling context
85///
86/// These functions do not register the current task to be woken when an item
87/// becomes available on the stream, and ensure that the most recent task that
88/// was already registered remains so (or is woken if there was an item ready).
89///
90/// Therefore, avoiding calling (only) these functions from the task that is
91/// reading from the stream, since they will not cause the current task to be
92/// woken when an item arrives.
93///
94/// Conversely, you *may* call these function in *other* tasks, without
95/// disturbing the task which is waiting for input.
96pub trait UnobtrusivePeekableStream: futures::Stream {
97    /// Peek at the next available value, while not losing a previously
98    /// registered waker.
99    ///
100    /// Guarantees that a returned `Some` result is stable (See "Stability" in
101    /// [`crate::peekable_stream`]).
102    ///
103    /// Does not register the current task to be notified when an item becomes
104    /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
105    ///
106    /// The caller of `unobtrusive_peek` can't distinguish between a pending and terminated stream.
107    // To address this we could return value in a `Poll` but normally returning `Poll::Pending`
108    // implies a promise of future wakeup, which is precisely contrary to this function's purpose.
109    // We could address that with imprecations in the docs but people don't always read docs.
110    // We could invent a new type, but that seems quite heavyweight.
111    // We'll cross this bridge when we have a requirement for this feature.
112    fn unobtrusive_peek(self: Pin<&mut Self>) -> Option<&<Self as futures::Stream>::Item> {
113        self.unobtrusive_peek_mut().map(|x| &*x)
114    }
115
116    /// Peek at the next available value, while not losing a previously
117    /// registered waker.
118    ///
119    /// Guarantees that a returned `Some` result is stable (See "Stability" in
120    /// [`crate::peekable_stream`]).
121    ///
122    /// Does not register the current task to be notified when an item becomes
123    /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
124    ///
125    /// The caller of `unobtrusive_peek_mut` can't distinguish between a pending and terminated stream.
126    // (See comment on `unobtrusive_peek` about options if we need a caller to be able to do that.)
127    fn unobtrusive_peek_mut(self: Pin<&mut Self>) -> Option<&mut <Self as futures::Stream>::Item>;
128}