tor_proto/tunnel/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12//!    this; for half-closed streams, the reactor handles it using the
13//!    `halfstream` module.
14//! 3. Does the message have an acceptable command type, and is the message
15//!    well-formed? For open streams, the streams themselves handle this check.
16//!    For half-closed streams, the reactor handles it by calling
17//!    `consume_checked_msg()`.
18
19pub(super) mod circuit;
20mod conflux;
21mod control;
22pub(super) mod syncview;
23
24use crate::crypto::cell::HopNum;
25use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26use crate::memquota::{CircuitAccount, StreamAccount};
27use crate::stream::AnyCmdChecker;
28#[cfg(feature = "hs-service")]
29use crate::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
30use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
31use crate::tunnel::circuit::unique_id::UniqId;
32use crate::tunnel::circuit::CircuitRxReceiver;
33use crate::tunnel::{streammap, HopLocation, TargetHop};
34use crate::util::err::ReactorError;
35use crate::util::skew::ClockSkew;
36use crate::{Error, Result};
37use circuit::{Circuit, CircuitCmd};
38use conflux::ConfluxSet;
39use control::ControlHandler;
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42use std::mem::size_of;
43use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
44use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
45use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
46use tor_rtcompat::DynTimeProvider;
47
48use futures::channel::mpsc;
49use futures::StreamExt;
50use futures::{select_biased, FutureExt as _};
51use oneshot_fused_workaround as oneshot;
52
53use std::result::Result as StdResult;
54use std::sync::Arc;
55
56use crate::channel::Channel;
57use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
58use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
59use derive_deftly::Deftly;
60use derive_more::From;
61use tor_cell::chancell::CircId;
62use tor_llcrypto::pk;
63use tor_memquota::mq_queue::{self, MpscSpec};
64use tor_memquota::{derive_deftly_template_HasMemoryCost, memory_cost_structural_copy};
65use tracing::trace;
66
67use super::circuit::{MutableState, TunnelMutableState};
68
69#[cfg(feature = "conflux")]
70use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
71
72pub(super) use control::CtrlCmd;
73pub(super) use control::CtrlMsg;
74
75/// The type of a oneshot channel used to inform reactor of the result of an operation.
76pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
77
78/// Contains a list of conflux handshake results.
79#[cfg(feature = "conflux")]
80pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
81
82/// The type of oneshot channel used to inform reactor users of the outcome
83/// of a client-side conflux handshake.
84///
85/// Contains a list of handshake results, one for each circuit that we were asked
86/// to link in the tunnel.
87#[cfg(feature = "conflux")]
88pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
89
90pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
91
92/// MPSC queue containing stream requests
93#[cfg(feature = "hs-service")]
94type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99    /// Use the CREATE_FAST handshake.
100    CreateFast,
101    /// Use the ntor handshake.
102    Ntor {
103        /// The public key of the relay.
104        public_key: NtorPublicKey,
105        /// The Ed25519 identity of the relay, which is verified against the
106        /// identity held in the circuit's channel.
107        ed_identity: pk::ed25519::Ed25519Identity,
108    },
109    /// Use the ntor-v3 handshake.
110    NtorV3 {
111        /// The public key of the relay.
112        public_key: NtorV3PublicKey,
113    },
114}
115
116/// A behavior to perform when closing a stream.
117///
118/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
119/// that we shouldn't let it pass unremarked.
120#[derive(Clone, Debug)]
121pub(crate) enum CloseStreamBehavior {
122    /// Send nothing at all, so that the other side will not realize we have
123    /// closed the stream.
124    ///
125    /// We should only do this for incoming onion service streams when we
126    /// want to black-hole the client's requests.
127    SendNothing,
128    /// Send an End cell, if we haven't already sent one.
129    SendEnd(End),
130}
131impl Default for CloseStreamBehavior {
132    fn default() -> Self {
133        Self::SendEnd(End::new_misc())
134    }
135}
136
137// TODO(conflux): the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
138// proliferation is a bit bothersome, but unavoidable with the current design.
139//
140// We should consider getting rid of some of these enums (if possible),
141// and coming up with more intuitive names.
142
143/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
144#[derive(From, Debug)]
145enum RunOnceCmd {
146    /// Run a single `RunOnceCmdInner` command.
147    Single(RunOnceCmdInner),
148    /// Run multiple `RunOnceCmdInner` commands.
149    //
150    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
151    // but most of the time we're only going to have *one* RunOnceCmdInner
152    // to run per run_once() loop. The enum enables us avoid the extra heap
153    // allocation for the `RunOnceCmd::Single` case.
154    Multiple(Vec<RunOnceCmdInner>),
155}
156
157/// Instructions for running something in the reactor loop.
158///
159/// Run at the end of [`Reactor::run_once`].
160//
161// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
162// We should consider making each variant a tuple variant and deduplicating the fields.
163#[derive(educe::Educe)]
164#[educe(Debug)]
165enum RunOnceCmdInner {
166    /// Send a RELAY cell.
167    Send {
168        /// The leg the cell should be sent on.
169        leg: LegId,
170        /// The cell to send.
171        cell: SendRelayCell,
172        /// A channel for sending completion notifications.
173        done: Option<ReactorResultChannel<()>>,
174    },
175    /// Send a given control message on this circuit, and install a control-message handler to
176    /// receive responses.
177    #[cfg(feature = "send-control-msg")]
178    SendMsgAndInstallHandler {
179        /// The message to send, if any
180        msg: Option<AnyRelayMsgOuter>,
181        /// A message handler to install.
182        ///
183        /// If this is `None`, there must already be a message handler installed
184        #[educe(Debug(ignore))]
185        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
186        /// A sender that we use to tell the caller that the message was sent
187        /// and the handler installed.
188        done: oneshot::Sender<Result<()>>,
189    },
190    /// Handle a SENDME message.
191    HandleSendMe {
192        /// The leg the SENDME was received on.
193        leg: LegId,
194        /// The hop number.
195        hop: HopNum,
196        /// The SENDME message to handle.
197        sendme: Sendme,
198    },
199    /// Begin a stream with the provided hop in this circuit.
200    ///
201    /// Uses the provided stream ID, and sends the provided message to that hop.
202    BeginStream {
203        /// The cell to send.
204        cell: Result<(SendRelayCell, StreamId)>,
205        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
206        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
207        /// caller.
208        hop: HopLocation,
209        /// The circuit leg to begin the stream on.
210        leg: LegId,
211        /// Oneshot channel to notify on completion, with the allocated stream ID.
212        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
213    },
214    /// Close the specified stream.
215    CloseStream {
216        /// The hop number.
217        hop: HopLocation,
218        /// The ID of the stream to close.
219        sid: StreamId,
220        /// The stream-closing behavior.
221        behav: CloseStreamBehavior,
222        /// The reason for closing the stream.
223        reason: streammap::TerminateReason,
224        /// A channel for sending completion notifications.
225        done: Option<ReactorResultChannel<()>>,
226    },
227    /// Get the clock skew claimed by the first hop of the circuit.
228    FirstHopClockSkew {
229        /// Oneshot channel to return the clock skew.
230        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
231    },
232    /// Remove a circuit leg from the conflux set.
233    RemoveLeg {
234        /// The circuit leg to remove.
235        leg: LegId,
236        /// The reason for removal.
237        ///
238        /// This is only used for conflux circuits that get removed
239        /// before the conflux handshake is complete.
240        ///
241        /// The [`RemoveLegReason`] is mapped by the reactor to a
242        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
243        /// handshake to indicate the reason the handshake failed.
244        reason: RemoveLegReason,
245    },
246    /// A circuit has completed the conflux handshake,
247    /// and wants to send the specified cell.
248    ///
249    /// This is similar to [`RunOnceCmdInner::Send`],
250    /// but needs to remain a separate variant,
251    /// because in addition to instructing the reactor to send a cell,
252    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
253    /// This enables the reactor to save the handshake result (`Ok(())`),
254    /// and, if there are no other legs still in the handshake phase,
255    /// send the result to the handshake initiator.
256    #[cfg(feature = "conflux")]
257    ConfluxHandshakeComplete {
258        /// The circuit leg that has completed the handshake,
259        /// This is the leg the cell should be sent on.
260        leg: LegId,
261        /// The cell to send.
262        cell: SendRelayCell,
263    },
264    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
265    #[cfg(feature = "conflux")]
266    Link {
267        /// The circuits to link into the tunnel
268        #[educe(Debug(ignore))]
269        circuits: Vec<Circuit>,
270        /// Oneshot channel for notifying of conflux handshake completion.
271        answer: ConfluxLinkResultChannel,
272    },
273    /// Enqueue an out-of-order cell in ooo_msg.
274    #[cfg(feature = "conflux")]
275    Enqueue {
276        /// The leg the entry originated from.
277        leg: LegId,
278        /// The out-of-order message.
279        msg: OooRelayMsg,
280    },
281    /// Perform a clean shutdown on this circuit.
282    CleanShutdown,
283}
284
285impl RunOnceCmdInner {
286    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`LegIdKey`].
287    fn from_circuit_cmd(leg: LegIdKey, cmd: CircuitCmd) -> Self {
288        match cmd {
289            CircuitCmd::Send(cell) => Self::Send {
290                leg: LegId(leg),
291                cell,
292                done: None,
293            },
294            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe {
295                leg: LegId(leg),
296                hop,
297                sendme,
298            },
299            CircuitCmd::CloseStream {
300                hop,
301                sid,
302                behav,
303                reason,
304            } => Self::CloseStream {
305                hop: HopLocation::Hop((LegId(leg), hop)),
306                sid,
307                behav,
308                reason,
309                done: None,
310            },
311            #[cfg(feature = "conflux")]
312            CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg {
313                leg: LegId(leg),
314                reason,
315            },
316            #[cfg(feature = "conflux")]
317            CircuitCmd::ConfluxHandshakeComplete(cell) => Self::ConfluxHandshakeComplete {
318                leg: LegId(leg),
319                cell,
320            },
321            #[cfg(feature = "conflux")]
322            CircuitCmd::Enqueue(msg) => Self::Enqueue {
323                leg: LegId(leg),
324                msg,
325            },
326            CircuitCmd::CleanShutdown => Self::CleanShutdown,
327        }
328    }
329}
330
331/// Cmd for sending a relay cell.
332///
333/// The contents of this struct are passed to `send_relay_cell`
334#[derive(educe::Educe)]
335#[educe(Debug)]
336pub(crate) struct SendRelayCell {
337    /// The hop number.
338    pub(crate) hop: HopNum,
339    /// Whether to use a RELAY_EARLY cell.
340    pub(crate) early: bool,
341    /// The cell to send.
342    pub(crate) cell: AnyRelayMsgOuter,
343}
344
345/// A command to execute at the end of [`Reactor::run_once`].
346#[derive(From, Debug)]
347#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
348enum CircuitAction {
349    /// Run a single `CircuitCmd` command.
350    RunCmd {
351        /// The unique identifier of the circuit leg to run the command on
352        leg: LegIdKey,
353        /// The command to run.
354        cmd: CircuitCmd,
355    },
356    /// Handle a control message
357    HandleControl(CtrlMsg),
358    /// Handle an input message.
359    HandleCell {
360        /// The unique identifier of the circuit leg the message was received on.
361        leg: LegIdKey,
362        /// The message to handle.
363        cell: ClientCircChanMsg,
364    },
365    /// Remove the specified circuit leg from the conflux set.
366    ///
367    /// Returned whenever a single circuit leg needs to be be removed
368    /// from the reactor's conflux set, without necessarily tearing down
369    /// the whole set or shutting down the reactor.
370    ///
371    /// Note: this action *can* cause the reactor to shut down
372    /// (and the conflux set to be closed).
373    ///
374    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
375    RemoveLeg {
376        /// The leg to remove.
377        leg: LegIdKey,
378        /// The reason for removal.
379        ///
380        /// This is only used for conflux circuits that get removed
381        /// before the conflux handshake is complete.
382        ///
383        /// The [`RemoveLegReason`] is mapped by the reactor to a
384        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
385        /// handshake to indicate the reason the handshake failed.
386        reason: RemoveLegReason,
387    },
388}
389
390/// The reason for removing a circuit leg from the conflux set.
391#[derive(Debug)]
392enum RemoveLegReason {
393    /// The conflux handshake timed out.
394    ///
395    /// On the client-side, this means we didn't receive
396    /// the CONFLUX_LINKED response in time.
397    ConfluxHandshakeTimeout,
398    /// An error occurred during conflux handshake.
399    ConfluxHandshakeErr(Error),
400    /// The channel was closed.
401    ChannelClosed,
402}
403
404/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
405/// progress.
406///
407/// # Background
408///
409/// The `Reactor` can't have async functions that send and receive cells, because its job is to
410/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
411///
412/// To get around this problem, the reactor can send some cells, and then make one of these
413/// `MetaCellHandler` objects, which will be run when the reply arrives.
414pub(crate) trait MetaCellHandler: Send {
415    /// The hop we're expecting the message to come from. This is compared against the hop
416    /// from which we actually receive messages, and an error is thrown if the two don't match.
417    fn expected_hop(&self) -> HopNum;
418    /// Called when the message we were waiting for arrives.
419    ///
420    /// Gets a copy of the `Reactor` in order to do anything it likes there.
421    ///
422    /// If this function returns an error, the reactor will shut down.
423    fn handle_msg(
424        &mut self,
425        msg: UnparsedRelayMsg,
426        reactor: &mut Circuit,
427    ) -> Result<MetaCellDisposition>;
428}
429
430/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
431#[derive(Debug, Clone)]
432#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
433#[non_exhaustive]
434pub(crate) enum MetaCellDisposition {
435    /// The message was consumed; the handler should remain installed.
436    #[cfg(feature = "send-control-msg")]
437    Consumed,
438    /// The message was consumed; the handler should be uninstalled.
439    ConversationFinished,
440    /// The message was consumed; the circuit should be closed.
441    #[cfg(feature = "send-control-msg")]
442    CloseCirc,
443    // TODO: Eventually we might want the ability to have multiple handlers
444    // installed, and to let them say "not for me, maybe for somebody else?".
445    // But right now we don't need that.
446}
447
448/// A unique identifier for a circuit leg.
449///
450/// After the circuit is torn down, its `LegId` becomes invalid.
451/// The same `LegId` won't be reused for a future circuit.
452//
453// TODO(#1857): make this pub
454#[allow(unused)]
455#[derive(Copy, Clone, Debug, Eq, PartialEq, Deftly)]
456#[derive_deftly(HasMemoryCost)]
457pub(crate) struct LegId(pub(crate) LegIdKey);
458
459// TODO(conflux): can we use `UniqId` as the key instead of this newtype?
460//
461// See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2996#note_3199069
462slotmap_careful::new_key_type! {
463    /// A key type for the circuit leg slotmap
464    ///
465    /// See [`LegId`].
466    pub(crate) struct LegIdKey;
467}
468
469impl From<LegIdKey> for LegId {
470    fn from(leg_id: LegIdKey) -> Self {
471        LegId(leg_id)
472    }
473}
474
475memory_cost_structural_copy!(LegIdKey);
476
477/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
478///
479/// This is a macro instead of a function to work around borrowck errors
480/// in the select! from run_once().
481macro_rules! unwrap_or_shutdown {
482    ($self:expr, $res:expr, $reason:expr) => {{
483        match $res {
484            None => {
485                trace!("{}: reactor shutdown due to {}", $self.unique_id, $reason);
486                Err(ReactorError::Shutdown)
487            }
488            Some(v) => Ok(v),
489        }
490    }};
491}
492
493/// Object to handle incoming cells and background tasks on a circuit
494///
495/// This type is returned when you finish a circuit; you need to spawn a
496/// new task that calls `run()` on it.
497#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
498pub struct Reactor {
499    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
500    ///
501    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
502    /// is ready to accept cells.
503    control: mpsc::UnboundedReceiver<CtrlMsg>,
504    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
505    ///
506    /// This channel is polled in [`Reactor::run_once`].
507    ///
508    /// NOTE: this is a separate channel from `control`, because some messages
509    /// have higher priority and need to be handled even if the `chan_sender` is not
510    /// ready (whereas `control` messages are not read until the `chan_sender` sink
511    /// is ready to accept cells).
512    command: mpsc::UnboundedReceiver<CtrlCmd>,
513    /// A oneshot sender that is used to alert other tasks when this reactor is
514    /// finally dropped.
515    ///
516    /// It is a sender for Void because we never actually want to send anything here;
517    /// we only want to generate canceled events.
518    #[allow(dead_code)] // the only purpose of this field is to be dropped.
519    reactor_closed_tx: oneshot::Sender<void::Void>,
520    /// A set of circuits that form a tunnel.
521    ///
522    /// Contains 1 or more circuits.
523    ///
524    /// Circuits may be added to this set throughout the lifetime of the reactor.
525    ///
526    /// Sometimes, the reactor will remove circuits from this set,
527    /// for example if the `LINKED` message takes too long to arrive,
528    /// or if congestion control negotiation fails.
529    /// The reactor will continue running with the remaining circuits.
530    /// It will shut down if *all* the circuits are removed.
531    ///
532    // TODO(conflux): document all the reasons why the reactor might
533    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
534    circuits: ConfluxSet,
535    /// An identifier for logging about this reactor's circuit.
536    unique_id: UniqId,
537    /// Handlers, shared with `Circuit`.
538    cell_handlers: CellHandlers,
539    /// The time provider, used for conflux handshake timeouts.
540    runtime: DynTimeProvider,
541    /// The conflux handshake context, if there is an on-going handshake.
542    ///
543    /// Set to `None` if this is a single-path tunnel,
544    /// or if none of the circuit legs from our conflux set
545    /// are currently in the conflux handshake phase.
546    #[cfg(feature = "conflux")]
547    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
548    /// A min-heap buffering all the out-of-order messages received so far.
549    ///
550    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
551    /// on its size. We should make this participate in the memquota memory
552    /// tracking system, somehow.
553    #[cfg(feature = "conflux")]
554    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
555}
556
557/// The context for an on-going conflux handshake.
558#[cfg(feature = "conflux")]
559struct ConfluxHandshakeCtx {
560    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
561    answer: ConfluxLinkResultChannel,
562    /// The number of legs that are currently doing the handshake.
563    num_legs: usize,
564    /// The handshake results we have collected so far.
565    results: ConfluxHandshakeResult,
566}
567
568/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
569#[derive(Debug)]
570#[cfg(feature = "conflux")]
571struct ConfluxHeapEntry {
572    /// The leg id this message came from.
573    leg_id: LegId,
574    /// The out of order message
575    msg: OooRelayMsg,
576}
577
578#[cfg(feature = "conflux")]
579impl Ord for ConfluxHeapEntry {
580    fn cmp(&self, other: &Self) -> Ordering {
581        self.msg.cmp(&other.msg).reverse()
582    }
583}
584
585#[cfg(feature = "conflux")]
586impl PartialOrd for ConfluxHeapEntry {
587    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
588        Some(self.cmp(other))
589    }
590}
591
592#[cfg(feature = "conflux")]
593impl PartialEq for ConfluxHeapEntry {
594    fn eq(&self, other: &Self) -> bool {
595        self.msg == other.msg
596    }
597}
598
599#[cfg(feature = "conflux")]
600impl Eq for ConfluxHeapEntry {}
601
602/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
603struct CellHandlers {
604    /// A handler for a meta cell, together with a result channel to notify on completion.
605    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
606    /// A handler for incoming stream requests.
607    #[cfg(feature = "hs-service")]
608    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
609}
610
611/// Information about an incoming stream request.
612#[cfg(feature = "hs-service")]
613#[derive(Debug, Deftly)]
614#[derive_deftly(HasMemoryCost)]
615pub(crate) struct StreamReqInfo {
616    /// The [`IncomingStreamRequest`].
617    pub(crate) req: IncomingStreamRequest,
618    /// The ID of the stream being requested.
619    pub(crate) stream_id: StreamId,
620    /// The [`HopNum`].
621    //
622    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
623    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
624    //
625    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
626    // incoming stream request from two separate hops.  (There is only one that's valid.)
627    pub(crate) hop_num: HopNum,
628    /// The [`LegId`] of the circuit the request came on.
629    pub(crate) leg: LegId,
630    /// The format which must be used with this stream to encode messages.
631    #[deftly(has_memory_cost(indirect_size = "0"))]
632    pub(crate) relay_cell_format: RelayCellFormat,
633    /// A channel for receiving messages from this stream.
634    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
635    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
636    /// A channel for sending messages to be sent on this stream.
637    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
638    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
639    /// The memory quota account to be used for this stream
640    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
641    pub(crate) memquota: StreamAccount,
642}
643
644/// Data required for handling an incoming stream request.
645#[cfg(feature = "hs-service")]
646#[derive(educe::Educe)]
647#[educe(Debug)]
648struct IncomingStreamRequestHandler {
649    /// A sender for sharing information about an incoming stream request.
650    incoming_sender: StreamReqSender,
651    /// A [`AnyCmdChecker`] for validating incoming stream requests.
652    cmd_checker: AnyCmdChecker,
653    /// The hop to expect incoming stream requests from.
654    hop_num: HopNum,
655    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
656    /// this request, or wants to reject it immediately.
657    #[educe(Debug(ignore))]
658    filter: Box<dyn IncomingStreamRequestFilter>,
659}
660
661impl Reactor {
662    /// Create a new circuit reactor.
663    ///
664    /// The reactor will send outbound messages on `channel`, receive incoming
665    /// messages on `input`, and identify this circuit by the channel-local
666    /// [`CircId`] provided.
667    ///
668    /// The internal unique identifier for this circuit will be `unique_id`.
669    #[allow(clippy::type_complexity)] // TODO
670    pub(super) fn new(
671        channel: Arc<Channel>,
672        channel_id: CircId,
673        unique_id: UniqId,
674        input: CircuitRxReceiver,
675        runtime: DynTimeProvider,
676        memquota: CircuitAccount,
677    ) -> (
678        Self,
679        mpsc::UnboundedSender<CtrlMsg>,
680        mpsc::UnboundedSender<CtrlCmd>,
681        oneshot::Receiver<void::Void>,
682        Arc<TunnelMutableState>,
683    ) {
684        let (control_tx, control_rx) = mpsc::unbounded();
685        let (command_tx, command_rx) = mpsc::unbounded();
686        let mutable = Arc::new(MutableState::default());
687
688        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
689
690        let cell_handlers = CellHandlers {
691            meta_handler: None,
692            #[cfg(feature = "hs-service")]
693            incoming_stream_req_handler: None,
694        };
695
696        let circuit_leg = Circuit::new(
697            channel,
698            channel_id,
699            unique_id,
700            input,
701            memquota,
702            Arc::clone(&mutable),
703        );
704
705        let (circuits, mutable) = ConfluxSet::new(circuit_leg);
706
707        let reactor = Reactor {
708            circuits,
709            control: control_rx,
710            command: command_rx,
711            reactor_closed_tx,
712            unique_id,
713            cell_handlers,
714            runtime,
715            #[cfg(feature = "conflux")]
716            conflux_hs_ctx: None,
717            #[cfg(feature = "conflux")]
718            ooo_msgs: Default::default(),
719        };
720
721        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
722    }
723
724    /// Launch the reactor, and run until the circuit closes or we
725    /// encounter an error.
726    ///
727    /// Once this method returns, the circuit is dead and cannot be
728    /// used again.
729    pub async fn run(mut self) -> Result<()> {
730        trace!("{}: Running circuit reactor", self.unique_id);
731        let result: Result<()> = loop {
732            match self.run_once().await {
733                Ok(()) => (),
734                Err(ReactorError::Shutdown) => break Ok(()),
735                Err(ReactorError::Err(e)) => break Err(e),
736            }
737        };
738        trace!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
739        result
740    }
741
742    /// Helper for run: doesn't mark the circuit closed on finish.  Only
743    /// processes one cell or control message.
744    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
745        // If all the circuits are closed, shut down the reactor
746        //
747        // TODO(conflux): we might need to rethink this behavior
748        if self.circuits.is_empty() {
749            trace!(
750                "{}: Circuit reactor shutting down: all circuits have closed",
751                self.unique_id
752            );
753
754            return Err(ReactorError::Shutdown);
755        }
756
757        // If this is a single path circuit, we need to wait until the first hop
758        // is created before doing anything else
759        let single_path_with_hops = self
760            .circuits
761            .single_leg_mut()
762            .is_ok_and(|(_id, leg)| !leg.has_hops());
763        if single_path_with_hops {
764            self.wait_for_create().await?;
765
766            return Ok(());
767        }
768
769        // Prioritize the buffered messages.
770        //
771        // Note: if any of the messages are ready to be handled,
772        // this will block the reactor until we are done processing them
773        #[cfg(feature = "conflux")]
774        self.try_dequeue_ooo_msgs().await?;
775
776        let action = select_biased! {
777            res = self.command.next() => {
778                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
779                return ControlHandler::new(self).handle_cmd(cmd);
780            },
781            // Check whether we've got a control message pending.
782            //
783            // Note: unfortunately, reading from control here means we might start
784            // handling control messages before our chan_senders are ready.
785            // With the current design, this is inevitable: we can't know which circuit leg
786            // a control message is meant for without first reading the control message from
787            // the channel, and at that point, we can't know for sure whether that particular
788            // circuit is ready for sending.
789            ret = self.control.next() => {
790                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
791                CircuitAction::HandleControl(msg)
792            },
793            res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
794        };
795
796        let cmd = match action {
797            CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
798                RunOnceCmdInner::from_circuit_cmd(leg, cmd),
799            )),
800            CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
801                .handle_msg(ctrl)?
802                .map(RunOnceCmd::Single),
803            CircuitAction::HandleCell { leg, cell } => {
804                let circ = self
805                    .circuits
806                    .leg_mut(LegId(leg))
807                    .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
808
809                let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg.into(), cell)?;
810                if circ_cmds.is_empty() {
811                    None
812                } else {
813                    // TODO(conflux): we return RunOnceCmd::Multiple even if there's a single command.
814                    //
815                    // See the TODO(conflux) on `Circuit::handle_cell`.
816                    let cmd = RunOnceCmd::Multiple(
817                        circ_cmds
818                            .into_iter()
819                            .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
820                            .collect(),
821                    );
822
823                    Some(cmd)
824                }
825            }
826            CircuitAction::RemoveLeg { leg, reason } => Some(
827                RunOnceCmdInner::RemoveLeg {
828                    leg: LegId(leg),
829                    reason,
830                }
831                .into(),
832            ),
833        };
834
835        if let Some(cmd) = cmd {
836            self.handle_run_once_cmd(cmd).await?;
837        }
838
839        // Check if it's time to switch our primary leg.
840        //
841        // TODO(conflux): this only be done just before sending a cell.
842        //
843        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2946#note_3192013
844        #[cfg(feature = "conflux")]
845        if let Some(switch_cell) = self.circuits.maybe_update_primary_leg()? {
846            self.circuits
847                .primary_leg_mut()?
848                .send_relay_cell(switch_cell)
849                .await?;
850        }
851
852        Ok(())
853    }
854
855    /// Try to process the previously-out-of-order messages we might have buffered.
856    #[cfg(feature = "conflux")]
857    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
858        // Check if we're ready to dequeue any of the previously out-of-order cells.
859        while let Some(entry) = self.ooo_msgs.peek() {
860            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
861
862            if !should_pop {
863                break;
864            }
865
866            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
867
868            let circ = self
869                .circuits
870                .leg_mut(entry.leg_id)
871                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
872            let handlers = &mut self.cell_handlers;
873            let cmd = circ
874                .handle_in_order_relay_msg(
875                    handlers,
876                    entry.msg.hopnum,
877                    entry.leg_id,
878                    entry.msg.cell_counts_towards_windows,
879                    entry.msg.streamid,
880                    entry.msg.msg,
881                )?
882                .map(|cmd| {
883                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id.0, cmd))
884                });
885
886            if let Some(cmd) = cmd {
887                self.handle_run_once_cmd(cmd).await?;
888            }
889        }
890
891        Ok(())
892    }
893
894    /// Handle a [`RunOnceCmd`].
895    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
896        match cmd {
897            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
898            RunOnceCmd::Multiple(cmds) => {
899                // While we know `sendable` is ready to accept *one* cell,
900                // we can't be certain it will be able to accept *all* of the cells
901                // that need to be sent here. This means we *may* end up buffering
902                // in its underlying SometimesUnboundedSink! That is OK, because
903                // RunOnceCmd::Multiple is only used for handling packed cells.
904                for cmd in cmds {
905                    self.handle_single_run_once_cmd(cmd).await?;
906                }
907            }
908        }
909
910        Ok(())
911    }
912
913    /// Handle a [`RunOnceCmd`].
914    async fn handle_single_run_once_cmd(
915        &mut self,
916        cmd: RunOnceCmdInner,
917    ) -> StdResult<(), ReactorError> {
918        match cmd {
919            RunOnceCmdInner::Send { leg, cell, done } => {
920                // TODO: check the cc window
921                let leg = self
922                    .circuits
923                    .leg_mut(leg)
924                    .ok_or_else(|| internal!("leg disappeared?!"))?;
925                let res = leg.send_relay_cell(cell).await;
926                if let Some(done) = done {
927                    // Don't care if the receiver goes away
928                    let _ = done.send(res.clone());
929                }
930                res?;
931            }
932            #[cfg(feature = "send-control-msg")]
933            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
934                let cell: Result<Option<SendRelayCell>> =
935                    self.prepare_msg_and_install_handler(msg, handler);
936
937                match cell {
938                    Ok(Some(cell)) => {
939                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
940                        let outcome = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
941                        // don't care if receiver goes away.
942                        let _ = done.send(outcome.clone());
943                        outcome?;
944                    }
945                    Ok(None) => {
946                        // don't care if receiver goes away.
947                        let _ = done.send(Ok(()));
948                    }
949                    Err(e) => {
950                        // don't care if receiver goes away.
951                        let _ = done.send(Err(e.clone()));
952                        return Err(e.into());
953                    }
954                }
955            }
956            RunOnceCmdInner::BeginStream {
957                leg,
958                cell,
959                hop,
960                done,
961            } => {
962                match cell {
963                    Ok((cell, stream_id)) => {
964                        let leg = self
965                            .circuits
966                            .leg_mut(leg)
967                            .ok_or_else(|| internal!("leg disappeared?!"))?;
968                        let cell_hop = cell.hop;
969                        let relay_format = leg
970                            .hop_mut(cell_hop)
971                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
972                            .ok_or(Error::NoSuchHop)?
973                            .relay_cell_format();
974                        let outcome = leg.send_relay_cell(cell).await;
975                        // don't care if receiver goes away.
976                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
977                        outcome?;
978                    }
979                    Err(e) => {
980                        // don't care if receiver goes away.
981                        let _ = done.send(Err(e.clone()));
982                        return Err(e.into());
983                    }
984                }
985            }
986            RunOnceCmdInner::CloseStream {
987                hop,
988                sid,
989                behav,
990                reason,
991                done,
992            } => {
993                let result = (move || {
994                    // this is needed to force the closure to be FnOnce rather than FnMut :(
995                    let self_ = self;
996                    let (leg_id, hop_num) = self_
997                        .resolve_hop_location(hop)
998                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
999                    let leg = self_
1000                        .circuits
1001                        .leg_mut(leg_id)
1002                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
1003                    Ok::<_, Bug>((leg, hop_num))
1004                })();
1005
1006                let (leg, hop_num) = match result {
1007                    Ok(x) => x,
1008                    Err(e) => {
1009                        if let Some(done) = done {
1010                            // don't care if the sender goes away
1011                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
1012                            let _ = done.send(Err(e.into()));
1013                        }
1014                        return Ok(());
1015                    }
1016                };
1017
1018                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
1019
1020                if let Some(done) = done {
1021                    // don't care if the sender goes away
1022                    let _ = done.send(res);
1023                }
1024            }
1025            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1026                let leg = self
1027                    .circuits
1028                    .leg_mut(leg)
1029                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1030                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1031                // future which *should* resolve immediately
1032                let signals = leg.congestion_signals().await;
1033                leg.handle_sendme(hop, sendme, signals)?;
1034            }
1035            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1036                let res = self
1037                    .circuits
1038                    .single_leg_mut()
1039                    .map(|(_id, leg)| leg.clock_skew());
1040
1041                // don't care if the sender goes away
1042                let _ = answer.send(res.map_err(Into::into));
1043            }
1044            RunOnceCmdInner::CleanShutdown => {
1045                trace!("{}: reactor shutdown due to handled cell", self.unique_id);
1046                return Err(ReactorError::Shutdown);
1047            }
1048            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1049                let circ = self.circuits.remove(leg.0)?;
1050                let is_conflux_pending = circ.is_conflux_pending();
1051
1052                // Drop the removed leg. This will cause it to close if it's not already closed.
1053                drop(circ);
1054
1055                // If we reach this point, it means we have more than one leg
1056                // (otherwise the .remove() would've returned a Shutdown error),
1057                // so we expect there to be a ConfluxHandshakeContext installed.
1058
1059                #[cfg(feature = "conflux")]
1060                if is_conflux_pending {
1061                    let error = match reason {
1062                        RemoveLegReason::ConfluxHandshakeTimeout => ConfluxHandshakeError::Timeout,
1063                        RemoveLegReason::ConfluxHandshakeErr(e) => ConfluxHandshakeError::Link(e),
1064                        RemoveLegReason::ChannelClosed => ConfluxHandshakeError::ChannelClosed,
1065                    };
1066
1067                    self.note_conflux_handshake_result(Err(error))?;
1068                }
1069            }
1070            #[cfg(feature = "conflux")]
1071            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1072                // Note: on the client-side, the handshake is considered complete once the
1073                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1074                //
1075                // We're optimistic here, and declare the handshake a success *before*
1076                // sending the LINKED_ACK response. I think this is OK though,
1077                // because if the send_relay_cell() below fails, the reactor will shut
1078                // down anyway. OTOH, marking the handshake as complete slightly early
1079                // means that on the happy path, the circuit is marked as usable sooner,
1080                // instead of blocking on the sending of the LINKED_ACK.
1081                self.note_conflux_handshake_result(Ok(()))?;
1082
1083                let leg = self
1084                    .circuits
1085                    .leg_mut(leg)
1086                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1087                let res = leg.send_relay_cell(cell).await;
1088
1089                res?;
1090            }
1091            #[cfg(feature = "conflux")]
1092            RunOnceCmdInner::Link { circuits, answer } => {
1093                // Add the specified circuits to our conflux set,
1094                // and send a LINK cell down each unlinked leg.
1095                //
1096                // NOTE: this will block the reactor until all the cells are sent.
1097                self.handle_link_circuits(circuits, answer).await?;
1098            }
1099            #[cfg(feature = "conflux")]
1100            RunOnceCmdInner::Enqueue { leg, msg } => {
1101                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1102                self.ooo_msgs.push(entry);
1103            }
1104        }
1105
1106        Ok(())
1107    }
1108
1109    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1110    ///
1111    /// Returns an error if an unexpected `CtrlMsg` is received.
1112    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1113        let msg = select_biased! {
1114            res = self.command.next() => {
1115                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1116                match cmd {
1117                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1118                    #[cfg(test)]
1119                    CtrlCmd::AddFakeHop {
1120                        relay_cell_format: format,
1121                        fwd_lasthop,
1122                        rev_lasthop,
1123                        params,
1124                        done,
1125                    } => {
1126                        let (_id, leg) = self.circuits.single_leg_mut()?;
1127                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, &params, done);
1128                        return Ok(())
1129                    },
1130                    _ => {
1131                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1132                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1133                    }
1134                }
1135            },
1136            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1137        };
1138
1139        match msg {
1140            CtrlMsg::Create {
1141                recv_created,
1142                handshake,
1143                mut params,
1144                done,
1145            } => {
1146                // TODO(conflux): instead of crashing the reactor, it might be better
1147                // to send the error via the done channel instead
1148                let (_id, leg) = self.circuits.single_leg_mut()?;
1149                leg.handle_create(recv_created, handshake, &mut params, done)
1150                    .await
1151            }
1152            _ => {
1153                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1154
1155                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1156            }
1157        }
1158    }
1159
1160    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1161    ///
1162    /// If all the circuits we were waiting on have finished the conflux handshake,
1163    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1164    /// are sent to the handshake initiator.
1165    #[cfg(feature = "conflux")]
1166    fn note_conflux_handshake_result(
1167        &mut self,
1168        res: StdResult<(), ConfluxHandshakeError>,
1169    ) -> StdResult<(), ReactorError> {
1170        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1171            Some(conflux_ctx) => {
1172                conflux_ctx.results.push(res);
1173                // Whether all the legs have finished linking:
1174                conflux_ctx.results.len() == conflux_ctx.num_legs
1175            }
1176            None => {
1177                return Err(internal!("no conflux handshake context").into());
1178            }
1179        };
1180
1181        if tunnel_complete {
1182            // Time to remove the conflux handshake context
1183            // and extract the results we have collected
1184            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1185
1186            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1187
1188            // We don't expect to receive any more handshake results,
1189            // at least not until we get another LinkCircuits control message,
1190            // which will install a new ConfluxHandshakeCtx with a channel
1191            // for us to send updates on
1192        }
1193
1194        Ok(())
1195    }
1196
1197    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1198    fn prepare_msg_and_install_handler(
1199        &mut self,
1200        msg: Option<AnyRelayMsgOuter>,
1201        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1202    ) -> Result<Option<SendRelayCell>> {
1203        let msg = msg
1204            .map(|msg| {
1205                let handlers = &mut self.cell_handlers;
1206                let handler = handler
1207                    .as_ref()
1208                    .or(handlers.meta_handler.as_ref())
1209                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1210                Ok::<_, crate::Error>(SendRelayCell {
1211                    hop: handler.expected_hop(),
1212                    early: false,
1213                    cell: msg,
1214                })
1215            })
1216            .transpose()?;
1217
1218        if let Some(handler) = handler {
1219            self.cell_handlers.set_meta_handler(handler)?;
1220        }
1221
1222        Ok(msg)
1223    }
1224
1225    /// Handle a shutdown request.
1226    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1227        trace!(
1228            "{}: reactor shutdown due to explicit request",
1229            self.unique_id
1230        );
1231
1232        Err(ReactorError::Shutdown)
1233    }
1234
1235    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1236    ///
1237    /// Returns an error over the `answer` channel if the reactor has no circuits,
1238    /// or more than one circuit. The reactor will shut down regardless.
1239    #[cfg(feature = "conflux")]
1240    fn handle_shutdown_and_return_circuit(
1241        &mut self,
1242        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1243    ) -> StdResult<(), ReactorError> {
1244        // Don't care if the receiver goes away
1245        let _ = answer.send(self.circuits.take_single_leg().map_err(Into::into));
1246        self.handle_shutdown().map(|_| ())
1247    }
1248
1249    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1250    ///
1251    /// After resolving a `TargetHop::LastHop`,
1252    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1253    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1254    ///
1255    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1256    /// if you need a fixed hop position in the tunnel.
1257    /// For example if we open a stream to `TargetHop::LastHop`,
1258    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1259    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1260    ///
1261    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1262    /// and the tunnel has no hops
1263    /// (either has no legs, or has legs which contain no hops).
1264    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1265        match hop {
1266            TargetHop::Hop(hop) => Ok(hop),
1267            TargetHop::LastHop => {
1268                if let Ok((leg_id, leg)) = self.circuits.single_leg() {
1269                    // single-path tunnel
1270                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1271                    Ok(HopLocation::Hop((leg_id, hop)))
1272                } else if !self.circuits.is_empty() {
1273                    // multi-path tunnel
1274                    return Ok(HopLocation::JoinPoint);
1275                } else {
1276                    // no legs
1277                    Err(NoHopsBuiltError)
1278                }
1279            }
1280        }
1281    }
1282
1283    /// Resolves a [`HopLocation`] to a [`LegId`] and [`HopNum`].
1284    ///
1285    /// After resolving a `HopLocation::JoinPoint`,
1286    /// the [`LegId`] and [`HopNum`] can become stale if the primary leg changes.
1287    ///
1288    /// You should try to only resolve to a specific [`LegId`] and [`HopNum`] immediately before you
1289    /// need them,
1290    /// and you should not hold on to the resolved [`LegId`] and [`HopNum`] between reactor
1291    /// iterations as the primary leg may change from one iteration to the next.
1292    ///
1293    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1294    /// but it does not have a join point.
1295    fn resolve_hop_location(
1296        &self,
1297        hop: HopLocation,
1298    ) -> StdResult<(LegId, HopNum), NoJoinPointError> {
1299        match hop {
1300            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1301            HopLocation::JoinPoint => {
1302                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1303                    Ok((leg_id, hop_num))
1304                } else {
1305                    // Attempted to get the join point of a non-multipath tunnel.
1306                    Err(NoJoinPointError)
1307                }
1308            }
1309        }
1310    }
1311
1312    /// Does congestion control use stream SENDMEs for the given hop?
1313    ///
1314    /// Returns `None` if either the `leg` or `hop` don't exist.
1315    fn uses_stream_sendme(&self, leg: LegId, hop: HopNum) -> Option<bool> {
1316        self.circuits.uses_stream_sendme(leg, hop)
1317    }
1318
1319    /// Handle a request to link some extra circuits in the reactor's conflux set.
1320    ///
1321    /// The circuits are validated, and if they do not have the same length,
1322    /// or if they do not all have the same last hop, an error is returned on
1323    /// the `answer` channel, and the conflux handshake is *not* initiated.
1324    ///
1325    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1326    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1327    ///
1328    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1329    #[cfg(feature = "conflux")]
1330    async fn handle_link_circuits(
1331        &mut self,
1332        circuits: Vec<Circuit>,
1333        answer: ConfluxLinkResultChannel,
1334    ) -> StdResult<(), ReactorError> {
1335        if self.conflux_hs_ctx.is_some() {
1336            let err = internal!("conflux linking already in progress");
1337            send_conflux_outcome(answer, Err(err.into()))?;
1338
1339            return Ok(());
1340        }
1341
1342        let num_legs = circuits.len();
1343
1344        // Note: add_legs validates `circuits`
1345        let res = async {
1346            self.circuits.add_legs(circuits, &self.runtime)?;
1347
1348            // TODO(conflux): check if we negotiated prop324 cc on *all* circuits,
1349            // returning an error if not?
1350
1351            self.circuits.link_circuits(&self.runtime).await
1352        }
1353        .await;
1354
1355        if let Err(e) = res {
1356            send_conflux_outcome(answer, Err(e))?;
1357        } else {
1358            // Save the channel, to notify the user of completion.
1359            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1360                answer,
1361                num_legs,
1362                results: Default::default(),
1363            });
1364        }
1365
1366        Ok(())
1367    }
1368}
1369
1370/// Notify the conflux handshake initiator of the handshake outcome.
1371///
1372/// Returns an error if the initiator has done away.
1373#[cfg(feature = "conflux")]
1374fn send_conflux_outcome(
1375    tx: ConfluxLinkResultChannel,
1376    res: Result<ConfluxHandshakeResult>,
1377) -> StdResult<(), ReactorError> {
1378    if tx.send(res).is_err() {
1379        tracing::warn!("conflux initiator went away before handshake completed?");
1380        return Err(ReactorError::Shutdown);
1381    }
1382
1383    Ok(())
1384}
1385
1386/// The tunnel does not have any hops.
1387#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1388#[non_exhaustive]
1389#[error("no hops have been built for this tunnel")]
1390pub(crate) struct NoHopsBuiltError;
1391
1392/// The tunnel does not have a join point.
1393#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1394#[non_exhaustive]
1395#[error("the tunnel does not have a join point")]
1396pub(crate) struct NoJoinPointError;
1397
1398impl CellHandlers {
1399    /// Try to install a given meta-cell handler to receive any unusual cells on
1400    /// this circuit, along with a result channel to notify on completion.
1401    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1402        if self.meta_handler.is_none() {
1403            self.meta_handler = Some(handler);
1404            Ok(())
1405        } else {
1406            Err(Error::from(internal!(
1407                "Tried to install a meta-cell handler before the old one was gone."
1408            )))
1409        }
1410    }
1411
1412    /// Try to install a given cell handler on this circuit.
1413    #[cfg(feature = "hs-service")]
1414    fn set_incoming_stream_req_handler(
1415        &mut self,
1416        handler: IncomingStreamRequestHandler,
1417    ) -> Result<()> {
1418        if self.incoming_stream_req_handler.is_none() {
1419            self.incoming_stream_req_handler = Some(handler);
1420            Ok(())
1421        } else {
1422            Err(Error::from(internal!(
1423                "Tried to install a BEGIN cell handler before the old one was gone."
1424            )))
1425        }
1426    }
1427}
1428
1429#[cfg(test)]
1430mod test {
1431    // Tested in [`crate::tunnel::circuit::test`].
1432}