1
//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2
//! be single or multi path.
3

            
4
pub mod circuit;
5
mod halfstream;
6
#[cfg(feature = "send-control-msg")]
7
pub(crate) mod msghandler;
8
pub(crate) mod reactor;
9
mod streammap;
10

            
11
use derive_deftly::Deftly;
12
use derive_more::Display;
13
use futures::SinkExt as _;
14
use oneshot_fused_workaround as oneshot;
15
use std::pin::Pin;
16
use std::sync::atomic::{AtomicU64, Ordering};
17
use std::sync::Arc;
18

            
19
use crate::circuit::UniqId;
20
use crate::crypto::cell::HopNum;
21
use crate::stream::StreamRateLimit;
22
use crate::{Error, Result};
23
use circuit::ClientCirc;
24
use circuit::{handshake, StreamMpscSender};
25
use reactor::{CtrlMsg, FlowCtrlMsg};
26

            
27
use postage::watch;
28
use tor_async_utils::SinkCloseChannel as _;
29
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
30
use tor_cell::relaycell::msg::AnyRelayMsg;
31
use tor_cell::relaycell::{RelayCellFormat, StreamId};
32
use tor_memquota::derive_deftly_template_HasMemoryCost;
33

            
34
/// The unique identifier of a tunnel.
35
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
36
#[display("{}", _0)]
37
pub(crate) struct TunnelId(u64);
38

            
39
impl TunnelId {
40
    /// Create a new TunnelId.
41
    ///
42
    /// # Panics
43
    ///
44
    /// Panics if we have exhausted the possible space of u64 IDs.
45
272
    pub(crate) fn next() -> TunnelId {
46
        /// The next unique tunnel ID.
47
        static NEXT_TUNNEL_ID: AtomicU64 = AtomicU64::new(1);
48
272
        let id = NEXT_TUNNEL_ID.fetch_add(1, Ordering::Relaxed);
49
272
        assert!(id != 0, "Exhausted Tunnel ID space?!");
50
272
        TunnelId(id)
51
272
    }
52
}
53

            
54
/// The identifier of a circuit [`UniqId`] within a tunnel.
55
///
56
/// This type is only needed for logging purposes: a circuit's [`UniqId`] is
57
/// process-unique, but in the logs it's often useful to display the
58
/// owning tunnel's ID alongside the circuit identifier.
59
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
60
#[display("Circ {}.{}", tunnel_id, circ_id.display_chan_circ())]
61
pub(crate) struct TunnelScopedCircId {
62
    /// The identifier of the owning tunnel
63
    tunnel_id: TunnelId,
64
    /// The process-unique identifier of the circuit
65
    circ_id: UniqId,
66
}
67

            
68
impl TunnelScopedCircId {
69
    /// Create a new [`TunnelScopedCircId`] from the specified identifiers.
70
352
    pub(crate) fn new(tunnel_id: TunnelId, circ_id: UniqId) -> Self {
71
352
        Self { tunnel_id, circ_id }
72
352
    }
73

            
74
    /// Return the [`UniqId`].
75
17396
    pub(crate) fn unique_id(&self) -> UniqId {
76
17396
        self.circ_id
77
17396
    }
78
}
79

            
80
/// A precise position in a tunnel.
81
#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
82
#[derive_deftly(HasMemoryCost)]
83
#[non_exhaustive]
84
pub enum HopLocation {
85
    /// A specific position in a tunnel.
86
    Hop((UniqId, HopNum)),
87
    /// The join point of a multi-path tunnel.
88
    JoinPoint,
89
}
90

            
91
/// A position in a tunnel.
92
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
93
#[non_exhaustive]
94
pub enum TargetHop {
95
    /// A specific position in a tunnel.
96
    Hop(HopLocation),
97
    /// The last hop of a tunnel.
98
    ///
99
    /// This should be used only when you don't care about what specific hop is used.
100
    /// Some tunnels may be extended or truncated,
101
    /// which means that the "last hop" may change at any time.
102
    LastHop,
103
}
104

            
105
impl From<(UniqId, HopNum)> for HopLocation {
106
148
    fn from(v: (UniqId, HopNum)) -> Self {
107
148
        HopLocation::Hop(v)
108
148
    }
109
}
110

            
111
impl From<(UniqId, HopNum)> for TargetHop {
112
44
    fn from(v: (UniqId, HopNum)) -> Self {
113
44
        TargetHop::Hop(v.into())
114
44
    }
115
}
116

            
117
impl HopLocation {
118
    /// Return the hop number if not a JointPoint.
119
    pub fn hop_num(&self) -> Option<HopNum> {
120
        match self {
121
            Self::Hop((_, hop_num)) => Some(*hop_num),
122
            Self::JoinPoint => None,
123
        }
124
    }
125
}
126

            
127
/// Internal handle, used to implement a stream on a particular circuit.
128
///
129
/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
130
/// the reader should additionally hold an `mpsc::Receiver` to get
131
/// relay messages for the stream.
132
///
133
/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
134
/// close the stream by sending an END message to the other side.
135
/// You can close a stream earlier by using [`StreamTarget::close`]
136
/// or [`StreamTarget::close_pending`].
137
#[derive(Clone, Debug)]
138
pub(crate) struct StreamTarget {
139
    /// Which hop of the circuit this stream is with.
140
    hop: HopLocation,
141
    /// Reactor ID for this stream.
142
    stream_id: StreamId,
143
    /// Encoding to use for relay cells sent on this stream.
144
    ///
145
    /// This is mostly irrelevant, except when deciding
146
    /// how many bytes we can pack in a DATA message.
147
    relay_cell_format: RelayCellFormat,
148
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
149
    // TODO(arti#2068): we should consider making this an `Option`
150
    rate_limit_stream: watch::Receiver<StreamRateLimit>,
151
    /// Channel to send cells down.
152
    tx: StreamMpscSender<AnyRelayMsg>,
153
    /// Reference to the circuit that this stream is on.
154
    // TODO(conflux): this should be a ClientTunnel
155
    circ: Arc<ClientCirc>,
156
}
157

            
158
impl StreamTarget {
159
    /// Deliver a relay message for the stream that owns this StreamTarget.
160
    ///
161
    /// The StreamTarget will set the correct stream ID and pick the
162
    /// right hop, but will not validate that the message is well-formed
163
    /// or meaningful in context.
164
6132
    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
165
4088
        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
166
4088
        Ok(())
167
4088
    }
