tor_proto/tunnel/reactor/
control.rs

1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6    RunOnceCmdInner, SendRelayCell,
7};
8use crate::crypto::binding::CircuitBinding;
9use crate::crypto::cell::{HopNum, InboundClientLayer, OutboundClientLayer, Tor1RelayCrypto};
10use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
11use crate::stream::AnyCmdChecker;
12use crate::tunnel::circuit::celltypes::CreateResponse;
13use crate::tunnel::circuit::{path, CircParameters};
14use crate::tunnel::reactor::{NtorClient, ReactorError};
15use crate::tunnel::{streammap, HopLocation, TargetHop};
16use crate::util::skew::ClockSkew;
17use crate::Result;
18use tor_cell::chancell::msg::HandshakeType;
19use tor_cell::relaycell::extend::{CcRequest, CircRequestExt};
20use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
21use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
22use tor_error::{bad_api_usage, into_bad_api_usage, Bug};
23use tracing::trace;
24#[cfg(feature = "hs-service")]
25use {
26    super::StreamReqSender, crate::stream::IncomingStreamRequestFilter,
27    crate::tunnel::reactor::IncomingStreamRequestHandler,
28};
29
30#[cfg(test)]
31use tor_cell::relaycell::msg::SendmeTag;
32
33#[cfg(feature = "conflux")]
34use super::{Circuit, ConfluxLinkResultChannel};
35
36use oneshot_fused_workaround as oneshot;
37
38use crate::crypto::handshake::ntor::NtorPublicKey;
39use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
40use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
41
42use std::result::Result as StdResult;
43
44/// A message telling the reactor to do something.
45///
46/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
47///
48/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
49/// cause the reactor to send cells on the reactor's `chan_sender`,
50/// whereas `CtrlCmd` do not.
51#[derive(educe::Educe)]
52#[educe(Debug)]
53pub(crate) enum CtrlMsg {
54    /// Create the first hop of this circuit.
55    Create {
56        /// A oneshot channel on which we'll receive the creation response.
57        recv_created: oneshot::Receiver<CreateResponse>,
58        /// The handshake type to use for the first hop.
59        handshake: CircuitHandshake,
60        /// Other parameters relevant for circuit creation.
61        params: CircParameters,
62        /// Oneshot channel to notify on completion.
63        done: ReactorResultChannel<()>,
64    },
65    /// Extend a circuit by one hop, using the ntor handshake.
66    ExtendNtor {
67        /// The peer that we're extending to.
68        ///
69        /// Used to extend our record of the circuit's path.
70        peer_id: OwnedChanTarget,
71        /// The handshake type to use for this hop.
72        public_key: NtorPublicKey,
73        /// Information about how to connect to the relay we're extending to.
74        linkspecs: Vec<EncodedLinkSpec>,
75        /// Other parameters relevant for circuit extension.
76        params: CircParameters,
77        /// Oneshot channel to notify on completion.
78        done: ReactorResultChannel<()>,
79    },
80    /// Extend a circuit by one hop, using the ntorv3 handshake.
81    ExtendNtorV3 {
82        /// The peer that we're extending to.
83        ///
84        /// Used to extend our record of the circuit's path.
85        peer_id: OwnedChanTarget,
86        /// The handshake type to use for this hop.
87        public_key: NtorV3PublicKey,
88        /// Information about how to connect to the relay we're extending to.
89        linkspecs: Vec<EncodedLinkSpec>,
90        /// Other parameters relevant for circuit extension.
91        params: CircParameters,
92        /// Oneshot channel to notify on completion.
93        done: ReactorResultChannel<()>,
94    },
95    /// Begin a stream with the provided hop in this circuit.
96    ///
97    /// Allocates a stream ID, and sends the provided message to that hop.
98    BeginStream {
99        /// The hop number to begin the stream with.
100        hop: TargetHop,
101        /// The message to send.
102        message: AnyRelayMsg,
103        /// A channel to send messages on this stream down.
104        ///
105        /// This sender shouldn't ever block, because we use congestion control and only send
106        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
107        /// can assume someone is trying to send us more cells than they should, and abort
108        /// the stream.
109        sender: StreamMpscSender<UnparsedRelayMsg>,
110        /// A channel to receive messages to send on this stream from.
111        rx: StreamMpscReceiver<AnyRelayMsg>,
112        /// Oneshot channel to notify on completion, with the allocated stream ID.
113        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
114        /// A `CmdChecker` to keep track of which message types are acceptable.
115        cmd_checker: AnyCmdChecker,
116    },
117    /// Close the specified pending incoming stream, sending the provided END message.
118    ///
119    /// A stream is said to be pending if the message for initiating the stream was received but
120    /// not has not been responded to yet.
121    ///
122    /// This should be used by responders for closing pending incoming streams initiated by the
123    /// other party on the circuit.
124    #[cfg(feature = "hs-service")]
125    ClosePendingStream {
126        /// The hop number the stream is on.
127        hop: HopLocation,
128        /// The stream ID to send the END for.
129        stream_id: StreamId,
130        /// The END message to send, if any.
131        message: CloseStreamBehavior,
132        /// Oneshot channel to notify on completion.
133        done: ReactorResultChannel<()>,
134    },
135    /// Send a given control message on this circuit.
136    #[cfg(feature = "send-control-msg")]
137    SendMsg {
138        /// The hop to receive this message.
139        hop_num: HopNum,
140        /// The message to send.
141        msg: AnyRelayMsg,
142        /// A sender that we use to tell the caller that the message was sent
143        /// and the handler installed.
144        sender: oneshot::Sender<Result<()>>,
145    },
146    /// Send a given control message on this circuit, and install a control-message handler to
147    /// receive responses.
148    #[cfg(feature = "send-control-msg")]
149    SendMsgAndInstallHandler {
150        /// The message to send, if any
151        msg: Option<AnyRelayMsgOuter>,
152        /// A message handler to install.
153        ///
154        /// If this is `None`, there must already be a message handler installed
155        #[educe(Debug(ignore))]
156        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
157        /// A sender that we use to tell the caller that the message was sent
158        /// and the handler installed.
159        sender: oneshot::Sender<Result<()>>,
160    },
161    /// Send a SENDME cell (used to ask for more data to be sent) on the given stream.
162    SendSendme {
163        /// The stream ID to send a SENDME for.
164        stream_id: StreamId,
165        /// The hop number the stream is on.
166        hop: HopLocation,
167        /// A sender that we use to tell the caller that the SENDME was sent.
168        sender: oneshot::Sender<Result<()>>,
169    },
170    /// Get the clock skew claimed by the first hop of the circuit.
171    FirstHopClockSkew {
172        /// Oneshot channel to return the clock skew.
173        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
174    },
175    /// Link the specified circuits into the current tunnel,
176    /// to form a multi-path tunnel.
177    #[cfg(feature = "conflux")]
178    #[allow(unused)] // TODO(conflux)
179    LinkCircuits {
180        /// The circuits to link into the tunnel,
181        #[educe(Debug(ignore))]
182        circuits: Vec<Circuit>,
183        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
184        /// or have failed to link.
185        ///
186        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
187        /// (see [set construction]).
188        ///
189        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
190        answer: ConfluxLinkResultChannel,
191    },
192}
193
194/// A message telling the reactor to do something.
195///
196/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
197/// never cause cells to sent on the channel,
198/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
199/// some of which instruct the reactor to send cells down the channel.
200#[derive(educe::Educe)]
201#[educe(Debug)]
202pub(crate) enum CtrlCmd {
203    /// Shut down the reactor.
204    Shutdown,
205    /// Extend the circuit by one hop, in response to an out-of-band handshake.
206    ///
207    /// (This is used for onion services, where the negotiation takes place in
208    /// INTRODUCE and RENDEZVOUS messages.)
209    #[cfg(feature = "hs-common")]
210    ExtendVirtual {
211        /// Which relay cell format to use for this hop.
212        relay_cell_format: RelayCellFormat,
213        /// The cryptographic algorithms and keys to use when communicating with
214        /// the newly added hop.
215        #[educe(Debug(ignore))]
216        cell_crypto: (
217            Box<dyn OutboundClientLayer + Send>,
218            Box<dyn InboundClientLayer + Send>,
219            Option<CircuitBinding>,
220        ),
221        /// A set of parameters used to configure this hop.
222        params: CircParameters,
223        /// Oneshot channel to notify on completion.
224        done: ReactorResultChannel<()>,
225    },
226    /// Begin accepting streams on this circuit.
227    #[cfg(feature = "hs-service")]
228    AwaitStreamRequest {
229        /// A channel for sending information about an incoming stream request.
230        incoming_sender: StreamReqSender,
231        /// A `CmdChecker` to keep track of which message types are acceptable.
232        cmd_checker: AnyCmdChecker,
233        /// Oneshot channel to notify on completion.
234        done: ReactorResultChannel<()>,
235        /// The hop that is allowed to create streams.
236        hop_num: HopNum,
237        /// A filter used to check requests before passing them on.
238        #[educe(Debug(ignore))]
239        #[cfg(feature = "hs-service")]
240        filter: Box<dyn IncomingStreamRequestFilter>,
241    },
242    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
243    #[cfg(test)]
244    AddFakeHop {
245        relay_cell_format: RelayCellFormat,
246        fwd_lasthop: bool,
247        rev_lasthop: bool,
248        params: CircParameters,
249        done: ReactorResultChannel<()>,
250    },
251    /// (tests only) Get the send window and expected tags for a given hop.
252    #[cfg(test)]
253    QuerySendWindow {
254        hop: HopNum,
255        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
256    },
257    /// Shut down the reactor, and return the underlying [`Circuit`],
258    /// if the tunnel is not multi-path.
259    ///
260    /// Returns an error if called on a multi-path reactor.
261    #[cfg(feature = "conflux")]
262    #[allow(unused)] // TODO(conflux)
263    ShutdownAndReturnCircuit {
264        /// Oneshot channel to return the underlying [`Circuit`],
265        /// or an error if the reactor's tunnel is multi-path.
266        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
267    },
268}
269
270/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
271///
272/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
273/// respectively, are handled.
274pub(crate) struct ControlHandler<'a> {
275    /// Reference to the reactor of this
276    reactor: &'a mut Reactor,
277}
278
279impl<'a> ControlHandler<'a> {
280    /// Constructor.
281    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
282        Self { reactor }
283    }
284
285    /// Handle a control message.
286    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
287        trace!("{}: reactor received {:?}", self.reactor.unique_id, msg);
288        match msg {
289            // This is handled earlier, since it requires blocking.
290            CtrlMsg::Create { done, .. } => {
291                if self.reactor.circuits.len() == 1 {
292                    // This should've been handled in Reactor::run_once()
293                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
294                    debug_assert!(self.reactor.circuits.single_leg()?.1.has_hops());
295                    // Don't care if the receiver goes away
296                    let _ = done.send(Err(tor_error::bad_api_usage!(
297                        "cannot create first hop twice"
298                    )
299                    .into()));
300                } else {
301                    // Don't care if the receiver goes away
302                    let _ = done.send(Err(tor_error::bad_api_usage!(
303                        "cannot create first hop on multipath tunnel"
304                    )
305                    .into()));
306                }
307
308                Ok(None)
309            }
310            CtrlMsg::ExtendNtor {
311                peer_id,
312                public_key,
313                linkspecs,
314                params,
315                done,
316            } => {
317                let Ok((leg_id, circ)) = self.reactor.circuits.single_leg_mut() else {
318                    // Don't care if the receiver goes away
319                    let _ = done.send(Err(tor_error::bad_api_usage!(
320                        "cannot extend multipath tunnel"
321                    )
322                    .into()));
323
324                    return Ok(None);
325                };
326
327                let (extender, cell) = CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
328                    RelayCellFormat::V0,
329                    peer_id,
330                    HandshakeType::NTOR,
331                    &public_key,
332                    linkspecs,
333                    params,
334                    &(),
335                    circ,
336                    done,
337                )?;
338                self.reactor
339                    .cell_handlers
340                    .set_meta_handler(Box::new(extender))?;
341
342                Ok(Some(RunOnceCmdInner::Send {
343                    leg: leg_id,
344                    cell,
345                    done: None,
346                }))
347            }
348            CtrlMsg::ExtendNtorV3 {
349                peer_id,
350                public_key,
351                linkspecs,
352                params,
353                done,
354            } => {
355                let Ok((leg_id, circ)) = self.reactor.circuits.single_leg_mut() else {
356                    // Don't care if the receiver goes away
357                    let _ = done.send(Err(tor_error::bad_api_usage!(
358                        "cannot extend multipath tunnel"
359                    )
360                    .into()));
361
362                    return Ok(None);
363                };
364
365                // TODO #1067, TODO #1947: support negotiating other formats.
366                let relay_cell_format = RelayCellFormat::V0;
367
368                // Set the client extensions.
369                // allow 'unused_mut' because of the combinations of `cfg` conditions below
370                #[allow(unused_mut)]
371                let mut client_extensions = Vec::new();
372
373                if params.ccontrol.is_enabled() {
374                    cfg_if::cfg_if! {
375                        if #[cfg(feature = "flowctl-cc")] {
376                            // TODO(arti#88): We have an `if false` in `exit_circparams_from_netparams`
377                            // which should prevent the above `is_enabled()` from ever being true,
378                            // even with the "flowctl-cc" feature enabled:
379                            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
380                            // The panic here is so that CI tests will hopefully catch if congestion
381                            // control is unexpectedly enabled.
382                            // We should remove this panic once xon/xoff flow is supported.
383                            #[cfg(not(test))]
384                            panic!("Congestion control is enabled on this circuit, but we don't yet support congestion control");
385
386                            #[allow(unreachable_code)]
387                            client_extensions.push(CircRequestExt::CcRequest(CcRequest::default()));
388                        } else {
389                            return Err(
390                                tor_error::internal!(
391                                    "Congestion control is enabled on this circuit, but 'flowctl-cc' feature is not enabled"
392                                )
393                                .into()
394                            );
395                        }
396                    }
397                }
398
399                let (extender, cell) =
400                    CircuitExtender::<NtorV3Client, Tor1RelayCrypto, _, _>::begin(
401                        relay_cell_format,
402                        peer_id,
403                        HandshakeType::NTOR_V3,
404                        &public_key,
405                        linkspecs,
406                        params,
407                        &client_extensions,
408                        circ,
409                        done,
410                    )?;
411                self.reactor
412                    .cell_handlers
413                    .set_meta_handler(Box::new(extender))?;
414
415                Ok(Some(RunOnceCmdInner::Send {
416                    leg: leg_id,
417                    cell,
418                    done: None,
419                }))
420            }
421            CtrlMsg::BeginStream {
422                hop,
423                message,
424                sender,
425                rx,
426                done,
427                cmd_checker,
428            } => {
429                // If resolving the hop fails,
430                // we want to report an error back to the initiator and not shut down the reactor.
431                let hop_location = match self.reactor.resolve_target_hop(hop) {
432                    Ok(x) => x,
433                    Err(e) => {
434                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
435                        // don't care if receiver goes away
436                        let _ = done.send(Err(e.into()));
437                        return Ok(None);
438                    }
439                };
440                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
441                    Ok(x) => x,
442                    Err(e) => {
443                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
444                        // don't care if receiver goes away
445                        let _ = done.send(Err(e.into()));
446                        return Ok(None);
447                    }
448                };
449                let circ = match self.reactor.circuits.leg_mut(leg_id) {
450                    Some(x) => x,
451                    None => {
452                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
453                        // don't care if receiver goes away
454                        let _ = done.send(Err(e.into()));
455                        return Ok(None);
456                    }
457                };
458
459                let cell = circ.begin_stream(hop_num, message, sender, rx, cmd_checker)?;
460                Ok(Some(RunOnceCmdInner::BeginStream {
461                    leg: leg_id,
462                    cell,
463                    hop: hop_location,
464                    done,
465                }))
466            }
467            #[cfg(feature = "hs-service")]
468            CtrlMsg::ClosePendingStream {
469                hop,
470                stream_id,
471                message,
472                done,
473            } => Ok(Some(RunOnceCmdInner::CloseStream {
474                hop,
475                sid: stream_id,
476                behav: message,
477                reason: streammap::TerminateReason::ExplicitEnd,
478                done: Some(done),
479            })),
480            // TODO(#1860): remove stream-level sendme support
481            CtrlMsg::SendSendme {
482                stream_id,
483                hop,
484                sender,
485            } => {
486                // If resolving the hop fails,
487                // we want to report an error back to the initiator and not shut down the reactor.
488                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
489                    Ok(x) => x,
490                    Err(e) => {
491                        let e = into_bad_api_usage!("Could not resolve hop {hop:?}")(e);
492                        // don't care if receiver goes away
493                        let _ = sender.send(Err(e.into()));
494                        return Ok(None);
495                    }
496                };
497
498                // Congestion control decides if we can send stream level SENDMEs or not.
499                let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num) {
500                    Some(x) => x,
501                    None => {
502                        // don't care if receiver goes away
503                        let _ = sender.send(Err(bad_api_usage!(
504                            "Unknown hop {hop_num:?} on leg {leg_id:?}"
505                        )
506                        .into()));
507                        return Ok(None);
508                    }
509                };
510
511                if !sendme_required {
512                    // don't care if receiver goes away
513                    let _ = sender.send(Ok(()));
514                    return Ok(None);
515                }
516
517                let sendme = Sendme::new_empty();
518                let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
519
520                let cell = SendRelayCell {
521                    hop: hop_num,
522                    early: false,
523                    cell,
524                };
525
526                Ok(Some(RunOnceCmdInner::Send {
527                    leg: leg_id,
528                    cell,
529                    done: Some(sender),
530                }))
531            }
532            // TODO(conflux): this should specify which leg to send the msg on
533            // (currently we send it down the primary leg).
534            //
535            // This will involve updating ClientCIrc::send_raw_msg() to take a
536            // leg id argument (which is a breaking change.
537            #[cfg(feature = "send-control-msg")]
538            CtrlMsg::SendMsg {
539                hop_num,
540                msg,
541                sender,
542            } => {
543                let cell = AnyRelayMsgOuter::new(None, msg);
544                let cell = SendRelayCell {
545                    hop: hop_num,
546                    early: false,
547                    cell,
548                };
549
550                let leg = self.reactor.circuits.primary_leg_id();
551
552                Ok(Some(RunOnceCmdInner::Send {
553                    leg,
554                    cell,
555                    done: Some(sender),
556                }))
557            }
558            // TODO(conflux): this should specify which leg to send the msg on
559            // (currently we send it down the primary leg)
560            #[cfg(feature = "send-control-msg")]
561            CtrlMsg::SendMsgAndInstallHandler {
562                msg,
563                handler,
564                sender,
565            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
566                msg,
567                handler,
568                done: sender,
569            })),
570            CtrlMsg::FirstHopClockSkew { answer } => {
571                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
572            }
573            #[cfg(feature = "conflux")]
574            CtrlMsg::LinkCircuits { circuits, answer } => {
575                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
576            }
577        }
578    }
579
580    /// Handle a control command.
581    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
582        trace!("{}: reactor received {:?}", self.reactor.unique_id, msg);
583        match msg {
584            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
585            #[cfg(feature = "hs-common")]
586            #[allow(unreachable_code)]
587            CtrlCmd::ExtendVirtual {
588                relay_cell_format: format,
589                cell_crypto,
590                params,
591                done,
592            } => {
593                let (outbound, inbound, binding) = cell_crypto;
594
595                // TODO HS: Perhaps this should describe the onion service, or
596                // describe why the virtual hop was added, or something?
597                let peer_id = path::HopDetail::Virtual;
598
599                let Ok((_id, leg)) = self.reactor.circuits.single_leg_mut() else {
600                    // Don't care if the receiver goes away
601                    let _ = done.send(Err(tor_error::bad_api_usage!(
602                        "cannot extend multipath tunnel"
603                    )
604                    .into()));
605
606                    return Ok(());
607                };
608
609                leg.add_hop(format, peer_id, outbound, inbound, binding, &params)?;
610                let _ = done.send(Ok(()));
611
612                Ok(())
613            }
614            #[cfg(feature = "hs-service")]
615            CtrlCmd::AwaitStreamRequest {
616                cmd_checker,
617                incoming_sender,
618                hop_num,
619                done,
620                filter,
621            } => {
622                // TODO: At some point we might want to add a CtrlCmd for
623                // de-registering the handler.  See comments on `allow_stream_requests`.
624                let handler = IncomingStreamRequestHandler {
625                    incoming_sender,
626                    cmd_checker,
627                    hop_num,
628                    filter,
629                };
630
631                let ret = self
632                    .reactor
633                    .cell_handlers
634                    .set_incoming_stream_req_handler(handler);
635                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
636
637                Ok(())
638            }
639            #[cfg(test)]
640            CtrlCmd::AddFakeHop {
641                relay_cell_format,
642                fwd_lasthop,
643                rev_lasthop,
644                params,
645                done,
646            } => {
647                let Ok((_id, leg)) = self.reactor.circuits.single_leg_mut() else {
648                    // Don't care if the receiver goes away
649                    let _ = done.send(Err(tor_error::bad_api_usage!(
650                        "cannot add fake hop to multipath tunnel"
651                    )
652                    .into()));
653
654                    return Ok(());
655                };
656
657                leg.handle_add_fake_hop(relay_cell_format, fwd_lasthop, rev_lasthop, &params, done);
658
659                Ok(())
660            }
661            #[cfg(test)]
662            CtrlCmd::QuerySendWindow { hop, done } => {
663                // Immediately invoked function means that errors will be sent to the channel.
664                let _ = done.send((|| {
665                    let (_id, leg) =
666                        self.reactor
667                            .circuits
668                            .single_leg_mut()
669                            .map_err(into_bad_api_usage!(
670                                "cannot query send window of multipath tunnel"
671                            ))?;
672
673                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
674                        "received QuerySendWindow for unknown hop {}",
675                        hop.display()
676                    ))?;
677
678                    Ok(hop.send_window_and_expected_tags())
679                })());
680
681                Ok(())
682            }
683            #[cfg(feature = "conflux")]
684            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
685                self.reactor.handle_shutdown_and_return_circuit(answer)
686            }
687        }
688    }
689}