tor_proto/tunnel/reactor/
control.rs

1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6    RunOnceCmdInner, SendRelayCell,
7};
8use crate::circuit::HopSettings;
9use crate::crypto::binding::CircuitBinding;
10use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer, Tor1RelayCrypto};
11use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
12use crate::stream::queue::StreamQueueSender;
13use crate::stream::{AnyCmdChecker, DrainRateRequest, StreamRateLimit};
14use crate::tunnel::circuit::celltypes::CreateResponse;
15use crate::tunnel::circuit::path;
16use crate::tunnel::reactor::circuit::circ_extensions_from_settings;
17use crate::tunnel::reactor::{NoJoinPointError, NtorClient, ReactorError};
18use crate::tunnel::{streammap, HopLocation, TargetHop};
19use crate::util::notify::NotifySender;
20use crate::util::skew::ClockSkew;
21use crate::Result;
22#[cfg(test)]
23use crate::{circuit::CircParameters, circuit::UniqId, crypto::cell::HopNum};
24use postage::watch;
25use tor_cell::chancell::msg::HandshakeType;
26use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
27use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
28use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
29use tor_error::{bad_api_usage, internal, into_bad_api_usage, warn_report, Bug};
30use tracing::{debug, trace};
31#[cfg(feature = "hs-service")]
32use {
33    super::StreamReqSender, crate::stream::IncomingStreamRequestFilter,
34    crate::tunnel::reactor::IncomingStreamRequestHandler,
35};
36
37#[cfg(test)]
38use tor_cell::relaycell::msg::SendmeTag;
39
40#[cfg(feature = "conflux")]
41use super::{Circuit, ConfluxLinkResultChannel};
42
43use oneshot_fused_workaround as oneshot;
44
45use crate::crypto::handshake::ntor::NtorPublicKey;
46use crate::tunnel::circuit::StreamMpscReceiver;
47use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
48
49use std::result::Result as StdResult;
50
51/// A message telling the reactor to do something.
52///
53/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
54///
55/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
56/// cause the reactor to send cells on the reactor's `chan_sender`,
57/// whereas `CtrlCmd` do not.
58#[derive(educe::Educe)]
59#[educe(Debug)]
60pub(crate) enum CtrlMsg {
61    /// Create the first hop of this circuit.
62    Create {
63        /// A oneshot channel on which we'll receive the creation response.
64        recv_created: oneshot::Receiver<CreateResponse>,
65        /// The handshake type to use for the first hop.
66        handshake: CircuitHandshake,
67        /// Other parameters relevant for circuit creation.
68        settings: HopSettings,
69        /// Oneshot channel to notify on completion.
70        done: ReactorResultChannel<()>,
71    },
72    /// Extend a circuit by one hop, using the ntor handshake.
73    ExtendNtor {
74        /// The peer that we're extending to.
75        ///
76        /// Used to extend our record of the circuit's path.
77        peer_id: OwnedChanTarget,
78        /// The handshake type to use for this hop.
79        public_key: NtorPublicKey,
80        /// Information about how to connect to the relay we're extending to.
81        linkspecs: Vec<EncodedLinkSpec>,
82        /// Other parameters we are negotiating.
83        settings: HopSettings,
84        /// Oneshot channel to notify on completion.
85        done: ReactorResultChannel<()>,
86    },
87    /// Extend a circuit by one hop, using the ntorv3 handshake.
88    ExtendNtorV3 {
89        /// The peer that we're extending to.
90        ///
91        /// Used to extend our record of the circuit's path.
92        peer_id: OwnedChanTarget,
93        /// The handshake type to use for this hop.
94        public_key: NtorV3PublicKey,
95        /// Information about how to connect to the relay we're extending to.
96        linkspecs: Vec<EncodedLinkSpec>,
97        /// Other parameters we are negotiating.
98        settings: HopSettings,
99        /// Oneshot channel to notify on completion.
100        done: ReactorResultChannel<()>,
101    },
102    /// Begin a stream with the provided hop in this circuit.
103    ///
104    /// Allocates a stream ID, and sends the provided message to that hop.
105    BeginStream {
106        /// The hop number to begin the stream with.
107        hop: TargetHop,
108        /// The message to send.
109        message: AnyRelayMsg,
110        /// A channel to send messages on this stream down.
111        ///
112        /// This sender shouldn't ever block, because we use congestion control and only send
113        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
114        /// can assume someone is trying to send us more cells than they should, and abort
115        /// the stream.
116        sender: StreamQueueSender,
117        /// A channel to receive messages to send on this stream from.
118        rx: StreamMpscReceiver<AnyRelayMsg>,
119        /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
120        rate_limit_notifier: watch::Sender<StreamRateLimit>,
121        /// Notifies the stream reader when it should send a new drain rate.
122        drain_rate_requester: NotifySender<DrainRateRequest>,
123        /// Oneshot channel to notify on completion, with the allocated stream ID.
124        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
125        /// A `CmdChecker` to keep track of which message types are acceptable.
126        cmd_checker: AnyCmdChecker,
127    },
128    /// Close the specified pending incoming stream, sending the provided END message.
129    ///
130    /// A stream is said to be pending if the message for initiating the stream was received but
131    /// not has not been responded to yet.
132    ///
133    /// This should be used by responders for closing pending incoming streams initiated by the
134    /// other party on the circuit.
135    #[cfg(feature = "hs-service")]
136    ClosePendingStream {
137        /// The hop number the stream is on.
138        hop: HopLocation,
139        /// The stream ID to send the END for.
140        stream_id: StreamId,
141        /// The END message to send, if any.
142        message: CloseStreamBehavior,
143        /// Oneshot channel to notify on completion.
144        done: ReactorResultChannel<()>,
145    },
146    /// Send a given control message on this circuit.
147    #[cfg(feature = "send-control-msg")]
148    SendMsg {
149        /// The hop to receive this message.
150        hop: TargetHop,
151        /// The message to send.
152        msg: AnyRelayMsg,
153        /// A sender that we use to tell the caller that the message was sent
154        /// and the handler installed.
155        sender: oneshot::Sender<Result<()>>,
156    },
157    /// Send a given control message on this circuit, and install a control-message handler to
158    /// receive responses.
159    #[cfg(feature = "send-control-msg")]
160    SendMsgAndInstallHandler {
161        /// The message to send, if any
162        msg: Option<AnyRelayMsgOuter>,
163        /// A message handler to install.
164        ///
165        /// If this is `None`, there must already be a message handler installed
166        #[educe(Debug(ignore))]
167        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
168        /// A sender that we use to tell the caller that the message was sent
169        /// and the handler installed.
170        sender: oneshot::Sender<Result<()>>,
171    },
172    /// Inform the reactor that there's a flow control update for a given stream.
173    ///
174    /// The reactor will decide how to handle this update depending on the type of flow control and
175    /// the current state of the stream.
176    FlowCtrlUpdate {
177        /// The type of flow control update, and any associated metadata.
178        msg: FlowCtrlMsg,
179        /// The stream ID that the update is for.
180        stream_id: StreamId,
181        /// The hop that the stream is on.
182        hop: HopLocation,
183    },
184    /// Get the clock skew claimed by the first hop of the circuit.
185    FirstHopClockSkew {
186        /// Oneshot channel to return the clock skew.
187        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
188    },
189    /// Link the specified circuits into the current tunnel,
190    /// to form a multi-path tunnel.
191    #[cfg(feature = "conflux")]
192    #[allow(unused)] // TODO(conflux)
193    LinkCircuits {
194        /// The circuits to link into the tunnel,
195        #[educe(Debug(ignore))]
196        circuits: Vec<Circuit>,
197        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
198        /// or have failed to link.
199        ///
200        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
201        /// (see [set construction]).
202        ///
203        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
204        answer: ConfluxLinkResultChannel,
205    },
206}
207
208/// A message telling the reactor to do something.
209///
210/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
211/// never cause cells to sent on the channel,
212/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
213/// some of which instruct the reactor to send cells down the channel.
214#[derive(educe::Educe)]
215#[educe(Debug)]
216pub(crate) enum CtrlCmd {
217    /// Shut down the reactor.
218    Shutdown,
219    /// Extend the circuit by one hop, in response to an out-of-band handshake.
220    ///
221    /// (This is used for onion services, where the negotiation takes place in
222    /// INTRODUCE and RENDEZVOUS messages.)
223    #[cfg(feature = "hs-common")]
224    ExtendVirtual {
225        /// The cryptographic algorithms and keys to use when communicating with
226        /// the newly added hop.
227        #[educe(Debug(ignore))]
228        cell_crypto: (
229            Box<dyn OutboundClientLayer + Send>,
230            Box<dyn InboundClientLayer + Send>,
231            Option<CircuitBinding>,
232        ),
233        /// A set of parameters to negotiate with this hop.
234        settings: HopSettings,
235        /// Oneshot channel to notify on completion.
236        done: ReactorResultChannel<()>,
237    },
238    /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
239    ResolveTargetHop {
240        /// The target hop to resolve.
241        hop: TargetHop,
242        /// Oneshot channel to notify on completion.
243        done: ReactorResultChannel<HopLocation>,
244    },
245    /// Begin accepting streams on this circuit.
246    #[cfg(feature = "hs-service")]
247    AwaitStreamRequest {
248        /// A channel for sending information about an incoming stream request.
249        incoming_sender: StreamReqSender,
250        /// A `CmdChecker` to keep track of which message types are acceptable.
251        cmd_checker: AnyCmdChecker,
252        /// Oneshot channel to notify on completion.
253        done: ReactorResultChannel<()>,
254        /// The hop that is allowed to create streams.
255        hop: TargetHop,
256        /// A filter used to check requests before passing them on.
257        #[educe(Debug(ignore))]
258        #[cfg(feature = "hs-service")]
259        filter: Box<dyn IncomingStreamRequestFilter>,
260    },
261    /// Request the binding key of a target hop.
262    #[cfg(feature = "hs-service")]
263    GetBindingKey {
264        /// The hop for which we want the key.
265        hop: TargetHop,
266        /// Oneshot channel to notify on completion.
267        done: ReactorResultChannel<Option<CircuitBinding>>,
268    },
269    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
270    #[cfg(test)]
271    AddFakeHop {
272        relay_cell_format: RelayCellFormat,
273        fwd_lasthop: bool,
274        rev_lasthop: bool,
275        peer_id: path::HopDetail,
276        params: CircParameters,
277        done: ReactorResultChannel<()>,
278    },
279    /// (tests only) Get the send window and expected tags for a given hop.
280    #[cfg(test)]
281    QuerySendWindow {
282        hop: HopNum,
283        leg: UniqId,
284        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
285    },
286    /// Shut down the reactor, and return the underlying [`Circuit`],
287    /// if the tunnel is not multi-path.
288    ///
289    /// Returns an error if called on a multi-path reactor.
290    #[cfg(feature = "conflux")]
291    #[allow(unused)] // TODO(conflux)
292    ShutdownAndReturnCircuit {
293        /// Oneshot channel to return the underlying [`Circuit`],
294        /// or an error if the reactor's tunnel is multi-path.
295        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
296    },
297}
298
299/// A flow control update message.
300#[derive(Debug)]
301pub(crate) enum FlowCtrlMsg {
302    /// Send a SENDME message on this stream.
303    Sendme,
304    /// Send an XON message on this stream with the given rate.
305    Xon(XonKbpsEwma),
306}
307
308/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
309///
310/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
311/// respectively, are handled.
312pub(crate) struct ControlHandler<'a> {
313    /// Reference to the reactor of this
314    reactor: &'a mut Reactor,
315}
316
317impl<'a> ControlHandler<'a> {
318    /// Constructor.
319    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
320        Self { reactor }
321    }
322
323    /// Handle a control message.
324    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
325        trace!(
326            tunnel_id = %self.reactor.tunnel_id,
327            msg = ?msg,
328            "reactor received control message"
329        );
330
331        match msg {
332            // This is handled earlier, since it requires blocking.
333            CtrlMsg::Create { done, .. } => {
334                if self.reactor.circuits.len() == 1 {
335                    // This should've been handled in Reactor::run_once()
336                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
337                    debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
338                    // Don't care if the receiver goes away
339                    let _ = done.send(Err(tor_error::bad_api_usage!(
340                        "cannot create first hop twice"
341                    )
342                    .into()));
343                } else {
344                    // Don't care if the receiver goes away
345                    let _ = done.send(Err(tor_error::bad_api_usage!(
346                        "cannot create first hop on multipath tunnel"
347                    )
348                    .into()));
349                }
350
351                Ok(None)
352            }
353            CtrlMsg::ExtendNtor {
354                peer_id,
355                public_key,
356                linkspecs,
357                settings,
358                done,
359            } => {
360                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
361                    // Don't care if the receiver goes away
362                    let _ = done.send(Err(tor_error::bad_api_usage!(
363                        "cannot extend multipath tunnel"
364                    )
365                    .into()));
366
367                    return Ok(None);
368                };
369
370                let (extender, cell) = CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
371                    peer_id,
372                    HandshakeType::NTOR,
373                    &public_key,
374                    linkspecs,
375                    settings,
376                    &(),
377                    circ,
378                    done,
379                )?;
380                self.reactor
381                    .cell_handlers
382                    .set_meta_handler(Box::new(extender))?;
383
384                Ok(Some(RunOnceCmdInner::Send {
385                    leg: circ.unique_id(),
386                    cell,
387                    done: None,
388                }))
389            }
390            CtrlMsg::ExtendNtorV3 {
391                peer_id,
392                public_key,
393                linkspecs,
394                settings,
395                done,
396            } => {
397                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
398                    // Don't care if the receiver goes away
399                    let _ = done.send(Err(tor_error::bad_api_usage!(
400                        "cannot extend multipath tunnel"
401                    )
402                    .into()));
403
404                    return Ok(None);
405                };
406
407                let client_extensions = circ_extensions_from_settings(&settings)?;
408
409                let (extender, cell) =
410                    CircuitExtender::<NtorV3Client, Tor1RelayCrypto, _, _>::begin(
411                        peer_id,
412                        HandshakeType::NTOR_V3,
413                        &public_key,
414                        linkspecs,
415                        settings,
416                        &client_extensions,
417                        circ,
418                        done,
419                    )?;
420                self.reactor
421                    .cell_handlers
422                    .set_meta_handler(Box::new(extender))?;
423
424                Ok(Some(RunOnceCmdInner::Send {
425                    leg: circ.unique_id(),
426                    cell,
427                    done: None,
428                }))
429            }
430            CtrlMsg::BeginStream {
431                hop,
432                message,
433                sender,
434                rx,
435                rate_limit_notifier,
436                drain_rate_requester,
437                done,
438                cmd_checker,
439            } => {
440                // If resolving the hop fails,
441                // we want to report an error back to the initiator and not shut down the reactor.
442                let hop_location = match self.reactor.resolve_target_hop(hop) {
443                    Ok(x) => x,
444                    Err(e) => {
445                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
446                        // don't care if receiver goes away
447                        let _ = done.send(Err(e.into()));
448                        return Ok(None);
449                    }
450                };
451                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
452                    Ok(x) => x,
453                    Err(e) => {
454                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
455                        // don't care if receiver goes away
456                        let _ = done.send(Err(e.into()));
457                        return Ok(None);
458                    }
459                };
460                let circ = match self.reactor.circuits.leg_mut(leg_id) {
461                    Some(x) => x,
462                    None => {
463                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
464                        // don't care if receiver goes away
465                        let _ = done.send(Err(e.into()));
466                        return Ok(None);
467                    }
468                };
469
470                let cell = circ.begin_stream(
471                    hop_num,
472                    message,
473                    sender,
474                    rx,
475                    rate_limit_notifier,
476                    drain_rate_requester,
477                    cmd_checker,
478                )?;
479                Ok(Some(RunOnceCmdInner::BeginStream {
480                    leg: leg_id,
481                    cell,
482                    hop: hop_location,
483                    done,
484                }))
485            }
486            #[cfg(feature = "hs-service")]
487            CtrlMsg::ClosePendingStream {
488                hop,
489                stream_id,
490                message,
491                done,
492            } => Ok(Some(RunOnceCmdInner::CloseStream {
493                hop,
494                sid: stream_id,
495                behav: message,
496                reason: streammap::TerminateReason::ExplicitEnd,
497                done: Some(done),
498            })),
499            CtrlMsg::FlowCtrlUpdate {
500                msg,
501                stream_id,
502                hop,
503            } => {
504                match msg {
505                    FlowCtrlMsg::Sendme => {
506                        let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
507                            Ok(x) => x,
508                            Err(NoJoinPointError) => {
509                                // A stream tried to send a stream-level SENDME message to the join point of
510                                // a tunnel that has never had a join point. Currently in arti, only a
511                                // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
512                                // originally created the `StreamTarget` to begin with. So this is a
513                                // legitimate bug somewhere in the tunnel code.
514                                let err = internal!(
515                                    "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
516                                );
517                                // TODO: Rather than calling `warn_report` here, we should call
518                                // `trace_report!` from `Reactor::run_once()`. Since this is an internal
519                                // error, `trace_report!` should log it at "warn" level.
520                                warn_report!(err, "Tunnel reactor error");
521                                return Err(err.into());
522                            }
523                        };
524
525                        // Congestion control decides if we can send stream level SENDMEs or not.
526                        let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
527                        {
528                            Some(x) => x,
529                            None => {
530                                // The leg/hop has disappeared. This is fine since the stream may have ended
531                                // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
532                                // It is possible that is a bug and this is an incorrect leg/hop number, but
533                                // it's not currently possible to differentiate between an incorrect leg/hop
534                                // number and a circuit hop that has been closed.
535                                debug!("Could not send a stream-level SENDME on a hop that does not exist. Ignoring.");
536                                return Ok(None);
537                            }
538                        };
539
540                        if !sendme_required {
541                            // Nothing to do, so discard the SENDME.
542                            return Ok(None);
543                        }
544
545                        let sendme = Sendme::new_empty();
546                        let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
547
548                        let cell = SendRelayCell {
549                            hop: hop_num,
550                            early: false,
551                            cell,
552                        };
553
554                        Ok(Some(RunOnceCmdInner::Send {
555                            leg: leg_id,
556                            cell,
557                            done: None,
558                        }))
559                    }
560                    FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
561                        rate,
562                        hop,
563                        stream_id,
564                    })),
565                }
566            }
567            // TODO(conflux): this should specify which leg to send the msg on
568            // (currently we send it down the primary leg).
569            //
570            // This will involve updating ClientCIrc::send_raw_msg() to take a
571            // leg id argument (which is a breaking change.
572            #[cfg(feature = "send-control-msg")]
573            CtrlMsg::SendMsg { hop, msg, sender } => {
574                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
575                    // Don't care if receiver goes away
576                    let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
577                    return Ok(None);
578                };
579
580                let cell = AnyRelayMsgOuter::new(None, msg);
581                let cell = SendRelayCell {
582                    hop: hop_num,
583                    early: false,
584                    cell,
585                };
586
587                Ok(Some(RunOnceCmdInner::Send {
588                    leg: leg_id,
589                    cell,
590                    done: Some(sender),
591                }))
592            }
593            // TODO(conflux): this should specify which leg to send the msg on
594            // (currently we send it down the primary leg)
595            #[cfg(feature = "send-control-msg")]
596            CtrlMsg::SendMsgAndInstallHandler {
597                msg,
598                handler,
599                sender,
600            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
601                msg,
602                handler,
603                done: sender,
604            })),
605            CtrlMsg::FirstHopClockSkew { answer } => {
606                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
607            }
608            #[cfg(feature = "conflux")]
609            CtrlMsg::LinkCircuits { circuits, answer } => {
610                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
611            }
612        }
613    }
614
615    /// Handle a control command.
616    #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
617    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
618        trace!(
619            tunnel_id = %self.reactor.tunnel_id,
620            msg = ?msg,
621            "reactor received control command"
622        );
623
624        match msg {
625            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
626            #[cfg(feature = "hs-common")]
627            #[allow(unreachable_code)]
628            CtrlCmd::ExtendVirtual {
629                cell_crypto,
630                settings,
631                done,
632            } => {
633                let (outbound, inbound, binding) = cell_crypto;
634
635                // TODO HS: Perhaps this should describe the onion service, or
636                // describe why the virtual hop was added, or something?
637                let peer_id = path::HopDetail::Virtual;
638
639                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
640                    // Don't care if the receiver goes away
641                    let _ = done.send(Err(tor_error::bad_api_usage!(
642                        "cannot extend multipath tunnel"
643                    )
644                    .into()));
645
646                    return Ok(());
647                };
648
649                leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
650                let _ = done.send(Ok(()));
651
652                Ok(())
653            }
654            CtrlCmd::ResolveTargetHop { hop, done } => {
655                let _ = done.send(
656                    self.reactor
657                        .resolve_target_hop(hop)
658                        .map_err(|_| crate::util::err::Error::NoSuchHop),
659                );
660                Ok(())
661            }
662            #[cfg(feature = "hs-service")]
663            CtrlCmd::AwaitStreamRequest {
664                cmd_checker,
665                incoming_sender,
666                hop,
667                done,
668                filter,
669            } => {
670                let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
671                    let _ = done.send(Err(crate::Error::NoSuchHop));
672                    return Ok(());
673                };
674                // TODO: At some point we might want to add a CtrlCmd for
675                // de-registering the handler.  See comments on `allow_stream_requests`.
676                let handler = IncomingStreamRequestHandler {
677                    incoming_sender,
678                    cmd_checker,
679                    hop_num,
680                    filter,
681                };
682
683                let ret = self
684                    .reactor
685                    .cell_handlers
686                    .set_incoming_stream_req_handler(handler);
687                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
688
689                Ok(())
690            }
691            #[cfg(feature = "hs-service")]
692            CtrlCmd::GetBindingKey { hop, done } => {
693                let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
694                    let _ = done.send(Err(tor_error::internal!(
695                        "Unknown TargetHop when getting binding key"
696                    )
697                    .into()));
698                    return Ok(());
699                };
700                let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
701                    let _ = done.send(Err(tor_error::bad_api_usage!(
702                        "Unknown circuit id {leg_id} when getting binding key"
703                    )
704                    .into()));
705                    return Ok(());
706                };
707                // Get the binding key from the mutable state and send it back.
708                let key = circuit.mutable().binding_key(hop_num);
709                let _ = done.send(Ok(key));
710
711                Ok(())
712            }
713            #[cfg(test)]
714            CtrlCmd::AddFakeHop {
715                relay_cell_format,
716                fwd_lasthop,
717                rev_lasthop,
718                peer_id,
719                params,
720                done,
721            } => {
722                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
723                    // Don't care if the receiver goes away
724                    let _ = done.send(Err(tor_error::bad_api_usage!(
725                        "cannot add fake hop to multipath tunnel"
726                    )
727                    .into()));
728
729                    return Ok(());
730                };
731
732                leg.handle_add_fake_hop(
733                    relay_cell_format,
734                    fwd_lasthop,
735                    rev_lasthop,
736                    peer_id,
737                    &params,
738                    done,
739                );
740
741                Ok(())
742            }
743            #[cfg(test)]
744            CtrlCmd::QuerySendWindow { hop, leg, done } => {
745                // Immediately invoked function means that errors will be sent to the channel.
746                let _ = done.send((|| {
747                    let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
748                        bad_api_usage!("cannot query send window of non-existent circuit")
749                    })?;
750
751                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
752                        "received QuerySendWindow for unknown hop {}",
753                        hop.display()
754                    ))?;
755
756                    Ok(hop.send_window_and_expected_tags())
757                })());
758
759                Ok(())
760            }
761            #[cfg(feature = "conflux")]
762            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
763                self.reactor.handle_shutdown_and_return_circuit(answer)
764            }
765        }
766    }
767}