tor_proto/
tunnel.rs

1//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2//! be single or multi path.
3
4pub mod circuit;
5mod halfstream;
6#[cfg(feature = "send-control-msg")]
7pub(crate) mod msghandler;
8pub(crate) mod reactor;
9mod streammap;
10
11use derive_more::Display;
12use futures::SinkExt as _;
13use oneshot_fused_workaround as oneshot;
14use std::pin::Pin;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use crate::circuit::UniqId;
19use crate::crypto::cell::HopNum;
20use crate::{Error, Result};
21use circuit::ClientCirc;
22use circuit::{handshake, StreamMpscSender};
23use reactor::CtrlMsg;
24
25use tor_async_utils::SinkCloseChannel as _;
26use tor_cell::relaycell::msg::AnyRelayMsg;
27use tor_cell::relaycell::{RelayCellFormat, StreamId};
28
29/// The unique identifier of a tunnel.
30#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
31#[display("{}", _0)]
32pub(crate) struct TunnelId(u64);
33
34impl TunnelId {
35    /// Create a new TunnelId.
36    ///
37    /// # Panics
38    ///
39    /// Panics if we have exhausted the possible space of u64 IDs.
40    pub(crate) fn next() -> TunnelId {
41        /// The next unique tunnel ID.
42        static NEXT_TUNNEL_ID: AtomicU64 = AtomicU64::new(1);
43        let id = NEXT_TUNNEL_ID.fetch_add(1, Ordering::Relaxed);
44        assert!(id != 0, "Exhausted Tunnel ID space?!");
45        TunnelId(id)
46    }
47}
48
49/// The identifier of a circuit [`UniqId`] within a tunnel.
50///
51/// This type is only needed for logging purposes: a circuit's [`UniqId`] is
52/// process-unique, but in the logs it's often useful to display the
53/// owning tunnel's ID alongside the circuit identifier.
54#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Display)]
55#[display("Circ {}.{}", tunnel_id, circ_id.display_chan_circ())]
56pub(crate) struct TunnelScopedCircId {
57    /// The identifier of the owning tunnel
58    tunnel_id: TunnelId,
59    /// The process-unique identifier of the circuit
60    circ_id: UniqId,
61}
62
63impl TunnelScopedCircId {
64    /// Create a new [`TunnelScopedCircId`] from the specified identifiers.
65    pub(crate) fn new(tunnel_id: TunnelId, circ_id: UniqId) -> Self {
66        Self { tunnel_id, circ_id }
67    }
68
69    /// Return the [`UniqId`].
70    pub(crate) fn unique_id(&self) -> UniqId {
71        self.circ_id
72    }
73}
74
75// TODO(#1857): Make this pub and not `allow(dead_code)`.
76/// A precise position in a tunnel.
77#[allow(dead_code)]
78#[derive(Debug, Copy, Clone, PartialEq, Eq)]
79pub(crate) enum HopLocation {
80    /// A specific position in a tunnel.
81    Hop((UniqId, HopNum)),
82    /// The join point of a multi-path tunnel.
83    JoinPoint,
84}
85
86// TODO(#1857): Make this pub and not `allow(dead_code)`.
87/// A position in a tunnel.
88#[allow(dead_code)]
89#[derive(Debug, Copy, Clone, PartialEq, Eq)]
90pub(crate) enum TargetHop {
91    /// A specific position in a tunnel.
92    Hop(HopLocation),
93    /// The last hop of a tunnel.
94    ///
95    /// This should be used only when you don't care about what specific hop is used.
96    /// Some tunnels may be extended or truncated,
97    /// which means that the "last hop" may change at any time.
98    LastHop,
99}
100
101/// Internal handle, used to implement a stream on a particular circuit.
102///
103/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
104/// the reader should additionally hold an `mpsc::Receiver` to get
105/// relay messages for the stream.
106///
107/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
108/// close the stream by sending an END message to the other side.
109/// You can close a stream earlier by using [`StreamTarget::close`]
110/// or [`StreamTarget::close_pending`].
111#[derive(Clone, Debug)]
112pub(crate) struct StreamTarget {
113    /// Which hop of the circuit this stream is with.
114    hop: HopLocation,
115    /// Reactor ID for this stream.
116    stream_id: StreamId,
117    /// Encoding to use for relay cells sent on this stream.
118    ///
119    /// This is mostly irrelevant, except when deciding
120    /// how many bytes we can pack in a DATA message.
121    relay_cell_format: RelayCellFormat,
122    /// Channel to send cells down.
123    tx: StreamMpscSender<AnyRelayMsg>,
124    /// Reference to the circuit that this stream is on.
125    // TODO(conflux): this should be a ClientTunnel
126    circ: Arc<ClientCirc>,
127}
128
129impl StreamTarget {
130    /// Deliver a relay message for the stream that owns this StreamTarget.
131    ///
132    /// The StreamTarget will set the correct stream ID and pick the
133    /// right hop, but will not validate that the message is well-formed
134    /// or meaningful in context.
135    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
136        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
137        Ok(())
138    }
139
140    /// Close the pending stream that owns this StreamTarget, delivering the specified
141    /// END message (if any)
142    ///
143    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
144    ///
145    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
146    ///
147    /// The StreamTarget will set the correct stream ID and pick the
148    /// right hop, but will not validate that the message is well-formed
149    /// or meaningful in context.
150    ///
151    /// Note that in many cases, the actual contents of an END message can leak unwanted
152    /// information. Please consider carefully before sending anything but an
153    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
154    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
155    ///
156    /// In addition to sending the END message, this function also ensures
157    /// the state of the stream map entry of this stream is updated
158    /// accordingly.
159    ///
160    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
161    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
162    /// function is for closing pending incoming streams (a stream is said to be pending if we have
163    /// received the message initiating the stream but have not responded to it yet).
164    ///
165    /// **NOTE**: This function should be called at most once per request.
166    /// Calling it twice is an error.
167    #[cfg(feature = "hs-service")]
168    pub(crate) fn close_pending(
169        &self,
170        message: reactor::CloseStreamBehavior,
171    ) -> Result<oneshot::Receiver<Result<()>>> {
172        let (tx, rx) = oneshot::channel();
173
174        self.circ
175            .control
176            .unbounded_send(CtrlMsg::ClosePendingStream {
177                stream_id: self.stream_id,
178                hop: self.hop,
179                message,
180                done: tx,
181            })
182            .map_err(|_| Error::CircuitClosed)?;
183
184        Ok(rx)
185    }
186
187    /// Queue a "close" for the stream corresponding to this StreamTarget.
188    ///
189    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
190    ///
191    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
192    /// on this `StreamTarget`` or any clone of it.
193    /// The reactor *will* try to flush any already-send messages before it closes the stream.
194    ///
195    /// You don't need to call this method if the stream is closing because all of its StreamTargets
196    /// have been dropped.
197    pub(crate) fn close(&mut self) {
198        Pin::new(&mut self.tx).close_channel();
199    }
200
201    /// Called when a circuit-level protocol error has occurred and the
202    /// circuit needs to shut down.
203    pub(crate) fn protocol_error(&mut self) {
204        self.circ.protocol_error();
205    }
206
207    /// Send a SENDME cell for this stream.
208    pub(crate) async fn send_sendme(&mut self) -> Result<()> {
209        let (tx, rx) = oneshot::channel();
210
211        self.circ
212            .control
213            .unbounded_send(CtrlMsg::SendSendme {
214                stream_id: self.stream_id,
215                hop: self.hop,
216                sender: tx,
217            })
218            .map_err(|_| Error::CircuitClosed)?;
219
220        rx.await.map_err(|_| Error::CircuitClosed)?
221    }
222
223    /// Return a reference to the circuit that this `StreamTarget` is using.
224    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
225    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
226        &self.circ
227    }
228
229    /// Return the kind of relay cell in use on this `StreamTarget`.
230    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
231        self.relay_cell_format
232    }
233}