168

            
169
    /// Close the pending stream that owns this StreamTarget, delivering the specified
170
    /// END message (if any)
171
    ///
172
    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
173
    ///
174
    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
175
    ///
176
    /// The StreamTarget will set the correct stream ID and pick the
177
    /// right hop, but will not validate that the message is well-formed
178
    /// or meaningful in context.
179
    ///
180
    /// Note that in many cases, the actual contents of an END message can leak unwanted
181
    /// information. Please consider carefully before sending anything but an
182
    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
183
    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
184
    ///
185
    /// In addition to sending the END message, this function also ensures
186
    /// the state of the stream map entry of this stream is updated
187
    /// accordingly.
188
    ///
189
    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
190
    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
191
    /// function is for closing pending incoming streams (a stream is said to be pending if we have
192
    /// received the message initiating the stream but have not responded to it yet).
193
    ///
194
    /// **NOTE**: This function should be called at most once per request.
195
    /// Calling it twice is an error.
196
    #[cfg(feature = "hs-service")]
197
8
    pub(crate) fn close_pending(
198
8
        &self,
199
8
        message: reactor::CloseStreamBehavior,
200
8
    ) -> Result<oneshot::Receiver<Result<()>>> {
201
8
        let (tx, rx) = oneshot::channel();
202
8

            
203
8
        self.circ
204
8
            .control
205
8
            .unbounded_send(CtrlMsg::ClosePendingStream {
206
8
                stream_id: self.stream_id,
207
8
                hop: self.hop,
208
8
                message,
209
8
                done: tx,
210
8
            })
211
8
            .map_err(|_| Error::CircuitClosed)?;
212

            
213
8
        Ok(rx)
214
8
    }
215

            
216
    /// Queue a "close" for the stream corresponding to this StreamTarget.
217
    ///
218
    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
219
    ///
220
    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
221
    /// on this `StreamTarget`` or any clone of it.
222
    /// The reactor *will* try to flush any already-send messages before it closes the stream.
223
    ///
224
    /// You don't need to call this method if the stream is closing because all of its StreamTargets
225
    /// have been dropped.
226
8
    pub(crate) fn close(&mut self) {
227
8
        Pin::new(&mut self.tx).close_channel();
228
8
    }
229

            
230
    /// Called when a circuit-level protocol error has occurred and the
231
    /// circuit needs to shut down.
232
    pub(crate) fn protocol_error(&mut self) {
233
        self.circ.protocol_error();
234
    }
235

            
236
    /// Request to send a SENDME cell for this stream.
237
    ///
238
    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
239
    /// block or wait for a response from the circuit reactor.
240
    /// An error is only returned if we are unable to send the request.
241
    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
242
    /// this here and an error will not be returned.
243
    pub(crate) fn send_sendme(&mut self) -> Result<()> {
244
        self.circ
245
            .control
246
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
247
                msg: FlowCtrlMsg::Sendme,
248
                stream_id: self.stream_id,
249
                hop: self.hop,
250
            })
251
            .map_err(|_| Error::CircuitClosed)
252
    }
253

            
254
    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
255
    ///
256
    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
257
    /// the stream.
258
    /// But it may decide not to, and may discard this update.
259
    /// For example the stream may have a large amount of buffered data, and the reactor may not
260
    /// want to send an XON while the buffer is large.
261
    ///
262
    /// This sends a message to inform the circuit reactor of the new drain rate,
263
    /// but it does not block or wait for a response from the reactor.
264
    /// An error is only returned if we are unable to send the update.
265
    pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
266
        self.circ
267
            .control
268
            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
269
                msg: FlowCtrlMsg::Xon(rate),
270
                stream_id: self.stream_id,
271
                hop: self.hop,
272
            })
273
            .map_err(|_| Error::CircuitClosed)
274
    }
275

            
276
    /// Return a reference to the circuit that this `StreamTarget` is using.
277
    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
278
84
    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
279
84
        &self.circ
280
84
    }
281

            
282
    /// Return the kind of relay cell in use on this `StreamTarget`.
283
84
    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
284
84
        self.relay_cell_format
285
84
    }
286

            
287
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
288
84
    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
289
84
        &self.rate_limit_stream
290
84
    }
291
}