1
//! Declare the lowest level of stream: a stream that operates on raw
2
//! cells.
3

            
4
use crate::congestion::sendme;
5
use crate::tunnel::StreamTarget;
6
use crate::{Error, Result};
7
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
8

            
9
use crate::tunnel::circuit::StreamMpscReceiver;
10
use futures::stream::StreamExt;
11

            
12
/// The read part of a stream on a particular circuit.
13
#[derive(Debug)]
14
pub struct StreamReader {
15
    /// The underlying `StreamTarget` for this stream.
16
    ///
17
    /// A reader has this target in order to:
18
    ///   * Make the reactor send SENDME messages.
19
    ///   * Tell the reactor when there is a protocol error.
20
    ///   * Keep the stream alive at least until the StreamReader
21
    ///     is dropped.
22
    pub(crate) target: StreamTarget,
23
    /// Channel to receive stream messages from the reactor.
24
    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
25
    /// Congestion control receive window for this stream.
26
    ///
27
    /// Having this here means we're only going to update it when the end consumer of this stream
28
    /// actually reads things, meaning we don't ask for more data until it's actually needed (as
29
    /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
30
    /// with having to buffer data).
31
    pub(crate) recv_window: sendme::StreamRecvWindow,
32
    /// Whether or not this stream has ended.
33
    pub(crate) ended: bool,
34
}
35

            
36
impl StreamReader {
37
    /// Try to read the next relay message from this stream.
38
132
    async fn recv_raw(&mut self) -> Result<UnparsedRelayMsg> {
39
88
        if self.ended {
40
            // Prevent reading from streams after they've ended.
41
            return Err(Error::NotConnected);
42
88
        }
43
88
        let msg = self
44
88
            .receiver
45
88
            .next()
46
88
            .await
47
            // This probably means that the other side closed the
48
            // mpsc channel.  I'm not sure the error type is correct though?
49
88
            .ok_or_else(|| {
50
                Error::StreamProto("stream channel disappeared without END cell?".into())
51
88
            })?;
52

            
53
88
        if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
54
            self.target.send_sendme().await?;
55
            self.recv_window.put();
56
88
        }
57

            
58
88
        Ok(msg)
59
88
    }
60

            
61
    /// As recv_raw, but if there is an error or an end cell, note that this
62
    /// stream has ended.
63
132
    pub async fn recv(&mut self) -> Result<UnparsedRelayMsg> {
64
88
        let val = self.recv_raw().await;
65
88
        match &val {
66
            Err(_) => {
67
                self.ended = true;
68
            }
69
88
            Ok(m) if m.cmd() == RelayCmd::END => {
70
8
                self.ended = true;
71
8
            }
72
80
            _ => {}
73
        }
74
88
        val
75
88
    }
76

            
77
    /// Shut down this stream.
78
    pub fn protocol_error(&mut self) {
79
        self.target.protocol_error();
80
    }
81
}