tor_proto/tunnel/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12//!    this; for half-closed streams, the reactor handles it using the
13//!    `halfstream` module.
14//! 3. Does the message have an acceptable command type, and is the message
15//!    well-formed? For open streams, the streams themselves handle this check.
16//!    For half-closed streams, the reactor handles it by calling
17//!    `consume_checked_msg()`.
18
19pub(super) mod circuit;
20mod conflux;
21mod control;
22pub(super) mod syncview;
23
24use crate::crypto::cell::HopNum;
25use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26use crate::memquota::{CircuitAccount, StreamAccount};
27use crate::stream::AnyCmdChecker;
28#[cfg(feature = "hs-service")]
29use crate::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
30use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
31use crate::tunnel::circuit::unique_id::UniqId;
32use crate::tunnel::circuit::CircuitRxReceiver;
33use crate::tunnel::{streammap, HopLocation, TargetHop, TunnelId, TunnelScopedCircId};
34use crate::util::err::ReactorError;
35use crate::util::skew::ClockSkew;
36use crate::{Error, Result};
37use circuit::{Circuit, CircuitCmd};
38use conflux::ConfluxSet;
39use control::ControlHandler;
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42use std::mem::size_of;
43use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
44use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
45use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
46use tor_rtcompat::DynTimeProvider;
47
48use futures::channel::mpsc;
49use futures::StreamExt;
50use futures::{select_biased, FutureExt as _};
51use oneshot_fused_workaround as oneshot;
52
53use std::result::Result as StdResult;
54use std::sync::Arc;
55
56use crate::channel::Channel;
57use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
58use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
59use derive_deftly::Deftly;
60use derive_more::From;
61use tor_cell::chancell::CircId;
62use tor_llcrypto::pk;
63use tor_memquota::derive_deftly_template_HasMemoryCost;
64use tor_memquota::mq_queue::{self, MpscSpec};
65use tracing::{info, trace, warn};
66
67use super::circuit::{MutableState, TunnelMutableState};
68
69#[cfg(feature = "conflux")]
70use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
71
72pub(super) use control::CtrlCmd;
73pub(super) use control::CtrlMsg;
74
75/// The type of a oneshot channel used to inform reactor of the result of an operation.
76pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
77
78/// Contains a list of conflux handshake results.
79#[cfg(feature = "conflux")]
80pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
81
82/// The type of oneshot channel used to inform reactor users of the outcome
83/// of a client-side conflux handshake.
84///
85/// Contains a list of handshake results, one for each circuit that we were asked
86/// to link in the tunnel.
87#[cfg(feature = "conflux")]
88pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
89
90pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
91
92/// MPSC queue containing stream requests
93#[cfg(feature = "hs-service")]
94type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99    /// Use the CREATE_FAST handshake.
100    CreateFast,
101    /// Use the ntor handshake.
102    Ntor {
103        /// The public key of the relay.
104        public_key: NtorPublicKey,
105        /// The Ed25519 identity of the relay, which is verified against the
106        /// identity held in the circuit's channel.
107        ed_identity: pk::ed25519::Ed25519Identity,
108    },
109    /// Use the ntor-v3 handshake.
110    NtorV3 {
111        /// The public key of the relay.
112        public_key: NtorV3PublicKey,
113    },
114}
115
116/// A behavior to perform when closing a stream.
117///
118/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
119/// that we shouldn't let it pass unremarked.
120#[derive(Clone, Debug)]
121pub(crate) enum CloseStreamBehavior {
122    /// Send nothing at all, so that the other side will not realize we have
123    /// closed the stream.
124    ///
125    /// We should only do this for incoming onion service streams when we
126    /// want to black-hole the client's requests.
127    SendNothing,
128    /// Send an End cell, if we haven't already sent one.
129    SendEnd(End),
130}
131impl Default for CloseStreamBehavior {
132    fn default() -> Self {
133        Self::SendEnd(End::new_misc())
134    }
135}
136
137// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
138// proliferation is a bit bothersome, but unavoidable with the current design.
139//
140// We should consider getting rid of some of these enums (if possible),
141// and coming up with more intuitive names.
142
143/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
144#[derive(From, Debug)]
145enum RunOnceCmd {
146    /// Run a single `RunOnceCmdInner` command.
147    Single(RunOnceCmdInner),
148    /// Run multiple `RunOnceCmdInner` commands.
149    //
150    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
151    // but most of the time we're only going to have *one* RunOnceCmdInner
152    // to run per run_once() loop. The enum enables us avoid the extra heap
153    // allocation for the `RunOnceCmd::Single` case.
154    Multiple(Vec<RunOnceCmdInner>),
155}
156
157/// Instructions for running something in the reactor loop.
158///
159/// Run at the end of [`Reactor::run_once`].
160//
161// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
162// We should consider making each variant a tuple variant and deduplicating the fields.
163#[derive(educe::Educe)]
164#[educe(Debug)]
165enum RunOnceCmdInner {
166    /// Send a RELAY cell.
167    Send {
168        /// The leg the cell should be sent on.
169        leg: UniqId,
170        /// The cell to send.
171        cell: SendRelayCell,
172        /// A channel for sending completion notifications.
173        done: Option<ReactorResultChannel<()>>,
174    },
175    /// Send a given control message on this circuit, and install a control-message handler to
176    /// receive responses.
177    #[cfg(feature = "send-control-msg")]
178    SendMsgAndInstallHandler {
179        /// The message to send, if any
180        msg: Option<AnyRelayMsgOuter>,
181        /// A message handler to install.
182        ///
183        /// If this is `None`, there must already be a message handler installed
184        #[educe(Debug(ignore))]
185        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
186        /// A sender that we use to tell the caller that the message was sent
187        /// and the handler installed.
188        done: oneshot::Sender<Result<()>>,
189    },
190    /// Handle a SENDME message.
191    HandleSendMe {
192        /// The leg the SENDME was received on.
193        leg: UniqId,
194        /// The hop number.
195        hop: HopNum,
196        /// The SENDME message to handle.
197        sendme: Sendme,
198    },
199    /// Begin a stream with the provided hop in this circuit.
200    ///
201    /// Uses the provided stream ID, and sends the provided message to that hop.
202    BeginStream {
203        /// The cell to send.
204        cell: Result<(SendRelayCell, StreamId)>,
205        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
206        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
207        /// caller.
208        hop: HopLocation,
209        /// The circuit leg to begin the stream on.
210        leg: UniqId,
211        /// Oneshot channel to notify on completion, with the allocated stream ID.
212        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
213    },
214    /// Close the specified stream.
215    CloseStream {
216        /// The hop number.
217        hop: HopLocation,
218        /// The ID of the stream to close.
219        sid: StreamId,
220        /// The stream-closing behavior.
221        behav: CloseStreamBehavior,
222        /// The reason for closing the stream.
223        reason: streammap::TerminateReason,
224        /// A channel for sending completion notifications.
225        done: Option<ReactorResultChannel<()>>,
226    },
227    /// Get the clock skew claimed by the first hop of the circuit.
228    FirstHopClockSkew {
229        /// Oneshot channel to return the clock skew.
230        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
231    },
232    /// Remove a circuit leg from the conflux set.
233    RemoveLeg {
234        /// The circuit leg to remove.
235        leg: UniqId,
236        /// The reason for removal.
237        ///
238        /// This is only used for conflux circuits that get removed
239        /// before the conflux handshake is complete.
240        ///
241        /// The [`RemoveLegReason`] is mapped by the reactor to a
242        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
243        /// handshake to indicate the reason the handshake failed.
244        reason: RemoveLegReason,
245    },
246    /// A circuit has completed the conflux handshake,
247    /// and wants to send the specified cell.
248    ///
249    /// This is similar to [`RunOnceCmdInner::Send`],
250    /// but needs to remain a separate variant,
251    /// because in addition to instructing the reactor to send a cell,
252    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
253    /// This enables the reactor to save the handshake result (`Ok(())`),
254    /// and, if there are no other legs still in the handshake phase,
255    /// send the result to the handshake initiator.
256    #[cfg(feature = "conflux")]
257    ConfluxHandshakeComplete {
258        /// The circuit leg that has completed the handshake,
259        /// This is the leg the cell should be sent on.
260        leg: UniqId,
261        /// The cell to send.
262        cell: SendRelayCell,
263    },
264    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
265    #[cfg(feature = "conflux")]
266    Link {
267        /// The circuits to link into the tunnel
268        #[educe(Debug(ignore))]
269        circuits: Vec<Circuit>,
270        /// Oneshot channel for notifying of conflux handshake completion.
271        answer: ConfluxLinkResultChannel,
272    },
273    /// Enqueue an out-of-order cell in ooo_msg.
274    #[cfg(feature = "conflux")]
275    Enqueue {
276        /// The leg the entry originated from.
277        leg: UniqId,
278        /// The out-of-order message.
279        msg: OooRelayMsg,
280    },
281    /// Perform a clean shutdown on this circuit.
282    CleanShutdown,
283}
284
285impl RunOnceCmdInner {
286    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
287    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
288        match cmd {
289            CircuitCmd::Send(cell) => Self::Send {
290                leg,
291                cell,
292                done: None,
293            },
294            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
295            CircuitCmd::CloseStream {
296                hop,
297                sid,
298                behav,
299                reason,
300            } => Self::CloseStream {
301                hop: HopLocation::Hop((leg, hop)),
302                sid,
303                behav,
304                reason,
305                done: None,
306            },
307            #[cfg(feature = "conflux")]
308            CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg { leg, reason },
309            #[cfg(feature = "conflux")]
310            CircuitCmd::ConfluxHandshakeComplete(cell) => {
311                Self::ConfluxHandshakeComplete { leg, cell }
312            }
313            #[cfg(feature = "conflux")]
314            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
315            CircuitCmd::CleanShutdown => Self::CleanShutdown,
316        }
317    }
318}
319
320/// Cmd for sending a relay cell.
321///
322/// The contents of this struct are passed to `send_relay_cell`
323#[derive(educe::Educe)]
324#[educe(Debug)]
325pub(crate) struct SendRelayCell {
326    /// The hop number.
327    pub(crate) hop: HopNum,
328    /// Whether to use a RELAY_EARLY cell.
329    pub(crate) early: bool,
330    /// The cell to send.
331    pub(crate) cell: AnyRelayMsgOuter,
332}
333
334/// A command to execute at the end of [`Reactor::run_once`].
335#[derive(From, Debug)]
336#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
337enum CircuitAction {
338    /// Run a single `CircuitCmd` command.
339    RunCmd {
340        /// The unique identifier of the circuit leg to run the command on
341        leg: UniqId,
342        /// The command to run.
343        cmd: CircuitCmd,
344    },
345    /// Handle a control message
346    HandleControl(CtrlMsg),
347    /// Handle an input message.
348    HandleCell {
349        /// The unique identifier of the circuit leg the message was received on.
350        leg: UniqId,
351        /// The message to handle.
352        cell: ClientCircChanMsg,
353    },
354    /// Remove the specified circuit leg from the conflux set.
355    ///
356    /// Returned whenever a single circuit leg needs to be be removed
357    /// from the reactor's conflux set, without necessarily tearing down
358    /// the whole set or shutting down the reactor.
359    ///
360    /// Note: this action *can* cause the reactor to shut down
361    /// (and the conflux set to be closed).
362    ///
363    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
364    RemoveLeg {
365        /// The leg to remove.
366        leg: UniqId,
367        /// The reason for removal.
368        ///
369        /// This is only used for conflux circuits that get removed
370        /// before the conflux handshake is complete.
371        ///
372        /// The [`RemoveLegReason`] is mapped by the reactor to a
373        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
374        /// handshake to indicate the reason the handshake failed.
375        reason: RemoveLegReason,
376    },
377}
378
379/// The reason for removing a circuit leg from the conflux set.
380#[derive(Debug, derive_more::Display)]
381enum RemoveLegReason {
382    /// The conflux handshake timed out.
383    ///
384    /// On the client-side, this means we didn't receive
385    /// the CONFLUX_LINKED response in time.
386    #[display("conflux handshake timed out")]
387    ConfluxHandshakeTimeout,
388    /// An error occurred during conflux handshake.
389    #[display("{}", _0)]
390    ConfluxHandshakeErr(Error),
391    /// The channel was closed.
392    #[display("channel closed")]
393    ChannelClosed,
394}
395
396/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
397/// progress.
398///
399/// # Background
400///
401/// The `Reactor` can't have async functions that send and receive cells, because its job is to
402/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
403///
404/// To get around this problem, the reactor can send some cells, and then make one of these
405/// `MetaCellHandler` objects, which will be run when the reply arrives.
406pub(crate) trait MetaCellHandler: Send {
407    /// The hop we're expecting the message to come from. This is compared against the hop
408    /// from which we actually receive messages, and an error is thrown if the two don't match.
409    fn expected_hop(&self) -> HopNum;
410    /// Called when the message we were waiting for arrives.
411    ///
412    /// Gets a copy of the `Reactor` in order to do anything it likes there.
413    ///
414    /// If this function returns an error, the reactor will shut down.
415    fn handle_msg(
416        &mut self,
417        msg: UnparsedRelayMsg,
418        reactor: &mut Circuit,
419    ) -> Result<MetaCellDisposition>;
420}
421
422/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
423#[derive(Debug, Clone)]
424#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
425#[non_exhaustive]
426pub(crate) enum MetaCellDisposition {
427    /// The message was consumed; the handler should remain installed.
428    #[cfg(feature = "send-control-msg")]
429    Consumed,
430    /// The message was consumed; the handler should be uninstalled.
431    ConversationFinished,
432    /// The message was consumed; the circuit should be closed.
433    #[cfg(feature = "send-control-msg")]
434    CloseCirc,
435    // TODO: Eventually we might want the ability to have multiple handlers
436    // installed, and to let them say "not for me, maybe for somebody else?".
437    // But right now we don't need that.
438}
439
440/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
441///
442/// This is a macro instead of a function to work around borrowck errors
443/// in the select! from run_once().
444macro_rules! unwrap_or_shutdown {
445    ($self:expr, $res:expr, $reason:expr) => {{
446        match $res {
447            None => {
448                trace!(
449                    tunnel_id = %$self.tunnel_id,
450                    reason = %$reason,
451                    "reactor shutdown"
452                );
453                Err(ReactorError::Shutdown)
454            }
455            Some(v) => Ok(v),
456        }
457    }};
458}
459
460/// Object to handle incoming cells and background tasks on a circuit
461///
462/// This type is returned when you finish a circuit; you need to spawn a
463/// new task that calls `run()` on it.
464#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
465pub struct Reactor {
466    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
467    ///
468    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
469    /// is ready to accept cells.
470    control: mpsc::UnboundedReceiver<CtrlMsg>,
471    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
472    ///
473    /// This channel is polled in [`Reactor::run_once`].
474    ///
475    /// NOTE: this is a separate channel from `control`, because some messages
476    /// have higher priority and need to be handled even if the `chan_sender` is not
477    /// ready (whereas `control` messages are not read until the `chan_sender` sink
478    /// is ready to accept cells).
479    command: mpsc::UnboundedReceiver<CtrlCmd>,
480    /// A oneshot sender that is used to alert other tasks when this reactor is
481    /// finally dropped.
482    ///
483    /// It is a sender for Void because we never actually want to send anything here;
484    /// we only want to generate canceled events.
485    #[allow(dead_code)] // the only purpose of this field is to be dropped.
486    reactor_closed_tx: oneshot::Sender<void::Void>,
487    /// A set of circuits that form a tunnel.
488    ///
489    /// Contains 1 or more circuits.
490    ///
491    /// Circuits may be added to this set throughout the lifetime of the reactor.
492    ///
493    /// Sometimes, the reactor will remove circuits from this set,
494    /// for example if the `LINKED` message takes too long to arrive,
495    /// or if congestion control negotiation fails.
496    /// The reactor will continue running with the remaining circuits.
497    /// It will shut down if *all* the circuits are removed.
498    ///
499    // TODO(conflux): document all the reasons why the reactor might
500    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
501    circuits: ConfluxSet,
502    /// An identifier for logging about this tunnel reactor.
503    tunnel_id: TunnelId,
504    /// Handlers, shared with `Circuit`.
505    cell_handlers: CellHandlers,
506    /// The time provider, used for conflux handshake timeouts.
507    runtime: DynTimeProvider,
508    /// The conflux handshake context, if there is an on-going handshake.
509    ///
510    /// Set to `None` if this is a single-path tunnel,
511    /// or if none of the circuit legs from our conflux set
512    /// are currently in the conflux handshake phase.
513    #[cfg(feature = "conflux")]
514    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
515    /// A min-heap buffering all the out-of-order messages received so far.
516    ///
517    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
518    /// on its size. We should make this participate in the memquota memory
519    /// tracking system, somehow.
520    #[cfg(feature = "conflux")]
521    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
522}
523
524/// The context for an on-going conflux handshake.
525#[cfg(feature = "conflux")]
526struct ConfluxHandshakeCtx {
527    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
528    answer: ConfluxLinkResultChannel,
529    /// The number of legs that are currently doing the handshake.
530    num_legs: usize,
531    /// The handshake results we have collected so far.
532    results: ConfluxHandshakeResult,
533}
534
535/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
536#[derive(Debug)]
537#[cfg(feature = "conflux")]
538struct ConfluxHeapEntry {
539    /// The leg id this message came from.
540    leg_id: UniqId,
541    /// The out of order message
542    msg: OooRelayMsg,
543}
544
545#[cfg(feature = "conflux")]
546impl Ord for ConfluxHeapEntry {
547    fn cmp(&self, other: &Self) -> Ordering {
548        self.msg.cmp(&other.msg).reverse()
549    }
550}
551
552#[cfg(feature = "conflux")]
553impl PartialOrd for ConfluxHeapEntry {
554    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
555        Some(self.cmp(other))
556    }
557}
558
559#[cfg(feature = "conflux")]
560impl PartialEq for ConfluxHeapEntry {
561    fn eq(&self, other: &Self) -> bool {
562        self.msg == other.msg
563    }
564}
565
566#[cfg(feature = "conflux")]
567impl Eq for ConfluxHeapEntry {}
568
569/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
570struct CellHandlers {
571    /// A handler for a meta cell, together with a result channel to notify on completion.
572    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
573    /// A handler for incoming stream requests.
574    #[cfg(feature = "hs-service")]
575    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
576}
577
578/// Information about an incoming stream request.
579#[cfg(feature = "hs-service")]
580#[derive(Debug, Deftly)]
581#[derive_deftly(HasMemoryCost)]
582pub(crate) struct StreamReqInfo {
583    /// The [`IncomingStreamRequest`].
584    pub(crate) req: IncomingStreamRequest,
585    /// The ID of the stream being requested.
586    pub(crate) stream_id: StreamId,
587    /// The [`HopNum`].
588    //
589    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
590    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
591    //
592    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
593    // incoming stream request from two separate hops.  (There is only one that's valid.)
594    pub(crate) hop_num: HopNum,
595    /// The [`UniqId`] of the circuit the request came on.
596    pub(crate) leg: UniqId,
597    /// The format which must be used with this stream to encode messages.
598    #[deftly(has_memory_cost(indirect_size = "0"))]
599    pub(crate) relay_cell_format: RelayCellFormat,
600    /// A channel for receiving messages from this stream.
601    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
602    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
603    /// A channel for sending messages to be sent on this stream.
604    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
605    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
606    /// The memory quota account to be used for this stream
607    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
608    pub(crate) memquota: StreamAccount,
609}
610
611/// Data required for handling an incoming stream request.
612#[cfg(feature = "hs-service")]
613#[derive(educe::Educe)]
614#[educe(Debug)]
615struct IncomingStreamRequestHandler {
616    /// A sender for sharing information about an incoming stream request.
617    incoming_sender: StreamReqSender,
618    /// A [`AnyCmdChecker`] for validating incoming stream requests.
619    cmd_checker: AnyCmdChecker,
620    /// The hop to expect incoming stream requests from.
621    hop_num: HopNum,
622    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
623    /// this request, or wants to reject it immediately.
624    #[educe(Debug(ignore))]
625    filter: Box<dyn IncomingStreamRequestFilter>,
626}
627
628impl Reactor {
629    /// Create a new circuit reactor.
630    ///
631    /// The reactor will send outbound messages on `channel`, receive incoming
632    /// messages on `input`, and identify this circuit by the channel-local
633    /// [`CircId`] provided.
634    ///
635    /// The internal unique identifier for this circuit will be `unique_id`.
636    #[allow(clippy::type_complexity)] // TODO
637    pub(super) fn new(
638        channel: Arc<Channel>,
639        channel_id: CircId,
640        unique_id: UniqId,
641        input: CircuitRxReceiver,
642        runtime: DynTimeProvider,
643        memquota: CircuitAccount,
644    ) -> (
645        Self,
646        mpsc::UnboundedSender<CtrlMsg>,
647        mpsc::UnboundedSender<CtrlCmd>,
648        oneshot::Receiver<void::Void>,
649        Arc<TunnelMutableState>,
650    ) {
651        let tunnel_id = TunnelId::next();
652        let (control_tx, control_rx) = mpsc::unbounded();
653        let (command_tx, command_rx) = mpsc::unbounded();
654        let mutable = Arc::new(MutableState::default());
655
656        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
657
658        let cell_handlers = CellHandlers {
659            meta_handler: None,
660            #[cfg(feature = "hs-service")]
661            incoming_stream_req_handler: None,
662        };
663
664        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
665        let circuit_leg = Circuit::new(
666            channel,
667            channel_id,
668            unique_id,
669            input,
670            memquota,
671            Arc::clone(&mutable),
672        );
673
674        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
675
676        let reactor = Reactor {
677            circuits,
678            control: control_rx,
679            command: command_rx,
680            reactor_closed_tx,
681            tunnel_id,
682            cell_handlers,
683            runtime,
684            #[cfg(feature = "conflux")]
685            conflux_hs_ctx: None,
686            #[cfg(feature = "conflux")]
687            ooo_msgs: Default::default(),
688        };
689
690        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
691    }
692
693    /// Launch the reactor, and run until the circuit closes or we
694    /// encounter an error.
695    ///
696    /// Once this method returns, the circuit is dead and cannot be
697    /// used again.
698    pub async fn run(mut self) -> Result<()> {
699        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
700        let result: Result<()> = loop {
701            match self.run_once().await {
702                Ok(()) => (),
703                Err(ReactorError::Shutdown) => break Ok(()),
704                Err(ReactorError::Err(e)) => break Err(e),
705            }
706        };
707        trace!(tunnel_id = %self.tunnel_id, "Tunnel reactor stopped: {:?}", result);
708        result
709    }
710
711    /// Helper for run: doesn't mark the circuit closed on finish.  Only
712    /// processes one cell or control message.
713    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
714        // If all the circuits are closed, shut down the reactor
715        if self.circuits.is_empty() {
716            trace!(
717                tunnel_id = %self.tunnel_id,
718                "Tunnel reactor shutting down: all circuits have closed",
719            );
720
721            return Err(ReactorError::Shutdown);
722        }
723
724        // If this is a single path circuit, we need to wait until the first hop
725        // is created before doing anything else
726        let single_path_with_hops = self
727            .circuits
728            .single_leg_mut()
729            .is_ok_and(|leg| !leg.has_hops());
730        if single_path_with_hops {
731            self.wait_for_create().await?;
732
733            return Ok(());
734        }
735
736        // Prioritize the buffered messages.
737        //
738        // Note: if any of the messages are ready to be handled,
739        // this will block the reactor until we are done processing them
740        #[cfg(feature = "conflux")]
741        self.try_dequeue_ooo_msgs().await?;
742
743        let action = select_biased! {
744            res = self.command.next() => {
745                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
746                return ControlHandler::new(self).handle_cmd(cmd);
747            },
748            // Check whether we've got a control message pending.
749            //
750            // Note: unfortunately, reading from control here means we might start
751            // handling control messages before our chan_senders are ready.
752            // With the current design, this is inevitable: we can't know which circuit leg
753            // a control message is meant for without first reading the control message from
754            // the channel, and at that point, we can't know for sure whether that particular
755            // circuit is ready for sending.
756            ret = self.control.next() => {
757                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
758                CircuitAction::HandleControl(msg)
759            },
760            res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
761        };
762
763        let cmd = match action {
764            CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
765                RunOnceCmdInner::from_circuit_cmd(leg, cmd),
766            )),
767            CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
768                .handle_msg(ctrl)?
769                .map(RunOnceCmd::Single),
770            CircuitAction::HandleCell { leg, cell } => {
771                let circ = self
772                    .circuits
773                    .leg_mut(leg)
774                    .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
775
776                let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
777                if circ_cmds.is_empty() {
778                    None
779                } else {
780                    // TODO: we return RunOnceCmd::Multiple even if there's a single command.
781                    //
782                    // See the TODO on `Circuit::handle_cell`.
783                    let cmd = RunOnceCmd::Multiple(
784                        circ_cmds
785                            .into_iter()
786                            .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
787                            .collect(),
788                    );
789
790                    Some(cmd)
791                }
792            }
793            CircuitAction::RemoveLeg { leg, reason } => {
794                Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
795            }
796        };
797
798        if let Some(cmd) = cmd {
799            self.handle_run_once_cmd(cmd).await?;
800        }
801
802        Ok(())
803    }
804
805    /// Try to process the previously-out-of-order messages we might have buffered.
806    #[cfg(feature = "conflux")]
807    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
808        // Check if we're ready to dequeue any of the previously out-of-order cells.
809        while let Some(entry) = self.ooo_msgs.peek() {
810            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
811
812            if !should_pop {
813                break;
814            }
815
816            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
817
818            let circ = self
819                .circuits
820                .leg_mut(entry.leg_id)
821                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
822            let handlers = &mut self.cell_handlers;
823            let cmd = circ
824                .handle_in_order_relay_msg(
825                    handlers,
826                    entry.msg.hopnum,
827                    entry.leg_id,
828                    entry.msg.cell_counts_towards_windows,
829                    entry.msg.streamid,
830                    entry.msg.msg,
831                )?
832                .map(|cmd| {
833                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
834                });
835
836            if let Some(cmd) = cmd {
837                self.handle_run_once_cmd(cmd).await?;
838            }
839        }
840
841        Ok(())
842    }
843
844    /// Handle a [`RunOnceCmd`].
845    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
846        match cmd {
847            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
848            RunOnceCmd::Multiple(cmds) => {
849                // While we know `sendable` is ready to accept *one* cell,
850                // we can't be certain it will be able to accept *all* of the cells
851                // that need to be sent here. This means we *may* end up buffering
852                // in its underlying SometimesUnboundedSink! That is OK, because
853                // RunOnceCmd::Multiple is only used for handling packed cells.
854                for cmd in cmds {
855                    self.handle_single_run_once_cmd(cmd).await?;
856                }
857            }
858        }
859
860        Ok(())
861    }
862
863    /// Handle a [`RunOnceCmd`].
864    async fn handle_single_run_once_cmd(
865        &mut self,
866        cmd: RunOnceCmdInner,
867    ) -> StdResult<(), ReactorError> {
868        match cmd {
869            RunOnceCmdInner::Send { leg, cell, done } => {
870                // TODO: check the cc window
871                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
872                if let Some(done) = done {
873                    // Don't care if the receiver goes away
874                    let _ = done.send(res.clone());
875                }
876                res?;
877            }
878            #[cfg(feature = "send-control-msg")]
879            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
880                let cell: Result<Option<SendRelayCell>> =
881                    self.prepare_msg_and_install_handler(msg, handler);
882
883                match cell {
884                    Ok(Some(cell)) => {
885                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
886                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
887                        // don't care if receiver goes away.
888                        let _ = done.send(outcome.clone());
889                        outcome?;
890                    }
891                    Ok(None) => {
892                        // don't care if receiver goes away.
893                        let _ = done.send(Ok(()));
894                    }
895                    Err(e) => {
896                        // don't care if receiver goes away.
897                        let _ = done.send(Err(e.clone()));
898                        return Err(e.into());
899                    }
900                }
901            }
902            RunOnceCmdInner::BeginStream {
903                leg,
904                cell,
905                hop,
906                done,
907            } => {
908                match cell {
909                    Ok((cell, stream_id)) => {
910                        let circ = self
911                            .circuits
912                            .leg_mut(leg)
913                            .ok_or_else(|| internal!("leg disappeared?!"))?;
914                        let cell_hop = cell.hop;
915                        let relay_format = circ
916                            .hop_mut(cell_hop)
917                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
918                            .ok_or(Error::NoSuchHop)?
919                            .relay_cell_format();
920
921                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
922                        // don't care if receiver goes away.
923                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
924                        outcome?;
925                    }
926                    Err(e) => {
927                        // don't care if receiver goes away.
928                        let _ = done.send(Err(e.clone()));
929                        return Err(e.into());
930                    }
931                }
932            }
933            RunOnceCmdInner::CloseStream {
934                hop,
935                sid,
936                behav,
937                reason,
938                done,
939            } => {
940                let result = (move || {
941                    // this is needed to force the closure to be FnOnce rather than FnMut :(
942                    let self_ = self;
943                    let (leg_id, hop_num) = self_
944                        .resolve_hop_location(hop)
945                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
946                    let leg = self_
947                        .circuits
948                        .leg_mut(leg_id)
949                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
950                    Ok::<_, Bug>((leg, hop_num))
951                })();
952
953                let (leg, hop_num) = match result {
954                    Ok(x) => x,
955                    Err(e) => {
956                        if let Some(done) = done {
957                            // don't care if the sender goes away
958                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
959                            let _ = done.send(Err(e.into()));
960                        }
961                        return Ok(());
962                    }
963                };
964
965                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
966
967                if let Some(done) = done {
968                    // don't care if the sender goes away
969                    let _ = done.send(res);
970                }
971            }
972            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
973                let leg = self
974                    .circuits
975                    .leg_mut(leg)
976                    .ok_or_else(|| internal!("leg disappeared?!"))?;
977                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
978                // future which *should* resolve immediately
979                let signals = leg.congestion_signals().await;
980                leg.handle_sendme(hop, sendme, signals)?;
981            }
982            RunOnceCmdInner::FirstHopClockSkew { answer } => {
983                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
984
985                // don't care if the sender goes away
986                let _ = answer.send(res.map_err(Into::into));
987            }
988            RunOnceCmdInner::CleanShutdown => {
989                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
990                return Err(ReactorError::Shutdown);
991            }
992            RunOnceCmdInner::RemoveLeg { leg, reason } => {
993                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
994
995                let circ = self.circuits.remove(leg)?;
996                let is_conflux_pending = circ.is_conflux_pending();
997
998                // Drop the removed leg. This will cause it to close if it's not already closed.
999                drop(circ);
1000
1001                // If we reach this point, it means we have more than one leg
1002                // (otherwise the .remove() would've returned a Shutdown error),
1003                // so we expect there to be a ConfluxHandshakeContext installed.
1004
1005                #[cfg(feature = "conflux")]
1006                if is_conflux_pending {
1007                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1008                        RemoveLegReason::ConfluxHandshakeTimeout => {
1009                            (ConfluxHandshakeError::Timeout, None)
1010                        }
1011                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1012                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1013                        }
1014                        RemoveLegReason::ChannelClosed => {
1015                            (ConfluxHandshakeError::ChannelClosed, None)
1016                        }
1017                    };
1018
1019                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1020
1021                    if let Some(e) = proto_violation {
1022                        // TODO: make warn_report support structured logging
1023                        tor_error::warn_report!(
1024                            e,
1025                            "{}: Malformed conflux handshake, tearing down tunnel",
1026                            self.tunnel_id
1027                        );
1028
1029                        return Err(e.into());
1030                    }
1031                }
1032            }
1033            #[cfg(feature = "conflux")]
1034            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1035                // Note: on the client-side, the handshake is considered complete once the
1036                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1037                //
1038                // We're optimistic here, and declare the handshake a success *before*
1039                // sending the LINKED_ACK response. I think this is OK though,
1040                // because if the send_relay_cell() below fails, the reactor will shut
1041                // down anyway. OTOH, marking the handshake as complete slightly early
1042                // means that on the happy path, the circuit is marked as usable sooner,
1043                // instead of blocking on the sending of the LINKED_ACK.
1044                self.note_conflux_handshake_result(Ok(()), false)?;
1045
1046                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1047
1048                res?;
1049            }
1050            #[cfg(feature = "conflux")]
1051            RunOnceCmdInner::Link { circuits, answer } => {
1052                // Add the specified circuits to our conflux set,
1053                // and send a LINK cell down each unlinked leg.
1054                //
1055                // NOTE: this will block the reactor until all the cells are sent.
1056                self.handle_link_circuits(circuits, answer).await?;
1057            }
1058            #[cfg(feature = "conflux")]
1059            RunOnceCmdInner::Enqueue { leg, msg } => {
1060                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1061                self.ooo_msgs.push(entry);
1062            }
1063        }
1064
1065        Ok(())
1066    }
1067
1068    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1069    ///
1070    /// Returns an error if an unexpected `CtrlMsg` is received.
1071    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1072        let msg = select_biased! {
1073            res = self.command.next() => {
1074                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1075                match cmd {
1076                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1077                    #[cfg(test)]
1078                    CtrlCmd::AddFakeHop {
1079                        relay_cell_format: format,
1080                        fwd_lasthop,
1081                        rev_lasthop,
1082                        peer_id,
1083                        params,
1084                        done,
1085                    } => {
1086                        let leg = self.circuits.single_leg_mut()?;
1087                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1088                        return Ok(())
1089                    },
1090                    _ => {
1091                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1092                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1093                    }
1094                }
1095            },
1096            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1097        };
1098
1099        match msg {
1100            CtrlMsg::Create {
1101                recv_created,
1102                handshake,
1103                settings,
1104                done,
1105            } => {
1106                // TODO(conflux): instead of crashing the reactor, it might be better
1107                // to send the error via the done channel instead
1108                let leg = self.circuits.single_leg_mut()?;
1109                leg.handle_create(recv_created, handshake, settings, done)
1110                    .await
1111            }
1112            _ => {
1113                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1114
1115                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1116            }
1117        }
1118    }
1119
1120    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1121    ///
1122    /// If all the circuits we were waiting on have finished the conflux handshake,
1123    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1124    /// are sent to the handshake initiator.
1125    #[cfg(feature = "conflux")]
1126    fn note_conflux_handshake_result(
1127        &mut self,
1128        res: StdResult<(), ConfluxHandshakeError>,
1129        reactor_is_closing: bool,
1130    ) -> StdResult<(), ReactorError> {
1131        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1132            Some(conflux_ctx) => {
1133                conflux_ctx.results.push(res);
1134                // Whether all the legs have finished linking:
1135                conflux_ctx.results.len() == conflux_ctx.num_legs
1136            }
1137            None => {
1138                return Err(internal!("no conflux handshake context").into());
1139            }
1140        };
1141
1142        if tunnel_complete || reactor_is_closing {
1143            // Time to remove the conflux handshake context
1144            // and extract the results we have collected
1145            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1146
1147            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1148            let leg_count = conflux_ctx.results.len();
1149
1150            info!(
1151                tunnel_id = %self.tunnel_id,
1152                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1153            );
1154
1155            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1156
1157            // We don't expect to receive any more handshake results,
1158            // at least not until we get another LinkCircuits control message,
1159            // which will install a new ConfluxHandshakeCtx with a channel
1160            // for us to send updates on
1161        }
1162
1163        Ok(())
1164    }
1165
1166    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1167    fn prepare_msg_and_install_handler(
1168        &mut self,
1169        msg: Option<AnyRelayMsgOuter>,
1170        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1171    ) -> Result<Option<SendRelayCell>> {
1172        let msg = msg
1173            .map(|msg| {
1174                let handlers = &mut self.cell_handlers;
1175                let handler = handler
1176                    .as_ref()
1177                    .or(handlers.meta_handler.as_ref())
1178                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1179                Ok::<_, crate::Error>(SendRelayCell {
1180                    hop: handler.expected_hop(),
1181                    early: false,
1182                    cell: msg,
1183                })
1184            })
1185            .transpose()?;
1186
1187        if let Some(handler) = handler {
1188            self.cell_handlers.set_meta_handler(handler)?;
1189        }
1190
1191        Ok(msg)
1192    }
1193
1194    /// Handle a shutdown request.
1195    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1196        trace!(
1197            tunnel_id = %self.tunnel_id,
1198            "reactor shutdown due to explicit request",
1199        );
1200
1201        Err(ReactorError::Shutdown)
1202    }
1203
1204    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1205    ///
1206    /// Returns an error over the `answer` channel if the reactor has no circuits,
1207    /// or more than one circuit. The reactor will shut down regardless.
1208    #[cfg(feature = "conflux")]
1209    fn handle_shutdown_and_return_circuit(
1210        &mut self,
1211        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1212    ) -> StdResult<(), ReactorError> {
1213        // Don't care if the receiver goes away
1214        let _ = answer.send(self.circuits.take_single_leg().map_err(Into::into));
1215        self.handle_shutdown().map(|_| ())
1216    }
1217
1218    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1219    ///
1220    /// After resolving a `TargetHop::LastHop`,
1221    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1222    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1223    ///
1224    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1225    /// if you need a fixed hop position in the tunnel.
1226    /// For example if we open a stream to `TargetHop::LastHop`,
1227    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1228    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1229    ///
1230    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1231    /// and the tunnel has no hops
1232    /// (either has no legs, or has legs which contain no hops).
1233    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1234        match hop {
1235            TargetHop::Hop(hop) => Ok(hop),
1236            TargetHop::LastHop => {
1237                if let Ok(leg) = self.circuits.single_leg() {
1238                    let leg_id = leg.unique_id();
1239                    // single-path tunnel
1240                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1241                    Ok(HopLocation::Hop((leg_id, hop)))
1242                } else if !self.circuits.is_empty() {
1243                    // multi-path tunnel
1244                    return Ok(HopLocation::JoinPoint);
1245                } else {
1246                    // no legs
1247                    Err(NoHopsBuiltError)
1248                }
1249            }
1250        }
1251    }
1252
1253    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1254    ///
1255    /// After resolving a `HopLocation::JoinPoint`,
1256    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1257    ///
1258    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1259    /// need them,
1260    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1261    /// iterations as the primary leg may change from one iteration to the next.
1262    ///
1263    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1264    /// but it does not have a join point.
1265    fn resolve_hop_location(
1266        &self,
1267        hop: HopLocation,
1268    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1269        match hop {
1270            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1271            HopLocation::JoinPoint => {
1272                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1273                    Ok((leg_id, hop_num))
1274                } else {
1275                    // Attempted to get the join point of a non-multipath tunnel.
1276                    Err(NoJoinPointError)
1277                }
1278            }
1279        }
1280    }
1281
1282    /// Does congestion control use stream SENDMEs for the given hop?
1283    ///
1284    /// Returns `None` if either the `leg` or `hop` don't exist.
1285    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1286        self.circuits.uses_stream_sendme(leg, hop)
1287    }
1288
1289    /// Handle a request to link some extra circuits in the reactor's conflux set.
1290    ///
1291    /// The circuits are validated, and if they do not have the same length,
1292    /// or if they do not all have the same last hop, an error is returned on
1293    /// the `answer` channel, and the conflux handshake is *not* initiated.
1294    ///
1295    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1296    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1297    ///
1298    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1299    #[cfg(feature = "conflux")]
1300    async fn handle_link_circuits(
1301        &mut self,
1302        circuits: Vec<Circuit>,
1303        answer: ConfluxLinkResultChannel,
1304    ) -> StdResult<(), ReactorError> {
1305        use tor_error::warn_report;
1306
1307        if self.conflux_hs_ctx.is_some() {
1308            let err = internal!("conflux linking already in progress");
1309            send_conflux_outcome(answer, Err(err.into()))?;
1310
1311            return Ok(());
1312        }
1313
1314        let unlinked_legs = self.circuits.num_unlinked();
1315
1316        // We need to send the LINK cell on each of the new circuits
1317        // and on each of the existing, unlinked legs from self.circuits.
1318        //
1319        // In reality, there can only be one such circuit
1320        // (the "initial" one from the previously single-path tunnel),
1321        // because any circuits that to complete the conflux handshake
1322        // get removed from the set.
1323        let num_legs = circuits.len() + unlinked_legs;
1324
1325        // Note: add_legs validates `circuits`
1326        let res = async {
1327            self.circuits.add_legs(circuits, &self.runtime)?;
1328            self.circuits.link_circuits(&self.runtime).await
1329        }
1330        .await;
1331
1332        if let Err(e) = res {
1333            warn_report!(e, "Failed to link conflux circuits");
1334
1335            send_conflux_outcome(answer, Err(e))?;
1336        } else {
1337            // Save the channel, to notify the user of completion.
1338            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1339                answer,
1340                num_legs,
1341                results: Default::default(),
1342            });
1343        }
1344
1345        Ok(())
1346    }
1347}
1348
1349/// Notify the conflux handshake initiator of the handshake outcome.
1350///
1351/// Returns an error if the initiator has done away.
1352#[cfg(feature = "conflux")]
1353fn send_conflux_outcome(
1354    tx: ConfluxLinkResultChannel,
1355    res: Result<ConfluxHandshakeResult>,
1356) -> StdResult<(), ReactorError> {
1357    if tx.send(res).is_err() {
1358        tracing::warn!("conflux initiator went away before handshake completed?");
1359        return Err(ReactorError::Shutdown);
1360    }
1361
1362    Ok(())
1363}
1364
1365/// The tunnel does not have any hops.
1366#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1367#[non_exhaustive]
1368#[error("no hops have been built for this tunnel")]
1369pub(crate) struct NoHopsBuiltError;
1370
1371/// The tunnel does not have a join point.
1372#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1373#[non_exhaustive]
1374#[error("the tunnel does not have a join point")]
1375pub(crate) struct NoJoinPointError;
1376
1377impl CellHandlers {
1378    /// Try to install a given meta-cell handler to receive any unusual cells on
1379    /// this circuit, along with a result channel to notify on completion.
1380    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1381        if self.meta_handler.is_none() {
1382            self.meta_handler = Some(handler);
1383            Ok(())
1384        } else {
1385            Err(Error::from(internal!(
1386                "Tried to install a meta-cell handler before the old one was gone."
1387            )))
1388        }
1389    }
1390
1391    /// Try to install a given cell handler on this circuit.
1392    #[cfg(feature = "hs-service")]
1393    fn set_incoming_stream_req_handler(
1394        &mut self,
1395        handler: IncomingStreamRequestHandler,
1396    ) -> Result<()> {
1397        if self.incoming_stream_req_handler.is_none() {
1398            self.incoming_stream_req_handler = Some(handler);
1399            Ok(())
1400        } else {
1401            Err(Error::from(internal!(
1402                "Tried to install a BEGIN cell handler before the old one was gone."
1403            )))
1404        }
1405    }
1406}
1407
1408#[cfg(test)]
1409mod test {
1410    // Tested in [`crate::tunnel::circuit::test`].
1411}