tor_proto/client/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//! than we've sent data for.) This is handled within the reactor by the
12//! `StreamFlowCtrl`. For half-closed streams which don't send stream
13//! SENDMEs, an additional receive-window check is performed in the
14//! `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//! well-formed? For open streams, the streams themselves handle this check.
17//! For half-closed streams, the reactor handles it by calling
18//! `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23pub(super) mod syncview;
24
25use crate::circuit::circhop::SendRelayCell;
26use crate::circuit::{CircuitRxReceiver, UniqId};
27use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28use crate::client::circuit::{ClientCircChanMsg, TimeoutEstimator};
29use crate::client::{HopLocation, TargetHop};
30use crate::crypto::cell::HopNum;
31use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
32use crate::memquota::CircuitAccount;
33use crate::stream::CloseStreamBehavior;
34use crate::streammap;
35use crate::tunnel::{TunnelId, TunnelScopedCircId};
36use crate::util::err::ReactorError;
37use crate::util::skew::ClockSkew;
38use crate::{Error, Result};
39use circuit::Circuit;
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use std::cmp::Ordering;
43use std::collections::BinaryHeap;
44use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45use tor_cell::relaycell::msg::Sendme;
46use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48use tor_rtcompat::{DynTimeProvider, SleepProvider};
49
50use cfg_if::cfg_if;
51use futures::StreamExt;
52use futures::channel::mpsc;
53use futures::{FutureExt as _, select_biased};
54use oneshot_fused_workaround as oneshot;
55
56use std::result::Result as StdResult;
57use std::sync::Arc;
58use std::time::Duration;
59
60use crate::channel::Channel;
61use crate::conflux::msghandler::RemoveLegReason;
62use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63use circuit::CircuitCmd;
64use derive_more::From;
65use smallvec::smallvec;
66use tor_cell::chancell::CircId;
67use tor_llcrypto::pk;
68use tracing::{debug, info, instrument, trace, warn};
69
70use super::circuit::{MutableState, TunnelMutableState};
71
72#[cfg(feature = "hs-service")]
73use crate::stream::incoming::IncomingStreamRequestHandler;
74
75#[cfg(feature = "conflux")]
76use {
77 crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
78 crate::util::err::ConfluxHandshakeError,
79};
80
81pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
82
83/// The type of a oneshot channel used to inform reactor of the result of an operation.
84pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
85
86/// Contains a list of conflux handshake results.
87#[cfg(feature = "conflux")]
88pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
89
90/// The type of oneshot channel used to inform reactor users of the outcome
91/// of a client-side conflux handshake.
92///
93/// Contains a list of handshake results, one for each circuit that we were asked
94/// to link in the tunnel.
95#[cfg(feature = "conflux")]
96pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
97
98/// A handshake type, to be used when creating circuit hops.
99#[derive(Clone, Debug)]
100pub(crate) enum CircuitHandshake {
101 /// Use the CREATE_FAST handshake.
102 CreateFast,
103 /// Use the ntor handshake.
104 Ntor {
105 /// The public key of the relay.
106 public_key: NtorPublicKey,
107 /// The Ed25519 identity of the relay, which is verified against the
108 /// identity held in the circuit's channel.
109 ed_identity: pk::ed25519::Ed25519Identity,
110 },
111 /// Use the ntor-v3 handshake.
112 NtorV3 {
113 /// The public key of the relay.
114 public_key: NtorV3PublicKey,
115 },
116}
117
118// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
119// proliferation is a bit bothersome, but unavoidable with the current design.
120//
121// We should consider getting rid of some of these enums (if possible),
122// and coming up with more intuitive names.
123
124/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
125#[derive(From, Debug)]
126#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
127enum RunOnceCmd {
128 /// Run a single `RunOnceCmdInner` command.
129 Single(RunOnceCmdInner),
130 /// Run multiple `RunOnceCmdInner` commands.
131 //
132 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
133 // but most of the time we're only going to have *one* RunOnceCmdInner
134 // to run per run_once() loop. The enum enables us avoid the extra heap
135 // allocation for the `RunOnceCmd::Single` case.
136 Multiple(Vec<RunOnceCmdInner>),
137}
138
139/// Instructions for running something in the reactor loop.
140///
141/// Run at the end of [`Reactor::run_once`].
142//
143// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
144// We should consider making each variant a tuple variant and deduplicating the fields.
145#[derive(educe::Educe)]
146#[educe(Debug)]
147enum RunOnceCmdInner {
148 /// Send a RELAY cell.
149 Send {
150 /// The leg the cell should be sent on.
151 leg: UniqId,
152 /// The cell to send.
153 cell: SendRelayCell,
154 /// A channel for sending completion notifications.
155 done: Option<ReactorResultChannel<()>>,
156 },
157 /// Send a given control message on this circuit, and install a control-message handler to
158 /// receive responses.
159 #[cfg(feature = "send-control-msg")]
160 SendMsgAndInstallHandler {
161 /// The message to send, if any
162 msg: Option<AnyRelayMsgOuter>,
163 /// A message handler to install.
164 ///
165 /// If this is `None`, there must already be a message handler installed
166 #[educe(Debug(ignore))]
167 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
168 /// A sender that we use to tell the caller that the message was sent
169 /// and the handler installed.
170 done: oneshot::Sender<Result<()>>,
171 },
172 /// Handle a SENDME message.
173 HandleSendMe {
174 /// The leg the SENDME was received on.
175 leg: UniqId,
176 /// The hop number.
177 hop: HopNum,
178 /// The SENDME message to handle.
179 sendme: Sendme,
180 },
181 /// Begin a stream with the provided hop in this circuit.
182 ///
183 /// Uses the provided stream ID, and sends the provided message to that hop.
184 BeginStream {
185 /// The cell to send.
186 cell: Result<(SendRelayCell, StreamId)>,
187 /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
188 /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
189 /// caller.
190 hop: HopLocation,
191 /// The circuit leg to begin the stream on.
192 leg: UniqId,
193 /// Oneshot channel to notify on completion, with the allocated stream ID.
194 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
195 },
196 /// Consider sending an XON message with the given `rate`.
197 MaybeSendXon {
198 /// The drain rate to advertise in the XON message.
199 rate: XonKbpsEwma,
200 /// The ID of the stream to send the message on.
201 stream_id: StreamId,
202 /// The location of the hop on the tunnel.
203 hop: HopLocation,
204 },
205 /// Close the specified stream.
206 CloseStream {
207 /// The hop number.
208 hop: HopLocation,
209 /// The ID of the stream to close.
210 sid: StreamId,
211 /// The stream-closing behavior.
212 behav: CloseStreamBehavior,
213 /// The reason for closing the stream.
214 reason: streammap::TerminateReason,
215 /// A channel for sending completion notifications.
216 done: Option<ReactorResultChannel<()>>,
217 },
218 /// Get the clock skew claimed by the first hop of the circuit.
219 FirstHopClockSkew {
220 /// Oneshot channel to return the clock skew.
221 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
222 },
223 /// Remove a circuit leg from the conflux set.
224 RemoveLeg {
225 /// The circuit leg to remove.
226 leg: UniqId,
227 /// The reason for removal.
228 ///
229 /// This is only used for conflux circuits that get removed
230 /// before the conflux handshake is complete.
231 ///
232 /// The [`RemoveLegReason`] is mapped by the reactor to a
233 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
234 /// handshake to indicate the reason the handshake failed.
235 reason: RemoveLegReason,
236 },
237 /// A circuit has completed the conflux handshake,
238 /// and wants to send the specified cell.
239 ///
240 /// This is similar to [`RunOnceCmdInner::Send`],
241 /// but needs to remain a separate variant,
242 /// because in addition to instructing the reactor to send a cell,
243 /// it also notifies it that the conflux handshake is complete on the specified `leg`.
244 /// This enables the reactor to save the handshake result (`Ok(())`),
245 /// and, if there are no other legs still in the handshake phase,
246 /// send the result to the handshake initiator.
247 #[cfg(feature = "conflux")]
248 ConfluxHandshakeComplete {
249 /// The circuit leg that has completed the handshake,
250 /// This is the leg the cell should be sent on.
251 leg: UniqId,
252 /// The cell to send.
253 cell: SendRelayCell,
254 },
255 /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
256 #[cfg(feature = "conflux")]
257 Link {
258 /// The circuits to link into the tunnel
259 #[educe(Debug(ignore))]
260 circuits: Vec<Circuit>,
261 /// Oneshot channel for notifying of conflux handshake completion.
262 answer: ConfluxLinkResultChannel,
263 },
264 /// Enqueue an out-of-order cell in ooo_msg.
265 #[cfg(feature = "conflux")]
266 Enqueue {
267 /// The leg the entry originated from.
268 leg: UniqId,
269 /// The out-of-order message.
270 msg: OooRelayMsg,
271 },
272 /// Take a padding-related event on a circuit leg.
273 #[cfg(feature = "circ-padding")]
274 PaddingAction {
275 /// The leg to event on.
276 leg: UniqId,
277 /// The event to take.
278 padding_event: PaddingEvent,
279 },
280 /// Perform a clean shutdown on this circuit.
281 CleanShutdown,
282}
283
284impl RunOnceCmdInner {
285 /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
286 fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
287 match cmd {
288 CircuitCmd::Send(cell) => Self::Send {
289 leg,
290 cell,
291 done: None,
292 },
293 CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
294 CircuitCmd::CloseStream {
295 hop,
296 sid,
297 behav,
298 reason,
299 } => Self::CloseStream {
300 hop: HopLocation::Hop((leg, hop)),
301 sid,
302 behav,
303 reason,
304 done: None,
305 },
306 #[cfg(feature = "conflux")]
307 CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
308 #[cfg(feature = "conflux")]
309 CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
310 let cell = SendRelayCell {
311 hop: Some(hop),
312 early,
313 cell,
314 };
315 Self::ConfluxHandshakeComplete { leg, cell }
316 }
317 #[cfg(feature = "conflux")]
318 CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
319 CircuitCmd::CleanShutdown => Self::CleanShutdown,
320 }
321 }
322}
323
324/// A command to execute at the end of [`Reactor::run_once`].
325#[derive(From, Debug)]
326#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
327enum CircuitEvent {
328 /// Run a single `CircuitCmd` command.
329 RunCmd {
330 /// The unique identifier of the circuit leg to run the command on
331 leg: UniqId,
332 /// The command to run.
333 cmd: CircuitCmd,
334 },
335 /// Handle a control message
336 HandleControl(CtrlMsg),
337 /// Handle an input message.
338 HandleCell {
339 /// The unique identifier of the circuit leg the message was received on.
340 leg: UniqId,
341 /// The message to handle.
342 cell: ClientCircChanMsg,
343 },
344 /// Remove the specified circuit leg from the conflux set.
345 ///
346 /// Returned whenever a single circuit leg needs to be be removed
347 /// from the reactor's conflux set, without necessarily tearing down
348 /// the whole set or shutting down the reactor.
349 ///
350 /// Note: this event *can* cause the reactor to shut down
351 /// (and the conflux set to be closed).
352 ///
353 /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
354 RemoveLeg {
355 /// The leg to remove.
356 leg: UniqId,
357 /// The reason for removal.
358 ///
359 /// This is only used for conflux circuits that get removed
360 /// before the conflux handshake is complete.
361 ///
362 /// The [`RemoveLegReason`] is mapped by the reactor to a
363 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
364 /// handshake to indicate the reason the handshake failed.
365 reason: RemoveLegReason,
366 },
367 /// Take some event (blocking or unblocking a circuit, or sending padding)
368 /// based on the circuit padding backend code.
369 PaddingAction {
370 /// The leg on which to take the padding event .
371 leg: UniqId,
372 /// The event to take.
373 padding_event: PaddingEvent,
374 },
375 /// Protocol violation. This leads for now to the close of the circuit reactor. The
376 /// error causing the violation is set in err.
377 ProtoViolation {
378 /// The error that causes this protocol violation.
379 err: crate::Error,
380 },
381}
382
383impl CircuitEvent {
384 /// Return the ordering with which we should handle this event
385 /// within a list of events returned by a single call to next_circ_event().
386 ///
387 /// NOTE: Please do not make this any more complicated:
388 /// It is a consequence of a kludge that we need this sorting at all.
389 /// Assuming that eventually, we switch away from the current
390 /// poll-oriented `next_circ_event` design,
391 /// we may be able to get rid of this entirely.
392 fn order_within_batch(&self) -> u8 {
393 use CircuitEvent as CA;
394 use PaddingEvent as PE;
395 // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
396 // only used by the protocol violation event which leads to a shutdown of the reactor.
397 const IMMEDIATE: u8 = 0;
398 const EARLY: u8 = 1;
399 const NORMAL: u8 = 2;
400 const LATE: u8 = 3;
401
402 // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
403 // "StopBlocking" to the start.
404 //
405 // This way, we can be sure that we will handle any "send data" operations
406 // (and tell the Padder about them) _before_ we tell the Padder
407 // that we have blocked the circuit.
408 //
409 // This keeps things a bit more logical.
410 match self {
411 CA::RunCmd { .. } => NORMAL,
412 CA::HandleControl(..) => NORMAL,
413 CA::HandleCell { .. } => NORMAL,
414 CA::RemoveLeg { .. } => NORMAL,
415 #[cfg(feature = "circ-padding")]
416 CA::PaddingAction { padding_event, .. } => match padding_event {
417 PE::StopBlocking => EARLY,
418 PE::SendPadding(..) => NORMAL,
419 PE::StartBlocking(..) => LATE,
420 },
421 #[cfg(not(feature = "circ-padding"))]
422 CA::PaddingAction { .. } => NORMAL,
423 CA::ProtoViolation { .. } => IMMEDIATE,
424 }
425 }
426}
427
428/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
429/// progress.
430///
431/// # Background
432///
433/// The `Reactor` can't have async functions that send and receive cells, because its job is to
434/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
435///
436/// To get around this problem, the reactor can send some cells, and then make one of these
437/// `MetaCellHandler` objects, which will be run when the reply arrives.
438pub(crate) trait MetaCellHandler: Send {
439 /// The hop we're expecting the message to come from. This is compared against the hop
440 /// from which we actually receive messages, and an error is thrown if the two don't match.
441 fn expected_hop(&self) -> HopLocation;
442 /// Called when the message we were waiting for arrives.
443 ///
444 /// Gets a copy of the `Reactor` in order to do anything it likes there.
445 ///
446 /// If this function returns an error, the reactor will shut down.
447 fn handle_msg(
448 &mut self,
449 msg: UnparsedRelayMsg,
450 reactor: &mut Circuit,
451 ) -> Result<MetaCellDisposition>;
452}
453
454/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
455#[derive(Debug, Clone)]
456#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
457#[non_exhaustive]
458pub(crate) enum MetaCellDisposition {
459 /// The message was consumed; the handler should remain installed.
460 #[cfg(feature = "send-control-msg")]
461 Consumed,
462 /// The message was consumed; the handler should be uninstalled.
463 ConversationFinished,
464 /// The message was consumed; the circuit should be closed.
465 #[cfg(feature = "send-control-msg")]
466 CloseCirc,
467 // TODO: Eventually we might want the ability to have multiple handlers
468 // installed, and to let them say "not for me, maybe for somebody else?".
469 // But right now we don't need that.
470}
471
472/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
473///
474/// This is a macro instead of a function to work around borrowck errors
475/// in the select! from run_once().
476macro_rules! unwrap_or_shutdown {
477 ($self:expr, $res:expr, $reason:expr) => {{
478 match $res {
479 None => {
480 trace!(
481 tunnel_id = %$self.tunnel_id,
482 reason = %$reason,
483 "reactor shutdown"
484 );
485 Err(ReactorError::Shutdown)
486 }
487 Some(v) => Ok(v),
488 }
489 }};
490}
491
492/// Object to handle incoming cells and background tasks on a circuit
493///
494/// This type is returned when you finish a circuit; you need to spawn a
495/// new task that calls `run()` on it.
496#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
497pub struct Reactor {
498 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
499 ///
500 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
501 /// is ready to accept cells.
502 control: mpsc::UnboundedReceiver<CtrlMsg>,
503 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
504 ///
505 /// This channel is polled in [`Reactor::run_once`].
506 ///
507 /// NOTE: this is a separate channel from `control`, because some messages
508 /// have higher priority and need to be handled even if the `chan_sender` is not
509 /// ready (whereas `control` messages are not read until the `chan_sender` sink
510 /// is ready to accept cells).
511 command: mpsc::UnboundedReceiver<CtrlCmd>,
512 /// A oneshot sender that is used to alert other tasks when this reactor is
513 /// finally dropped.
514 ///
515 /// It is a sender for Void because we never actually want to send anything here;
516 /// we only want to generate canceled events.
517 #[allow(dead_code)] // the only purpose of this field is to be dropped.
518 reactor_closed_tx: oneshot::Sender<void::Void>,
519 /// A set of circuits that form a tunnel.
520 ///
521 /// Contains 1 or more circuits.
522 ///
523 /// Circuits may be added to this set throughout the lifetime of the reactor.
524 ///
525 /// Sometimes, the reactor will remove circuits from this set,
526 /// for example if the `LINKED` message takes too long to arrive,
527 /// or if congestion control negotiation fails.
528 /// The reactor will continue running with the remaining circuits.
529 /// It will shut down if *all* the circuits are removed.
530 ///
531 // TODO(conflux): document all the reasons why the reactor might
532 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
533 circuits: ConfluxSet,
534 /// An identifier for logging about this tunnel reactor.
535 tunnel_id: TunnelId,
536 /// Handlers, shared with `Circuit`.
537 cell_handlers: CellHandlers,
538 /// The time provider, used for conflux handshake timeouts.
539 runtime: DynTimeProvider,
540 /// The conflux handshake context, if there is an on-going handshake.
541 ///
542 /// Set to `None` if this is a single-path tunnel,
543 /// or if none of the circuit legs from our conflux set
544 /// are currently in the conflux handshake phase.
545 #[cfg(feature = "conflux")]
546 conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
547 /// A min-heap buffering all the out-of-order messages received so far.
548 ///
549 /// TODO(conflux): this becomes a DoS vector unless we impose a limit
550 /// on its size. We should make this participate in the memquota memory
551 /// tracking system, somehow.
552 #[cfg(feature = "conflux")]
553 ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
554}
555
556/// The context for an on-going conflux handshake.
557#[cfg(feature = "conflux")]
558struct ConfluxHandshakeCtx {
559 /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
560 answer: ConfluxLinkResultChannel,
561 /// The number of legs that are currently doing the handshake.
562 num_legs: usize,
563 /// The handshake results we have collected so far.
564 results: ConfluxHandshakeResult,
565}
566
567/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
568#[derive(Debug)]
569#[cfg(feature = "conflux")]
570struct ConfluxHeapEntry {
571 /// The leg id this message came from.
572 leg_id: UniqId,
573 /// The out of order message
574 msg: OooRelayMsg,
575}
576
577#[cfg(feature = "conflux")]
578impl Ord for ConfluxHeapEntry {
579 fn cmp(&self, other: &Self) -> Ordering {
580 self.msg.cmp(&other.msg)
581 }
582}
583
584#[cfg(feature = "conflux")]
585impl PartialOrd for ConfluxHeapEntry {
586 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
587 Some(self.cmp(other))
588 }
589}
590
591#[cfg(feature = "conflux")]
592impl PartialEq for ConfluxHeapEntry {
593 fn eq(&self, other: &Self) -> bool {
594 self.msg == other.msg
595 }
596}
597
598#[cfg(feature = "conflux")]
599impl Eq for ConfluxHeapEntry {}
600
601/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
602struct CellHandlers {
603 /// A handler for a meta cell, together with a result channel to notify on completion.
604 ///
605 /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
606 ///
607 /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
608 /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
609 /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
610 /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
611 /// (which are handled separately) or conflux cells
612 /// (which are handled by the conflux handlers).
613 ///
614 /// The handler is uninstalled after the receipt of the EXTENDED cell,
615 /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
616 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
617 /// A handler for incoming stream requests.
618 #[cfg(feature = "hs-service")]
619 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
620}
621
622impl Reactor {
623 /// Create a new circuit reactor.
624 ///
625 /// The reactor will send outbound messages on `channel`, receive incoming
626 /// messages on `input`, and identify this circuit by the channel-local
627 /// [`CircId`] provided.
628 ///
629 /// The internal unique identifier for this circuit will be `unique_id`.
630 #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
631 pub(super) fn new(
632 channel: Arc<Channel>,
633 channel_id: CircId,
634 unique_id: UniqId,
635 input: CircuitRxReceiver,
636 runtime: DynTimeProvider,
637 memquota: CircuitAccount,
638 padding_ctrl: PaddingController,
639 padding_stream: PaddingEventStream,
640 timeouts: Arc<dyn TimeoutEstimator + Send>,
641 ) -> (
642 Self,
643 mpsc::UnboundedSender<CtrlMsg>,
644 mpsc::UnboundedSender<CtrlCmd>,
645 oneshot::Receiver<void::Void>,
646 Arc<TunnelMutableState>,
647 ) {
648 let tunnel_id = TunnelId::next();
649 let (control_tx, control_rx) = mpsc::unbounded();
650 let (command_tx, command_rx) = mpsc::unbounded();
651 let mutable = Arc::new(MutableState::default());
652
653 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
654
655 let cell_handlers = CellHandlers {
656 meta_handler: None,
657 #[cfg(feature = "hs-service")]
658 incoming_stream_req_handler: None,
659 };
660
661 let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
662 let circuit_leg = Circuit::new(
663 runtime.clone(),
664 channel,
665 channel_id,
666 unique_id,
667 input,
668 memquota,
669 Arc::clone(&mutable),
670 padding_ctrl,
671 padding_stream,
672 timeouts,
673 );
674
675 let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
676
677 let reactor = Reactor {
678 circuits,
679 control: control_rx,
680 command: command_rx,
681 reactor_closed_tx,
682 tunnel_id,
683 cell_handlers,
684 runtime,
685 #[cfg(feature = "conflux")]
686 conflux_hs_ctx: None,
687 #[cfg(feature = "conflux")]
688 ooo_msgs: Default::default(),
689 };
690
691 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
692 }
693
694 /// Launch the reactor, and run until the circuit closes or we
695 /// encounter an error.
696 ///
697 /// Once this method returns, the circuit is dead and cannot be
698 /// used again.
699 #[instrument(level = "trace", skip_all)]
700 pub async fn run(mut self) -> Result<()> {
701 trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
702 let result: Result<()> = loop {
703 match self.run_once().await {
704 Ok(()) => (),
705 Err(ReactorError::Shutdown) => break Ok(()),
706 Err(ReactorError::Err(e)) => break Err(e),
707 }
708 };
709
710 // Log that the reactor stopped, possibly with the associated error as a report.
711 // May log at a higher level depending on the error kind.
712 const MSG: &str = "Tunnel reactor stopped";
713 match &result {
714 Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
715 Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
716 }
717
718 result
719 }
720
721 /// Helper for run: doesn't mark the circuit closed on finish. Only
722 /// processes one cell or control message.
723 #[instrument(level = "trace", skip_all)]
724 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
725 // If all the circuits are closed, shut down the reactor
726 if self.circuits.is_empty() {
727 trace!(
728 tunnel_id = %self.tunnel_id,
729 "Tunnel reactor shutting down: all circuits have closed",
730 );
731
732 return Err(ReactorError::Shutdown);
733 }
734
735 // If this is a single path circuit, we need to wait until the first hop
736 // is created before doing anything else
737 let single_path_with_hops = self
738 .circuits
739 .single_leg_mut()
740 .is_ok_and(|leg| !leg.has_hops());
741 if single_path_with_hops {
742 self.wait_for_create().await?;
743
744 return Ok(());
745 }
746
747 // Prioritize the buffered messages.
748 //
749 // Note: if any of the messages are ready to be handled,
750 // this will block the reactor until we are done processing them
751 //
752 // TODO circpad: If this is a problem, we might want to re-order things so that we
753 // prioritize padding instead. On the other hand, this should be fixed by refactoring
754 // circuit and tunnel reactors, so we might do well to just leave it alone for now.
755 #[cfg(feature = "conflux")]
756 self.try_dequeue_ooo_msgs().await?;
757
758 let mut events = select_biased! {
759 res = self.command.next() => {
760 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
761 return ControlHandler::new(self).handle_cmd(cmd);
762 },
763 // Check whether we've got a control message pending.
764 //
765 // Note: unfortunately, reading from control here means we might start
766 // handling control messages before our chan_senders are ready.
767 // With the current design, this is inevitable: we can't know which circuit leg
768 // a control message is meant for without first reading the control message from
769 // the channel, and at that point, we can't know for sure whether that particular
770 // circuit is ready for sending.
771 ret = self.control.next() => {
772 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
773 smallvec![CircuitEvent::HandleControl(msg)]
774 },
775 res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
776 };
777
778 // Put the events into the order that we need to execute them in.
779 //
780 // (Yes, this _does_ have to be a stable sort. Not all events may be freely re-ordered
781 // with respect to one another.)
782 events.sort_by_key(|a| a.order_within_batch());
783
784 for event in events {
785 let cmd = match event {
786 CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
787 RunOnceCmdInner::from_circuit_cmd(leg, cmd),
788 )),
789 CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
790 .handle_msg(ctrl)?
791 .map(RunOnceCmd::Single),
792 CircuitEvent::HandleCell { leg, cell } => {
793 let circ = self
794 .circuits
795 .leg_mut(leg)
796 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
797
798 let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
799 if circ_cmds.is_empty() {
800 None
801 } else {
802 // TODO: we return RunOnceCmd::Multiple even if there's a single command.
803 //
804 // See the TODO on `Circuit::handle_cell`.
805 let cmd = RunOnceCmd::Multiple(
806 circ_cmds
807 .into_iter()
808 .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
809 .collect(),
810 );
811
812 Some(cmd)
813 }
814 }
815 CircuitEvent::RemoveLeg { leg, reason } => {
816 Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
817 }
818 CircuitEvent::PaddingAction { leg, padding_event } => {
819 cfg_if! {
820 if #[cfg(feature = "circ-padding")] {
821 Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
822 } else {
823 // If padding isn't enabled, we never generate a padding event,
824 // so we can be sure this case will never be called.
825 void::unreachable(padding_event.0);
826 }
827 }
828 }
829 CircuitEvent::ProtoViolation { err } => {
830 return Err(err.into());
831 }
832 };
833
834 if let Some(cmd) = cmd {
835 self.handle_run_once_cmd(cmd).await?;
836 }
837 }
838
839 Ok(())
840 }
841
842 /// Try to process the previously-out-of-order messages we might have buffered.
843 #[cfg(feature = "conflux")]
844 #[instrument(level = "trace", skip_all)]
845 async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
846 // Check if we're ready to dequeue any of the previously out-of-order cells.
847 while let Some(entry) = self.ooo_msgs.peek() {
848 let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
849
850 if !should_pop {
851 break;
852 }
853
854 let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
855
856 let circ = self
857 .circuits
858 .leg_mut(entry.leg_id)
859 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
860 let handlers = &mut self.cell_handlers;
861 let cmd = circ
862 .handle_in_order_relay_msg(
863 handlers,
864 entry.msg.hopnum,
865 entry.leg_id,
866 entry.msg.cell_counts_towards_windows,
867 entry.msg.streamid,
868 entry.msg.msg,
869 )?
870 .map(|cmd| {
871 RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
872 });
873
874 if let Some(cmd) = cmd {
875 self.handle_run_once_cmd(cmd).await?;
876 }
877 }
878
879 Ok(())
880 }
881
882 /// Handle a [`RunOnceCmd`].
883 #[instrument(level = "trace", skip_all)]
884 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
885 match cmd {
886 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
887 RunOnceCmd::Multiple(cmds) => {
888 // While we know `sendable` is ready to accept *one* cell,
889 // we can't be certain it will be able to accept *all* of the cells
890 // that need to be sent here. This means we *may* end up buffering
891 // in its underlying SometimesUnboundedSink! That is OK, because
892 // RunOnceCmd::Multiple is only used for handling packed cells.
893 for cmd in cmds {
894 self.handle_single_run_once_cmd(cmd).await?;
895 }
896 }
897 }
898
899 Ok(())
900 }
901
902 /// Handle a [`RunOnceCmd`].
903 #[instrument(level = "trace", skip_all)]
904 async fn handle_single_run_once_cmd(
905 &mut self,
906 cmd: RunOnceCmdInner,
907 ) -> StdResult<(), ReactorError> {
908 match cmd {
909 RunOnceCmdInner::Send { leg, cell, done } => {
910 // TODO: check the cc window
911 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
912 if let Some(done) = done {
913 // Don't care if the receiver goes away
914 let _ = done.send(res.clone());
915 }
916 res?;
917 }
918 #[cfg(feature = "send-control-msg")]
919 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
920 let cell: Result<Option<SendRelayCell>> =
921 self.prepare_msg_and_install_handler(msg, handler);
922
923 match cell {
924 Ok(Some(cell)) => {
925 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
926 let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
927 // don't care if receiver goes away.
928 let _ = done.send(outcome.clone());
929 outcome?;
930 }
931 Ok(None) => {
932 // don't care if receiver goes away.
933 let _ = done.send(Ok(()));
934 }
935 Err(e) => {
936 // don't care if receiver goes away.
937 let _ = done.send(Err(e.clone()));
938 return Err(e.into());
939 }
940 }
941 }
942 RunOnceCmdInner::BeginStream {
943 leg,
944 cell,
945 hop,
946 done,
947 } => {
948 match cell {
949 Ok((cell, stream_id)) => {
950 let circ = self
951 .circuits
952 .leg_mut(leg)
953 .ok_or_else(|| internal!("leg disappeared?!"))?;
954 let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
955 let relay_format = circ
956 .hop_mut(cell_hop)
957 // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
958 .ok_or(Error::NoSuchHop)?
959 .relay_cell_format();
960
961 let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
962 // don't care if receiver goes away.
963 let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
964 outcome?;
965 }
966 Err(e) => {
967 // don't care if receiver goes away.
968 let _ = done.send(Err(e.clone()));
969 return Err(e.into());
970 }
971 }
972 }
973 RunOnceCmdInner::CloseStream {
974 hop,
975 sid,
976 behav,
977 reason,
978 done,
979 } => {
980 let result = {
981 let (leg_id, hop_num) = self
982 .resolve_hop_location(hop)
983 .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
984 let leg = self
985 .circuits
986 .leg_mut(leg_id)
987 .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
988 Ok::<_, Bug>((leg, hop_num))
989 };
990
991 let (leg, hop_num) = match result {
992 Ok(x) => x,
993 Err(e) => {
994 if let Some(done) = done {
995 // don't care if the sender goes away
996 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
997 let _ = done.send(Err(e.into()));
998 }
999 return Ok(());
1000 }
1001 };
1002
1003 let max_rtt = {
1004 let hop = leg
1005 .hop(hop_num)
1006 .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1007 let ccontrol = hop.ccontrol();
1008
1009 // Note: if we have no measurements for the RTT, this will be set to 0,
1010 // and the timeout will be 2 * CBT.
1011 ccontrol
1012 .rtt()
1013 .max_rtt_usec()
1014 .map(|rtt| Duration::from_millis(u64::from(rtt)))
1015 .unwrap_or_default()
1016 };
1017
1018 // The length of the circuit up until the hop that has the half-streeam.
1019 //
1020 // +1, because HopNums are zero-based.
1021 let circ_len = usize::from(hop_num) + 1;
1022
1023 // We double the CBT to account for rend circuits,
1024 // which are twice as long (otherwise we risk expiring
1025 // the rend half-streams too soon).
1026 let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1027 let expire_at = self.runtime.now() + timeout;
1028
1029 let res: Result<()> = leg
1030 .close_stream(hop_num, sid, behav, reason, expire_at)
1031 .await;
1032
1033 if let Some(done) = done {
1034 // don't care if the sender goes away
1035 let _ = done.send(res);
1036 }
1037 }
1038 RunOnceCmdInner::MaybeSendXon {
1039 rate,
1040 stream_id,
1041 hop,
1042 } => {
1043 let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1044 Ok(x) => x,
1045 Err(NoJoinPointError) => {
1046 // A stream tried to send an XON message message to the join point of
1047 // a tunnel that has never had a join point. Currently in arti, only a
1048 // `StreamTarget` asks us to send an XON message, and this tunnel
1049 // originally created the `StreamTarget` to begin with. So this is a
1050 // legitimate bug somewhere in the tunnel code.
1051 return Err(
1052 internal!(
1053 "Could not send an XON message to a join point on a tunnel without a join point",
1054 )
1055 .into()
1056 );
1057 }
1058 };
1059
1060 let Some(leg) = self.circuits.leg_mut(leg_id) else {
1061 // The leg has disappeared. This is fine since the stream may have ended and
1062 // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1063 // It is possible that is a bug and this is an incorrect leg number, but
1064 // it's not currently possible to differentiate between an incorrect leg
1065 // number and a tunnel leg that has been closed.
1066 debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1067 return Ok(());
1068 };
1069
1070 let Some(hop) = leg.hop_mut(hop_num) else {
1071 // The hop has disappeared. This is fine since the circuit may have been
1072 // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1073 // It is possible that is a bug and this is an incorrect hop number, but
1074 // it's not currently possible to differentiate between an incorrect hop
1075 // number and a circuit hop that has been removed.
1076 debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1077 return Ok(());
1078 };
1079
1080 let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1081 // Nothing to do.
1082 return Ok(());
1083 };
1084
1085 let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1086 let cell = SendRelayCell {
1087 hop: Some(hop_num),
1088 early: false,
1089 cell,
1090 };
1091
1092 leg.send_relay_cell(cell).await?;
1093 }
1094 RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1095 let leg = self
1096 .circuits
1097 .leg_mut(leg)
1098 .ok_or_else(|| internal!("leg disappeared?!"))?;
1099 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1100 // future which *should* resolve immediately
1101 let signals = leg.chan_sender.congestion_signals().await;
1102 leg.handle_sendme(hop, sendme, signals)?;
1103 }
1104 RunOnceCmdInner::FirstHopClockSkew { answer } => {
1105 let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1106
1107 // don't care if the sender goes away
1108 let _ = answer.send(res.map_err(Into::into));
1109 }
1110 RunOnceCmdInner::CleanShutdown => {
1111 trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1112 return Err(ReactorError::Shutdown);
1113 }
1114 RunOnceCmdInner::RemoveLeg { leg, reason } => {
1115 warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1116
1117 let circ = self.circuits.remove(leg)?;
1118 let is_conflux_pending = circ.is_conflux_pending();
1119
1120 // Drop the removed leg. This will cause it to close if it's not already closed.
1121 drop(circ);
1122
1123 // If we reach this point, it means we have more than one leg
1124 // (otherwise the .remove() would've returned a Shutdown error),
1125 // so we expect there to be a ConfluxHandshakeContext installed.
1126
1127 #[cfg(feature = "conflux")]
1128 if is_conflux_pending {
1129 let (error, proto_violation): (_, Option<Error>) = match &reason {
1130 RemoveLegReason::ConfluxHandshakeTimeout => {
1131 (ConfluxHandshakeError::Timeout, None)
1132 }
1133 RemoveLegReason::ConfluxHandshakeErr(e) => {
1134 (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1135 }
1136 RemoveLegReason::ChannelClosed => {
1137 (ConfluxHandshakeError::ChannelClosed, None)
1138 }
1139 };
1140
1141 self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1142
1143 if let Some(e) = proto_violation {
1144 tor_error::warn_report!(
1145 e,
1146 tunnel_id = %self.tunnel_id,
1147 "Malformed conflux handshake, tearing down tunnel",
1148 );
1149
1150 return Err(e.into());
1151 }
1152 }
1153 }
1154 #[cfg(feature = "conflux")]
1155 RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1156 // Note: on the client-side, the handshake is considered complete once the
1157 // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1158 //
1159 // We're optimistic here, and declare the handshake a success *before*
1160 // sending the LINKED_ACK response. I think this is OK though,
1161 // because if the send_relay_cell() below fails, the reactor will shut
1162 // down anyway. OTOH, marking the handshake as complete slightly early
1163 // means that on the happy path, the circuit is marked as usable sooner,
1164 // instead of blocking on the sending of the LINKED_ACK.
1165 self.note_conflux_handshake_result(Ok(()), false)?;
1166
1167 let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1168
1169 res?;
1170 }
1171 #[cfg(feature = "conflux")]
1172 RunOnceCmdInner::Link { circuits, answer } => {
1173 // Add the specified circuits to our conflux set,
1174 // and send a LINK cell down each unlinked leg.
1175 //
1176 // NOTE: this will block the reactor until all the cells are sent.
1177 self.handle_link_circuits(circuits, answer).await?;
1178 }
1179 #[cfg(feature = "conflux")]
1180 RunOnceCmdInner::Enqueue { leg, msg } => {
1181 let entry = ConfluxHeapEntry { leg_id: leg, msg };
1182 self.ooo_msgs.push(entry);
1183 }
1184 #[cfg(feature = "circ-padding")]
1185 RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1186 // TODO: If we someday move back to having a per-circuit reactor,
1187 // this event would logically belong there, not on the tunnel reactor.
1188 self.circuits.run_padding_event(leg, padding_event).await?;
1189 }
1190 }
1191
1192 Ok(())
1193 }
1194
1195 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1196 ///
1197 /// Returns an error if an unexpected `CtrlMsg` is received.
1198 #[instrument(level = "trace", skip_all)]
1199 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1200 let msg = select_biased! {
1201 res = self.command.next() => {
1202 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1203 match cmd {
1204 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1205 #[cfg(test)]
1206 CtrlCmd::AddFakeHop {
1207 relay_cell_format: format,
1208 fwd_lasthop,
1209 rev_lasthop,
1210 peer_id,
1211 params,
1212 done,
1213 } => {
1214 let leg = self.circuits.single_leg_mut()?;
1215 leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, ¶ms, done);
1216 return Ok(())
1217 },
1218 _ => {
1219 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1220 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1221 }
1222 }
1223 },
1224 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1225 };
1226
1227 match msg {
1228 CtrlMsg::Create {
1229 recv_created,
1230 handshake,
1231 settings,
1232 done,
1233 } => {
1234 // TODO(conflux): instead of crashing the reactor, it might be better
1235 // to send the error via the done channel instead
1236 let leg = self.circuits.single_leg_mut()?;
1237 leg.handle_create(recv_created, handshake, settings, done)
1238 .await
1239 }
1240 _ => {
1241 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1242
1243 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1244 }
1245 }
1246 }
1247
1248 /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1249 ///
1250 /// If all the circuits we were waiting on have finished the conflux handshake,
1251 /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1252 /// are sent to the handshake initiator.
1253 #[cfg(feature = "conflux")]
1254 #[instrument(level = "trace", skip_all)]
1255 fn note_conflux_handshake_result(
1256 &mut self,
1257 res: StdResult<(), ConfluxHandshakeError>,
1258 reactor_is_closing: bool,
1259 ) -> StdResult<(), ReactorError> {
1260 let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1261 Some(conflux_ctx) => {
1262 conflux_ctx.results.push(res);
1263 // Whether all the legs have finished linking:
1264 conflux_ctx.results.len() == conflux_ctx.num_legs
1265 }
1266 None => {
1267 return Err(internal!("no conflux handshake context").into());
1268 }
1269 };
1270
1271 if tunnel_complete || reactor_is_closing {
1272 // Time to remove the conflux handshake context
1273 // and extract the results we have collected
1274 let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1275
1276 let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1277 let leg_count = conflux_ctx.results.len();
1278
1279 info!(
1280 tunnel_id = %self.tunnel_id,
1281 "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1282 );
1283
1284 send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1285
1286 // We don't expect to receive any more handshake results,
1287 // at least not until we get another LinkCircuits control message,
1288 // which will install a new ConfluxHandshakeCtx with a channel
1289 // for us to send updates on
1290 }
1291
1292 Ok(())
1293 }
1294
1295 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1296 fn prepare_msg_and_install_handler(
1297 &mut self,
1298 msg: Option<AnyRelayMsgOuter>,
1299 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1300 ) -> Result<Option<SendRelayCell>> {
1301 let msg = msg
1302 .map(|msg| {
1303 let handlers = &mut self.cell_handlers;
1304 let handler = handler
1305 .as_ref()
1306 .or(handlers.meta_handler.as_ref())
1307 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1308 // We should always have a precise HopLocation here so this should never fails but
1309 // in case we have a ::JointPoint, we'll notice.
1310 let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1311 "MsgHandler doesn't have a precise HopLocation"
1312 ))?;
1313 Ok::<_, crate::Error>(SendRelayCell {
1314 hop: Some(hop),
1315 early: false,
1316 cell: msg,
1317 })
1318 })
1319 .transpose()?;
1320
1321 if let Some(handler) = handler {
1322 self.cell_handlers.set_meta_handler(handler)?;
1323 }
1324
1325 Ok(msg)
1326 }
1327
1328 /// Handle a shutdown request.
1329 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1330 trace!(
1331 tunnel_id = %self.tunnel_id,
1332 "reactor shutdown due to explicit request",
1333 );
1334
1335 Err(ReactorError::Shutdown)
1336 }
1337
1338 /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1339 ///
1340 /// Returns an error over the `answer` channel if the reactor has no circuits,
1341 /// or more than one circuit. The reactor will shut down regardless.
1342 #[cfg(feature = "conflux")]
1343 fn handle_shutdown_and_return_circuit(
1344 &mut self,
1345 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1346 ) -> StdResult<(), ReactorError> {
1347 // Don't care if the receiver goes away
1348 let _ = answer.send(self.circuits.take_single_leg());
1349 self.handle_shutdown().map(|_| ())
1350 }
1351
1352 /// Resolves a [`TargetHop`] to a [`HopLocation`].
1353 ///
1354 /// After resolving a `TargetHop::LastHop`,
1355 /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1356 /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1357 ///
1358 /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1359 /// if you need a fixed hop position in the tunnel.
1360 /// For example if we open a stream to `TargetHop::LastHop`,
1361 /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1362 /// as we don't want the stream position to change as the tunnel is extended or truncated.
1363 ///
1364 /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1365 /// and the tunnel has no hops
1366 /// (either has no legs, or has legs which contain no hops).
1367 fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1368 match hop {
1369 TargetHop::Hop(hop) => Ok(hop),
1370 TargetHop::LastHop => {
1371 if let Ok(leg) = self.circuits.single_leg() {
1372 let leg_id = leg.unique_id();
1373 // single-path tunnel
1374 let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1375 Ok(HopLocation::Hop((leg_id, hop)))
1376 } else if !self.circuits.is_empty() {
1377 // multi-path tunnel
1378 Ok(HopLocation::JoinPoint)
1379 } else {
1380 // no legs
1381 Err(NoHopsBuiltError)
1382 }
1383 }
1384 }
1385 }
1386
1387 /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1388 ///
1389 /// After resolving a `HopLocation::JoinPoint`,
1390 /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1391 ///
1392 /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1393 /// need them,
1394 /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1395 /// iterations as the primary leg may change from one iteration to the next.
1396 ///
1397 /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1398 /// but it does not have a join point.
1399 #[instrument(level = "trace", skip_all)]
1400 fn resolve_hop_location(
1401 &self,
1402 hop: HopLocation,
1403 ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1404 match hop {
1405 HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1406 HopLocation::JoinPoint => {
1407 if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1408 Ok((leg_id, hop_num))
1409 } else {
1410 // Attempted to get the join point of a non-multipath tunnel.
1411 Err(NoJoinPointError)
1412 }
1413 }
1414 }
1415 }
1416
1417 /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1418 ///
1419 /// This is a helper function that basically calls both resolve_target_hop and
1420 /// resolve_hop_location back to back.
1421 ///
1422 /// It returns None on failure to resolve meaning that if you want more detailed error on why
1423 /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1424 pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1425 self.resolve_target_hop(hop)
1426 .ok()
1427 .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1428 }
1429
1430 /// Install or remove a padder at a given hop.
1431 #[cfg(feature = "circ-padding-manual")]
1432 fn set_padding_at_hop(
1433 &self,
1434 hop: HopLocation,
1435 padder: Option<super::circuit::padding::CircuitPadder>,
1436 ) -> Result<()> {
1437 let HopLocation::Hop((leg_id, hop_num)) = hop else {
1438 return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1439 };
1440 let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1441 circ.set_padding_at_hop(hop_num, padder)?;
1442 Ok(())
1443 }
1444
1445 /// Does congestion control use stream SENDMEs for the given hop?
1446 ///
1447 /// Returns `None` if either the `leg` or `hop` don't exist.
1448 fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1449 self.circuits.uses_stream_sendme(leg, hop)
1450 }
1451
1452 /// Handle a request to link some extra circuits in the reactor's conflux set.
1453 ///
1454 /// The circuits are validated, and if they do not have the same length,
1455 /// or if they do not all have the same last hop, an error is returned on
1456 /// the `answer` channel, and the conflux handshake is *not* initiated.
1457 ///
1458 /// If validation succeeds, the circuits are added to this reactor's conflux set,
1459 /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1460 ///
1461 /// NOTE: this blocks the reactor main loop until all the cells are sent.
1462 #[cfg(feature = "conflux")]
1463 #[instrument(level = "trace", skip_all)]
1464 async fn handle_link_circuits(
1465 &mut self,
1466 circuits: Vec<Circuit>,
1467 answer: ConfluxLinkResultChannel,
1468 ) -> StdResult<(), ReactorError> {
1469 use tor_error::warn_report;
1470
1471 if self.conflux_hs_ctx.is_some() {
1472 let err = internal!("conflux linking already in progress");
1473 send_conflux_outcome(answer, Err(err.into()))?;
1474
1475 return Ok(());
1476 }
1477
1478 let unlinked_legs = self.circuits.num_unlinked();
1479
1480 // We need to send the LINK cell on each of the new circuits
1481 // and on each of the existing, unlinked legs from self.circuits.
1482 //
1483 // In reality, there can only be one such circuit
1484 // (the "initial" one from the previously single-path tunnel),
1485 // because any circuits that to complete the conflux handshake
1486 // get removed from the set.
1487 let num_legs = circuits.len() + unlinked_legs;
1488
1489 // Note: add_legs validates `circuits`
1490 let res = async {
1491 self.circuits.add_legs(circuits, &self.runtime)?;
1492 self.circuits.link_circuits(&self.runtime).await
1493 }
1494 .await;
1495
1496 if let Err(e) = res {
1497 warn_report!(e, "Failed to link conflux circuits");
1498
1499 send_conflux_outcome(answer, Err(e))?;
1500 } else {
1501 // Save the channel, to notify the user of completion.
1502 self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1503 answer,
1504 num_legs,
1505 results: Default::default(),
1506 });
1507 }
1508
1509 Ok(())
1510 }
1511}
1512
1513/// Notify the conflux handshake initiator of the handshake outcome.
1514///
1515/// Returns an error if the initiator has done away.
1516#[cfg(feature = "conflux")]
1517fn send_conflux_outcome(
1518 tx: ConfluxLinkResultChannel,
1519 res: Result<ConfluxHandshakeResult>,
1520) -> StdResult<(), ReactorError> {
1521 if tx.send(res).is_err() {
1522 tracing::warn!("conflux initiator went away before handshake completed?");
1523 return Err(ReactorError::Shutdown);
1524 }
1525
1526 Ok(())
1527}
1528
1529/// The tunnel does not have any hops.
1530#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1531#[non_exhaustive]
1532#[error("no hops have been built for this tunnel")]
1533pub(crate) struct NoHopsBuiltError;
1534
1535/// The tunnel does not have a join point.
1536#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1537#[non_exhaustive]
1538#[error("the tunnel does not have a join point")]
1539pub(crate) struct NoJoinPointError;
1540
1541impl CellHandlers {
1542 /// Try to install a given meta-cell handler to receive any unusual cells on
1543 /// this circuit, along with a result channel to notify on completion.
1544 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1545 if self.meta_handler.is_none() {
1546 self.meta_handler = Some(handler);
1547 Ok(())
1548 } else {
1549 Err(Error::from(internal!(
1550 "Tried to install a meta-cell handler before the old one was gone."
1551 )))
1552 }
1553 }
1554
1555 /// Try to install a given cell handler on this circuit.
1556 #[cfg(feature = "hs-service")]
1557 fn set_incoming_stream_req_handler(
1558 &mut self,
1559 handler: IncomingStreamRequestHandler,
1560 ) -> Result<()> {
1561 if self.incoming_stream_req_handler.is_none() {
1562 self.incoming_stream_req_handler = Some(handler);
1563 Ok(())
1564 } else {
1565 Err(Error::from(internal!(
1566 "Tried to install a BEGIN cell handler before the old one was gone."
1567 )))
1568 }
1569 }
1570}
1571
1572#[cfg(test)]
1573mod test {
1574 // Tested in [`crate::client::circuit::test`].
1575}