1
//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2
//! be single or multi path.
3

            
4
pub mod circuit;
5
mod halfstream;
6
#[cfg(feature = "send-control-msg")]
7
pub(crate) mod msghandler;
8
pub(crate) mod reactor;
9
mod streammap;
10

            
11
use futures::SinkExt as _;
12
use oneshot_fused_workaround as oneshot;
13
use std::pin::Pin;
14
use std::sync::Arc;
15

            
16
use crate::crypto::cell::HopNum;
17
use crate::{Error, Result};
18
use circuit::ClientCirc;
19
use circuit::{handshake, StreamMpscSender};
20
use reactor::{CtrlMsg, LegId};
21

            
22
use tor_async_utils::SinkCloseChannel as _;
23
use tor_cell::relaycell::msg::AnyRelayMsg;
24
use tor_cell::relaycell::{RelayCellFormat, StreamId};
25

            
26
// TODO(#1857): Make this pub and not `allow(dead_code)`.
27
/// A precise position in a tunnel.
28
#[allow(dead_code)]
29
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30
pub(crate) enum HopLocation {
31
    /// A specific position in a tunnel.
32
    Hop((LegId, HopNum)),
33
    /// The join point of a multi-path tunnel.
34
    JoinPoint,
35
}
36

            
37
// TODO(#1857): Make this pub and not `allow(dead_code)`.
38
/// A position in a tunnel.
39
#[allow(dead_code)]
40
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
41
pub(crate) enum TargetHop {
42
    /// A specific position in a tunnel.
43
    Hop(HopLocation),
44
    /// The last hop of a tunnel.
45
    ///
46
    /// This should be used only when you don't care about what specific hop is used.
47
    /// Some tunnels may be extended or truncated,
48
    /// which means that the "last hop" may change at any time.
49
    LastHop,
50
}
51

            
52
/// Internal handle, used to implement a stream on a particular circuit.
53
///
54
/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
55
/// the reader should additionally hold an `mpsc::Receiver` to get
56
/// relay messages for the stream.
57
///
58
/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
59
/// close the stream by sending an END message to the other side.
60
/// You can close a stream earlier by using [`StreamTarget::close`]
61
/// or [`StreamTarget::close_pending`].
62
#[derive(Clone, Debug)]
63
pub(crate) struct StreamTarget {
64
    /// Which hop of the circuit this stream is with.
65
    hop: HopLocation,
66
    /// Reactor ID for this stream.
67
    stream_id: StreamId,
68
    /// Encoding to use for relay cells sent on this stream.
69
    ///
70
    /// This is mostly irrelevant, except when deciding
71
    /// how many bytes we can pack in a DATA message.
72
    relay_cell_format: RelayCellFormat,
73
    /// Channel to send cells down.
74
    tx: StreamMpscSender<AnyRelayMsg>,
75
    /// Reference to the circuit that this stream is on.
76
    // TODO(conflux): this should be a ClientTunnel
77
    circ: Arc<ClientCirc>,
78
}
79

            
80
impl StreamTarget {
81
    /// Deliver a relay message for the stream that owns this StreamTarget.
82
    ///
83
    /// The StreamTarget will set the correct stream ID and pick the
84
    /// right hop, but will not validate that the message is well-formed
85
    /// or meaningful in context.
86
4332
    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
87
2888
        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
88
2888
        Ok(())
89
2888
    }
90

            
91
    /// Close the pending stream that owns this StreamTarget, delivering the specified
92
    /// END message (if any)
93
    ///
94
    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
95
    ///
96
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
97
    ///
98
    /// The StreamTarget will set the correct stream ID and pick the
99
    /// right hop, but will not validate that the message is well-formed
100
    /// or meaningful in context.
101
    ///
102
    /// Note that in many cases, the actual contents of an END message can leak unwanted
103
    /// information. Please consider carefully before sending anything but an
104
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
105
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
106
    ///
107
    /// In addition to sending the END message, this function also ensures
108
    /// the state of the stream map entry of this stream is updated
109
    /// accordingly.
110
    ///
111
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
112
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
113
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
114
    /// received the message initiating the stream but have not responded to it yet).
115
    ///
116
    /// **NOTE**: This function should be called at most once per request.
117
    /// Calling it twice is an error.
118
    #[cfg(feature = "hs-service")]
119
8
    pub(crate) fn close_pending(
120
8
        &self,
121
8
        message: reactor::CloseStreamBehavior,
122
8
    ) -> Result<oneshot::Receiver<Result<()>>> {
123
8
        let (tx, rx) = oneshot::channel();
124
8

            
125
8
        self.circ
126
8
            .control
127
8
            .unbounded_send(CtrlMsg::ClosePendingStream {
128
8
                stream_id: self.stream_id,
129
8
                hop: self.hop,
130
8
                message,
131
8
                done: tx,
132
8
            })
133
8
            .map_err(|_| Error::CircuitClosed)?;
134

            
135
8
        Ok(rx)
136
8
    }
137

            
138
    /// Queue a "close" for the stream corresponding to this StreamTarget.
139
    ///
140
    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
141
    ///
142
    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
143
    /// on this `StreamTarget`` or any clone of it.
144
    /// The reactor *will* try to flush any already-send messages before it closes the stream.
145
    ///
146
    /// You don't need to call this method if the stream is closing because all of its StreamTargets
147
    /// have been dropped.
148
8
    pub(crate) fn close(&mut self) {
149
8
        Pin::new(&mut self.tx).close_channel();
150
8
    }
151

            
152
    /// Called when a circuit-level protocol error has occurred and the
153
    /// circuit needs to shut down.
154
    pub(crate) fn protocol_error(&mut self) {
155
        self.circ.protocol_error();
156
    }
157

            
158
    /// Send a SENDME cell for this stream.
159
    pub(crate) async fn send_sendme(&mut self) -> Result<()> {
160
        let (tx, rx) = oneshot::channel();
161

            
162
        self.circ
163
            .control
164
            .unbounded_send(CtrlMsg::SendSendme {
165
                stream_id: self.stream_id,
166
                hop: self.hop,
167
                sender: tx,
168
            })
169
            .map_err(|_| Error::CircuitClosed)?;
170

            
171
        rx.await.map_err(|_| Error::CircuitClosed)?
172
    }
173

            
174
    /// Return a reference to the circuit that this `StreamTarget` is using.
175
    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
176
72
    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
177
72
        &self.circ
178
72
    }
179

            
180
    /// Return the kind of relay cell in use on this `StreamTarget`.
181
72
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
182
72
        self.relay_cell_format
183
72
    }
184
}