tor_proto/
tunnel.rs

1//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2//! be single or multi path.
3
4pub mod circuit;
5mod halfstream;
6#[cfg(feature = "send-control-msg")]
7pub(crate) mod msghandler;
8pub(crate) mod reactor;
9mod streammap;
10
11use derive_deftly::Deftly;
12use derive_more::Display;
13use futures::SinkExt as _;
14use oneshot_fused_workaround as oneshot;
15use std::pin::Pin;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18
19use crate::circuit::UniqId;
20use crate::crypto::cell::HopNum;
21use crate::stream::StreamRateLimit;
22use crate::{Error, Result};
23use circuit::ClientCirc;
24use circuit::{handshake, StreamMpscSender};
25use reactor::CtrlMsg;
26
27use postage::watch;
28use tor_async_utils::SinkCloseChannel as _;
29use tor_cell::relaycell::msg::AnyRelayMsg;
30use tor_cell::relaycell::{RelayCellFormat, StreamId};
31use tor_memquota::derive_deftly_template_HasMemoryCost;
32
33/// The unique identifier of a tunnel.
34#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
35#[display("{}", _0)]
36pub(crate) struct TunnelId(u64);
37
38impl TunnelId {
39    /// Create a new TunnelId.
40    ///
41    /// # Panics
42    ///
43    /// Panics if we have exhausted the possible space of u64 IDs.
44    pub(crate) fn next() -> TunnelId {
45        /// The next unique tunnel ID.
46        static NEXT_TUNNEL_ID: AtomicU64 = AtomicU64::new(1);
47        let id = NEXT_TUNNEL_ID.fetch_add(1, Ordering::Relaxed);
48        assert!(id != 0, "Exhausted Tunnel ID space?!");
49        TunnelId(id)
50    }
51}
52
53/// The identifier of a circuit [`UniqId`] within a tunnel.
54///
55/// This type is only needed for logging purposes: a circuit's [`UniqId`] is
56/// process-unique, but in the logs it's often useful to display the
57/// owning tunnel's ID alongside the circuit identifier.
58#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
59#[display("Circ {}.{}", tunnel_id, circ_id.display_chan_circ())]
60pub(crate) struct TunnelScopedCircId {
61    /// The identifier of the owning tunnel
62    tunnel_id: TunnelId,
63    /// The process-unique identifier of the circuit
64    circ_id: UniqId,
65}
66
67impl TunnelScopedCircId {
68    /// Create a new [`TunnelScopedCircId`] from the specified identifiers.
69    pub(crate) fn new(tunnel_id: TunnelId, circ_id: UniqId) -> Self {
70        Self { tunnel_id, circ_id }
71    }
72
73    /// Return the [`UniqId`].
74    pub(crate) fn unique_id(&self) -> UniqId {
75        self.circ_id
76    }
77}
78
79/// A precise position in a tunnel.
80#[derive(Debug, Deftly, Copy, Clone, PartialEq, Eq)]
81#[derive_deftly(HasMemoryCost)]
82#[non_exhaustive]
83pub enum HopLocation {
84    /// A specific position in a tunnel.
85    Hop((UniqId, HopNum)),
86    /// The join point of a multi-path tunnel.
87    JoinPoint,
88}
89
90/// A position in a tunnel.
91#[derive(Debug, Copy, Clone, PartialEq, Eq)]
92#[non_exhaustive]
93pub enum TargetHop {
94    /// A specific position in a tunnel.
95    Hop(HopLocation),
96    /// The last hop of a tunnel.
97    ///
98    /// This should be used only when you don't care about what specific hop is used.
99    /// Some tunnels may be extended or truncated,
100    /// which means that the "last hop" may change at any time.
101    LastHop,
102}
103
104impl From<(UniqId, HopNum)> for HopLocation {
105    fn from(v: (UniqId, HopNum)) -> Self {
106        HopLocation::Hop(v)
107    }
108}
109
110impl From<(UniqId, HopNum)> for TargetHop {
111    fn from(v: (UniqId, HopNum)) -> Self {
112        TargetHop::Hop(v.into())
113    }
114}
115
116impl HopLocation {
117    /// Return the hop number if not a JointPoint.
118    pub fn hop_num(&self) -> Option<HopNum> {
119        match self {
120            Self::Hop((_, hop_num)) => Some(*hop_num),
121            Self::JoinPoint => None,
122        }
123    }
124}
125
126/// Internal handle, used to implement a stream on a particular circuit.
127///
128/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
129/// the reader should additionally hold an `mpsc::Receiver` to get
130/// relay messages for the stream.
131///
132/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
133/// close the stream by sending an END message to the other side.
134/// You can close a stream earlier by using [`StreamTarget::close`]
135/// or [`StreamTarget::close_pending`].
136#[derive(Clone, Debug)]
137pub(crate) struct StreamTarget {
138    /// Which hop of the circuit this stream is with.
139    hop: HopLocation,
140    /// Reactor ID for this stream.
141    stream_id: StreamId,
142    /// Encoding to use for relay cells sent on this stream.
143    ///
144    /// This is mostly irrelevant, except when deciding
145    /// how many bytes we can pack in a DATA message.
146    relay_cell_format: RelayCellFormat,
147    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
148    // TODO(arti#2068): we should consider making this an `Option`
149    rate_limit_stream: watch::Receiver<StreamRateLimit>,
150    /// Channel to send cells down.
151    tx: StreamMpscSender<AnyRelayMsg>,
152    /// Reference to the circuit that this stream is on.
153    // TODO(conflux): this should be a ClientTunnel
154    circ: Arc<ClientCirc>,
155}
156
157impl StreamTarget {
158    /// Deliver a relay message for the stream that owns this StreamTarget.
159    ///
160    /// The StreamTarget will set the correct stream ID and pick the
161    /// right hop, but will not validate that the message is well-formed
162    /// or meaningful in context.
163    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
164        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
165        Ok(())
166    }
167
168    /// Close the pending stream that owns this StreamTarget, delivering the specified
169    /// END message (if any)
170    ///
171    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
172    ///
173    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
174    ///
175    /// The StreamTarget will set the correct stream ID and pick the
176    /// right hop, but will not validate that the message is well-formed
177    /// or meaningful in context.
178    ///
179    /// Note that in many cases, the actual contents of an END message can leak unwanted
180    /// information. Please consider carefully before sending anything but an
181    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
182    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
183    ///
184    /// In addition to sending the END message, this function also ensures
185    /// the state of the stream map entry of this stream is updated
186    /// accordingly.
187    ///
188    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
189    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
190    /// function is for closing pending incoming streams (a stream is said to be pending if we have
191    /// received the message initiating the stream but have not responded to it yet).
192    ///
193    /// **NOTE**: This function should be called at most once per request.
194    /// Calling it twice is an error.
195    #[cfg(feature = "hs-service")]
196    pub(crate) fn close_pending(
197        &self,
198        message: reactor::CloseStreamBehavior,
199    ) -> Result<oneshot::Receiver<Result<()>>> {
200        let (tx, rx) = oneshot::channel();
201
202        self.circ
203            .control
204            .unbounded_send(CtrlMsg::ClosePendingStream {
205                stream_id: self.stream_id,
206                hop: self.hop,
207                message,
208                done: tx,
209            })
210            .map_err(|_| Error::CircuitClosed)?;
211
212        Ok(rx)
213    }
214
215    /// Queue a "close" for the stream corresponding to this StreamTarget.
216    ///
217    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
218    ///
219    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
220    /// on this `StreamTarget`` or any clone of it.
221    /// The reactor *will* try to flush any already-send messages before it closes the stream.
222    ///
223    /// You don't need to call this method if the stream is closing because all of its StreamTargets
224    /// have been dropped.
225    pub(crate) fn close(&mut self) {
226        Pin::new(&mut self.tx).close_channel();
227    }
228
229    /// Called when a circuit-level protocol error has occurred and the
230    /// circuit needs to shut down.
231    pub(crate) fn protocol_error(&mut self) {
232        self.circ.protocol_error();
233    }
234
235    /// Request to send a SENDME cell for this stream.
236    ///
237    /// This sends a request to the circuit reactor to send a stream-level SENDME, but it does not
238    /// block or wait for a response from the circuit reactor.
239    /// An error is only returned if we are unable to send the request.
240    /// This means that if the circuit reactor is unable to send the SENDME, we are not notified of
241    /// this here and an error will not be returned.
242    pub(crate) fn send_sendme(&mut self) -> Result<()> {
243        self.circ
244            .control
245            .unbounded_send(CtrlMsg::SendSendme {
246                stream_id: self.stream_id,
247                hop: self.hop,
248            })
249            .map_err(|_| Error::CircuitClosed)
250    }
251
252    /// Return a reference to the circuit that this `StreamTarget` is using.
253    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
254    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
255        &self.circ
256    }
257
258    /// Return the kind of relay cell in use on this `StreamTarget`.
259    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
260        self.relay_cell_format
261    }
262
263    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
264    pub(crate) fn rate_limit_stream(&self) -> &watch::Receiver<StreamRateLimit> {
265        &self.rate_limit_stream
266    }
267}