tor_proto/stream/
raw.rs

1//! Declare the lowest level of stream: a stream that operates on raw
2//! cells.
3
4use crate::congestion::sendme;
5use crate::tunnel::StreamTarget;
6use crate::{Error, Result};
7use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
8
9use crate::tunnel::circuit::StreamMpscReceiver;
10use futures::stream::StreamExt;
11
12/// The read part of a stream on a particular circuit.
13#[derive(Debug)]
14pub struct StreamReader {
15    /// The underlying `StreamTarget` for this stream.
16    ///
17    /// A reader has this target in order to:
18    ///   * Make the reactor send SENDME messages.
19    ///   * Tell the reactor when there is a protocol error.
20    ///   * Keep the stream alive at least until the StreamReader
21    ///     is dropped.
22    pub(crate) target: StreamTarget,
23    /// Channel to receive stream messages from the reactor.
24    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
25    /// Congestion control receive window for this stream.
26    ///
27    /// Having this here means we're only going to update it when the end consumer of this stream
28    /// actually reads things, meaning we don't ask for more data until it's actually needed (as
29    /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
30    /// with having to buffer data).
31    pub(crate) recv_window: sendme::StreamRecvWindow,
32    /// Whether or not this stream has ended.
33    pub(crate) ended: bool,
34}
35
36impl StreamReader {
37    /// Try to read the next relay message from this stream.
38    async fn recv_raw(&mut self) -> Result<UnparsedRelayMsg> {
39        if self.ended {
40            // Prevent reading from streams after they've ended.
41            return Err(Error::NotConnected);
42        }
43        let msg = self
44            .receiver
45            .next()
46            .await
47            // This probably means that the other side closed the
48            // mpsc channel.  I'm not sure the error type is correct though?
49            .ok_or_else(|| {
50                Error::StreamProto("stream channel disappeared without END cell?".into())
51            })?;
52
53        if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
54            self.target.send_sendme().await?;
55            self.recv_window.put();
56        }
57
58        Ok(msg)
59    }
60
61    /// As recv_raw, but if there is an error or an end cell, note that this
62    /// stream has ended.
63    pub async fn recv(&mut self) -> Result<UnparsedRelayMsg> {
64        let val = self.recv_raw().await;
65        match &val {
66            Err(_) => {
67                self.ended = true;
68            }
69            Ok(m) if m.cmd() == RelayCmd::END => {
70                self.ended = true;
71            }
72            _ => {}
73        }
74        val
75    }
76
77    /// Shut down this stream.
78    pub fn protocol_error(&mut self) {
79        self.target.protocol_error();
80    }
81}