tor_proto/stream/raw.rs
1//! Declare the lowest level of stream: a stream that operates on raw
2//! cells.
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::stream::Stream;
8use pin_project::pin_project;
9use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
10use tracing::debug;
11
12use crate::congestion::sendme;
13use crate::tunnel::circuit::StreamMpscReceiver;
14use crate::tunnel::StreamTarget;
15use crate::{Error, Result};
16
17/// The read part of a stream on a particular circuit.
18///
19/// This [`Stream`](Stream) will return incoming messages for this Tor stream, excluding flow control
20/// related messages like SENDME, XON, and XOFF.
21///
22/// To avoid ambiguity, the following uses "stream" to refer to the `futures::Stream`, not the Tor
23/// stream.
24///
25/// If the stream ends unexpectedly (before an END message), the stream will return an error.
26/// After the stream returns an END message or an error, this stream will be "terminated" and future
27/// [`poll_next`](Stream::poll_next) calls will return `None`.
28// I think it would be better to *not* return an error if the stream ends before an END message is
29// received, and just return `None`. The caller will know if it received an END message or not, so
30// returning an error isn't very useful and is maybe unexpected.
31#[derive(Debug)]
32#[pin_project]
33pub struct StreamReceiver {
34 /// The underlying `StreamTarget` for this stream.
35 ///
36 /// A reader has this target in order to:
37 /// * Make the reactor send SENDME messages.
38 /// * Tell the reactor when there is a protocol error.
39 /// * Keep the stream alive at least until the StreamReceiver
40 /// is dropped.
41 pub(crate) target: StreamTarget,
42 /// Channel to receive stream messages from the reactor.
43 #[pin]
44 pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
45 /// Congestion control receive window for this stream.
46 ///
47 /// Having this here means we're only going to update it when the end consumer of this stream
48 /// actually reads things, meaning we don't ask for more data until it's actually needed (as
49 /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
50 /// with having to buffer data).
51 pub(crate) recv_window: sendme::StreamRecvWindow,
52 /// Whether or not this stream has ended.
53 pub(crate) ended: bool,
54}
55
56impl StreamReceiver {
57 /// Try to read the next relay message from this stream.
58 fn poll_next_inner(
59 mut self: Pin<&mut Self>,
60 cx: &mut Context<'_>,
61 ) -> Result<Poll<UnparsedRelayMsg>> {
62 let msg = match self.as_mut().project().receiver.poll_next(cx) {
63 Poll::Ready(Some(msg)) => msg,
64 Poll::Ready(None) => {
65 // The channel is indicating that it has terminated, likely from a dropped sender.
66 // But if we're here, it means we never received an END cell.
67 // I don't think this is unexpected, since a circuit may be destroyed before the
68 // peer sends an END message.
69 // TODO: Is there a better message or error variant we could provide here?
70 return Err(Error::StreamProto(
71 "stream channel disappeared without END cell?".into(),
72 ));
73 }
74 Poll::Pending => return Ok(Poll::Pending),
75 };
76
77 if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
78 if let Err(e) = self.target.send_sendme() {
79 if matches!(e, Error::CircuitClosed) {
80 // If the tunnel has closed, sending a message to the tunnel reactor may fail.
81 // But this is okay. We still want the user to be able to continue reading the
82 // remaining queued data for this stream, and if the tunnel has closed it
83 // wouldn't make sense to send a SENDME message anyways.
84 debug!("Failed to send stream-level SENDME. Ignoring: {e}");
85 } else {
86 // This error is unexpected. Let's return it to the user.
87 return Err(e);
88 }
89 }
90 self.recv_window.put();
91 }
92
93 Ok(Poll::Ready(msg))
94 }
95
96 /// Shut down this stream.
97 pub fn protocol_error(&mut self) {
98 self.target.protocol_error();
99 }
100}
101
102impl Stream for StreamReceiver {
103 type Item = Result<UnparsedRelayMsg>;
104
105 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106 if self.ended {
107 // Prevent reading more messages from streams after they've ended. `None` indicates that
108 // the stream is complete/terminated.
109 return Poll::Ready(None);
110 }
111
112 match self.as_mut().poll_next_inner(cx) {
113 Ok(Poll::Pending) => Poll::Pending,
114 Ok(Poll::Ready(msg)) => {
115 if msg.cmd() == RelayCmd::END {
116 // We return the END cell, and future polls will return `None`.
117 self.ended = true;
118 }
119 Poll::Ready(Some(Ok(msg)))
120 }
121 Err(e) => {
122 // We return the error, and future polls will return `None`.
123 self.ended = true;
124 Poll::Ready(Some(Err(e)))
125 }
126 }
127 }
128}