tor_proto/tunnel/
reactor.rs

1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//!    `CONNECTED` message per stream.) This is handled by calling
9//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12//!    this; for half-closed streams, the reactor handles it using the
13//!    `halfstream` module.
14//! 3. Does the message have an acceptable command type, and is the message
15//!    well-formed? For open streams, the streams themselves handle this check.
16//!    For half-closed streams, the reactor handles it by calling
17//!    `consume_checked_msg()`.
18
19pub(super) mod circuit;
20mod conflux;
21mod control;
22pub(super) mod syncview;
23
24use crate::crypto::cell::HopNum;
25use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26use crate::memquota::{CircuitAccount, StreamAccount};
27use crate::stream::queue::StreamQueueReceiver;
28use crate::stream::{AnyCmdChecker, StreamRateLimit};
29#[cfg(feature = "hs-service")]
30use crate::stream::{DrainRateRequest, IncomingStreamRequest, IncomingStreamRequestFilter};
31use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
32use crate::tunnel::circuit::unique_id::UniqId;
33use crate::tunnel::circuit::CircuitRxReceiver;
34use crate::tunnel::{streammap, HopLocation, TargetHop, TunnelId, TunnelScopedCircId};
35use crate::util::err::ReactorError;
36use crate::util::notify::NotifyReceiver;
37use crate::util::skew::ClockSkew;
38use crate::{Error, Result};
39use circuit::{Circuit, CircuitCmd};
40use conflux::ConfluxSet;
41use control::ControlHandler;
42use postage::watch;
43use std::cmp::Ordering;
44use std::collections::BinaryHeap;
45use std::mem::size_of;
46use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
47use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
48use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
49use tor_error::{bad_api_usage, internal, into_bad_api_usage, warn_report, Bug};
50use tor_rtcompat::DynTimeProvider;
51
52use futures::channel::mpsc;
53use futures::StreamExt;
54use futures::{select_biased, FutureExt as _};
55use oneshot_fused_workaround as oneshot;
56
57use std::result::Result as StdResult;
58use std::sync::Arc;
59
60use crate::channel::Channel;
61use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
62use crate::tunnel::circuit::StreamMpscSender;
63use derive_deftly::Deftly;
64use derive_more::From;
65use tor_cell::chancell::CircId;
66use tor_llcrypto::pk;
67use tor_memquota::derive_deftly_template_HasMemoryCost;
68use tor_memquota::mq_queue::{self, MpscSpec};
69use tracing::{debug, info, trace, warn};
70
71use super::circuit::{MutableState, TunnelMutableState};
72
73#[cfg(feature = "conflux")]
74use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
75
76pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
77
78/// The type of a oneshot channel used to inform reactor of the result of an operation.
79pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
80
81/// Contains a list of conflux handshake results.
82#[cfg(feature = "conflux")]
83pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
84
85/// The type of oneshot channel used to inform reactor users of the outcome
86/// of a client-side conflux handshake.
87///
88/// Contains a list of handshake results, one for each circuit that we were asked
89/// to link in the tunnel.
90#[cfg(feature = "conflux")]
91pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
92
93pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
94
95/// MPSC queue containing stream requests
96#[cfg(feature = "hs-service")]
97type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
98
99/// A handshake type, to be used when creating circuit hops.
100#[derive(Clone, Debug)]
101pub(crate) enum CircuitHandshake {
102    /// Use the CREATE_FAST handshake.
103    CreateFast,
104    /// Use the ntor handshake.
105    Ntor {
106        /// The public key of the relay.
107        public_key: NtorPublicKey,
108        /// The Ed25519 identity of the relay, which is verified against the
109        /// identity held in the circuit's channel.
110        ed_identity: pk::ed25519::Ed25519Identity,
111    },
112    /// Use the ntor-v3 handshake.
113    NtorV3 {
114        /// The public key of the relay.
115        public_key: NtorV3PublicKey,
116    },
117}
118
119/// A behavior to perform when closing a stream.
120///
121/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
122/// that we shouldn't let it pass unremarked.
123#[derive(Clone, Debug)]
124pub(crate) enum CloseStreamBehavior {
125    /// Send nothing at all, so that the other side will not realize we have
126    /// closed the stream.
127    ///
128    /// We should only do this for incoming onion service streams when we
129    /// want to black-hole the client's requests.
130    SendNothing,
131    /// Send an End cell, if we haven't already sent one.
132    SendEnd(End),
133}
134impl Default for CloseStreamBehavior {
135    fn default() -> Self {
136        Self::SendEnd(End::new_misc())
137    }
138}
139
140// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
141// proliferation is a bit bothersome, but unavoidable with the current design.
142//
143// We should consider getting rid of some of these enums (if possible),
144// and coming up with more intuitive names.
145
146/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
147#[derive(From, Debug)]
148enum RunOnceCmd {
149    /// Run a single `RunOnceCmdInner` command.
150    Single(RunOnceCmdInner),
151    /// Run multiple `RunOnceCmdInner` commands.
152    //
153    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
154    // but most of the time we're only going to have *one* RunOnceCmdInner
155    // to run per run_once() loop. The enum enables us avoid the extra heap
156    // allocation for the `RunOnceCmd::Single` case.
157    Multiple(Vec<RunOnceCmdInner>),
158}
159
160/// Instructions for running something in the reactor loop.
161///
162/// Run at the end of [`Reactor::run_once`].
163//
164// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
165// We should consider making each variant a tuple variant and deduplicating the fields.
166#[derive(educe::Educe)]
167#[educe(Debug)]
168enum RunOnceCmdInner {
169    /// Send a RELAY cell.
170    Send {
171        /// The leg the cell should be sent on.
172        leg: UniqId,
173        /// The cell to send.
174        cell: SendRelayCell,
175        /// A channel for sending completion notifications.
176        done: Option<ReactorResultChannel<()>>,
177    },
178    /// Send a given control message on this circuit, and install a control-message handler to
179    /// receive responses.
180    #[cfg(feature = "send-control-msg")]
181    SendMsgAndInstallHandler {
182        /// The message to send, if any
183        msg: Option<AnyRelayMsgOuter>,
184        /// A message handler to install.
185        ///
186        /// If this is `None`, there must already be a message handler installed
187        #[educe(Debug(ignore))]
188        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
189        /// A sender that we use to tell the caller that the message was sent
190        /// and the handler installed.
191        done: oneshot::Sender<Result<()>>,
192    },
193    /// Handle a SENDME message.
194    HandleSendMe {
195        /// The leg the SENDME was received on.
196        leg: UniqId,
197        /// The hop number.
198        hop: HopNum,
199        /// The SENDME message to handle.
200        sendme: Sendme,
201    },
202    /// Begin a stream with the provided hop in this circuit.
203    ///
204    /// Uses the provided stream ID, and sends the provided message to that hop.
205    BeginStream {
206        /// The cell to send.
207        cell: Result<(SendRelayCell, StreamId)>,
208        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
209        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
210        /// caller.
211        hop: HopLocation,
212        /// The circuit leg to begin the stream on.
213        leg: UniqId,
214        /// Oneshot channel to notify on completion, with the allocated stream ID.
215        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
216    },
217    /// Consider sending an XON message with the given `rate`.
218    MaybeSendXon {
219        /// The drain rate to advertise in the XON message.
220        rate: XonKbpsEwma,
221        /// The ID of the stream to send the message on.
222        stream_id: StreamId,
223        /// The location of the hop on the tunnel.
224        hop: HopLocation,
225    },
226    /// Close the specified stream.
227    CloseStream {
228        /// The hop number.
229        hop: HopLocation,
230        /// The ID of the stream to close.
231        sid: StreamId,
232        /// The stream-closing behavior.
233        behav: CloseStreamBehavior,
234        /// The reason for closing the stream.
235        reason: streammap::TerminateReason,
236        /// A channel for sending completion notifications.
237        done: Option<ReactorResultChannel<()>>,
238    },
239    /// Get the clock skew claimed by the first hop of the circuit.
240    FirstHopClockSkew {
241        /// Oneshot channel to return the clock skew.
242        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
243    },
244    /// Remove a circuit leg from the conflux set.
245    RemoveLeg {
246        /// The circuit leg to remove.
247        leg: UniqId,
248        /// The reason for removal.
249        ///
250        /// This is only used for conflux circuits that get removed
251        /// before the conflux handshake is complete.
252        ///
253        /// The [`RemoveLegReason`] is mapped by the reactor to a
254        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
255        /// handshake to indicate the reason the handshake failed.
256        reason: RemoveLegReason,
257    },
258    /// A circuit has completed the conflux handshake,
259    /// and wants to send the specified cell.
260    ///
261    /// This is similar to [`RunOnceCmdInner::Send`],
262    /// but needs to remain a separate variant,
263    /// because in addition to instructing the reactor to send a cell,
264    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
265    /// This enables the reactor to save the handshake result (`Ok(())`),
266    /// and, if there are no other legs still in the handshake phase,
267    /// send the result to the handshake initiator.
268    #[cfg(feature = "conflux")]
269    ConfluxHandshakeComplete {
270        /// The circuit leg that has completed the handshake,
271        /// This is the leg the cell should be sent on.
272        leg: UniqId,
273        /// The cell to send.
274        cell: SendRelayCell,
275    },
276    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
277    #[cfg(feature = "conflux")]
278    Link {
279        /// The circuits to link into the tunnel
280        #[educe(Debug(ignore))]
281        circuits: Vec<Circuit>,
282        /// Oneshot channel for notifying of conflux handshake completion.
283        answer: ConfluxLinkResultChannel,
284    },
285    /// Enqueue an out-of-order cell in ooo_msg.
286    #[cfg(feature = "conflux")]
287    Enqueue {
288        /// The leg the entry originated from.
289        leg: UniqId,
290        /// The out-of-order message.
291        msg: OooRelayMsg,
292    },
293    /// Perform a clean shutdown on this circuit.
294    CleanShutdown,
295}
296
297impl RunOnceCmdInner {
298    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
299    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
300        match cmd {
301            CircuitCmd::Send(cell) => Self::Send {
302                leg,
303                cell,
304                done: None,
305            },
306            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
307            CircuitCmd::CloseStream {
308                hop,
309                sid,
310                behav,
311                reason,
312            } => Self::CloseStream {
313                hop: HopLocation::Hop((leg, hop)),
314                sid,
315                behav,
316                reason,
317                done: None,
318            },
319            #[cfg(feature = "conflux")]
320            CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg { leg, reason },
321            #[cfg(feature = "conflux")]
322            CircuitCmd::ConfluxHandshakeComplete(cell) => {
323                Self::ConfluxHandshakeComplete { leg, cell }
324            }
325            #[cfg(feature = "conflux")]
326            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
327            CircuitCmd::CleanShutdown => Self::CleanShutdown,
328        }
329    }
330}
331
332/// Cmd for sending a relay cell.
333///
334/// The contents of this struct are passed to `send_relay_cell`
335#[derive(educe::Educe)]
336#[educe(Debug)]
337pub(crate) struct SendRelayCell {
338    /// The hop number.
339    pub(crate) hop: HopNum,
340    /// Whether to use a RELAY_EARLY cell.
341    pub(crate) early: bool,
342    /// The cell to send.
343    pub(crate) cell: AnyRelayMsgOuter,
344}
345
346/// A command to execute at the end of [`Reactor::run_once`].
347#[derive(From, Debug)]
348#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
349enum CircuitAction {
350    /// Run a single `CircuitCmd` command.
351    RunCmd {
352        /// The unique identifier of the circuit leg to run the command on
353        leg: UniqId,
354        /// The command to run.
355        cmd: CircuitCmd,
356    },
357    /// Handle a control message
358    HandleControl(CtrlMsg),
359    /// Handle an input message.
360    HandleCell {
361        /// The unique identifier of the circuit leg the message was received on.
362        leg: UniqId,
363        /// The message to handle.
364        cell: ClientCircChanMsg,
365    },
366    /// Remove the specified circuit leg from the conflux set.
367    ///
368    /// Returned whenever a single circuit leg needs to be be removed
369    /// from the reactor's conflux set, without necessarily tearing down
370    /// the whole set or shutting down the reactor.
371    ///
372    /// Note: this action *can* cause the reactor to shut down
373    /// (and the conflux set to be closed).
374    ///
375    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
376    RemoveLeg {
377        /// The leg to remove.
378        leg: UniqId,
379        /// The reason for removal.
380        ///
381        /// This is only used for conflux circuits that get removed
382        /// before the conflux handshake is complete.
383        ///
384        /// The [`RemoveLegReason`] is mapped by the reactor to a
385        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
386        /// handshake to indicate the reason the handshake failed.
387        reason: RemoveLegReason,
388    },
389}
390
391/// The reason for removing a circuit leg from the conflux set.
392#[derive(Debug, derive_more::Display)]
393enum RemoveLegReason {
394    /// The conflux handshake timed out.
395    ///
396    /// On the client-side, this means we didn't receive
397    /// the CONFLUX_LINKED response in time.
398    #[display("conflux handshake timed out")]
399    ConfluxHandshakeTimeout,
400    /// An error occurred during conflux handshake.
401    #[display("{}", _0)]
402    ConfluxHandshakeErr(Error),
403    /// The channel was closed.
404    #[display("channel closed")]
405    ChannelClosed,
406}
407
408/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
409/// progress.
410///
411/// # Background
412///
413/// The `Reactor` can't have async functions that send and receive cells, because its job is to
414/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
415///
416/// To get around this problem, the reactor can send some cells, and then make one of these
417/// `MetaCellHandler` objects, which will be run when the reply arrives.
418pub(crate) trait MetaCellHandler: Send {
419    /// The hop we're expecting the message to come from. This is compared against the hop
420    /// from which we actually receive messages, and an error is thrown if the two don't match.
421    fn expected_hop(&self) -> HopLocation;
422    /// Called when the message we were waiting for arrives.
423    ///
424    /// Gets a copy of the `Reactor` in order to do anything it likes there.
425    ///
426    /// If this function returns an error, the reactor will shut down.
427    fn handle_msg(
428        &mut self,
429        msg: UnparsedRelayMsg,
430        reactor: &mut Circuit,
431    ) -> Result<MetaCellDisposition>;
432}
433
434/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
435#[derive(Debug, Clone)]
436#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
437#[non_exhaustive]
438pub(crate) enum MetaCellDisposition {
439    /// The message was consumed; the handler should remain installed.
440    #[cfg(feature = "send-control-msg")]
441    Consumed,
442    /// The message was consumed; the handler should be uninstalled.
443    ConversationFinished,
444    /// The message was consumed; the circuit should be closed.
445    #[cfg(feature = "send-control-msg")]
446    CloseCirc,
447    // TODO: Eventually we might want the ability to have multiple handlers
448    // installed, and to let them say "not for me, maybe for somebody else?".
449    // But right now we don't need that.
450}
451
452/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
453///
454/// This is a macro instead of a function to work around borrowck errors
455/// in the select! from run_once().
456macro_rules! unwrap_or_shutdown {
457    ($self:expr, $res:expr, $reason:expr) => {{
458        match $res {
459            None => {
460                trace!(
461                    tunnel_id = %$self.tunnel_id,
462                    reason = %$reason,
463                    "reactor shutdown"
464                );
465                Err(ReactorError::Shutdown)
466            }
467            Some(v) => Ok(v),
468        }
469    }};
470}
471
472/// Object to handle incoming cells and background tasks on a circuit
473///
474/// This type is returned when you finish a circuit; you need to spawn a
475/// new task that calls `run()` on it.
476#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
477pub struct Reactor {
478    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
479    ///
480    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
481    /// is ready to accept cells.
482    control: mpsc::UnboundedReceiver<CtrlMsg>,
483    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
484    ///
485    /// This channel is polled in [`Reactor::run_once`].
486    ///
487    /// NOTE: this is a separate channel from `control`, because some messages
488    /// have higher priority and need to be handled even if the `chan_sender` is not
489    /// ready (whereas `control` messages are not read until the `chan_sender` sink
490    /// is ready to accept cells).
491    command: mpsc::UnboundedReceiver<CtrlCmd>,
492    /// A oneshot sender that is used to alert other tasks when this reactor is
493    /// finally dropped.
494    ///
495    /// It is a sender for Void because we never actually want to send anything here;
496    /// we only want to generate canceled events.
497    #[allow(dead_code)] // the only purpose of this field is to be dropped.
498    reactor_closed_tx: oneshot::Sender<void::Void>,
499    /// A set of circuits that form a tunnel.
500    ///
501    /// Contains 1 or more circuits.
502    ///
503    /// Circuits may be added to this set throughout the lifetime of the reactor.
504    ///
505    /// Sometimes, the reactor will remove circuits from this set,
506    /// for example if the `LINKED` message takes too long to arrive,
507    /// or if congestion control negotiation fails.
508    /// The reactor will continue running with the remaining circuits.
509    /// It will shut down if *all* the circuits are removed.
510    ///
511    // TODO(conflux): document all the reasons why the reactor might
512    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
513    circuits: ConfluxSet,
514    /// An identifier for logging about this tunnel reactor.
515    tunnel_id: TunnelId,
516    /// Handlers, shared with `Circuit`.
517    cell_handlers: CellHandlers,
518    /// The time provider, used for conflux handshake timeouts.
519    runtime: DynTimeProvider,
520    /// The conflux handshake context, if there is an on-going handshake.
521    ///
522    /// Set to `None` if this is a single-path tunnel,
523    /// or if none of the circuit legs from our conflux set
524    /// are currently in the conflux handshake phase.
525    #[cfg(feature = "conflux")]
526    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
527    /// A min-heap buffering all the out-of-order messages received so far.
528    ///
529    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
530    /// on its size. We should make this participate in the memquota memory
531    /// tracking system, somehow.
532    #[cfg(feature = "conflux")]
533    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
534}
535
536/// The context for an on-going conflux handshake.
537#[cfg(feature = "conflux")]
538struct ConfluxHandshakeCtx {
539    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
540    answer: ConfluxLinkResultChannel,
541    /// The number of legs that are currently doing the handshake.
542    num_legs: usize,
543    /// The handshake results we have collected so far.
544    results: ConfluxHandshakeResult,
545}
546
547/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
548#[derive(Debug)]
549#[cfg(feature = "conflux")]
550struct ConfluxHeapEntry {
551    /// The leg id this message came from.
552    leg_id: UniqId,
553    /// The out of order message
554    msg: OooRelayMsg,
555}
556
557#[cfg(feature = "conflux")]
558impl Ord for ConfluxHeapEntry {
559    fn cmp(&self, other: &Self) -> Ordering {
560        self.msg.cmp(&other.msg)
561    }
562}
563
564#[cfg(feature = "conflux")]
565impl PartialOrd for ConfluxHeapEntry {
566    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
567        Some(self.cmp(other))
568    }
569}
570
571#[cfg(feature = "conflux")]
572impl PartialEq for ConfluxHeapEntry {
573    fn eq(&self, other: &Self) -> bool {
574        self.msg == other.msg
575    }
576}
577
578#[cfg(feature = "conflux")]
579impl Eq for ConfluxHeapEntry {}
580
581/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
582struct CellHandlers {
583    /// A handler for a meta cell, together with a result channel to notify on completion.
584    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
585    /// A handler for incoming stream requests.
586    #[cfg(feature = "hs-service")]
587    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
588}
589
590/// Information about an incoming stream request.
591#[cfg(feature = "hs-service")]
592#[derive(Debug, Deftly)]
593#[derive_deftly(HasMemoryCost)]
594pub(crate) struct StreamReqInfo {
595    /// The [`IncomingStreamRequest`].
596    pub(crate) req: IncomingStreamRequest,
597    /// The ID of the stream being requested.
598    pub(crate) stream_id: StreamId,
599    /// The [`HopNum`].
600    //
601    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
602    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
603    //
604    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
605    // incoming stream request from two separate hops.  (There is only one that's valid.)
606    pub(crate) hop: HopLocation,
607    /// The format which must be used with this stream to encode messages.
608    #[deftly(has_memory_cost(indirect_size = "0"))]
609    pub(crate) relay_cell_format: RelayCellFormat,
610    /// A channel for receiving messages from this stream.
611    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
612    pub(crate) receiver: StreamQueueReceiver,
613    /// A channel for sending messages to be sent on this stream.
614    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
615    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
616    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
617    // TODO(arti#2068): we should consider making this an `Option`
618    // the `watch::Sender` owns the indirect data
619    #[deftly(has_memory_cost(indirect_size = "0"))]
620    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
621    /// A [`Stream`](futures::Stream) that provides notifications when a new drain rate is
622    /// requested.
623    #[deftly(has_memory_cost(indirect_size = "0"))]
624    pub(crate) drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
625    /// The memory quota account to be used for this stream
626    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
627    pub(crate) memquota: StreamAccount,
628}
629
630/// Data required for handling an incoming stream request.
631#[cfg(feature = "hs-service")]
632#[derive(educe::Educe)]
633#[educe(Debug)]
634struct IncomingStreamRequestHandler {
635    /// A sender for sharing information about an incoming stream request.
636    incoming_sender: StreamReqSender,
637    /// A [`AnyCmdChecker`] for validating incoming stream requests.
638    cmd_checker: AnyCmdChecker,
639    /// The hop to expect incoming stream requests from.
640    hop_num: HopNum,
641    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
642    /// this request, or wants to reject it immediately.
643    #[educe(Debug(ignore))]
644    filter: Box<dyn IncomingStreamRequestFilter>,
645}
646
647impl Reactor {
648    /// Create a new circuit reactor.
649    ///
650    /// The reactor will send outbound messages on `channel`, receive incoming
651    /// messages on `input`, and identify this circuit by the channel-local
652    /// [`CircId`] provided.
653    ///
654    /// The internal unique identifier for this circuit will be `unique_id`.
655    #[allow(clippy::type_complexity)] // TODO
656    pub(super) fn new(
657        channel: Arc<Channel>,
658        channel_id: CircId,
659        unique_id: UniqId,
660        input: CircuitRxReceiver,
661        runtime: DynTimeProvider,
662        memquota: CircuitAccount,
663    ) -> (
664        Self,
665        mpsc::UnboundedSender<CtrlMsg>,
666        mpsc::UnboundedSender<CtrlCmd>,
667        oneshot::Receiver<void::Void>,
668        Arc<TunnelMutableState>,
669    ) {
670        let tunnel_id = TunnelId::next();
671        let (control_tx, control_rx) = mpsc::unbounded();
672        let (command_tx, command_rx) = mpsc::unbounded();
673        let mutable = Arc::new(MutableState::default());
674
675        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
676
677        let cell_handlers = CellHandlers {
678            meta_handler: None,
679            #[cfg(feature = "hs-service")]
680            incoming_stream_req_handler: None,
681        };
682
683        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
684        let circuit_leg = Circuit::new(
685            runtime.clone(),
686            channel,
687            channel_id,
688            unique_id,
689            input,
690            memquota,
691            Arc::clone(&mutable),
692        );
693
694        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
695
696        let reactor = Reactor {
697            circuits,
698            control: control_rx,
699            command: command_rx,
700            reactor_closed_tx,
701            tunnel_id,
702            cell_handlers,
703            runtime,
704            #[cfg(feature = "conflux")]
705            conflux_hs_ctx: None,
706            #[cfg(feature = "conflux")]
707            ooo_msgs: Default::default(),
708        };
709
710        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
711    }
712
713    /// Launch the reactor, and run until the circuit closes or we
714    /// encounter an error.
715    ///
716    /// Once this method returns, the circuit is dead and cannot be
717    /// used again.
718    pub async fn run(mut self) -> Result<()> {
719        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
720        let result: Result<()> = loop {
721            match self.run_once().await {
722                Ok(()) => (),
723                Err(ReactorError::Shutdown) => break Ok(()),
724                Err(ReactorError::Err(e)) => break Err(e),
725            }
726        };
727        trace!(tunnel_id = %self.tunnel_id, "Tunnel reactor stopped: {:?}", result);
728        result
729    }
730
731    /// Helper for run: doesn't mark the circuit closed on finish.  Only
732    /// processes one cell or control message.
733    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
734        // If all the circuits are closed, shut down the reactor
735        if self.circuits.is_empty() {
736            trace!(
737                tunnel_id = %self.tunnel_id,
738                "Tunnel reactor shutting down: all circuits have closed",
739            );
740
741            return Err(ReactorError::Shutdown);
742        }
743
744        // If this is a single path circuit, we need to wait until the first hop
745        // is created before doing anything else
746        let single_path_with_hops = self
747            .circuits
748            .single_leg_mut()
749            .is_ok_and(|leg| !leg.has_hops());
750        if single_path_with_hops {
751            self.wait_for_create().await?;
752
753            return Ok(());
754        }
755
756        // Prioritize the buffered messages.
757        //
758        // Note: if any of the messages are ready to be handled,
759        // this will block the reactor until we are done processing them
760        #[cfg(feature = "conflux")]
761        self.try_dequeue_ooo_msgs().await?;
762
763        let action = select_biased! {
764            res = self.command.next() => {
765                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
766                return ControlHandler::new(self).handle_cmd(cmd);
767            },
768            // Check whether we've got a control message pending.
769            //
770            // Note: unfortunately, reading from control here means we might start
771            // handling control messages before our chan_senders are ready.
772            // With the current design, this is inevitable: we can't know which circuit leg
773            // a control message is meant for without first reading the control message from
774            // the channel, and at that point, we can't know for sure whether that particular
775            // circuit is ready for sending.
776            ret = self.control.next() => {
777                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
778                CircuitAction::HandleControl(msg)
779            },
780            res = self.circuits.next_circ_action(&self.runtime)?.fuse() => res?,
781        };
782
783        let cmd = match action {
784            CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
785                RunOnceCmdInner::from_circuit_cmd(leg, cmd),
786            )),
787            CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
788                .handle_msg(ctrl)?
789                .map(RunOnceCmd::Single),
790            CircuitAction::HandleCell { leg, cell } => {
791                let circ = self
792                    .circuits
793                    .leg_mut(leg)
794                    .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
795
796                let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
797                if circ_cmds.is_empty() {
798                    None
799                } else {
800                    // TODO: we return RunOnceCmd::Multiple even if there's a single command.
801                    //
802                    // See the TODO on `Circuit::handle_cell`.
803                    let cmd = RunOnceCmd::Multiple(
804                        circ_cmds
805                            .into_iter()
806                            .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
807                            .collect(),
808                    );
809
810                    Some(cmd)
811                }
812            }
813            CircuitAction::RemoveLeg { leg, reason } => {
814                Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
815            }
816        };
817
818        if let Some(cmd) = cmd {
819            self.handle_run_once_cmd(cmd).await?;
820        }
821
822        Ok(())
823    }
824
825    /// Try to process the previously-out-of-order messages we might have buffered.
826    #[cfg(feature = "conflux")]
827    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
828        // Check if we're ready to dequeue any of the previously out-of-order cells.
829        while let Some(entry) = self.ooo_msgs.peek() {
830            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
831
832            if !should_pop {
833                break;
834            }
835
836            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
837
838            let circ = self
839                .circuits
840                .leg_mut(entry.leg_id)
841                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
842            let handlers = &mut self.cell_handlers;
843            let cmd = circ
844                .handle_in_order_relay_msg(
845                    handlers,
846                    entry.msg.hopnum,
847                    entry.leg_id,
848                    entry.msg.cell_counts_towards_windows,
849                    entry.msg.streamid,
850                    entry.msg.msg,
851                )?
852                .map(|cmd| {
853                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
854                });
855
856            if let Some(cmd) = cmd {
857                self.handle_run_once_cmd(cmd).await?;
858            }
859        }
860
861        Ok(())
862    }
863
864    /// Handle a [`RunOnceCmd`].
865    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
866        match cmd {
867            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
868            RunOnceCmd::Multiple(cmds) => {
869                // While we know `sendable` is ready to accept *one* cell,
870                // we can't be certain it will be able to accept *all* of the cells
871                // that need to be sent here. This means we *may* end up buffering
872                // in its underlying SometimesUnboundedSink! That is OK, because
873                // RunOnceCmd::Multiple is only used for handling packed cells.
874                for cmd in cmds {
875                    self.handle_single_run_once_cmd(cmd).await?;
876                }
877            }
878        }
879
880        Ok(())
881    }
882
883    /// Handle a [`RunOnceCmd`].
884    async fn handle_single_run_once_cmd(
885        &mut self,
886        cmd: RunOnceCmdInner,
887    ) -> StdResult<(), ReactorError> {
888        match cmd {
889            RunOnceCmdInner::Send { leg, cell, done } => {
890                // TODO: check the cc window
891                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
892                if let Some(done) = done {
893                    // Don't care if the receiver goes away
894                    let _ = done.send(res.clone());
895                }
896                res?;
897            }
898            #[cfg(feature = "send-control-msg")]
899            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
900                let cell: Result<Option<SendRelayCell>> =
901                    self.prepare_msg_and_install_handler(msg, handler);
902
903                match cell {
904                    Ok(Some(cell)) => {
905                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
906                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
907                        // don't care if receiver goes away.
908                        let _ = done.send(outcome.clone());
909                        outcome?;
910                    }
911                    Ok(None) => {
912                        // don't care if receiver goes away.
913                        let _ = done.send(Ok(()));
914                    }
915                    Err(e) => {
916                        // don't care if receiver goes away.
917                        let _ = done.send(Err(e.clone()));
918                        return Err(e.into());
919                    }
920                }
921            }
922            RunOnceCmdInner::BeginStream {
923                leg,
924                cell,
925                hop,
926                done,
927            } => {
928                match cell {
929                    Ok((cell, stream_id)) => {
930                        let circ = self
931                            .circuits
932                            .leg_mut(leg)
933                            .ok_or_else(|| internal!("leg disappeared?!"))?;
934                        let cell_hop = cell.hop;
935                        let relay_format = circ
936                            .hop_mut(cell_hop)
937                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
938                            .ok_or(Error::NoSuchHop)?
939                            .relay_cell_format();
940
941                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
942                        // don't care if receiver goes away.
943                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
944                        outcome?;
945                    }
946                    Err(e) => {
947                        // don't care if receiver goes away.
948                        let _ = done.send(Err(e.clone()));
949                        return Err(e.into());
950                    }
951                }
952            }
953            RunOnceCmdInner::CloseStream {
954                hop,
955                sid,
956                behav,
957                reason,
958                done,
959            } => {
960                let result = (move || {
961                    // this is needed to force the closure to be FnOnce rather than FnMut :(
962                    let self_ = self;
963                    let (leg_id, hop_num) = self_
964                        .resolve_hop_location(hop)
965                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
966                    let leg = self_
967                        .circuits
968                        .leg_mut(leg_id)
969                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
970                    Ok::<_, Bug>((leg, hop_num))
971                })();
972
973                let (leg, hop_num) = match result {
974                    Ok(x) => x,
975                    Err(e) => {
976                        if let Some(done) = done {
977                            // don't care if the sender goes away
978                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
979                            let _ = done.send(Err(e.into()));
980                        }
981                        return Ok(());
982                    }
983                };
984
985                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
986
987                if let Some(done) = done {
988                    // don't care if the sender goes away
989                    let _ = done.send(res);
990                }
991            }
992            RunOnceCmdInner::MaybeSendXon {
993                rate,
994                stream_id,
995                hop,
996            } => {
997                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
998                    Ok(x) => x,
999                    Err(NoJoinPointError) => {
1000                        // A stream tried to send an XON message message to the join point of
1001                        // a tunnel that has never had a join point. Currently in arti, only a
1002                        // `StreamTarget` asks us to send an XON message, and this tunnel
1003                        // originally created the `StreamTarget` to begin with. So this is a
1004                        // legitimate bug somewhere in the tunnel code.
1005                        let err = internal!(
1006                            "Could not send an XON message to a join point on a tunnel without a join point",
1007                        );
1008                        // TODO: Rather than calling `warn_report` here, we should call
1009                        // `trace_report!` from `Reactor::run_once()`. Since this is an internal
1010                        // error, `trace_report!` should log it at "warn" level.
1011                        warn_report!(err, "Tunnel reactor error");
1012                        return Err(err.into());
1013                    }
1014                };
1015
1016                let Some(leg) = self.circuits.leg_mut(leg_id) else {
1017                    // The leg has disappeared. This is fine since the stream may have ended and
1018                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
1019                    // It is possible that is a bug and this is an incorrect leg number, but
1020                    // it's not currently possible to differentiate between an incorrect leg
1021                    // number and a tunnel leg that has been closed.
1022                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
1023                    return Ok(());
1024                };
1025
1026                let Some(hop) = leg.hop_mut(hop_num) else {
1027                    // The hop has disappeared. This is fine since the circuit may have been
1028                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
1029                    // It is possible that is a bug and this is an incorrect hop number, but
1030                    // it's not currently possible to differentiate between an incorrect hop
1031                    // number and a circuit hop that has been removed.
1032                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
1033                    return Ok(());
1034                };
1035
1036                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
1037                    // Nothing to do.
1038                    return Ok(());
1039                };
1040
1041                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
1042                let cell = SendRelayCell {
1043                    hop: hop_num,
1044                    early: false,
1045                    cell,
1046                };
1047
1048                leg.send_relay_cell(cell).await?;
1049            }
1050            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1051                let leg = self
1052                    .circuits
1053                    .leg_mut(leg)
1054                    .ok_or_else(|| internal!("leg disappeared?!"))?;
1055                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1056                // future which *should* resolve immediately
1057                let signals = leg.congestion_signals().await;
1058                leg.handle_sendme(hop, sendme, signals)?;
1059            }
1060            RunOnceCmdInner::FirstHopClockSkew { answer } => {
1061                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
1062
1063                // don't care if the sender goes away
1064                let _ = answer.send(res.map_err(Into::into));
1065            }
1066            RunOnceCmdInner::CleanShutdown => {
1067                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
1068                return Err(ReactorError::Shutdown);
1069            }
1070            RunOnceCmdInner::RemoveLeg { leg, reason } => {
1071                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
1072
1073                let circ = self.circuits.remove(leg)?;
1074                let is_conflux_pending = circ.is_conflux_pending();
1075
1076                // Drop the removed leg. This will cause it to close if it's not already closed.
1077                drop(circ);
1078
1079                // If we reach this point, it means we have more than one leg
1080                // (otherwise the .remove() would've returned a Shutdown error),
1081                // so we expect there to be a ConfluxHandshakeContext installed.
1082
1083                #[cfg(feature = "conflux")]
1084                if is_conflux_pending {
1085                    let (error, proto_violation): (_, Option<Error>) = match &reason {
1086                        RemoveLegReason::ConfluxHandshakeTimeout => {
1087                            (ConfluxHandshakeError::Timeout, None)
1088                        }
1089                        RemoveLegReason::ConfluxHandshakeErr(e) => {
1090                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
1091                        }
1092                        RemoveLegReason::ChannelClosed => {
1093                            (ConfluxHandshakeError::ChannelClosed, None)
1094                        }
1095                    };
1096
1097                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
1098
1099                    if let Some(e) = proto_violation {
1100                        // TODO: make warn_report support structured logging
1101                        tor_error::warn_report!(
1102                            e,
1103                            "{}: Malformed conflux handshake, tearing down tunnel",
1104                            self.tunnel_id
1105                        );
1106
1107                        return Err(e.into());
1108                    }
1109                }
1110            }
1111            #[cfg(feature = "conflux")]
1112            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1113                // Note: on the client-side, the handshake is considered complete once the
1114                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1115                //
1116                // We're optimistic here, and declare the handshake a success *before*
1117                // sending the LINKED_ACK response. I think this is OK though,
1118                // because if the send_relay_cell() below fails, the reactor will shut
1119                // down anyway. OTOH, marking the handshake as complete slightly early
1120                // means that on the happy path, the circuit is marked as usable sooner,
1121                // instead of blocking on the sending of the LINKED_ACK.
1122                self.note_conflux_handshake_result(Ok(()), false)?;
1123
1124                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
1125
1126                res?;
1127            }
1128            #[cfg(feature = "conflux")]
1129            RunOnceCmdInner::Link { circuits, answer } => {
1130                // Add the specified circuits to our conflux set,
1131                // and send a LINK cell down each unlinked leg.
1132                //
1133                // NOTE: this will block the reactor until all the cells are sent.
1134                self.handle_link_circuits(circuits, answer).await?;
1135            }
1136            #[cfg(feature = "conflux")]
1137            RunOnceCmdInner::Enqueue { leg, msg } => {
1138                let entry = ConfluxHeapEntry { leg_id: leg, msg };
1139                self.ooo_msgs.push(entry);
1140            }
1141        }
1142
1143        Ok(())
1144    }
1145
1146    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1147    ///
1148    /// Returns an error if an unexpected `CtrlMsg` is received.
1149    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1150        let msg = select_biased! {
1151            res = self.command.next() => {
1152                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1153                match cmd {
1154                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1155                    #[cfg(test)]
1156                    CtrlCmd::AddFakeHop {
1157                        relay_cell_format: format,
1158                        fwd_lasthop,
1159                        rev_lasthop,
1160                        peer_id,
1161                        params,
1162                        done,
1163                    } => {
1164                        let leg = self.circuits.single_leg_mut()?;
1165                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
1166                        return Ok(())
1167                    },
1168                    _ => {
1169                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1170                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1171                    }
1172                }
1173            },
1174            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1175        };
1176
1177        match msg {
1178            CtrlMsg::Create {
1179                recv_created,
1180                handshake,
1181                settings,
1182                done,
1183            } => {
1184                // TODO(conflux): instead of crashing the reactor, it might be better
1185                // to send the error via the done channel instead
1186                let leg = self.circuits.single_leg_mut()?;
1187                leg.handle_create(recv_created, handshake, settings, done)
1188                    .await
1189            }
1190            _ => {
1191                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1192
1193                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1194            }
1195        }
1196    }
1197
1198    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1199    ///
1200    /// If all the circuits we were waiting on have finished the conflux handshake,
1201    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1202    /// are sent to the handshake initiator.
1203    #[cfg(feature = "conflux")]
1204    fn note_conflux_handshake_result(
1205        &mut self,
1206        res: StdResult<(), ConfluxHandshakeError>,
1207        reactor_is_closing: bool,
1208    ) -> StdResult<(), ReactorError> {
1209        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1210            Some(conflux_ctx) => {
1211                conflux_ctx.results.push(res);
1212                // Whether all the legs have finished linking:
1213                conflux_ctx.results.len() == conflux_ctx.num_legs
1214            }
1215            None => {
1216                return Err(internal!("no conflux handshake context").into());
1217            }
1218        };
1219
1220        if tunnel_complete || reactor_is_closing {
1221            // Time to remove the conflux handshake context
1222            // and extract the results we have collected
1223            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1224
1225            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
1226            let leg_count = conflux_ctx.results.len();
1227
1228            info!(
1229                tunnel_id = %self.tunnel_id,
1230                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
1231            );
1232
1233            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1234
1235            // We don't expect to receive any more handshake results,
1236            // at least not until we get another LinkCircuits control message,
1237            // which will install a new ConfluxHandshakeCtx with a channel
1238            // for us to send updates on
1239        }
1240
1241        Ok(())
1242    }
1243
1244    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1245    fn prepare_msg_and_install_handler(
1246        &mut self,
1247        msg: Option<AnyRelayMsgOuter>,
1248        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1249    ) -> Result<Option<SendRelayCell>> {
1250        let msg = msg
1251            .map(|msg| {
1252                let handlers = &mut self.cell_handlers;
1253                let handler = handler
1254                    .as_ref()
1255                    .or(handlers.meta_handler.as_ref())
1256                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1257                // We should always have a precise HopLocation here so this should never fails but
1258                // in case we have a ::JointPoint, we'll notice.
1259                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
1260                    "MsgHandler doesn't have a precise HopLocation"
1261                ))?;
1262                Ok::<_, crate::Error>(SendRelayCell {
1263                    hop,
1264                    early: false,
1265                    cell: msg,
1266                })
1267            })
1268            .transpose()?;
1269
1270        if let Some(handler) = handler {
1271            self.cell_handlers.set_meta_handler(handler)?;
1272        }
1273
1274        Ok(msg)
1275    }
1276
1277    /// Handle a shutdown request.
1278    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1279        trace!(
1280            tunnel_id = %self.tunnel_id,
1281            "reactor shutdown due to explicit request",
1282        );
1283
1284        Err(ReactorError::Shutdown)
1285    }
1286
1287    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1288    ///
1289    /// Returns an error over the `answer` channel if the reactor has no circuits,
1290    /// or more than one circuit. The reactor will shut down regardless.
1291    #[cfg(feature = "conflux")]
1292    fn handle_shutdown_and_return_circuit(
1293        &mut self,
1294        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1295    ) -> StdResult<(), ReactorError> {
1296        // Don't care if the receiver goes away
1297        let _ = answer.send(self.circuits.take_single_leg());
1298        self.handle_shutdown().map(|_| ())
1299    }
1300
1301    /// Resolves a [`TargetHop`] to a [`HopLocation`].
1302    ///
1303    /// After resolving a `TargetHop::LastHop`,
1304    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1305    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1306    ///
1307    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1308    /// if you need a fixed hop position in the tunnel.
1309    /// For example if we open a stream to `TargetHop::LastHop`,
1310    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1311    /// as we don't want the stream position to change as the tunnel is extended or truncated.
1312    ///
1313    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1314    /// and the tunnel has no hops
1315    /// (either has no legs, or has legs which contain no hops).
1316    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1317        match hop {
1318            TargetHop::Hop(hop) => Ok(hop),
1319            TargetHop::LastHop => {
1320                if let Ok(leg) = self.circuits.single_leg() {
1321                    let leg_id = leg.unique_id();
1322                    // single-path tunnel
1323                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1324                    Ok(HopLocation::Hop((leg_id, hop)))
1325                } else if !self.circuits.is_empty() {
1326                    // multi-path tunnel
1327                    Ok(HopLocation::JoinPoint)
1328                } else {
1329                    // no legs
1330                    Err(NoHopsBuiltError)
1331                }
1332            }
1333        }
1334    }
1335
1336    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
1337    ///
1338    /// After resolving a `HopLocation::JoinPoint`,
1339    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
1340    ///
1341    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
1342    /// need them,
1343    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
1344    /// iterations as the primary leg may change from one iteration to the next.
1345    ///
1346    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1347    /// but it does not have a join point.
1348    fn resolve_hop_location(
1349        &self,
1350        hop: HopLocation,
1351    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
1352        match hop {
1353            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1354            HopLocation::JoinPoint => {
1355                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1356                    Ok((leg_id, hop_num))
1357                } else {
1358                    // Attempted to get the join point of a non-multipath tunnel.
1359                    Err(NoJoinPointError)
1360                }
1361            }
1362        }
1363    }
1364
1365    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
1366    ///
1367    /// This is a helper function that basically calls both resolve_target_hop and
1368    /// resolve_hop_location back to back.
1369    ///
1370    /// It returns None on failure to resolve meaning that if you want more detailed error on why
1371    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
1372    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
1373        self.resolve_target_hop(hop)
1374            .ok()
1375            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
1376    }
1377
1378    /// Does congestion control use stream SENDMEs for the given hop?
1379    ///
1380    /// Returns `None` if either the `leg` or `hop` don't exist.
1381    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
1382        self.circuits.uses_stream_sendme(leg, hop)
1383    }
1384
1385    /// Handle a request to link some extra circuits in the reactor's conflux set.
1386    ///
1387    /// The circuits are validated, and if they do not have the same length,
1388    /// or if they do not all have the same last hop, an error is returned on
1389    /// the `answer` channel, and the conflux handshake is *not* initiated.
1390    ///
1391    /// If validation succeeds, the circuits are added to this reactor's conflux set,
1392    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1393    ///
1394    /// NOTE: this blocks the reactor main loop until all the cells are sent.
1395    #[cfg(feature = "conflux")]
1396    async fn handle_link_circuits(
1397        &mut self,
1398        circuits: Vec<Circuit>,
1399        answer: ConfluxLinkResultChannel,
1400    ) -> StdResult<(), ReactorError> {
1401        use tor_error::warn_report;
1402
1403        if self.conflux_hs_ctx.is_some() {
1404            let err = internal!("conflux linking already in progress");
1405            send_conflux_outcome(answer, Err(err.into()))?;
1406
1407            return Ok(());
1408        }
1409
1410        let unlinked_legs = self.circuits.num_unlinked();
1411
1412        // We need to send the LINK cell on each of the new circuits
1413        // and on each of the existing, unlinked legs from self.circuits.
1414        //
1415        // In reality, there can only be one such circuit
1416        // (the "initial" one from the previously single-path tunnel),
1417        // because any circuits that to complete the conflux handshake
1418        // get removed from the set.
1419        let num_legs = circuits.len() + unlinked_legs;
1420
1421        // Note: add_legs validates `circuits`
1422        let res = async {
1423            self.circuits.add_legs(circuits, &self.runtime)?;
1424            self.circuits.link_circuits(&self.runtime).await
1425        }
1426        .await;
1427
1428        if let Err(e) = res {
1429            warn_report!(e, "Failed to link conflux circuits");
1430
1431            send_conflux_outcome(answer, Err(e))?;
1432        } else {
1433            // Save the channel, to notify the user of completion.
1434            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1435                answer,
1436                num_legs,
1437                results: Default::default(),
1438            });
1439        }
1440
1441        Ok(())
1442    }
1443}
1444
1445/// Notify the conflux handshake initiator of the handshake outcome.
1446///
1447/// Returns an error if the initiator has done away.
1448#[cfg(feature = "conflux")]
1449fn send_conflux_outcome(
1450    tx: ConfluxLinkResultChannel,
1451    res: Result<ConfluxHandshakeResult>,
1452) -> StdResult<(), ReactorError> {
1453    if tx.send(res).is_err() {
1454        tracing::warn!("conflux initiator went away before handshake completed?");
1455        return Err(ReactorError::Shutdown);
1456    }
1457
1458    Ok(())
1459}
1460
1461/// The tunnel does not have any hops.
1462#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1463#[non_exhaustive]
1464#[error("no hops have been built for this tunnel")]
1465pub(crate) struct NoHopsBuiltError;
1466
1467/// The tunnel does not have a join point.
1468#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1469#[non_exhaustive]
1470#[error("the tunnel does not have a join point")]
1471pub(crate) struct NoJoinPointError;
1472
1473impl CellHandlers {
1474    /// Try to install a given meta-cell handler to receive any unusual cells on
1475    /// this circuit, along with a result channel to notify on completion.
1476    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1477        if self.meta_handler.is_none() {
1478            self.meta_handler = Some(handler);
1479            Ok(())
1480        } else {
1481            Err(Error::from(internal!(
1482                "Tried to install a meta-cell handler before the old one was gone."
1483            )))
1484        }
1485    }
1486
1487    /// Try to install a given cell handler on this circuit.
1488    #[cfg(feature = "hs-service")]
1489    fn set_incoming_stream_req_handler(
1490        &mut self,
1491        handler: IncomingStreamRequestHandler,
1492    ) -> Result<()> {
1493        if self.incoming_stream_req_handler.is_none() {
1494            self.incoming_stream_req_handler = Some(handler);
1495            Ok(())
1496        } else {
1497            Err(Error::from(internal!(
1498                "Tried to install a BEGIN cell handler before the old one was gone."
1499            )))
1500        }
1501    }
1502}
1503
1504#[cfg(test)]
1505mod test {
1506    // Tested in [`crate::tunnel::circuit::test`].
1507}