tor_proto/stream/raw.rs
1//! Declare the lowest level of stream: a stream that operates on raw
2//! cells.
3
4use crate::congestion::sendme;
5use crate::tunnel::StreamTarget;
6use crate::{Error, Result};
7use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
8
9use crate::tunnel::circuit::StreamMpscReceiver;
10use futures::stream::StreamExt;
11
12/// The read part of a stream on a particular circuit.
13#[derive(Debug)]
14pub struct StreamReader {
15 /// The underlying `StreamTarget` for this stream.
16 ///
17 /// A reader has this target in order to:
18 /// * Make the reactor send SENDME messages.
19 /// * Tell the reactor when there is a protocol error.
20 /// * Keep the stream alive at least until the StreamReader
21 /// is dropped.
22 pub(crate) target: StreamTarget,
23 /// Channel to receive stream messages from the reactor.
24 pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
25 /// Congestion control receive window for this stream.
26 ///
27 /// Having this here means we're only going to update it when the end consumer of this stream
28 /// actually reads things, meaning we don't ask for more data until it's actually needed (as
29 /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
30 /// with having to buffer data).
31 pub(crate) recv_window: sendme::StreamRecvWindow,
32 /// Whether or not this stream has ended.
33 pub(crate) ended: bool,
34}
35
36impl StreamReader {
37 /// Try to read the next relay message from this stream.
38 async fn recv_raw(&mut self) -> Result<UnparsedRelayMsg> {
39 if self.ended {
40 // Prevent reading from streams after they've ended.
41 return Err(Error::NotConnected);
42 }
43 let msg = self
44 .receiver
45 .next()
46 .await
47 // This probably means that the other side closed the
48 // mpsc channel. I'm not sure the error type is correct though?
49 .ok_or_else(|| {
50 Error::StreamProto("stream channel disappeared without END cell?".into())
51 })?;
52
53 if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
54 self.target.send_sendme().await?;
55 self.recv_window.put();
56 }
57
58 Ok(msg)
59 }
60
61 /// As recv_raw, but if there is an error or an end cell, note that this
62 /// stream has ended.
63 pub async fn recv(&mut self) -> Result<UnparsedRelayMsg> {
64 let val = self.recv_raw().await;
65 match &val {
66 Err(_) => {
67 self.ended = true;
68 }
69 Ok(m) if m.cmd() == RelayCmd::END => {
70 self.ended = true;
71 }
72 _ => {}
73 }
74 val
75 }
76
77 /// Shut down this stream.
78 pub fn protocol_error(&mut self) {
79 self.target.protocol_error();
80 }
81}