tor_proto/
tunnel.rs

1//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2//! be single or multi path.
3
4pub mod circuit;
5mod halfstream;
6#[cfg(feature = "send-control-msg")]
7pub(crate) mod msghandler;
8pub(crate) mod reactor;
9mod streammap;
10
11use futures::SinkExt as _;
12use oneshot_fused_workaround as oneshot;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use crate::crypto::cell::HopNum;
17use crate::{Error, Result};
18use circuit::ClientCirc;
19use circuit::{handshake, StreamMpscSender};
20use reactor::{CtrlMsg, LegId};
21
22use tor_async_utils::SinkCloseChannel as _;
23use tor_cell::relaycell::msg::AnyRelayMsg;
24use tor_cell::relaycell::{RelayCellFormat, StreamId};
25
26// TODO(#1857): Make this pub and not `allow(dead_code)`.
27/// A precise position in a tunnel.
28#[allow(dead_code)]
29#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub(crate) enum HopLocation {
31    /// A specific position in a tunnel.
32    Hop((LegId, HopNum)),
33    /// The join point of a multi-path tunnel.
34    JoinPoint,
35}
36
37// TODO(#1857): Make this pub and not `allow(dead_code)`.
38/// A position in a tunnel.
39#[allow(dead_code)]
40#[derive(Debug, Copy, Clone, PartialEq, Eq)]
41pub(crate) enum TargetHop {
42    /// A specific position in a tunnel.
43    Hop(HopLocation),
44    /// The last hop of a tunnel.
45    ///
46    /// This should be used only when you don't care about what specific hop is used.
47    /// Some tunnels may be extended or truncated,
48    /// which means that the "last hop" may change at any time.
49    LastHop,
50}
51
52/// Internal handle, used to implement a stream on a particular circuit.
53///
54/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
55/// the reader should additionally hold an `mpsc::Receiver` to get
56/// relay messages for the stream.
57///
58/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
59/// close the stream by sending an END message to the other side.
60/// You can close a stream earlier by using [`StreamTarget::close`]
61/// or [`StreamTarget::close_pending`].
62#[derive(Clone, Debug)]
63pub(crate) struct StreamTarget {
64    /// Which hop of the circuit this stream is with.
65    hop: HopLocation,
66    /// Reactor ID for this stream.
67    stream_id: StreamId,
68    /// Encoding to use for relay cells sent on this stream.
69    ///
70    /// This is mostly irrelevant, except when deciding
71    /// how many bytes we can pack in a DATA message.
72    relay_cell_format: RelayCellFormat,
73    /// Channel to send cells down.
74    tx: StreamMpscSender<AnyRelayMsg>,
75    /// Reference to the circuit that this stream is on.
76    // TODO(conflux): this should be a ClientTunnel
77    circ: Arc<ClientCirc>,
78}
79
80impl StreamTarget {
81    /// Deliver a relay message for the stream that owns this StreamTarget.
82    ///
83    /// The StreamTarget will set the correct stream ID and pick the
84    /// right hop, but will not validate that the message is well-formed
85    /// or meaningful in context.
86    pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
87        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
88        Ok(())
89    }
90
91    /// Close the pending stream that owns this StreamTarget, delivering the specified
92    /// END message (if any)
93    ///
94    /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
95    ///
96    /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
97    ///
98    /// The StreamTarget will set the correct stream ID and pick the
99    /// right hop, but will not validate that the message is well-formed
100    /// or meaningful in context.
101    ///
102    /// Note that in many cases, the actual contents of an END message can leak unwanted
103    /// information. Please consider carefully before sending anything but an
104    /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
105    /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
106    ///
107    /// In addition to sending the END message, this function also ensures
108    /// the state of the stream map entry of this stream is updated
109    /// accordingly.
110    ///
111    /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
112    /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
113    /// function is for closing pending incoming streams (a stream is said to be pending if we have
114    /// received the message initiating the stream but have not responded to it yet).
115    ///
116    /// **NOTE**: This function should be called at most once per request.
117    /// Calling it twice is an error.
118    #[cfg(feature = "hs-service")]
119    pub(crate) fn close_pending(
120        &self,
121        message: reactor::CloseStreamBehavior,
122    ) -> Result<oneshot::Receiver<Result<()>>> {
123        let (tx, rx) = oneshot::channel();
124
125        self.circ
126            .control
127            .unbounded_send(CtrlMsg::ClosePendingStream {
128                stream_id: self.stream_id,
129                hop: self.hop,
130                message,
131                done: tx,
132            })
133            .map_err(|_| Error::CircuitClosed)?;
134
135        Ok(rx)
136    }
137
138    /// Queue a "close" for the stream corresponding to this StreamTarget.
139    ///
140    /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
141    ///
142    /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
143    /// on this `StreamTarget`` or any clone of it.
144    /// The reactor *will* try to flush any already-send messages before it closes the stream.
145    ///
146    /// You don't need to call this method if the stream is closing because all of its StreamTargets
147    /// have been dropped.
148    pub(crate) fn close(&mut self) {
149        Pin::new(&mut self.tx).close_channel();
150    }
151
152    /// Called when a circuit-level protocol error has occurred and the
153    /// circuit needs to shut down.
154    pub(crate) fn protocol_error(&mut self) {
155        self.circ.protocol_error();
156    }
157
158    /// Send a SENDME cell for this stream.
159    pub(crate) async fn send_sendme(&mut self) -> Result<()> {
160        let (tx, rx) = oneshot::channel();
161
162        self.circ
163            .control
164            .unbounded_send(CtrlMsg::SendSendme {
165                stream_id: self.stream_id,
166                hop: self.hop,
167                sender: tx,
168            })
169            .map_err(|_| Error::CircuitClosed)?;
170
171        rx.await.map_err(|_| Error::CircuitClosed)?
172    }
173
174    /// Return a reference to the circuit that this `StreamTarget` is using.
175    #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
176    pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
177        &self.circ
178    }
179
180    /// Return the kind of relay cell in use on this `StreamTarget`.
181    pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
182        self.relay_cell_format
183    }
184}