tor_async_utils/peekable_stream.rs
1//! Provides utilities for peeking at items in [`futures::Stream`].
2//!
3//! # Stability of peeked values
4//!
5//! Implementors of this trait guarantee that a peeked `Poll::Ready` result is
6//! required to remain at the head of the stream until
7//! [`futures::Stream::poll_next`] or another method requiring a `&mut`
8//! reference (and documented to potentially change the head of the stream) is
9//! called. e.g. a caller holding a `Pin<&mut Self>` that observes a Ready value
10//! via [`PeekableStream::poll_peek`] is guaranteed to observe that same value
11//! again on a subsequent call to [`PeekableStream::poll_peek`],
12//! [`futures::Stream::poll_next`], etc.
13//!
14//! This property must not be relied up on to prove *soundness*, but can be
15//! relied upon to prove correctness.
16
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20/// A stream that provides the ability to peek at the next available item.
21///
22/// This provides an alternative to interfaces and data structure that would
23/// otherwise want a [`futures::stream::Peekable<S>`], which can potentially
24/// avoid multiple layers of buffering where one would do.
25///
26/// # Tasks, waking, and calling context
27///
28/// These methods should be called by the task that is reading from the stream.
29/// If they are called by another task, the reading task would miss
30/// notifications.
31pub trait PeekableStream: futures::Stream {
32 /// Poll for an item to be ready, and then inspect it.
33 ///
34 /// Equivalent to [`futures::stream::Peekable::poll_peek`].
35 ///
36 /// Guarantees that a returned `Ready` result is stable (See "Stability ..." in
37 /// [`crate::peekable_stream`]).
38 ///
39 /// Should be called only by the task that is reading the stream (see
40 /// "Tasks ..." in [`PeekableStream`]).
41 fn poll_peek(
42 self: Pin<&mut Self>,
43 cx: &mut Context<'_>,
44 ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
45 self.poll_peek_mut(cx).map(|x| x.map(|x| &*x))
46 }
47
48 /// Poll for an item to be ready, and then inspect it.
49 ///
50 /// Equivalent to [`futures::stream::Peekable::poll_peek_mut`].
51 ///
52 /// Guarantees that a returned `Ready` result is stable (See "Stability" in
53 /// [`crate::peekable_stream`]).
54 ///
55 /// Should be called only by the task that is reading the stream (see
56 /// "Tasks ..." in [`PeekableStream`]).
57 fn poll_peek_mut(
58 self: Pin<&mut Self>,
59 cx: &mut Context<'_>,
60 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>>;
61}
62
63impl<S> PeekableStream for futures::stream::Peekable<S>
64where
65 S: futures::Stream,
66{
67 fn poll_peek(
68 self: Pin<&mut Self>,
69 cx: &mut Context<'_>,
70 ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
71 self.poll_peek(cx)
72 }
73
74 fn poll_peek_mut(
75 self: Pin<&mut Self>,
76 cx: &mut Context<'_>,
77 ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
78 self.poll_peek_mut(cx)
79 }
80}
81
82/// A stream that supports peeking without perturbing any registered waker.
83///
84/// # Tasks, waking, and calling context
85///
86/// These functions do not register the current task to be woken when an item
87/// becomes available on the stream, and ensure that the most recent task that
88/// was already registered remains so (or is woken if there was an item ready).
89///
90/// Therefore, avoiding calling (only) these functions from the task that is
91/// reading from the stream, since they will not cause the current task to be
92/// woken when an item arrives.
93///
94/// Conversely, you *may* call these function in *other* tasks, without
95/// disturbing the task which is waiting for input.
96pub trait UnobtrusivePeekableStream: futures::Stream {
97 /// Peek at the next available value, while not losing a previously
98 /// registered waker.
99 ///
100 /// Guarantees that a returned `Some` result is stable (See "Stability" in
101 /// [`crate::peekable_stream`]).
102 ///
103 /// Does not register the current task to be notified when an item becomes
104 /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
105 ///
106 /// The caller of `unobtrusive_peek` can't distinguish between a pending and terminated stream.
107 // To address this we could return value in a `Poll` but normally returning `Poll::Pending`
108 // implies a promise of future wakeup, which is precisely contrary to this function's purpose.
109 // We could address that with imprecations in the docs but people don't always read docs.
110 // We could invent a new type, but that seems quite heavyweight.
111 // We'll cross this bridge when we have a requirement for this feature.
112 fn unobtrusive_peek(self: Pin<&mut Self>) -> Option<&<Self as futures::Stream>::Item> {
113 self.unobtrusive_peek_mut().map(|x| &*x)
114 }
115
116 /// Peek at the next available value, while not losing a previously
117 /// registered waker.
118 ///
119 /// Guarantees that a returned `Some` result is stable (See "Stability" in
120 /// [`crate::peekable_stream`]).
121 ///
122 /// Does not register the current task to be notified when an item becomes
123 /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
124 ///
125 /// The caller of `unobtrusive_peek_mut` can't distinguish between a pending and terminated stream.
126 // (See comment on `unobtrusive_peek` about options if we need a caller to be able to do that.)
127 fn unobtrusive_peek_mut(self: Pin<&mut Self>) -> Option<&mut <Self as futures::Stream>::Item>;
128}