tor_proto/
tunnel.rs

1//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2//! be single or multi path.
3
4pub mod circuit;
5mod halfstream;
6#[cfg(feature = "send-control-msg")]
7pub(crate) mod msghandler;
8pub(crate) mod reactor;
9mod streammap;
10
11use derive_deftly::Deftly;
12use derive_more::Display;
13use futures::SinkExt as _;
14use oneshot_fused_workaround as oneshot;
15use std::pin::Pin;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18
19use crate::circuit::UniqId;
20use crate::crypto::cell::HopNum;
21use crate::stream::StreamRateLimit;
22use crate::{Error, Result};
23use circuit::ClientCirc;
24use circuit::StreamMpscSender;
25use reactor::{CtrlMsg, FlowCtrlMsg};
26
27use postage::watch;
28use tor_async_utils::SinkCloseChannel as _;
29use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
30use tor_cell::relaycell::msg::AnyRelayMsg;
31use tor_cell::relaycell::{RelayCellFormat, StreamId};
32use tor_memquota::derive_deftly_template_HasMemoryCost;
33
34/// The unique identifier of a tunnel.
35#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
36#[display("{}", _0)]
37pub(crate) struct TunnelId(u64);
38
39impl TunnelId {
40    /// Create a new TunnelId.
41    ///
42    /// # Panics
43    ///
44    /// Panics if we have exhausted the possible space of u64 IDs.
45    pub(crate) fn next() -> TunnelId {
46        /// The next unique tunnel ID.
47        static NEXT_TUNNEL_ID: AtomicU64 = AtomicU64::new(1);
48        let id = NEXT_TUNNEL_ID.fetch_add(1, Ordering::Relaxed);
49        assert!(id != 0, "Exhausted Tunnel ID space?!");
50        TunnelId(id)
51    }
52}
53
54/// The identifier of a circuit [`UniqId`] within a tunnel.
55///
56/// This type is only needed for logging purposes: a circuit's [`UniqId`] is
57/// process-unique, but in the logs it's often useful to display the
58/// owning tunnel's ID alongside the circuit identifier.
59#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
60#[display("Circ {}.{}", tunnel_id, circ_id.display_chan_circ())]
61pub(crate) struct TunnelScopedCircId {
62    /// The identifier of the owning tunnel
63    tunnel_id: TunnelId,
64    /// The process-unique identifier of the circuit
65    circ_id: UniqId,
66}
67
68impl TunnelScopedCircId {
69    /// Create a new [`TunnelScopedCircId`] from the specified identifiers.
70    pub(crate) fn new(tunnel_id: TunnelId, circ_id: UniqId) -> Self {
71        Self { tunnel_id, circ_id }
72    }
73
74    /// Return the [`UniqId`].
75    pub(crate) fn unique_id(&self) -> UniqId {
76        self.circ_id
77    }
78}
79
80/// A precise position in a tunnel.
81#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
82#[derive_deftly(HasMemoryCost)]
83#[non_exhaustive]
84pub enum HopLocation {
85    /// A specific position in a tunnel.
86    Hop((UniqId, HopNum)),
87    /// The join point of a multi-path tunnel.
88    JoinPoint,
89}
90
91/// A position in a tunnel.
92#[derive(Debug, Copy, Clone, PartialEq, Eq)]
93#[non_exhaustive]
94pub enum TargetHop {
95    /// A specific position in a tunnel.
96    Hop(HopLocation),
97    /// The last hop of a tunnel.
98    ///
99    /// This should be used only when you don't care about what specific hop is used.
100    /// Some tunnels may be extended or truncated,
101    /// which means that the "last hop" may change at any time.
102    LastHop,
103}
104
105impl From<(UniqId, HopNum)> for HopLocation {
106    fn from(v: (UniqId, HopNum)) -> Self {
107        HopLocation::Hop(v)
108    }
109}
110
111impl From<(UniqId, HopNum)> for TargetHop {
112    fn from(v: (UniqId, HopNum)) -> Self {
113        TargetHop::Hop(v.into())
114    }
115}
116
117impl HopLocation {
118    /// Return the hop number if not a JointPoint.
119    pub fn hop_num(&self) -> Option<HopNum> {
120        match self {
121            Self::Hop((_, hop_num)) => Some(*hop_num),
122            Self::JoinPoint => None,
123        }
124    }
125}
126
127/// Internal handle, used to implement a stream on a particular circuit.
128///
129/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
130/// the reader should additionally hold an `mpsc::Receiver` to get
131/// relay messages for the stream.
132///
133/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
134/// close the stream by sending an END message to the other side.
135/// You can close a stream earlier by using [`StreamTarget::close`]
136/// or [`StreamTarget::close_pending`].
137#[derive(Clone, Debug)]
138pub(crate) struct StreamTarget {
139    /// Which hop of the circuit this stream is with.
140    hop: HopLocation,
141    /// Reactor ID for this stream.
142    stream_id: StreamId,
143    /// Encoding to use for relay cells sent on this stream.
144    ///
145    /// This is mostly irrelevant, except when deciding
146    /// how many bytes we can pack in a DATA message.
147    relay_cell_format: RelayCellFormat,
148    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
149    // TODO(arti#2068): we should consider making this an `Option`
150    rate_limit_stream: watch::Receiver<StreamRateLimit>,
151    /// Channel to send cells down.
152    tx: StreamMpscSender<AnyRelayMsg>,
153    /// Reference to the circuit that this stream is on.
154    // TODO(conflux): this should be a ClientTunnel
155    circ: Arc<ClientCirc>,
156}
157
158impl StreamTarget {
159    /// Deliver a relay message for the stream that owns this StreamTarget.
160    ///
161    /// The StreamTarget will set the correct stream ID and pick the
162    /// right hop, but will not validate that the message is well-formed
163    /// or meaningful in context.
164    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
165        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
166        Ok(())
167    }
168
169    /// Close the pending stream that owns this StreamTarget, delivering the specified
170    /// END message (if any)
171    ///
172    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
173    ///
174    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
175    ///
176    /// The StreamTarget will set the correct stream ID and pick the
177    /// right hop, but will not validate that the message is well-formed
178    /// or meaningful in context.
179    ///
180    /// Note that in many cases, the actual contents of an END message can leak unwanted
181    /// information. Please consider carefully before sending anything but an
182    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
183    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
184    ///
185    /// In addition to sending the END message, this function also ensures
186    /// the state of the stream map entry of this stream is updated
187    /// accordingly.
188    ///
189    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
190    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
191    /// function is for closing pending incoming streams (a stream is said to be pending if we have
192    /// received the message initiating the stream but have not responded to it yet).
193    ///
194    /// **NOTE**: This function should be called at most once per request.
195    /// Calling it twice is an error.
196    #[cfg(feature = "hs-service")]
197    pub(crate) fn close_pending(
198        &self,
199        message: reactor::CloseStreamBehavior,
200    ) -> Result<oneshot::Receiver<Result<()>>> {
201        let (tx, rx) = oneshot::channel();
202
203        self.circ
204            .control
205            .unbounded_send(CtrlMsg::ClosePendingStream {
206                stream_id: self.stream_id,
207                hop: self.hop,
208                message,
209                done: tx,
210            })
211            .map_err(|_| Error::CircuitClosed)?;
212
213        Ok(rx)
214    }
215
216    /// Queue a "close" for the stream corresponding to this StreamTarget.
217    ///
218    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
219    ///
220    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
221    /// on this `StreamTarget`` or any clone of it.
222    /// The reactor *will* try to flush any already-send messages before it closes the stream.
223    ///
224    /// You don't need to call this method if the stream is closing because all of its StreamTargets
225    /// have been dropped.
226    pub(crate) fn close(&mut self) {
227        Pin::new(&mut self.tx).close_channel();
228    }
229
230    /// Called when a circuit-level protocol error has occurred and the
231    /// circuit needs to shut down.
232    pub(crate) fn protocol_error(&mut self) {
233        self.circ.protocol_error();
234    }
235
236    /// Request to send a SENDME cell for this stream.
237    ///
238    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
239    /// block or wait for a response from the circuit reactor.
240    /// An error is only returned if we are unable to send the request.
241    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
242    /// this here and an error will not be returned.
243    pub(crate) fn send_sendme(&mut self) -> Result<()> {
244        self.circ
245            .control
246            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
247                msg: FlowCtrlMsg::Sendme,
248                stream_id: self.stream_id,
249                hop: self.hop,
250            })
251            .map_err(|_| Error::CircuitClosed)
252    }
253
254    /// Inform the circuit reactor that there has been a change in the drain rate for this stream.
255    ///
256    /// Typically the circuit reactor would send this new rate in an XON message to the other end of
257    /// the stream.
258    /// But it may decide not to, and may discard this update.
259    /// For example the stream may have a large amount of buffered data, and the reactor may not
260    /// want to send an XON while the buffer is large.
261    ///
262    /// This sends a message to inform the circuit reactor of the new drain rate,
263    /// but it does not block or wait for a response from the reactor.
264    /// An error is only returned if we are unable to send the update.
265    pub(crate) fn drain_rate_update(&mut self, rate: XonKbpsEwma) -> Result<()> {
266        self.circ
267            .control
268            .unbounded_send(CtrlMsg::FlowCtrlUpdate {
269                msg: FlowCtrlMsg::Xon(rate),
270                stream_id: self.stream_id,
271                hop: self.hop,
272            })
273            .map_err(|_| Error::CircuitClosed)
274    }
275
276    /// Return a reference to the circuit that this `StreamTarget` is using.
277    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
278    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
279        &self.circ
280    }
281
282    /// Return the kind of relay cell in use on this `StreamTarget`.
283    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
284        self.relay_cell_format
285    }
286
287    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
288    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
289        &self.rate_limit_stream
290    }
291}