tor_proto/tunnel/reactor/control.rs
1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5 CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6 RunOnceCmdInner, SendRelayCell,
7};
8use crate::circuit::HopSettings;
9use crate::crypto::binding::CircuitBinding;
10use crate::crypto::cell::{InboundClientLayer, OutboundClientLayer, Tor1RelayCrypto};
11use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
12use crate::stream::queue::StreamQueueSender;
13use crate::stream::{AnyCmdChecker, DrainRateRequest, StreamRateLimit};
14use crate::tunnel::circuit::celltypes::CreateResponse;
15use crate::tunnel::circuit::path;
16use crate::tunnel::reactor::circuit::circ_extensions_from_settings;
17use crate::tunnel::reactor::{NoJoinPointError, NtorClient, ReactorError};
18use crate::tunnel::{streammap, HopLocation, TargetHop};
19use crate::util::notify::NotifySender;
20use crate::util::skew::ClockSkew;
21use crate::Result;
22#[cfg(test)]
23use crate::{circuit::CircParameters, circuit::UniqId, crypto::cell::HopNum};
24use postage::watch;
25use tor_cell::chancell::msg::HandshakeType;
26use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
27use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
28use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId};
29use tor_error::{bad_api_usage, internal, into_bad_api_usage, warn_report, Bug};
30use tracing::{debug, trace};
31#[cfg(feature = "hs-service")]
32use {
33 super::StreamReqSender, crate::stream::IncomingStreamRequestFilter,
34 crate::tunnel::reactor::IncomingStreamRequestHandler,
35};
36
37#[cfg(test)]
38use tor_cell::relaycell::msg::SendmeTag;
39
40#[cfg(feature = "conflux")]
41use super::{Circuit, ConfluxLinkResultChannel};
42
43use oneshot_fused_workaround as oneshot;
44
45use crate::crypto::handshake::ntor::NtorPublicKey;
46use crate::tunnel::circuit::StreamMpscReceiver;
47use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
48
49use std::result::Result as StdResult;
50
51/// A message telling the reactor to do something.
52///
53/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
54///
55/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
56/// cause the reactor to send cells on the reactor's `chan_sender`,
57/// whereas `CtrlCmd` do not.
58#[derive(educe::Educe)]
59#[educe(Debug)]
60pub(crate) enum CtrlMsg {
61 /// Create the first hop of this circuit.
62 Create {
63 /// A oneshot channel on which we'll receive the creation response.
64 recv_created: oneshot::Receiver<CreateResponse>,
65 /// The handshake type to use for the first hop.
66 handshake: CircuitHandshake,
67 /// Other parameters relevant for circuit creation.
68 settings: HopSettings,
69 /// Oneshot channel to notify on completion.
70 done: ReactorResultChannel<()>,
71 },
72 /// Extend a circuit by one hop, using the ntor handshake.
73 ExtendNtor {
74 /// The peer that we're extending to.
75 ///
76 /// Used to extend our record of the circuit's path.
77 peer_id: OwnedChanTarget,
78 /// The handshake type to use for this hop.
79 public_key: NtorPublicKey,
80 /// Information about how to connect to the relay we're extending to.
81 linkspecs: Vec<EncodedLinkSpec>,
82 /// Other parameters we are negotiating.
83 settings: HopSettings,
84 /// Oneshot channel to notify on completion.
85 done: ReactorResultChannel<()>,
86 },
87 /// Extend a circuit by one hop, using the ntorv3 handshake.
88 ExtendNtorV3 {
89 /// The peer that we're extending to.
90 ///
91 /// Used to extend our record of the circuit's path.
92 peer_id: OwnedChanTarget,
93 /// The handshake type to use for this hop.
94 public_key: NtorV3PublicKey,
95 /// Information about how to connect to the relay we're extending to.
96 linkspecs: Vec<EncodedLinkSpec>,
97 /// Other parameters we are negotiating.
98 settings: HopSettings,
99 /// Oneshot channel to notify on completion.
100 done: ReactorResultChannel<()>,
101 },
102 /// Begin a stream with the provided hop in this circuit.
103 ///
104 /// Allocates a stream ID, and sends the provided message to that hop.
105 BeginStream {
106 /// The hop number to begin the stream with.
107 hop: TargetHop,
108 /// The message to send.
109 message: AnyRelayMsg,
110 /// A channel to send messages on this stream down.
111 ///
112 /// This sender shouldn't ever block, because we use congestion control and only send
113 /// SENDME cells once we've read enough out of the other end. If it *does* block, we
114 /// can assume someone is trying to send us more cells than they should, and abort
115 /// the stream.
116 sender: StreamQueueSender,
117 /// A channel to receive messages to send on this stream from.
118 rx: StreamMpscReceiver<AnyRelayMsg>,
119 /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
120 rate_limit_notifier: watch::Sender<StreamRateLimit>,
121 /// Notifies the stream reader when it should send a new drain rate.
122 drain_rate_requester: NotifySender<DrainRateRequest>,
123 /// Oneshot channel to notify on completion, with the allocated stream ID.
124 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
125 /// A `CmdChecker` to keep track of which message types are acceptable.
126 cmd_checker: AnyCmdChecker,
127 },
128 /// Close the specified pending incoming stream, sending the provided END message.
129 ///
130 /// A stream is said to be pending if the message for initiating the stream was received but
131 /// not has not been responded to yet.
132 ///
133 /// This should be used by responders for closing pending incoming streams initiated by the
134 /// other party on the circuit.
135 #[cfg(feature = "hs-service")]
136 ClosePendingStream {
137 /// The hop number the stream is on.
138 hop: HopLocation,
139 /// The stream ID to send the END for.
140 stream_id: StreamId,
141 /// The END message to send, if any.
142 message: CloseStreamBehavior,
143 /// Oneshot channel to notify on completion.
144 done: ReactorResultChannel<()>,
145 },
146 /// Send a given control message on this circuit.
147 #[cfg(feature = "send-control-msg")]
148 SendMsg {
149 /// The hop to receive this message.
150 hop: TargetHop,
151 /// The message to send.
152 msg: AnyRelayMsg,
153 /// A sender that we use to tell the caller that the message was sent
154 /// and the handler installed.
155 sender: oneshot::Sender<Result<()>>,
156 },
157 /// Send a given control message on this circuit, and install a control-message handler to
158 /// receive responses.
159 #[cfg(feature = "send-control-msg")]
160 SendMsgAndInstallHandler {
161 /// The message to send, if any
162 msg: Option<AnyRelayMsgOuter>,
163 /// A message handler to install.
164 ///
165 /// If this is `None`, there must already be a message handler installed
166 #[educe(Debug(ignore))]
167 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
168 /// A sender that we use to tell the caller that the message was sent
169 /// and the handler installed.
170 sender: oneshot::Sender<Result<()>>,
171 },
172 /// Inform the reactor that there's a flow control update for a given stream.
173 ///
174 /// The reactor will decide how to handle this update depending on the type of flow control and
175 /// the current state of the stream.
176 FlowCtrlUpdate {
177 /// The type of flow control update, and any associated metadata.
178 msg: FlowCtrlMsg,
179 /// The stream ID that the update is for.
180 stream_id: StreamId,
181 /// The hop that the stream is on.
182 hop: HopLocation,
183 },
184 /// Get the clock skew claimed by the first hop of the circuit.
185 FirstHopClockSkew {
186 /// Oneshot channel to return the clock skew.
187 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
188 },
189 /// Link the specified circuits into the current tunnel,
190 /// to form a multi-path tunnel.
191 #[cfg(feature = "conflux")]
192 #[allow(unused)] // TODO(conflux)
193 LinkCircuits {
194 /// The circuits to link into the tunnel,
195 #[educe(Debug(ignore))]
196 circuits: Vec<Circuit>,
197 /// Oneshot channel to notify sender when all the specified circuits have finished linking,
198 /// or have failed to link.
199 ///
200 /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
201 /// (see [set construction]).
202 ///
203 /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
204 answer: ConfluxLinkResultChannel,
205 },
206}
207
208/// A message telling the reactor to do something.
209///
210/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
211/// never cause cells to sent on the channel,
212/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
213/// some of which instruct the reactor to send cells down the channel.
214#[derive(educe::Educe)]
215#[educe(Debug)]
216pub(crate) enum CtrlCmd {
217 /// Shut down the reactor.
218 Shutdown,
219 /// Extend the circuit by one hop, in response to an out-of-band handshake.
220 ///
221 /// (This is used for onion services, where the negotiation takes place in
222 /// INTRODUCE and RENDEZVOUS messages.)
223 #[cfg(feature = "hs-common")]
224 ExtendVirtual {
225 /// The cryptographic algorithms and keys to use when communicating with
226 /// the newly added hop.
227 #[educe(Debug(ignore))]
228 cell_crypto: (
229 Box<dyn OutboundClientLayer + Send>,
230 Box<dyn InboundClientLayer + Send>,
231 Option<CircuitBinding>,
232 ),
233 /// A set of parameters to negotiate with this hop.
234 settings: HopSettings,
235 /// Oneshot channel to notify on completion.
236 done: ReactorResultChannel<()>,
237 },
238 /// Resolve a given [`TargetHop`] into a precise [`HopLocation`].
239 ResolveTargetHop {
240 /// The target hop to resolve.
241 hop: TargetHop,
242 /// Oneshot channel to notify on completion.
243 done: ReactorResultChannel<HopLocation>,
244 },
245 /// Begin accepting streams on this circuit.
246 #[cfg(feature = "hs-service")]
247 AwaitStreamRequest {
248 /// A channel for sending information about an incoming stream request.
249 incoming_sender: StreamReqSender,
250 /// A `CmdChecker` to keep track of which message types are acceptable.
251 cmd_checker: AnyCmdChecker,
252 /// Oneshot channel to notify on completion.
253 done: ReactorResultChannel<()>,
254 /// The hop that is allowed to create streams.
255 hop: TargetHop,
256 /// A filter used to check requests before passing them on.
257 #[educe(Debug(ignore))]
258 #[cfg(feature = "hs-service")]
259 filter: Box<dyn IncomingStreamRequestFilter>,
260 },
261 /// Request the binding key of a target hop.
262 #[cfg(feature = "hs-service")]
263 GetBindingKey {
264 /// The hop for which we want the key.
265 hop: TargetHop,
266 /// Oneshot channel to notify on completion.
267 done: ReactorResultChannel<Option<CircuitBinding>>,
268 },
269 /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
270 #[cfg(test)]
271 AddFakeHop {
272 relay_cell_format: RelayCellFormat,
273 fwd_lasthop: bool,
274 rev_lasthop: bool,
275 peer_id: path::HopDetail,
276 params: CircParameters,
277 done: ReactorResultChannel<()>,
278 },
279 /// (tests only) Get the send window and expected tags for a given hop.
280 #[cfg(test)]
281 QuerySendWindow {
282 hop: HopNum,
283 leg: UniqId,
284 done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
285 },
286 /// Shut down the reactor, and return the underlying [`Circuit`],
287 /// if the tunnel is not multi-path.
288 ///
289 /// Returns an error if called on a multi-path reactor.
290 #[cfg(feature = "conflux")]
291 #[allow(unused)] // TODO(conflux)
292 ShutdownAndReturnCircuit {
293 /// Oneshot channel to return the underlying [`Circuit`],
294 /// or an error if the reactor's tunnel is multi-path.
295 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
296 },
297}
298
299/// A flow control update message.
300#[derive(Debug)]
301pub(crate) enum FlowCtrlMsg {
302 /// Send a SENDME message on this stream.
303 Sendme,
304 /// Send an XON message on this stream with the given rate.
305 Xon(XonKbpsEwma),
306}
307
308/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
309///
310/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
311/// respectively, are handled.
312pub(crate) struct ControlHandler<'a> {
313 /// Reference to the reactor of this
314 reactor: &'a mut Reactor,
315}
316
317impl<'a> ControlHandler<'a> {
318 /// Constructor.
319 pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
320 Self { reactor }
321 }
322
323 /// Handle a control message.
324 pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
325 trace!(
326 tunnel_id = %self.reactor.tunnel_id,
327 msg = ?msg,
328 "reactor received control message"
329 );
330
331 match msg {
332 // This is handled earlier, since it requires blocking.
333 CtrlMsg::Create { done, .. } => {
334 if self.reactor.circuits.len() == 1 {
335 // This should've been handled in Reactor::run_once()
336 // (ControlHandler::handle_msg() is never called before wait_for_create()).
337 debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
338 // Don't care if the receiver goes away
339 let _ = done.send(Err(tor_error::bad_api_usage!(
340 "cannot create first hop twice"
341 )
342 .into()));
343 } else {
344 // Don't care if the receiver goes away
345 let _ = done.send(Err(tor_error::bad_api_usage!(
346 "cannot create first hop on multipath tunnel"
347 )
348 .into()));
349 }
350
351 Ok(None)
352 }
353 CtrlMsg::ExtendNtor {
354 peer_id,
355 public_key,
356 linkspecs,
357 settings,
358 done,
359 } => {
360 let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
361 // Don't care if the receiver goes away
362 let _ = done.send(Err(tor_error::bad_api_usage!(
363 "cannot extend multipath tunnel"
364 )
365 .into()));
366
367 return Ok(None);
368 };
369
370 let (extender, cell) = CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
371 peer_id,
372 HandshakeType::NTOR,
373 &public_key,
374 linkspecs,
375 settings,
376 &(),
377 circ,
378 done,
379 )?;
380 self.reactor
381 .cell_handlers
382 .set_meta_handler(Box::new(extender))?;
383
384 Ok(Some(RunOnceCmdInner::Send {
385 leg: circ.unique_id(),
386 cell,
387 done: None,
388 }))
389 }
390 CtrlMsg::ExtendNtorV3 {
391 peer_id,
392 public_key,
393 linkspecs,
394 settings,
395 done,
396 } => {
397 let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
398 // Don't care if the receiver goes away
399 let _ = done.send(Err(tor_error::bad_api_usage!(
400 "cannot extend multipath tunnel"
401 )
402 .into()));
403
404 return Ok(None);
405 };
406
407 let client_extensions = circ_extensions_from_settings(&settings)?;
408
409 let (extender, cell) =
410 CircuitExtender::<NtorV3Client, Tor1RelayCrypto, _, _>::begin(
411 peer_id,
412 HandshakeType::NTOR_V3,
413 &public_key,
414 linkspecs,
415 settings,
416 &client_extensions,
417 circ,
418 done,
419 )?;
420 self.reactor
421 .cell_handlers
422 .set_meta_handler(Box::new(extender))?;
423
424 Ok(Some(RunOnceCmdInner::Send {
425 leg: circ.unique_id(),
426 cell,
427 done: None,
428 }))
429 }
430 CtrlMsg::BeginStream {
431 hop,
432 message,
433 sender,
434 rx,
435 rate_limit_notifier,
436 drain_rate_requester,
437 done,
438 cmd_checker,
439 } => {
440 // If resolving the hop fails,
441 // we want to report an error back to the initiator and not shut down the reactor.
442 let hop_location = match self.reactor.resolve_target_hop(hop) {
443 Ok(x) => x,
444 Err(e) => {
445 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
446 // don't care if receiver goes away
447 let _ = done.send(Err(e.into()));
448 return Ok(None);
449 }
450 };
451 let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
452 Ok(x) => x,
453 Err(e) => {
454 let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
455 // don't care if receiver goes away
456 let _ = done.send(Err(e.into()));
457 return Ok(None);
458 }
459 };
460 let circ = match self.reactor.circuits.leg_mut(leg_id) {
461 Some(x) => x,
462 None => {
463 let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
464 // don't care if receiver goes away
465 let _ = done.send(Err(e.into()));
466 return Ok(None);
467 }
468 };
469
470 let cell = circ.begin_stream(
471 hop_num,
472 message,
473 sender,
474 rx,
475 rate_limit_notifier,
476 drain_rate_requester,
477 cmd_checker,
478 )?;
479 Ok(Some(RunOnceCmdInner::BeginStream {
480 leg: leg_id,
481 cell,
482 hop: hop_location,
483 done,
484 }))
485 }
486 #[cfg(feature = "hs-service")]
487 CtrlMsg::ClosePendingStream {
488 hop,
489 stream_id,
490 message,
491 done,
492 } => Ok(Some(RunOnceCmdInner::CloseStream {
493 hop,
494 sid: stream_id,
495 behav: message,
496 reason: streammap::TerminateReason::ExplicitEnd,
497 done: Some(done),
498 })),
499 CtrlMsg::FlowCtrlUpdate {
500 msg,
501 stream_id,
502 hop,
503 } => {
504 match msg {
505 FlowCtrlMsg::Sendme => {
506 let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
507 Ok(x) => x,
508 Err(NoJoinPointError) => {
509 // A stream tried to send a stream-level SENDME message to the join point of
510 // a tunnel that has never had a join point. Currently in arti, only a
511 // `StreamTarget` asks us to send a stream-level SENDME, and this tunnel
512 // originally created the `StreamTarget` to begin with. So this is a
513 // legitimate bug somewhere in the tunnel code.
514 let err = internal!(
515 "Could not send a stream-level SENDME to a join point on a tunnel without a join point",
516 );
517 // TODO: Rather than calling `warn_report` here, we should call
518 // `trace_report!` from `Reactor::run_once()`. Since this is an internal
519 // error, `trace_report!` should log it at "warn" level.
520 warn_report!(err, "Tunnel reactor error");
521 return Err(err.into());
522 }
523 };
524
525 // Congestion control decides if we can send stream level SENDMEs or not.
526 let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num)
527 {
528 Some(x) => x,
529 None => {
530 // The leg/hop has disappeared. This is fine since the stream may have ended
531 // and been cleaned up while this `CtrlMsg::SendSendme` message was queued.
532 // It is possible that is a bug and this is an incorrect leg/hop number, but
533 // it's not currently possible to differentiate between an incorrect leg/hop
534 // number and a circuit hop that has been closed.
535 debug!("Could not send a stream-level SENDME on a hop that does not exist. Ignoring.");
536 return Ok(None);
537 }
538 };
539
540 if !sendme_required {
541 // Nothing to do, so discard the SENDME.
542 return Ok(None);
543 }
544
545 let sendme = Sendme::new_empty();
546 let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
547
548 let cell = SendRelayCell {
549 hop: hop_num,
550 early: false,
551 cell,
552 };
553
554 Ok(Some(RunOnceCmdInner::Send {
555 leg: leg_id,
556 cell,
557 done: None,
558 }))
559 }
560 FlowCtrlMsg::Xon(rate) => Ok(Some(RunOnceCmdInner::MaybeSendXon {
561 rate,
562 hop,
563 stream_id,
564 })),
565 }
566 }
567 // TODO(conflux): this should specify which leg to send the msg on
568 // (currently we send it down the primary leg).
569 //
570 // This will involve updating ClientCIrc::send_raw_msg() to take a
571 // leg id argument (which is a breaking change.
572 #[cfg(feature = "send-control-msg")]
573 CtrlMsg::SendMsg { hop, msg, sender } => {
574 let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
575 // Don't care if receiver goes away
576 let _ = sender.send(Err(bad_api_usage!("Unknown {hop:?}").into()));
577 return Ok(None);
578 };
579
580 let cell = AnyRelayMsgOuter::new(None, msg);
581 let cell = SendRelayCell {
582 hop: hop_num,
583 early: false,
584 cell,
585 };
586
587 Ok(Some(RunOnceCmdInner::Send {
588 leg: leg_id,
589 cell,
590 done: Some(sender),
591 }))
592 }
593 // TODO(conflux): this should specify which leg to send the msg on
594 // (currently we send it down the primary leg)
595 #[cfg(feature = "send-control-msg")]
596 CtrlMsg::SendMsgAndInstallHandler {
597 msg,
598 handler,
599 sender,
600 } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
601 msg,
602 handler,
603 done: sender,
604 })),
605 CtrlMsg::FirstHopClockSkew { answer } => {
606 Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
607 }
608 #[cfg(feature = "conflux")]
609 CtrlMsg::LinkCircuits { circuits, answer } => {
610 Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
611 }
612 }
613 }
614
615 /// Handle a control command.
616 #[allow(clippy::needless_pass_by_value)] // Needed when conflux is enabled
617 pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
618 trace!(
619 tunnel_id = %self.reactor.tunnel_id,
620 msg = ?msg,
621 "reactor received control command"
622 );
623
624 match msg {
625 CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
626 #[cfg(feature = "hs-common")]
627 #[allow(unreachable_code)]
628 CtrlCmd::ExtendVirtual {
629 cell_crypto,
630 settings,
631 done,
632 } => {
633 let (outbound, inbound, binding) = cell_crypto;
634
635 // TODO HS: Perhaps this should describe the onion service, or
636 // describe why the virtual hop was added, or something?
637 let peer_id = path::HopDetail::Virtual;
638
639 let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
640 // Don't care if the receiver goes away
641 let _ = done.send(Err(tor_error::bad_api_usage!(
642 "cannot extend multipath tunnel"
643 )
644 .into()));
645
646 return Ok(());
647 };
648
649 leg.add_hop(peer_id, outbound, inbound, binding, &settings)?;
650 let _ = done.send(Ok(()));
651
652 Ok(())
653 }
654 CtrlCmd::ResolveTargetHop { hop, done } => {
655 let _ = done.send(
656 self.reactor
657 .resolve_target_hop(hop)
658 .map_err(|_| crate::util::err::Error::NoSuchHop),
659 );
660 Ok(())
661 }
662 #[cfg(feature = "hs-service")]
663 CtrlCmd::AwaitStreamRequest {
664 cmd_checker,
665 incoming_sender,
666 hop,
667 done,
668 filter,
669 } => {
670 let Some((_, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
671 let _ = done.send(Err(crate::Error::NoSuchHop));
672 return Ok(());
673 };
674 // TODO: At some point we might want to add a CtrlCmd for
675 // de-registering the handler. See comments on `allow_stream_requests`.
676 let handler = IncomingStreamRequestHandler {
677 incoming_sender,
678 cmd_checker,
679 hop_num,
680 filter,
681 };
682
683 let ret = self
684 .reactor
685 .cell_handlers
686 .set_incoming_stream_req_handler(handler);
687 let _ = done.send(ret); // don't care if the corresponding receiver goes away.
688
689 Ok(())
690 }
691 #[cfg(feature = "hs-service")]
692 CtrlCmd::GetBindingKey { hop, done } => {
693 let Some((leg_id, hop_num)) = self.reactor.target_hop_to_hopnum_id(hop) else {
694 let _ = done.send(Err(tor_error::internal!(
695 "Unknown TargetHop when getting binding key"
696 )
697 .into()));
698 return Ok(());
699 };
700 let Some(circuit) = self.reactor.circuits.leg(leg_id) else {
701 let _ = done.send(Err(tor_error::bad_api_usage!(
702 "Unknown circuit id {leg_id} when getting binding key"
703 )
704 .into()));
705 return Ok(());
706 };
707 // Get the binding key from the mutable state and send it back.
708 let key = circuit.mutable().binding_key(hop_num);
709 let _ = done.send(Ok(key));
710
711 Ok(())
712 }
713 #[cfg(test)]
714 CtrlCmd::AddFakeHop {
715 relay_cell_format,
716 fwd_lasthop,
717 rev_lasthop,
718 peer_id,
719 params,
720 done,
721 } => {
722 let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
723 // Don't care if the receiver goes away
724 let _ = done.send(Err(tor_error::bad_api_usage!(
725 "cannot add fake hop to multipath tunnel"
726 )
727 .into()));
728
729 return Ok(());
730 };
731
732 leg.handle_add_fake_hop(
733 relay_cell_format,
734 fwd_lasthop,
735 rev_lasthop,
736 peer_id,
737 ¶ms,
738 done,
739 );
740
741 Ok(())
742 }
743 #[cfg(test)]
744 CtrlCmd::QuerySendWindow { hop, leg, done } => {
745 // Immediately invoked function means that errors will be sent to the channel.
746 let _ = done.send((|| {
747 let leg = self.reactor.circuits.leg_mut(leg).ok_or_else(|| {
748 bad_api_usage!("cannot query send window of non-existent circuit")
749 })?;
750
751 let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
752 "received QuerySendWindow for unknown hop {}",
753 hop.display()
754 ))?;
755
756 Ok(hop.send_window_and_expected_tags())
757 })());
758
759 Ok(())
760 }
761 #[cfg(feature = "conflux")]
762 CtrlCmd::ShutdownAndReturnCircuit { answer } => {
763 self.reactor.handle_shutdown_and_return_circuit(answer)
764 }
765 }
766 }
767}