tor_proto/client/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::cmdcheck::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more SENDMEs
11//!    than we've sent data for.) This is handled within the reactor by the
12//!    `StreamFlowCtrl`. For half-closed streams which don't send stream
13//!    SENDMEs, an additional receive-window check is performed in the
14//!    `halfstream` module.
15//! 3. Does the message have an acceptable command type, and is the message
16//!    well-formed? For open streams, the streams themselves handle this check.
17//!    For half-closed streams, the reactor handles it by calling
18//!    `consume_checked_msg()`.
19
20pub(crate) mod circuit;
21mod conflux;
22mod control;
23pub(super) mod syncview;
24
25use crate::circuit::circhop::SendRelayCell;
26use crate::circuit::{CircuitRxReceiver, UniqId};
27use crate::client::circuit::padding::{PaddingController, PaddingEvent, PaddingEventStream};
28use crate::client::circuit::{ClientCircChanMsg, TimeoutEstimator};
29use crate::client::{HopLocation, TargetHop};
30use crate::crypto::cell::HopNum;
31use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
32use crate::memquota::CircuitAccount;
33use crate::stream::CloseStreamBehavior;
34use crate::streammap;
35use crate::tunnel::{TunnelId, TunnelScopedCircId};
36use crate::util::err::ReactorError;
37use crate::util::skew::ClockSkew;
38use crate::{Error, Result};
39use circuit::Circuit;
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use std::cmp::Ordering;
43use std::collections::BinaryHeap;
44use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
45use tor_cell::relaycell::msg::Sendme;
46use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
47use tor_error::{Bug, bad_api_usage, debug_report, internal, into_bad_api_usage};
48use tor_rtcompat::{DynTimeProvider, SleepProvider};
49
50use cfg_if::cfg_if;
51use futures::StreamExt;
52use futures::channel::mpsc;
53use futures::{FutureExt as _, select_biased};
54use oneshot_fused_workaround as oneshot;
55
56use std::result::Result as StdResult;
57use std::sync::Arc;
58use std::time::Duration;
59
60use crate::channel::Channel;
61use crate::conflux::msghandler::RemoveLegReason;
62use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
63use circuit::CircuitCmd;
64use derive_more::From;
65use smallvec::smallvec;
66use tor_cell::chancell::CircId;
67use tor_llcrypto::pk;
68use tracing::{debug, info, instrument, trace, warn};
69
70use super::circuit::{MutableState, TunnelMutableState};
71
72#[cfg(feature = "hs-service")]
73use crate::stream::incoming::IncomingStreamRequestHandler;
74
75#[cfg(feature = "conflux")]
76use {
77    crate::conflux::msghandler::{ConfluxCmd, OooRelayMsg},
78    crate::util::err::ConfluxHandshakeError,
79};
80
81pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
82
83/// The type of a oneshot channel used to inform reactor of the result of an operation.
84pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
85
86/// Contains a list of conflux handshake results.
87#[cfg(feature = "conflux")]
88pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
89
90/// The type of oneshot channel used to inform reactor users of the outcome
91/// of a client-side conflux handshake.
92///
93/// Contains a list of handshake results, one for each circuit that we were asked
94/// to link in the tunnel.
95#[cfg(feature = "conflux")]
96pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
97
98/// A handshake type, to be used when creating circuit hops.
99#[derive(Clone, Debug)]
100pub(crate) enum CircuitHandshake {
101    /// Use the CREATE_FAST handshake.
102    CreateFast,
103    /// Use the ntor handshake.
104    Ntor {
105        /// The public key of the relay.
106        public_key: NtorPublicKey,
107        /// The Ed25519 identity of the relay, which is verified against the
108        /// identity held in the circuit's channel.
109        ed_identity: pk::ed25519::Ed25519Identity,
110    },
111    /// Use the ntor-v3 handshake.
112    NtorV3 {
113        /// The public key of the relay.
114        public_key: NtorV3PublicKey,
115    },
116}
117
118// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitEvent enum
119// proliferation is a bit bothersome, but unavoidable with the current design.
120//
121// We should consider getting rid of some of these enums (if possible),
122// and coming up with more intuitive names.
123
124/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
125#[derive(From, Debug)]
126#[allow(clippy::large_enum_variant)] // TODO #2003: resolve this
127enum RunOnceCmd {
128    /// Run a single `RunOnceCmdInner` command.
129    Single(RunOnceCmdInner),
130    /// Run multiple `RunOnceCmdInner` commands.
131    //
132    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
133    // but most of the time we're only going to have *one* RunOnceCmdInner
134    // to run per run_once() loop. The enum enables us avoid the extra heap
135    // allocation for the `RunOnceCmd::Single` case.
136    Multiple(Vec<RunOnceCmdInner>),
137}
138
139/// Instructions for running something in the reactor loop.
140///
141/// Run at the end of [`Reactor::run_once`].
142//
143// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
144// We should consider making each variant a tuple variant and deduplicating the fields.
145#[derive(educe::Educe)]
146#[educe(Debug)]
147enum RunOnceCmdInner {
148    /// Send a RELAY cell.
149    Send {
150        /// The leg the cell should be sent on.
151        leg: UniqId,
152        /// The cell to send.
153        cell: SendRelayCell,
154        /// A channel for sending completion notifications.
155        done: Option<ReactorResultChannel<()>>,
156    },
157    /// Send a given control message on this circuit, and install a control-message handler to
158    /// receive responses.
159    #[cfg(feature = "send-control-msg")]
160    SendMsgAndInstallHandler {
161        /// The message to send, if any
162        msg: Option<AnyRelayMsgOuter>,
163        /// A message handler to install.
164        ///
165        /// If this is `None`, there must already be a message handler installed
166        #[educe(Debug(ignore))]
167        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
168        /// A sender that we use to tell the caller that the message was sent
169        /// and the handler installed.
170        done: oneshot::Sender<Result<()>>,
171    },
172    /// Handle a SENDME message.
173    HandleSendMe {
174        /// The leg the SENDME was received on.
175        leg: UniqId,
176        /// The hop number.
177        hop: HopNum,
178        /// The SENDME message to handle.
179        sendme: Sendme,
180    },
181    /// Begin a stream with the provided hop in this circuit.
182    ///
183    /// Uses the provided stream ID, and sends the provided message to that hop.
184    BeginStream {
185        /// The cell to send.
186        cell: Result<(SendRelayCell, StreamId)>,
187        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
188        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
189        /// caller.
190        hop: HopLocation,
191        /// The circuit leg to begin the stream on.
192        leg: UniqId,
193        /// Oneshot channel to notify on completion, with the allocated stream ID.
194        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
195    },
196    /// Consider sending an XON message with the given `rate`.
197    MaybeSendXon {
198        /// The drain rate to advertise in the XON message.
199        rate: XonKbpsEwma,
200        /// The ID of the stream to send the message on.
201        stream_id: StreamId,
202        /// The location of the hop on the tunnel.
203        hop: HopLocation,
204    },
205    /// Close the specified stream.
206    CloseStream {
207        /// The hop number.
208        hop: HopLocation,
209        /// The ID of the stream to close.
210        sid: StreamId,
211        /// The stream-closing behavior.
212        behav: CloseStreamBehavior,
213        /// The reason for closing the stream.
214        reason: streammap::TerminateReason,
215        /// A channel for sending completion notifications.
216        done: Option<ReactorResultChannel<()>>,
217    },
218    /// Get the clock skew claimed by the first hop of the circuit.
219    FirstHopClockSkew {
220        /// Oneshot channel to return the clock skew.
221        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
222    },
223    /// Remove a circuit leg from the conflux set.
224    RemoveLeg {
225        /// The circuit leg to remove.
226        leg: UniqId,
227        /// The reason for removal.
228        ///
229        /// This is only used for conflux circuits that get removed
230        /// before the conflux handshake is complete.
231        ///
232        /// The [`RemoveLegReason`] is mapped by the reactor to a
233        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
234        /// handshake to indicate the reason the handshake failed.
235        reason: RemoveLegReason,
236    },
237    /// A circuit has completed the conflux handshake,
238    /// and wants to send the specified cell.
239    ///
240    /// This is similar to [`RunOnceCmdInner::Send`],
241    /// but needs to remain a separate variant,
242    /// because in addition to instructing the reactor to send a cell,
243    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
244    /// This enables the reactor to save the handshake result (`Ok(())`),
245    /// and, if there are no other legs still in the handshake phase,
246    /// send the result to the handshake initiator.
247    #[cfg(feature = "conflux")]
248    ConfluxHandshakeComplete {
249        /// The circuit leg that has completed the handshake,
250        /// This is the leg the cell should be sent on.
251        leg: UniqId,
252        /// The cell to send.
253        cell: SendRelayCell,
254    },
255    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
256    #[cfg(feature = "conflux")]
257    Link {
258        /// The circuits to link into the tunnel
259        #[educe(Debug(ignore))]
260        circuits: Vec<Circuit>,
261        /// Oneshot channel for notifying of conflux handshake completion.
262        answer: ConfluxLinkResultChannel,
263    },
264    /// Enqueue an out-of-order cell in ooo_msg.
265    #[cfg(feature = "conflux")]
266    Enqueue {
267        /// The leg the entry originated from.
268        leg: UniqId,
269        /// The out-of-order message.
270        msg: OooRelayMsg,
271    },
272    /// Take a padding-related event on a circuit leg.
273    #[cfg(feature = "circ-padding")]
274    PaddingAction {
275        /// The leg to event on.
276        leg: UniqId,
277        /// The event to take.
278        padding_event: PaddingEvent,
279    },
280    /// Perform a clean shutdown on this circuit.
281    CleanShutdown,
282}
283
284impl RunOnceCmdInner {
285    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
286    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
287        match cmd {
288            CircuitCmd::Send(cell) => Self::Send {
289                leg,
290                cell,
291                done: None,
292            },
293            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
294            CircuitCmd::CloseStream {
295                hop,
296                sid,
297                behav,
298                reason,
299            } => Self::CloseStream {
300                hop: HopLocation::Hop((leg, hop)),
301                sid,
302                behav,
303                reason,
304                done: None,
305            },
306            #[cfg(feature = "conflux")]
307            CircuitCmd::Conflux(ConfluxCmd::RemoveLeg(reason)) => Self::RemoveLeg { leg, reason },
308            #[cfg(feature = "conflux")]
309            CircuitCmd::Conflux(ConfluxCmd::HandshakeComplete { hop, early, cell }) => {
310                let cell = SendRelayCell {
311                    hop: Some(hop),
312                    early,
313                    cell,
314                };
315                Self::ConfluxHandshakeComplete { leg, cell }
316            }
317            #[cfg(feature = "conflux")]
318            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
319            CircuitCmd::CleanShutdown => Self::CleanShutdown,
320        }
321    }
322}
323
324/// A command to execute at the end of [`Reactor::run_once`].
325#[derive(From, Debug)]
326#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
327enum CircuitEvent {
328    /// Run a single `CircuitCmd` command.
329    RunCmd {
330        /// The unique identifier of the circuit leg to run the command on
331        leg: UniqId,
332        /// The command to run.
333        cmd: CircuitCmd,
334    },
335    /// Handle a control message
336    HandleControl(CtrlMsg),
337    /// Handle an input message.
338    HandleCell {
339        /// The unique identifier of the circuit leg the message was received on.
340        leg: UniqId,
341        /// The message to handle.
342        cell: ClientCircChanMsg,
343    },
344    /// Remove the specified circuit leg from the conflux set.
345    ///
346    /// Returned whenever a single circuit leg needs to be be removed
347    /// from the reactor's conflux set, without necessarily tearing down
348    /// the whole set or shutting down the reactor.
349    ///
350    /// Note: this event *can* cause the reactor to shut down
351    /// (and the conflux set to be closed).
352    ///
353    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
354    RemoveLeg {
355        /// The leg to remove.
356        leg: UniqId,
357        /// The reason for removal.
358        ///
359        /// This is only used for conflux circuits that get removed
360        /// before the conflux handshake is complete.
361        ///
362        /// The [`RemoveLegReason`] is mapped by the reactor to a
363        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
364        /// handshake to indicate the reason the handshake failed.
365        reason: RemoveLegReason,
366    },
367    /// Take some event (blocking or unblocking a circuit, or sending padding)
368    /// based on the circuit padding backend code.
369    PaddingAction {
370        /// The leg on which to take the padding event .
371        leg: UniqId,
372        /// The event to take.
373        padding_event: PaddingEvent,
374    },
375    /// Protocol violation. This leads for now to the close of the circuit reactor. The
376    /// error causing the violation is set in err.
377    ProtoViolation {
378        /// The error that causes this protocol violation.
379        err: crate::Error,
380    },
381}
382
383impl CircuitEvent {
384    /// Return the ordering with which we should handle this event
385    /// within a list of events returned by a single call to next_circ_event().
386    ///
387    /// NOTE: Please do not make this any more complicated:
388    /// It is a consequence of a kludge that we need this sorting at all.
389    /// Assuming that eventually, we switch away from the current
390    /// poll-oriented `next_circ_event` design,
391    /// we may be able to get rid of this entirely.
392    fn order_within_batch(&self) -> u8 {
393        use CircuitEvent as CA;
394        use PaddingEvent as PE;
395        // This immediate state MUST NOT be used for events emitting cells. At the moment, it is
396        // only used by the protocol violation event which leads to a shutdown of the reactor.
397        const IMMEDIATE: u8 = 0;
398        const EARLY: u8 = 1;
399        const NORMAL: u8 = 2;
400        const LATE: u8 = 3;
401
402        // We use this ordering to move any "StartBlocking" to the _end_ of a batch and
403        // "StopBlocking" to the start.
404        //
405        // This way, we can be sure that we will handle any "send data" operations
406        // (and tell the Padder about them) _before_  we tell the Padder
407        // that we have blocked the circuit.
408        //
409        // This keeps things a bit more logical.
410        match self {
411            CA::RunCmd { .. } => NORMAL,
412            CA::HandleControl(..) => NORMAL,
413            CA::HandleCell { .. } => NORMAL,
414            CA::RemoveLeg { .. } => NORMAL,
415            #[cfg(feature = "circ-padding")]
416            CA::PaddingAction { padding_event, .. } => match padding_event {
417                PE::StopBlocking => EARLY,
418                PE::SendPadding(..) => NORMAL,
419                PE::StartBlocking(..) => LATE,
420            },
421            #[cfg(not(feature = "circ-padding"))]
422            CA::PaddingAction { .. } => NORMAL,
423            CA::ProtoViolation { .. } => IMMEDIATE,
424        }
425    }
426}
427
428/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
429/// progress.
430///
431/// # Background
432///
433/// The `Reactor` can't have async functions that send and receive cells, because its job is to
434/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
435///
436/// To get around this problem, the reactor can send some cells, and then make one of these
437/// `MetaCellHandler` objects, which will be run when the reply arrives.
438pub(crate) trait MetaCellHandler: Send {
439    /// The hop we're expecting the message to come from. This is compared against the hop
440    /// from which we actually receive messages, and an error is thrown if the two don't match.
441    fn expected_hop(&self) -> HopLocation;
442    /// Called when the message we were waiting for arrives.
443    ///
444    /// Gets a copy of the `Reactor` in order to do anything it likes there.
445    ///
446    /// If this function returns an error, the reactor will shut down.
447    fn handle_msg(
448        &mut self,
449        msg: UnparsedRelayMsg,
450        reactor: &mut Circuit,
451    ) -> Result<MetaCellDisposition>;
452}
453
454/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
455#[derive(Debug, Clone)]
456#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
457#[non_exhaustive]
458pub(crate) enum MetaCellDisposition {
459    /// The message was consumed; the handler should remain installed.
460    #[cfg(feature = "send-control-msg")]
461    Consumed,
462    /// The message was consumed; the handler should be uninstalled.
463    ConversationFinished,
464    /// The message was consumed; the circuit should be closed.
465    #[cfg(feature = "send-control-msg")]
466    CloseCirc,
467    // TODO: Eventually we might want the ability to have multiple handlers
468    // installed, and to let them say "not for me, maybe for somebody else?".
469    // But right now we don't need that.
470}
471
472/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
473///
474/// This is a macro instead of a function to work around borrowck errors
475/// in the select! from run_once().
476macro_rules! unwrap_or_shutdown {
477    ($self:expr, $res:expr, $reason:expr) => {{
478        match $res {
479            None => {
480                trace!(
481                    tunnel_id = %$self.tunnel_id,
482                    reason = %$reason,
483                    "reactor shutdown"
484                );
485                Err(ReactorError::Shutdown)
486            }
487            Some(v) => Ok(v),
488        }
489    }};
490}
491
492/// Object to handle incoming cells and background tasks on a circuit
493///
494/// This type is returned when you finish a circuit; you need to spawn a
495/// new task that calls `run()` on it.
496#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
497pub struct Reactor {
498    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
499    ///
500    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
501    /// is ready to accept cells.
502    control: mpsc::UnboundedReceiver<CtrlMsg>,
503    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
504    ///
505    /// This channel is polled in [`Reactor::run_once`].
506    ///
507    /// NOTE: this is a separate channel from `control`, because some messages
508    /// have higher priority and need to be handled even if the `chan_sender` is not
509    /// ready (whereas `control` messages are not read until the `chan_sender` sink
510    /// is ready to accept cells).
511    command: mpsc::UnboundedReceiver<CtrlCmd>,
512    /// A oneshot sender that is used to alert other tasks when this reactor is
513    /// finally dropped.
514    ///
515    /// It is a sender for Void because we never actually want to send anything here;
516    /// we only want to generate canceled events.
517    #[allow(dead_code)] // the only purpose of this field is to be dropped.
518    reactor_closed_tx: oneshot::Sender<void::Void>,
519    /// A set of circuits that form a tunnel.
520    ///
521    /// Contains 1 or more circuits.
522    ///
523    /// Circuits may be added to this set throughout the lifetime of the reactor.
524    ///
525    /// Sometimes, the reactor will remove circuits from this set,
526    /// for example if the `LINKED` message takes too long to arrive,
527    /// or if congestion control negotiation fails.
528    /// The reactor will continue running with the remaining circuits.
529    /// It will shut down if *all* the circuits are removed.
530    ///
531    // TODO(conflux): document all the reasons why the reactor might
532    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
533    circuits: ConfluxSet,
534    /// An identifier for logging about this tunnel reactor.
535    tunnel_id: TunnelId,
536    /// Handlers, shared with `Circuit`.
537    cell_handlers: CellHandlers,
538    /// The time provider, used for conflux handshake timeouts.
539    runtime: DynTimeProvider,
540    /// The conflux handshake context, if there is an on-going handshake.
541    ///
542    /// Set to `None` if this is a single-path tunnel,
543    /// or if none of the circuit legs from our conflux set
544    /// are currently in the conflux handshake phase.
545    #[cfg(feature = "conflux")]
546    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
547    /// A min-heap buffering all the out-of-order messages received so far.
548    ///
549    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
550    /// on its size. We should make this participate in the memquota memory
551    /// tracking system, somehow.
552    #[cfg(feature = "conflux")]
553    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
554}
555
556/// The context for an on-going conflux handshake.
557#[cfg(feature = "conflux")]
558struct ConfluxHandshakeCtx {
559    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
560    answer: ConfluxLinkResultChannel,
561    /// The number of legs that are currently doing the handshake.
562    num_legs: usize,
563    /// The handshake results we have collected so far.
564    results: ConfluxHandshakeResult,
565}
566
567/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
568#[derive(Debug)]
569#[cfg(feature = "conflux")]
570struct ConfluxHeapEntry {
571    /// The leg id this message came from.
572    leg_id: UniqId,
573    /// The out of order message
574    msg: OooRelayMsg,
575}
576
577#[cfg(feature = "conflux")]
578impl Ord for ConfluxHeapEntry {
579    fn cmp(&self, other: &Self) -> Ordering {
580        self.msg.cmp(&other.msg)
581    }
582}
583
584#[cfg(feature = "conflux")]
585impl PartialOrd for ConfluxHeapEntry {
586    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
587        Some(self.cmp(other))
588    }
589}
590
591#[cfg(feature = "conflux")]
592impl PartialEq for ConfluxHeapEntry {
593    fn eq(&self, other: &Self) -> bool {
594        self.msg == other.msg
595    }
596}
597
598#[cfg(feature = "conflux")]
599impl Eq for ConfluxHeapEntry {}
600
601/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
602struct CellHandlers {
603    /// A handler for a meta cell, together with a result channel to notify on completion.
604    ///
605    /// NOTE(prop349): this is part of Arti's "Base Circuit Hop Handler".
606    ///
607    /// Upon sending an EXTEND cell, the [`ControlHandler`] sets this handler
608    /// to [`CircuitExtender`](circuit::extender::CircuitExtender).
609    /// The handler is then used in [`Circuit::handle_meta_cell`] for handling
610    /// all the meta cells received on the circuit that are not SENDMEs or TRUNCATE
611    /// (which are handled separately) or conflux cells
612    /// (which are handled by the conflux handlers).
613    ///
614    /// The handler is uninstalled after the receipt of the EXTENDED cell,
615    /// so any subsequent EXTENDED cells will cause the circuit to be torn down.
616    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
617    /// A handler for incoming stream requests.
618    #[cfg(feature = "hs-service")]
619    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
620}
621
622impl Reactor {
623    /// Create a new circuit reactor.
624    ///
625    /// The reactor will send outbound messages on `channel`, receive incoming
626    /// messages on `input`, and identify this circuit by the channel-local
627    /// [`CircId`] provided.
628    ///
629    /// The internal unique identifier for this circuit will be `unique_id`.
630    #[allow(clippy::type_complexity, clippy::too_many_arguments)] // TODO
631    pub(super) fn new(
632        channel: Arc<Channel>,
633        channel_id: CircId,
634        unique_id: UniqId,
635        input: CircuitRxReceiver,
636        runtime: DynTimeProvider,
637        memquota: CircuitAccount,
638        padding_ctrl: PaddingController,
639        padding_stream: PaddingEventStream,
640        timeouts: Arc<dyn TimeoutEstimator + Send>,
641    ) -> (
642        Self,
643        mpsc::UnboundedSender<CtrlMsg>,
644        mpsc::UnboundedSender<CtrlCmd>,
645        oneshot::Receiver<void::Void>,
646        Arc<TunnelMutableState>,
647    ) {
648        let tunnel_id = TunnelId::next();
649        let (control_tx, control_rx) = mpsc::unbounded();
650        let (command_tx, command_rx) = mpsc::unbounded();
651        let mutable = Arc::new(MutableState::default());
652
653        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
654
655        let cell_handlers = CellHandlers {
656            meta_handler: None,
657            #[cfg(feature = "hs-service")]
658            incoming_stream_req_handler: None,
659        };
660
661        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
662        let circuit_leg = Circuit::new(
663            runtime.clone(),
664            channel,
665            channel_id,
666            unique_id,
667            input,
668            memquota,
669            Arc::clone(&mutable),
670            padding_ctrl,
671            padding_stream,
672            timeouts,
673        );
674
675        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
676
677        let reactor = Reactor {
678            circuits,
679            control: control_rx,
680            command: command_rx,
681            reactor_closed_tx,
682            tunnel_id,
683            cell_handlers,
684            runtime,
685            #[cfg(feature = "conflux")]
686            conflux_hs_ctx: None,
687            #[cfg(feature = "conflux")]
688            ooo_msgs: Default::default(),
689        };
690
691        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
692    }
693
694    /// Launch the reactor, and run until the circuit closes or we
695    /// encounter an error.
696    ///
697    /// Once this method returns, the circuit is dead and cannot be
698    /// used again.
699    #[instrument(level = "trace", skip_all)]
700    pub async fn run(mut self) -> Result<()> {
701        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
702        let result: Result<()> = loop {
703            match self.run_once().await {
704                Ok(()) => (),
705                Err(ReactorError::Shutdown) => break Ok(()),
706                Err(ReactorError::Err(e)) => break Err(e),
707            }
708        };
709
710        // Log that the reactor stopped, possibly with the associated error as a report.
711        // May log at a higher level depending on the error kind.
712        const MSG: &str = "Tunnel reactor stopped";
713        match &result {
714            Ok(()) => trace!(tunnel_id = %self.tunnel_id, "{MSG}"),
715            Err(e) => debug_report!(e, tunnel_id = %self.tunnel_id, "{MSG}"),
716        }
717
718        result
719    }
720
721    /// Helper for run: doesn't mark the circuit closed on finish.  Only
722    /// processes one cell or control message.
723    #[instrument(level = "trace", skip_all)]
724    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
725        // If all the circuits are closed, shut down the reactor
726        if self.circuits.is_empty() {
727            trace!(
728                tunnel_id = %self.tunnel_id,
729                "Tunnel reactor shutting down: all circuits have closed",
730            );
731
732            return Err(ReactorError::Shutdown);
733        }
734
735        // If this is a single path circuit, we need to wait until the first hop
736        // is created before doing anything else
737        let single_path_with_hops = self
738            .circuits
739            .single_leg_mut()
740            .is_ok_and(|leg| !leg.has_hops());
741        if single_path_with_hops {
742            self.wait_for_create().await?;
743
744            return Ok(());
745        }
746
747        // Prioritize the buffered messages.
748        //
749        // Note: if any of the messages are ready to be handled,
750        // this will block the reactor until we are done processing them
751        //
752        // TODO circpad: If this is a problem, we might want to re-order things so that we
753        // prioritize padding instead.  On the other hand, this should be fixed by refactoring
754        // circuit and tunnel reactors, so we might do well to just leave it alone for now.
755        #[cfg(feature = "conflux")]
756        self.try_dequeue_ooo_msgs().await?;
757
758        let mut events = select_biased! {
759            res = self.command.next() => {
760                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
761                return ControlHandler::new(self).handle_cmd(cmd);
762            },
763            // Check whether we've got a control message pending.
764            //
765            // Note: unfortunately, reading from control here means we might start
766            // handling control messages before our chan_senders are ready.
767            // With the current design, this is inevitable: we can't know which circuit leg
768            // a control message is meant for without first reading the control message from
769            // the channel, and at that point, we can't know for sure whether that particular
770            // circuit is ready for sending.
771            ret = self.control.next() => {
772                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
773                smallvec![CircuitEvent::HandleControl(msg)]
774            },
775            res = self.circuits.next_circ_event(&self.runtime).fuse() => res?,
776        };
777
778        // Put the events into the order that we need to execute them in.
779        //
780        // (Yes, this _does_ have to be a stable sort.  Not all events may be freely re-ordered
781        // with respect to one another.)
782        events.sort_by_key(|a| a.order_within_batch());
783
784        for event in events {
785            let cmd = match event {
786                CircuitEvent::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
787                    RunOnceCmdInner::from_circuit_cmd(leg, cmd),
788                )),
789                CircuitEvent::HandleControl(ctrl) => ControlHandler::new(self)
790                    .handle_msg(ctrl)?
791                    .map(RunOnceCmd::Single),
792                CircuitEvent::HandleCell { leg, cell } => {
793                    let circ = self
794                        .circuits
795                        .leg_mut(leg)
796                        .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
797
798                    let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
799                    if circ_cmds.is_empty() {
800                        None
801                    } else {
802                        // TODO: we return RunOnceCmd::Multiple even if there's a single command.
803                        //
804                        // See the TODO on `Circuit::handle_cell`.
805                        let cmd = RunOnceCmd::Multiple(
806                            circ_cmds
807                                .into_iter()
808                                .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
809                                .collect(),
810                        );
811
812                        Some(cmd)
813                    }
814                }
815                CircuitEvent::RemoveLeg { leg, reason } => {
816                    Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
817                }
818                CircuitEvent::PaddingAction { leg, padding_event } => {
819                    cfg_if! {
820                        if #[cfg(feature = "circ-padding")] {
821                            Some(RunOnceCmdInner::PaddingAction { leg, padding_event }.into())
822                        } else {
823                            // If padding isn't enabled, we never generate a padding event,
824                            // so we can be sure this case will never be called.
825                            void::unreachable(padding_event.0);
826                        }
827                    }
828                }
829                CircuitEvent::ProtoViolation { err } => {
830                    return Err(err.into());
831                }
832            };
833
834            if let Some(cmd) = cmd {
835                self.handle_run_once_cmd(cmd).await?;
836            }
837        }
838
839        Ok(())
840    }
841
842    /// Try to process the previously-out-of-order messages we might have buffered.
843    #[cfg(feature = "conflux")]
844    #[instrument(level = "trace", skip_all)]
845    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
846        // Check if we're ready to dequeue any of the previously out-of-order cells.
847        while let Some(entry) = self.ooo_msgs.peek() {
848            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
849
850            if !should_pop {
851                break;
852            }
853
854            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
855
856            let circ = self
857                .circuits
858                .leg_mut(entry.leg_id)
859                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
860            let handlers = &mut self.cell_handlers;
861            let cmd = circ
862                .handle_in_order_relay_msg(
863                    handlers,
864                    entry.msg.hopnum,
865                    entry.leg_id,
866                    entry.msg.cell_counts_towards_windows,
867                    entry.msg.streamid,
868                    entry.msg.msg,
869                )?
870                .map(|cmd| {
871                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
872                });
873
874            if let Some(cmd) = cmd {
875                self.handle_run_once_cmd(cmd).await?;
876            }
877        }
878
879        Ok(())
880    }
881
882    /// Handle a [`RunOnceCmd`].
883    #[instrument(level = "trace", skip_all)]
884    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
885        match cmd {
886            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
887            RunOnceCmd::Multiple(cmds) => {
888                // While we know `sendable` is ready to accept *one* cell,
889                // we can't be certain it will be able to accept *all* of the cells
890                // that need to be sent here. This means we *may* end up buffering
891                // in its underlying SometimesUnboundedSink! That is OK, because
892                // RunOnceCmd::Multiple is only used for handling packed cells.
893                for cmd in cmds {
894                    self.handle_single_run_once_cmd(cmd).await?;
895                }
896            }
897        }
898
899        Ok(())
900    }
901
902    /// Handle a [`RunOnceCmd`].
903    #[instrument(level = "trace", skip_all)]
904    async fn handle_single_run_once_cmd(
905        &mut self,
906        cmd: RunOnceCmdInner,
907    ) -> StdResult<(), ReactorError> {
908        match cmd {
909            RunOnceCmdInner::Send { leg, cell, done } => {
910                // TODO: check the cc window
911                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
912                if let Some(done) = done {
913                    // Don't care if the receiver goes away
914                    let _ = done.send(res.clone());
915                }
916                res?;
917            }
918            #[cfg(feature = "send-control-msg")]
919            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
920                let cell: Result<Option<SendRelayCell>> =
921                    self.prepare_msg_and_install_handler(msg, handler);
922
923                match cell {
924                    Ok(Some(cell)) => {
925                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
926                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
927                        // don't care if receiver goes away.
928                        let _ = done.send(outcome.clone());
929                        outcome?;
930                    }
931                    Ok(None) => {
932                        // don't care if receiver goes away.
933                        let _ = done.send(Ok(()));
934                    }
935                    Err(e) => {
936                        // don't care if receiver goes away.
937                        let _ = done.send(Err(e.clone()));
938                        return Err(e.into());
939                    }
940                }
941            }
942            RunOnceCmdInner::BeginStream {
943                leg,
944                cell,
945                hop,
946                done,
947            } => {
948                match cell {
949                    Ok((cell, stream_id)) => {
950                        let circ = self
951                            .circuits
952                            .leg_mut(leg)
953                            .ok_or_else(|| internal!("leg disappeared?!"))?;
954                        let cell_hop = cell.hop.expect("missing hop in client SendRelayCell?!");
955                        let relay_format = circ
956                            .hop_mut(cell_hop)
957                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
958                            .ok_or(Error::NoSuchHop)?
959                            .relay_cell_format();
960
961                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
962                        // don't care if receiver goes away.
963                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
964                        outcome?;
965                    }
966                    Err(e) => {
967                        // don't care if receiver goes away.
968                        let _ = done.send(Err(e.clone()));
969                        return Err(e.into());
970                    }
971                }
972            }
973            RunOnceCmdInner::CloseStream {
974                hop,
975                sid,
976                behav,
977                reason,
978                done,
979            } => {
980                let result = {
981                    let (leg_id, hop_num) = self
982                        .resolve_hop_location(hop)
983                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
984                    let leg = self
985                        .circuits
986                        .leg_mut(leg_id)
987                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
988                    Ok::<_, Bug>((leg, hop_num))
989                };
990
991                let (leg, hop_num) = match result {
992                    Ok(x) => x,
993                    Err(e) => {
994                        if let Some(done) = done {
995                            // don't care if the sender goes away
996                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
997                            let _ = done.send(Err(e.into()));
998                        }
999                        return Ok(());
1000                    }
1001                };
1002
1003                let max_rtt = {
1004                    let hop = leg
1005                        .hop(hop_num)
1006                        .ok_or_else(|| internal!("the hop we resolved disappeared?!"))?;
1007                    let ccontrol = hop.ccontrol();
1008
1009                    // Note: if we have no measurements for the RTT, this will be set to 0,
1010                    // and the timeout will be 2 * CBT.
1011                    ccontrol
1012                        .rtt()
1013                        .max_rtt_usec()
1014                        .map(|rtt| Duration::from_millis(u64::from(rtt)))
1015                        .unwrap_or_default()
1016                };
1017
1018                // The length of the circuit up until the hop that has the half-streeam.
1019                //
1020                // +1, because HopNums are zero-based.
1021                let circ_len = usize::from(hop_num) + 1;
1022
1023                // We double the CBT to account for rend circuits,
1024                // which are twice as long (otherwise we risk expiring
1025                // the rend half-streams too soon).
1026                let timeout = std::cmp::max(max_rtt, 2 * leg.estimate_cbt(circ_len));
1027                let expire_at = self.runtime.now() + timeout;
1028
1029                let res: Result<()> = leg
1030                    .close_stream(hop_num, sid, behav, reason, expire_at)
1031                    .await;
1032
1033                if let Some(done) = done {
1034                    // don't care if the sender goes away
1035                    let _ = done.send(res);
1036                }
1037            }
1038            RunOnceCmdInner::MaybeSendXon {
1039                rate,
1040                stream_id,
1041                hop,
1042            } => {
1043                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
1044                    Ok(x) => x,
1045                    Err(NoJoinPointError) => {
1046                        // A stream tried to send an XON message message to the join point of
1047                        // a tunnel that has never had a join point. Currently in arti, only a
1048                        // `StreamTarget` asks us to send an XON message, and this tunnel
1049                        // originally created the `StreamTarget` to begin with. So this is a
1050                        // legitimate bug somewhere in the tunnel code.
1051                        return Err(
1052                            internal!(
1053                                "Could not send an XON message to a join point on a tunnel without a join point",
1054                            )
1055                            .into()
1056                        );
1057                    }
1058                };
1059
1060                let Some(leg) = self.circuits.leg_mut(leg_id) else {
1061                    // The leg has disappeared. This is fine since the stream may have ended and
1062                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1063                    // It is possible that is a bug and this is an incorrect leg number, but
1064                    // it's not currently possible to differentiate between an incorrect leg
1065                    // number and a tunnel leg that has been closed.
1066                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1067                    return Ok(());
1068                };
1069
1070                let Some(hop) = leg.hop_mut(hop_num) else {
1071                    // The hop has disappeared. This is fine since the circuit may have been
1072                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1073                    // It is possible that is a bug and this is an incorrect hop number, but
1074                    // it's not currently possible to differentiate between an incorrect hop
1075                    // number and a circuit hop that has been removed.
1076                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1077                    return Ok(());
1078                };
1079
1080                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1081                    // Nothing to do.
1082                    return Ok(());
1083                };
1084
1085                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1086                let cell = SendRelayCell {
1087                    hop: Some(hop_num),
1088                    early: false,
1089                    cell,
1090                };
1091
1092                leg.send_relay_cell(cell).await?;
1093            }
1094            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1095                let leg = self
1096                    .circuits
1097                    .leg_mut(leg)
1098                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1099                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1100                // future which *should* resolve immediately
1101                let signals = leg.chan_sender.congestion_signals().await;
1102                leg.handle_sendme(hop, sendme, signals)?;
1103            }
1104            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1105                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1106
1107                // don't care if the sender goes away
1108                let _ = answer.send(res.map_err(Into::into));
1109            }
1110            RunOnceCmdInner::CleanShutdown => {
1111                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1112                return Err(ReactorError::Shutdown);
1113            }
1114            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1115                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1116
1117                let circ = self.circuits.remove(leg)?;
1118                let is_conflux_pending = circ.is_conflux_pending();
1119
1120                // Drop the removed leg. This will cause it to close if it's not already closed.
1121                drop(circ);
1122
1123                // If we reach this point, it means we have more than one leg
1124                // (otherwise the .remove() would've returned a Shutdown error),
1125                // so we expect there to be a ConfluxHandshakeContext installed.
1126
1127                #[cfg(feature = "conflux")]
1128                if is_conflux_pending {
1129                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1130                        RemoveLegReason::ConfluxHandshakeTimeout => {
1131                            (ConfluxHandshakeError::Timeout, None)
1132                        }
1133                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1134                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1135                        }
1136                        RemoveLegReason::ChannelClosed => {
1137                            (ConfluxHandshakeError::ChannelClosed, None)
1138                        }
1139                    };
1140
1141                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1142
1143                    if let Some(e) = proto_violation {
1144                        tor_error::warn_report!(
1145                            e,
1146                            tunnel_id = %self.tunnel_id,
1147                            "Malformed conflux handshake, tearing down tunnel",
1148                        );
1149
1150                        return Err(e.into());
1151                    }
1152                }
1153            }
1154            #[cfg(feature = "conflux")]
1155            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1156                // Note: on the client-side, the handshake is considered complete once the
1157                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1158                //
1159                // We're optimistic here, and declare the handshake a success *before*
1160                // sending the LINKED_ACK response. I think this is OK though,
1161                // because if the send_relay_cell() below fails, the reactor will shut
1162                // down anyway. OTOH, marking the handshake as complete slightly early
1163                // means that on the happy path, the circuit is marked as usable sooner,
1164                // instead of blocking on the sending of the LINKED_ACK.
1165                self.note_conflux_handshake_result(Ok(()), false)?;
1166
1167                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1168
1169                res?;
1170            }
1171            #[cfg(feature = "conflux")]
1172            RunOnceCmdInner::Link { circuits, answer } => {
1173                // Add the specified circuits to our conflux set,
1174                // and send a LINK cell down each unlinked leg.
1175                //
1176                // NOTE: this will block the reactor until all the cells are sent.
1177                self.handle_link_circuits(circuits, answer).await?;
1178            }
1179            #[cfg(feature = "conflux")]
1180            RunOnceCmdInner::Enqueue { leg, msg } => {
1181                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1182                self.ooo_msgs.push(entry);
1183            }
1184            #[cfg(feature = "circ-padding")]
1185            RunOnceCmdInner::PaddingAction { leg, padding_event } => {
1186                // TODO: If we someday move back to having a per-circuit reactor,
1187                // this event would logically belong there, not on the tunnel reactor.
1188                self.circuits.run_padding_event(leg, padding_event).await?;
1189            }
1190        }
1191
1192        Ok(())
1193    }
1194
1195    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1196    ///
1197    /// Returns an error if an unexpected `CtrlMsg` is received.
1198    #[instrument(level = "trace", skip_all)]
1199    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1200        let msg = select_biased! {
1201            res = self.command.next() => {
1202                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1203                match cmd {
1204                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1205                    #[cfg(test)]
1206                    CtrlCmd::AddFakeHop {
1207                        relay_cell_format: format,
1208                        fwd_lasthop,
1209                        rev_lasthop,
1210                        peer_id,
1211                        params,
1212                        done,
1213                    } => {
1214                        let leg = self.circuits.single_leg_mut()?;
1215                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1216                        return Ok(())
1217                    },
1218                    _ => {
1219                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1220                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1221                    }
1222                }
1223            },
1224            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1225        };
1226
1227        match msg {
1228            CtrlMsg::Create {
1229                recv_created,
1230                handshake,
1231                settings,
1232                done,
1233            } => {
1234                // TODO(conflux): instead of crashing the reactor, it might be better
1235                // to send the error via the done channel instead
1236                let leg = self.circuits.single_leg_mut()?;
1237                leg.handle_create(recv_created, handshake, settings, done)
1238                    .await
1239            }
1240            _ => {
1241                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1242
1243                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1244            }
1245        }
1246    }
1247
1248    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1249    ///
1250    /// If all the circuits we were waiting on have finished the conflux handshake,
1251    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1252    /// are sent to the handshake initiator.
1253    #[cfg(feature = "conflux")]
1254    #[instrument(level = "trace", skip_all)]
1255    fn note_conflux_handshake_result(
1256        &mut self,
1257        res: StdResult<(), ConfluxHandshakeError>,
1258        reactor_is_closing: bool,
1259    ) -> StdResult<(), ReactorError> {
1260        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1261            Some(conflux_ctx) => {
1262                conflux_ctx.results.push(res);
1263                // Whether all the legs have finished linking:
1264                conflux_ctx.results.len() == conflux_ctx.num_legs
1265            }
1266            None => {
1267                return Err(internal!("no conflux handshake context").into());
1268            }
1269        };
1270
1271        if tunnel_complete || reactor_is_closing {
1272            // Time to remove the conflux handshake context
1273            // and extract the results we have collected
1274            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1275
1276            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1277            let leg_count = conflux_ctx.results.len();
1278
1279            info!(
1280                tunnel_id = %self.tunnel_id,
1281                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1282            );
1283
1284            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1285
1286            // We don't expect to receive any more handshake results,
1287            // at least not until we get another LinkCircuits control message,
1288            // which will install a new ConfluxHandshakeCtx with a channel
1289            // for us to send updates on
1290        }
1291
1292        Ok(())
1293    }
1294
1295    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1296    fn prepare_msg_and_install_handler(
1297        &mut self,
1298        msg: Option<AnyRelayMsgOuter>,
1299        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1300    ) -> Result<Option<SendRelayCell>> {
1301        let msg = msg
1302            .map(|msg| {
1303                let handlers = &mut self.cell_handlers;
1304                let handler = handler
1305                    .as_ref()
1306                    .or(handlers.meta_handler.as_ref())
1307                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1308                // We should always have a precise HopLocation here so this should never fails but
1309                // in case we have a ::JointPoint, we'll notice.
1310                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1311                    "MsgHandler doesn't have a precise HopLocation"
1312                ))?;
1313                Ok::<_, crate::Error>(SendRelayCell {
1314                    hop: Some(hop),
1315                    early: false,
1316                    cell: msg,
1317                })
1318            })
1319            .transpose()?;
1320
1321        if let Some(handler) = handler {
1322            self.cell_handlers.set_meta_handler(handler)?;
1323        }
1324
1325        Ok(msg)
1326    }
1327
1328    /// Handle a shutdown request.
1329    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1330        trace!(
1331            tunnel_id = %self.tunnel_id,
1332            "reactor shutdown due to explicit request",
1333        );
1334
1335        Err(ReactorError::Shutdown)
1336    }
1337
1338    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1339    ///
1340    /// Returns an error over the `answer` channel if the reactor has no circuits,
1341    /// or more than one circuit. The reactor will shut down regardless.
1342    #[cfg(feature = "conflux")]
1343    fn handle_shutdown_and_return_circuit(
1344        &mut self,
1345        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1346    ) -> StdResult<(), ReactorError> {
1347        // Don't care if the receiver goes away
1348        let _ = answer.send(self.circuits.take_single_leg());
1349        self.handle_shutdown().map(|_| ())
1350    }
1351
1352    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1353    ///
1354    /// After resolving a `TargetHop::LastHop`,
1355    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1356    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1357    ///
1358    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1359    /// if you need a fixed hop position in the tunnel.
1360    /// For example if we open a stream to `TargetHop::LastHop`,
1361    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1362    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1363    ///
1364    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1365    /// and the tunnel has no hops
1366    /// (either has no legs, or has legs which contain no hops).
1367    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1368        match hop {
1369            TargetHop::Hop(hop) => Ok(hop),
1370            TargetHop::LastHop => {
1371                if let Ok(leg) = self.circuits.single_leg() {
1372                    let leg_id = leg.unique_id();
1373                    // single-path tunnel
1374                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1375                    Ok(HopLocation::Hop((leg_id, hop)))
1376                } else if !self.circuits.is_empty() {
1377                    // multi-path tunnel
1378                    Ok(HopLocation::JoinPoint)
1379                } else {
1380                    // no legs
1381                    Err(NoHopsBuiltError)
1382                }
1383            }
1384        }
1385    }
1386
1387    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1388    ///
1389    /// After resolving a `HopLocation::JoinPoint`,
1390    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1391    ///
1392    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1393    /// need them,
1394    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1395    /// iterations as the primary leg may change from one iteration to the next.
1396    ///
1397    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1398    /// but it does not have a join point.
1399    #[instrument(level = "trace", skip_all)]
1400    fn resolve_hop_location(
1401        &self,
1402        hop: HopLocation,
1403    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1404        match hop {
1405            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1406            HopLocation::JoinPoint => {
1407                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1408                    Ok((leg_id, hop_num))
1409                } else {
1410                    // Attempted to get the join point of a non-multipath tunnel.
1411                    Err(NoJoinPointError)
1412                }
1413            }
1414        }
1415    }
1416
1417    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1418    ///
1419    /// This is a helper function that basically calls both resolve_target_hop and
1420    /// resolve_hop_location back to back.
1421    ///
1422    /// It returns None on failure to resolve meaning that if you want more detailed error on why
1423    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1424    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1425        self.resolve_target_hop(hop)
1426            .ok()
1427            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1428    }
1429
1430    /// Install or remove a padder at a given hop.
1431    #[cfg(feature = "circ-padding-manual")]
1432    fn set_padding_at_hop(
1433        &self,
1434        hop: HopLocation,
1435        padder: Option<super::circuit::padding::CircuitPadder>,
1436    ) -> Result<()> {
1437        let HopLocation::Hop((leg_id, hop_num)) = hop else {
1438            return Err(bad_api_usage!("Padding to the join point is not supported.").into());
1439        };
1440        let circ = self.circuits.leg(leg_id).ok_or(Error::NoSuchHop)?;
1441        circ.set_padding_at_hop(hop_num, padder)?;
1442        Ok(())
1443    }
1444
1445    /// Does congestion control use stream SENDMEs for the given hop?
1446    ///
1447    /// Returns `None` if either the `leg` or `hop` don't exist.
1448    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1449        self.circuits.uses_stream_sendme(leg, hop)
1450    }
1451
1452    /// Handle a request to link some extra circuits in the reactor's conflux set.
1453    ///
1454    /// The circuits are validated, and if they do not have the same length,
1455    /// or if they do not all have the same last hop, an error is returned on
1456    /// the `answer` channel, and the conflux handshake is *not* initiated.
1457    ///
1458    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1459    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1460    ///
1461    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1462    #[cfg(feature = "conflux")]
1463    #[instrument(level = "trace", skip_all)]
1464    async fn handle_link_circuits(
1465        &mut self,
1466        circuits: Vec<Circuit>,
1467        answer: ConfluxLinkResultChannel,
1468    ) -> StdResult<(), ReactorError> {
1469        use tor_error::warn_report;
1470
1471        if self.conflux_hs_ctx.is_some() {
1472            let err = internal!("conflux linking already in progress");
1473            send_conflux_outcome(answer, Err(err.into()))?;
1474
1475            return Ok(());
1476        }
1477
1478        let unlinked_legs = self.circuits.num_unlinked();
1479
1480        // We need to send the LINK cell on each of the new circuits
1481        // and on each of the existing, unlinked legs from self.circuits.
1482        //
1483        // In reality, there can only be one such circuit
1484        // (the "initial" one from the previously single-path tunnel),
1485        // because any circuits that to complete the conflux handshake
1486        // get removed from the set.
1487        let num_legs = circuits.len() + unlinked_legs;
1488
1489        // Note: add_legs validates `circuits`
1490        let res = async {
1491            self.circuits.add_legs(circuits, &self.runtime)?;
1492            self.circuits.link_circuits(&self.runtime).await
1493        }
1494        .await;
1495
1496        if let Err(e) = res {
1497            warn_report!(e, "Failed to link conflux circuits");
1498
1499            send_conflux_outcome(answer, Err(e))?;
1500        } else {
1501            // Save the channel, to notify the user of completion.
1502            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1503                answer,
1504                num_legs,
1505                results: Default::default(),
1506            });
1507        }
1508
1509        Ok(())
1510    }
1511}
1512
1513/// Notify the conflux handshake initiator of the handshake outcome.
1514///
1515/// Returns an error if the initiator has done away.
1516#[cfg(feature = "conflux")]
1517fn send_conflux_outcome(
1518    tx: ConfluxLinkResultChannel,
1519    res: Result<ConfluxHandshakeResult>,
1520) -> StdResult<(), ReactorError> {
1521    if tx.send(res).is_err() {
1522        tracing::warn!("conflux initiator went away before handshake completed?");
1523        return Err(ReactorError::Shutdown);
1524    }
1525
1526    Ok(())
1527}
1528
1529/// The tunnel does not have any hops.
1530#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1531#[non_exhaustive]
1532#[error("no hops have been built for this tunnel")]
1533pub(crate) struct NoHopsBuiltError;
1534
1535/// The tunnel does not have a join point.
1536#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1537#[non_exhaustive]
1538#[error("the tunnel does not have a join point")]
1539pub(crate) struct NoJoinPointError;
1540
1541impl CellHandlers {
1542    /// Try to install a given meta-cell handler to receive any unusual cells on
1543    /// this circuit, along with a result channel to notify on completion.
1544    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1545        if self.meta_handler.is_none() {
1546            self.meta_handler = Some(handler);
1547            Ok(())
1548        } else {
1549            Err(Error::from(internal!(
1550                "Tried to install a meta-cell handler before the old one was gone."
1551            )))
1552        }
1553    }
1554
1555    /// Try to install a given cell handler on this circuit.
1556    #[cfg(feature = "hs-service")]
1557    fn set_incoming_stream_req_handler(
1558        &mut self,
1559        handler: IncomingStreamRequestHandler,
1560    ) -> Result<()> {
1561        if self.incoming_stream_req_handler.is_none() {
1562            self.incoming_stream_req_handler = Some(handler);
1563            Ok(())
1564        } else {
1565            Err(Error::from(internal!(
1566                "Tried to install a BEGIN cell handler before the old one was gone."
1567            )))
1568        }
1569    }
1570}
1571
1572#[cfg(test)]
1573mod test {
1574    // Tested in [`crate::client::circuit::test`].
1575}