tor_proto/stream/
raw.rs

1//! Declare the lowest level of stream: a stream that operates on raw
2//! cells.
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::Stream;
8use pin_project::pin_project;
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10use tracing::debug;
11
12use crate::congestion::sendme;
13use crate::tunnel::circuit::StreamMpscReceiver;
14use crate::tunnel::StreamTarget;
15use crate::{Error, Result};
16
17/// The read part of a stream on a particular circuit.
18///
19/// This [`Stream`](Stream) will return incoming messages for this Tor stream, excluding flow control
20/// related messages like SENDME, XON, and XOFF.
21///
22/// To avoid ambiguity, the following uses "stream" to refer to the `futures::Stream`, not the Tor
23/// stream.
24///
25/// If the stream ends unexpectedly (before an END message), the stream will return an error.
26/// After the stream returns an END message or an error, this stream will be "terminated" and future
27/// [`poll_next`](Stream::poll_next) calls will return `None`.
28// I think it would be better to *not* return an error if the stream ends before an END message is
29// received, and just return `None`. The caller will know if it received an END message or not, so
30// returning an error isn't very useful and is maybe unexpected.
31#[derive(Debug)]
32#[pin_project]
33pub struct StreamReceiver {
34    /// The underlying `StreamTarget` for this stream.
35    ///
36    /// A reader has this target in order to:
37    ///   * Make the reactor send SENDME messages.
38    ///   * Tell the reactor when there is a protocol error.
39    ///   * Keep the stream alive at least until the StreamReceiver
40    ///     is dropped.
41    pub(crate) target: StreamTarget,
42    /// Channel to receive stream messages from the reactor.
43    #[pin]
44    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
45    /// Congestion control receive window for this stream.
46    ///
47    /// Having this here means we're only going to update it when the end consumer of this stream
48    /// actually reads things, meaning we don't ask for more data until it's actually needed (as
49    /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
50    /// with having to buffer data).
51    pub(crate) recv_window: sendme::StreamRecvWindow,
52    /// Whether or not this stream has ended.
53    pub(crate) ended: bool,
54}
55
56impl StreamReceiver {
57    /// Try to read the next relay message from this stream.
58    fn poll_next_inner(
59        mut self: Pin<&mut Self>,
60        cx: &mut Context<'_>,
61    ) -> Result<Poll<UnparsedRelayMsg>> {
62        let msg = match self.as_mut().project().receiver.poll_next(cx) {
63            Poll::Ready(Some(msg)) => msg,
64            Poll::Ready(None) => {
65                // The channel is indicating that it has terminated, likely from a dropped sender.
66                // But if we're here, it means we never received an END cell.
67                // I don't think this is unexpected, since a circuit may be destroyed before the
68                // peer sends an END message.
69                // TODO: Is there a better message or error variant we could provide here?
70                return Err(Error::StreamProto(
71                    "stream channel disappeared without END cell?".into(),
72                ));
73            }
74            Poll::Pending => return Ok(Poll::Pending),
75        };
76
77        if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
78            if let Err(e) = self.target.send_sendme() {
79                if matches!(e, Error::CircuitClosed) {
80                    // If the tunnel has closed, sending a message to the tunnel reactor may fail.
81                    // But this is okay. We still want the user to be able to continue reading the
82                    // remaining queued data for this stream, and if the tunnel has closed it
83                    // wouldn't make sense to send a SENDME message anyways.
84                    debug!("Failed to send stream-level SENDME. Ignoring: {e}");
85                } else {
86                    // This error is unexpected. Let's return it to the user.
87                    return Err(e);
88                }
89            }
90            self.recv_window.put();
91        }
92
93        Ok(Poll::Ready(msg))
94    }
95
96    /// Shut down this stream.
97    pub fn protocol_error(&mut self) {
98        self.target.protocol_error();
99    }
100}
101
102impl Stream for StreamReceiver {
103    type Item = Result<UnparsedRelayMsg>;
104
105    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106        if self.ended {
107            // Prevent reading more messages from streams after they've ended. `None` indicates that
108            // the stream is complete/terminated.
109            return Poll::Ready(None);
110        }
111
112        match self.as_mut().poll_next_inner(cx) {
113            Ok(Poll::Pending) => Poll::Pending,
114            Ok(Poll::Ready(msg)) => {
115                if msg.cmd() == RelayCmd::END {
116                    // We return the END cell, and future polls will return `None`.
117                    self.ended = true;
118                }
119                Poll::Ready(Some(Ok(msg)))
120            }
121            Err(e) => {
122                // We return the error, and future polls will return `None`.
123                self.ended = true;
124                Poll::Ready(Some(Err(e)))
125            }
126        }
127    }
128}