1
//! Provides utilities for peeking at items in [`futures::Stream`].
2
//!
3
//! # Stability of peeked values
4
//!
5
//! Implementors of this trait guarantee that a peeked `Poll::Ready` result is
6
//! required to remain at the head of the stream until
7
//! [`futures::Stream::poll_next`] or another method requiring a `&mut`
8
//! reference (and documented to potentially change the head of the stream) is
9
//! called. e.g. a caller holding a `Pin<&mut Self>` that observes a Ready value
10
//! via [`PeekableStream::poll_peek`] is guaranteed to observe that same value
11
//! again on a subsequent call to [`PeekableStream::poll_peek`],
12
//! [`futures::Stream::poll_next`], etc.
13
//!
14
//! This property must not be relied up on to prove *soundness*, but can be
15
//! relied upon to prove correctness.
16

            
17
use std::pin::Pin;
18
use std::task::{Context, Poll};
19

            
20
/// A stream that provides the ability to peek at the next available item.
21
///
22
/// This provides an alternative to interfaces and data structure that would
23
/// otherwise want a [`futures::stream::Peekable<S>`], which can potentially
24
/// avoid multiple layers of buffering where one would do.
25
///
26
/// # Tasks, waking, and calling context
27
///
28
/// These methods should be called by the task that is reading from the stream.
29
/// If they are called by another task, the reading task would miss
30
/// notifications.
31
pub trait PeekableStream: futures::Stream {
32
    /// Poll for an item to be ready, and then inspect it.
33
    ///
34
    /// Equivalent to [`futures::stream::Peekable::poll_peek`].
35
    ///
36
    /// Guarantees that a returned `Ready` result is stable (See "Stability ..." in
37
    /// [`crate::peekable_stream`]).
38
    ///
39
    /// Should be called only by the task that is reading the stream (see
40
    /// "Tasks ..." in [`PeekableStream`]).
41
10546
    fn poll_peek(
42
10546
        self: Pin<&mut Self>,
43
10546
        cx: &mut Context<'_>,
44
10546
    ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
45
15748
        self.poll_peek_mut(cx).map(|x| x.map(|x| &*x))
46
10546
    }
47

            
48
    /// Poll for an item to be ready, and then inspect it.
49
    ///
50
    /// Equivalent to [`futures::stream::Peekable::poll_peek_mut`].
51
    ///
52
    /// Guarantees that a returned `Ready` result is stable (See "Stability" in
53
    /// [`crate::peekable_stream`]).
54
    ///
55
    /// Should be called only by the task that is reading the stream (see
56
    /// "Tasks ..." in [`PeekableStream`]).
57
    fn poll_peek_mut(
58
        self: Pin<&mut Self>,
59
        cx: &mut Context<'_>,
60
    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>>;
61
}
62

            
63
impl<S> PeekableStream for futures::stream::Peekable<S>
64
where
65
    S: futures::Stream,
66
{
67
196
    fn poll_peek(
68
196
        self: Pin<&mut Self>,
69
196
        cx: &mut Context<'_>,
70
196
    ) -> Poll<Option<&<Self as futures::Stream>::Item>> {
71
196
        self.poll_peek(cx)
72
196
    }
73

            
74
    fn poll_peek_mut(
75
        self: Pin<&mut Self>,
76
        cx: &mut Context<'_>,
77
    ) -> Poll<Option<&mut <Self as futures::Stream>::Item>> {
78
        self.poll_peek_mut(cx)
79
    }
80
}
81

            
82
/// A stream that supports peeking without perturbing any registered waker.
83
///
84
/// # Tasks, waking, and calling context
85
///
86
/// These functions do not register the current task to be woken when an item
87
/// becomes available on the stream, and ensure that the most recent task that
88
/// was already registered remains so (or is woken if there was an item ready).
89
///
90
/// Therefore, avoiding calling (only) these functions from the task that is
91
/// reading from the stream, since they will not cause the current task to be
92
/// woken when an item arrives.
93
///
94
/// Conversely, you *may* call these function in *other* tasks, without
95
/// disturbing the task which is waiting for input.
96
pub trait UnobtrusivePeekableStream: futures::Stream {
97
    /// Peek at the next available value, while not losing a previously
98
    /// registered waker.
99
    ///
100
    /// Guarantees that a returned `Some` result is stable (See "Stability" in
101
    /// [`crate::peekable_stream`]).
102
    ///
103
    /// Does not register the current task to be notified when an item becomes
104
    /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
105
    ///
106
    /// The caller of `unobtrusive_peek` can't distinguish between a pending and terminated stream.
107
    // To address this we could return value in a `Poll` but normally returning `Poll::Pending`
108
    // implies a promise of future wakeup, which is precisely contrary to this function's purpose.
109
    // We could address that with imprecations in the docs but people don't always read docs.
110
    // We could invent a new type, but that seems quite heavyweight.
111
    // We'll cross this bridge when we have a requirement for this feature.
112
5208
    fn unobtrusive_peek(self: Pin<&mut Self>) -> Option<&<Self as futures::Stream>::Item> {
113
7793
        self.unobtrusive_peek_mut().map(|x| &*x)
114
5208
    }
115

            
116
    /// Peek at the next available value, while not losing a previously
117
    /// registered waker.
118
    ///
119
    /// Guarantees that a returned `Some` result is stable (See "Stability" in
120
    /// [`crate::peekable_stream`]).
121
    ///
122
    /// Does not register the current task to be notified when an item becomes
123
    /// available (see "Tasks ..." in [`UnobtrusivePeekableStream`]).
124
    ///
125
    /// The caller of `unobtrusive_peek_mut` can't distinguish between a pending and terminated stream.
126
    // (See comment on `unobtrusive_peek` about options if we need a caller to be able to do that.)
127
    fn unobtrusive_peek_mut(self: Pin<&mut Self>) -> Option<&mut <Self as futures::Stream>::Item>;
128
}