tor_proto/tunnel.rs
1//! Tunnel module that will encompass a generic tunnel wrapping around a circuit reactor that can
2//! be single or multi path.
3
4pub mod circuit;
5mod halfstream;
6#[cfg(feature = "send-control-msg")]
7pub(crate) mod msghandler;
8pub(crate) mod reactor;
9mod streammap;
10
11use futures::SinkExt as _;
12use oneshot_fused_workaround as oneshot;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use crate::crypto::cell::HopNum;
17use crate::{Error, Result};
18use circuit::ClientCirc;
19use circuit::{handshake, StreamMpscSender};
20use reactor::{CtrlMsg, LegId};
21
22use tor_async_utils::SinkCloseChannel as _;
23use tor_cell::relaycell::msg::AnyRelayMsg;
24use tor_cell::relaycell::{RelayCellFormat, StreamId};
25
26// TODO(#1857): Make this pub and not `allow(dead_code)`.
27/// A precise position in a tunnel.
28#[allow(dead_code)]
29#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub(crate) enum HopLocation {
31 /// A specific position in a tunnel.
32 Hop((LegId, HopNum)),
33 /// The join point of a multi-path tunnel.
34 JoinPoint,
35}
36
37// TODO(#1857): Make this pub and not `allow(dead_code)`.
38/// A position in a tunnel.
39#[allow(dead_code)]
40#[derive(Debug, Copy, Clone, PartialEq, Eq)]
41pub(crate) enum TargetHop {
42 /// A specific position in a tunnel.
43 Hop(HopLocation),
44 /// The last hop of a tunnel.
45 ///
46 /// This should be used only when you don't care about what specific hop is used.
47 /// Some tunnels may be extended or truncated,
48 /// which means that the "last hop" may change at any time.
49 LastHop,
50}
51
52/// Internal handle, used to implement a stream on a particular circuit.
53///
54/// The reader and the writer for a stream should hold a `StreamTarget` for the stream;
55/// the reader should additionally hold an `mpsc::Receiver` to get
56/// relay messages for the stream.
57///
58/// When all the `StreamTarget`s for a stream are dropped, the Reactor will
59/// close the stream by sending an END message to the other side.
60/// You can close a stream earlier by using [`StreamTarget::close`]
61/// or [`StreamTarget::close_pending`].
62#[derive(Clone, Debug)]
63pub(crate) struct StreamTarget {
64 /// Which hop of the circuit this stream is with.
65 hop: HopLocation,
66 /// Reactor ID for this stream.
67 stream_id: StreamId,
68 /// Encoding to use for relay cells sent on this stream.
69 ///
70 /// This is mostly irrelevant, except when deciding
71 /// how many bytes we can pack in a DATA message.
72 relay_cell_format: RelayCellFormat,
73 /// Channel to send cells down.
74 tx: StreamMpscSender<AnyRelayMsg>,
75 /// Reference to the circuit that this stream is on.
76 // TODO(conflux): this should be a ClientTunnel
77 circ: Arc<ClientCirc>,
78}
79
80impl StreamTarget {
81 /// Deliver a relay message for the stream that owns this StreamTarget.
82 ///
83 /// The StreamTarget will set the correct stream ID and pick the
84 /// right hop, but will not validate that the message is well-formed
85 /// or meaningful in context.
86 pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
87 self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
88 Ok(())
89 }
90
91 /// Close the pending stream that owns this StreamTarget, delivering the specified
92 /// END message (if any)
93 ///
94 /// The stream is closed by sending a [`CtrlMsg::ClosePendingStream`] message to the reactor.
95 ///
96 /// Returns a [`oneshot::Receiver`] that can be used to await the reactor's response.
97 ///
98 /// The StreamTarget will set the correct stream ID and pick the
99 /// right hop, but will not validate that the message is well-formed
100 /// or meaningful in context.
101 ///
102 /// Note that in many cases, the actual contents of an END message can leak unwanted
103 /// information. Please consider carefully before sending anything but an
104 /// [`End::new_misc()`](tor_cell::relaycell::msg::End::new_misc) message over a `ClientCirc`.
105 /// (For onion services, we send [`DONE`](tor_cell::relaycell::msg::EndReason::DONE) )
106 ///
107 /// In addition to sending the END message, this function also ensures
108 /// the state of the stream map entry of this stream is updated
109 /// accordingly.
110 ///
111 /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the
112 /// reactor when their corresponding `StreamTarget` is dropped. The only valid use of this
113 /// function is for closing pending incoming streams (a stream is said to be pending if we have
114 /// received the message initiating the stream but have not responded to it yet).
115 ///
116 /// **NOTE**: This function should be called at most once per request.
117 /// Calling it twice is an error.
118 #[cfg(feature = "hs-service")]
119 pub(crate) fn close_pending(
120 &self,
121 message: reactor::CloseStreamBehavior,
122 ) -> Result<oneshot::Receiver<Result<()>>> {
123 let (tx, rx) = oneshot::channel();
124
125 self.circ
126 .control
127 .unbounded_send(CtrlMsg::ClosePendingStream {
128 stream_id: self.stream_id,
129 hop: self.hop,
130 message,
131 done: tx,
132 })
133 .map_err(|_| Error::CircuitClosed)?;
134
135 Ok(rx)
136 }
137
138 /// Queue a "close" for the stream corresponding to this StreamTarget.
139 ///
140 /// Unlike `close_pending`, this method does not allow the caller to provide an `END` message.
141 ///
142 /// Once this method has been called, no more messages may be sent with [`StreamTarget::send`],
143 /// on this `StreamTarget`` or any clone of it.
144 /// The reactor *will* try to flush any already-send messages before it closes the stream.
145 ///
146 /// You don't need to call this method if the stream is closing because all of its StreamTargets
147 /// have been dropped.
148 pub(crate) fn close(&mut self) {
149 Pin::new(&mut self.tx).close_channel();
150 }
151
152 /// Called when a circuit-level protocol error has occurred and the
153 /// circuit needs to shut down.
154 pub(crate) fn protocol_error(&mut self) {
155 self.circ.protocol_error();
156 }
157
158 /// Send a SENDME cell for this stream.
159 pub(crate) async fn send_sendme(&mut self) -> Result<()> {
160 let (tx, rx) = oneshot::channel();
161
162 self.circ
163 .control
164 .unbounded_send(CtrlMsg::SendSendme {
165 stream_id: self.stream_id,
166 hop: self.hop,
167 sender: tx,
168 })
169 .map_err(|_| Error::CircuitClosed)?;
170
171 rx.await.map_err(|_| Error::CircuitClosed)?
172 }
173
174 /// Return a reference to the circuit that this `StreamTarget` is using.
175 #[cfg(any(feature = "experimental-api", feature = "stream-ctrl"))]
176 pub(crate) fn circuit(&self) -> &Arc<ClientCirc> {
177 &self.circ
178 }
179
180 /// Return the kind of relay cell in use on this `StreamTarget`.
181 pub(crate) fn relay_cell_format(&self) -> RelayCellFormat {
182 self.relay_cell_format
183 }
184}