tor_proto/tunnel/reactor/control.rs
1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5 CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6 RunOnceCmdInner, SendRelayCell,
7};
8#[cfg(test)]
9use crate::circuit::CircParameters;
10use crate::circuit::HopSettings;
11use crate::crypto::binding::CircuitBinding;
12use crate::crypto::cell::{HopNum, InboundClientLayer, OutboundClientLayer, Tor1RelayCrypto};
13use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
14use crate::stream::AnyCmdChecker;
15use crate::tunnel::circuit::celltypes::CreateResponse;
16use crate::tunnel::circuit::path;
17use crate::tunnel::reactor::circuit::circ_extensions_from_settings;
18use crate::tunnel::reactor::{NtorClient, ReactorError};
19use crate::tunnel::{streammap, HopLocation, TargetHop};
20use crate::util::skew::ClockSkew;
21use crate::Result;
22use tor_cell::chancell::msg::HandshakeType;
23use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
24use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
25use tor_error::{bad_api_usage, into_bad_api_usage, Bug};
26use tracing::trace;
27#[cfg(feature = "hs-service")]
28use {
29 super::StreamReqSender, crate::stream::IncomingStreamRequestFilter,
30 crate::tunnel::reactor::IncomingStreamRequestHandler,
31};
32
33#[cfg(test)]
34use tor_cell::relaycell::msg::SendmeTag;
35
36#[cfg(feature = "conflux")]
37use super::{Circuit, ConfluxLinkResultChannel};
38
39use oneshot_fused_workaround as oneshot;
40
41use crate::crypto::handshake::ntor::NtorPublicKey;
42use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
43use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
44
45use std::result::Result as StdResult;
46
47/// A message telling the reactor to do something.
48///
49/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
50///
51/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
52/// cause the reactor to send cells on the reactor's `chan_sender`,
53/// whereas `CtrlCmd` do not.
54#[derive(educe::Educe)]
55#[educe(Debug)]
56pub(crate) enum CtrlMsg {
57 /// Create the first hop of this circuit.
58 Create {
59 /// A oneshot channel on which we'll receive the creation response.
60 recv_created: oneshot::Receiver<CreateResponse>,
61 /// The handshake type to use for the first hop.
62 handshake: CircuitHandshake,
63 /// Other parameters relevant for circuit creation.
64 settings: HopSettings,
65 /// Oneshot channel to notify on completion.
66 done: ReactorResultChannel<()>,
67 },
68 /// Extend a circuit by one hop, using the ntor handshake.
69 ExtendNtor {
70 /// The peer that we're extending to.
71 ///
72 /// Used to extend our record of the circuit's path.
73 peer_id: OwnedChanTarget,
74 /// The handshake type to use for this hop.
75 public_key: NtorPublicKey,
76 /// Information about how to connect to the relay we're extending to.
77 linkspecs: Vec<EncodedLinkSpec>,
78 /// Other parameters we are negotiating.
79 settings: HopSettings,
80 /// Oneshot channel to notify on completion.
81 done: ReactorResultChannel<()>,
82 },
83 /// Extend a circuit by one hop, using the ntorv3 handshake.
84 ExtendNtorV3 {
85 /// The peer that we're extending to.
86 ///
87 /// Used to extend our record of the circuit's path.
88 peer_id: OwnedChanTarget,
89 /// The handshake type to use for this hop.
90 public_key: NtorV3PublicKey,
91 /// Information about how to connect to the relay we're extending to.
92 linkspecs: Vec<EncodedLinkSpec>,
93 /// Other parameters we are negotiating.
94 settings: HopSettings,
95 /// Oneshot channel to notify on completion.
96 done: ReactorResultChannel<()>,
97 },
98 /// Begin a stream with the provided hop in this circuit.
99 ///
100 /// Allocates a stream ID, and sends the provided message to that hop.
101 BeginStream {
102 /// The hop number to begin the stream with.
103 hop: TargetHop,
104 /// The message to send.
105 message: AnyRelayMsg,
106 /// A channel to send messages on this stream down.
107 ///
108 /// This sender shouldn't ever block, because we use congestion control and only send
109 /// SENDME cells once we've read enough out of the other end. If it *does* block, we
110 /// can assume someone is trying to send us more cells than they should, and abort
111 /// the stream.
112 sender: StreamMpscSender<UnparsedRelayMsg>,
113 /// A channel to receive messages to send on this stream from.
114 rx: StreamMpscReceiver<AnyRelayMsg>,
115 /// Oneshot channel to notify on completion, with the allocated stream ID.
116 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
117 /// A `CmdChecker` to keep track of which message types are acceptable.
118 cmd_checker: AnyCmdChecker,
119 },
120 /// Close the specified pending incoming stream, sending the provided END message.
121 ///
122 /// A stream is said to be pending if the message for initiating the stream was received but
123 /// not has not been responded to yet.
124 ///
125 /// This should be used by responders for closing pending incoming streams initiated by the
126 /// other party on the circuit.
127 #[cfg(feature = "hs-service")]
128 ClosePendingStream {
129 /// The hop number the stream is on.
130 hop: HopLocation,
131 /// The stream ID to send the END for.
132 stream_id: StreamId,
133 /// The END message to send, if any.
134 message: CloseStreamBehavior,
135 /// Oneshot channel to notify on completion.
136 done: ReactorResultChannel<()>,
137 },
138 /// Send a given control message on this circuit.
139 #[cfg(feature = "send-control-msg")]
140 SendMsg {
141 /// The hop to receive this message.
142 hop_num: HopNum,
143 /// The message to send.
144 msg: AnyRelayMsg,
145 /// A sender that we use to tell the caller that the message was sent
146 /// and the handler installed.
147 sender: oneshot::Sender<Result<()>>,
148 },
149 /// Send a given control message on this circuit, and install a control-message handler to
150 /// receive responses.
151 #[cfg(feature = "send-control-msg")]
152 SendMsgAndInstallHandler {
153 /// The message to send, if any
154 msg: Option<AnyRelayMsgOuter>,
155 /// A message handler to install.
156 ///
157 /// If this is `None`, there must already be a message handler installed
158 #[educe(Debug(ignore))]
159 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
160 /// A sender that we use to tell the caller that the message was sent
161 /// and the handler installed.
162 sender: oneshot::Sender<Result<()>>,
163 },
164 /// Send a SENDME cell (used to ask for more data to be sent) on the given stream.
165 SendSendme {
166 /// The stream ID to send a SENDME for.
167 stream_id: StreamId,
168 /// The hop number the stream is on.
169 hop: HopLocation,
170 /// A sender that we use to tell the caller that the SENDME was sent.
171 sender: oneshot::Sender<Result<()>>,
172 },
173 /// Get the clock skew claimed by the first hop of the circuit.
174 FirstHopClockSkew {
175 /// Oneshot channel to return the clock skew.
176 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
177 },
178 /// Link the specified circuits into the current tunnel,
179 /// to form a multi-path tunnel.
180 #[cfg(feature = "conflux")]
181 #[allow(unused)] // TODO(conflux)
182 LinkCircuits {
183 /// The circuits to link into the tunnel,
184 #[educe(Debug(ignore))]
185 circuits: Vec<Circuit>,
186 /// Oneshot channel to notify sender when all the specified circuits have finished linking,
187 /// or have failed to link.
188 ///
189 /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
190 /// (see [set construction]).
191 ///
192 /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
193 answer: ConfluxLinkResultChannel,
194 },
195}
196
197/// A message telling the reactor to do something.
198///
199/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
200/// never cause cells to sent on the channel,
201/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
202/// some of which instruct the reactor to send cells down the channel.
203#[derive(educe::Educe)]
204#[educe(Debug)]
205pub(crate) enum CtrlCmd {
206 /// Shut down the reactor.
207 Shutdown,
208 /// Extend the circuit by one hop, in response to an out-of-band handshake.
209 ///
210 /// (This is used for onion services, where the negotiation takes place in
211 /// INTRODUCE and RENDEZVOUS messages.)
212 #[cfg(feature = "hs-common")]
213 ExtendVirtual {
214 /// Which relay cell format to use for this hop.
215 relay_cell_format: RelayCellFormat,
216 /// The cryptographic algorithms and keys to use when communicating with
217 /// the newly added hop.
218 #[educe(Debug(ignore))]
219 cell_crypto: (
220 Box<dyn OutboundClientLayer + Send>,
221 Box<dyn InboundClientLayer + Send>,
222 Option<CircuitBinding>,
223 ),
224 /// A set of parameters to negotiate with this hop.
225 settings: HopSettings,
226 /// Oneshot channel to notify on completion.
227 done: ReactorResultChannel<()>,
228 },
229 /// Begin accepting streams on this circuit.
230 #[cfg(feature = "hs-service")]
231 AwaitStreamRequest {
232 /// A channel for sending information about an incoming stream request.
233 incoming_sender: StreamReqSender,
234 /// A `CmdChecker` to keep track of which message types are acceptable.
235 cmd_checker: AnyCmdChecker,
236 /// Oneshot channel to notify on completion.
237 done: ReactorResultChannel<()>,
238 /// The hop that is allowed to create streams.
239 hop_num: HopNum,
240 /// A filter used to check requests before passing them on.
241 #[educe(Debug(ignore))]
242 #[cfg(feature = "hs-service")]
243 filter: Box<dyn IncomingStreamRequestFilter>,
244 },
245 /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
246 #[cfg(test)]
247 AddFakeHop {
248 relay_cell_format: RelayCellFormat,
249 fwd_lasthop: bool,
250 rev_lasthop: bool,
251 peer_id: path::HopDetail,
252 params: CircParameters,
253 done: ReactorResultChannel<()>,
254 },
255 /// (tests only) Get the send window and expected tags for a given hop.
256 #[cfg(test)]
257 QuerySendWindow {
258 hop: HopNum,
259 done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
260 },
261 /// Shut down the reactor, and return the underlying [`Circuit`],
262 /// if the tunnel is not multi-path.
263 ///
264 /// Returns an error if called on a multi-path reactor.
265 #[cfg(feature = "conflux")]
266 #[allow(unused)] // TODO(conflux)
267 ShutdownAndReturnCircuit {
268 /// Oneshot channel to return the underlying [`Circuit`],
269 /// or an error if the reactor's tunnel is multi-path.
270 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
271 },
272}
273
274/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
275///
276/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
277/// respectively, are handled.
278pub(crate) struct ControlHandler<'a> {
279 /// Reference to the reactor of this
280 reactor: &'a mut Reactor,
281}
282
283impl<'a> ControlHandler<'a> {
284 /// Constructor.
285 pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
286 Self { reactor }
287 }
288
289 /// Handle a control message.
290 pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
291 trace!(
292 tunnel_id = %self.reactor.tunnel_id,
293 msg = ?msg,
294 "reactor received control message"
295 );
296
297 match msg {
298 // This is handled earlier, since it requires blocking.
299 CtrlMsg::Create { done, .. } => {
300 if self.reactor.circuits.len() == 1 {
301 // This should've been handled in Reactor::run_once()
302 // (ControlHandler::handle_msg() is never called before wait_for_create()).
303 debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
304 // Don't care if the receiver goes away
305 let _ = done.send(Err(tor_error::bad_api_usage!(
306 "cannot create first hop twice"
307 )
308 .into()));
309 } else {
310 // Don't care if the receiver goes away
311 let _ = done.send(Err(tor_error::bad_api_usage!(
312 "cannot create first hop on multipath tunnel"
313 )
314 .into()));
315 }
316
317 Ok(None)
318 }
319 CtrlMsg::ExtendNtor {
320 peer_id,
321 public_key,
322 linkspecs,
323 settings,
324 done,
325 } => {
326 let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
327 // Don't care if the receiver goes away
328 let _ = done.send(Err(tor_error::bad_api_usage!(
329 "cannot extend multipath tunnel"
330 )
331 .into()));
332
333 return Ok(None);
334 };
335
336 let (extender, cell) = CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
337 RelayCellFormat::V0,
338 peer_id,
339 HandshakeType::NTOR,
340 &public_key,
341 linkspecs,
342 settings,
343 &(),
344 circ,
345 done,
346 )?;
347 self.reactor
348 .cell_handlers
349 .set_meta_handler(Box::new(extender))?;
350
351 Ok(Some(RunOnceCmdInner::Send {
352 leg: circ.unique_id(),
353 cell,
354 done: None,
355 }))
356 }
357 CtrlMsg::ExtendNtorV3 {
358 peer_id,
359 public_key,
360 linkspecs,
361 settings,
362 done,
363 } => {
364 let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
365 // Don't care if the receiver goes away
366 let _ = done.send(Err(tor_error::bad_api_usage!(
367 "cannot extend multipath tunnel"
368 )
369 .into()));
370
371 return Ok(None);
372 };
373
374 // TODO #1067, TODO #1947: support negotiating other formats.
375 let relay_cell_format = RelayCellFormat::V0;
376
377 let client_extensions = circ_extensions_from_settings(&settings)?;
378
379 let (extender, cell) =
380 CircuitExtender::<NtorV3Client, Tor1RelayCrypto, _, _>::begin(
381 relay_cell_format,
382 peer_id,
383 HandshakeType::NTOR_V3,
384 &public_key,
385 linkspecs,
386 settings,
387 &client_extensions,
388 circ,
389 done,
390 )?;
391 self.reactor
392 .cell_handlers
393 .set_meta_handler(Box::new(extender))?;
394
395 Ok(Some(RunOnceCmdInner::Send {
396 leg: circ.unique_id(),
397 cell,
398 done: None,
399 }))
400 }
401 CtrlMsg::BeginStream {
402 hop,
403 message,
404 sender,
405 rx,
406 done,
407 cmd_checker,
408 } => {
409 // If resolving the hop fails,
410 // we want to report an error back to the initiator and not shut down the reactor.
411 let hop_location = match self.reactor.resolve_target_hop(hop) {
412 Ok(x) => x,
413 Err(e) => {
414 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
415 // don't care if receiver goes away
416 let _ = done.send(Err(e.into()));
417 return Ok(None);
418 }
419 };
420 let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
421 Ok(x) => x,
422 Err(e) => {
423 let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
424 // don't care if receiver goes away
425 let _ = done.send(Err(e.into()));
426 return Ok(None);
427 }
428 };
429 let circ = match self.reactor.circuits.leg_mut(leg_id) {
430 Some(x) => x,
431 None => {
432 let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
433 // don't care if receiver goes away
434 let _ = done.send(Err(e.into()));
435 return Ok(None);
436 }
437 };
438
439 let cell = circ.begin_stream(hop_num, message, sender, rx, cmd_checker)?;
440 Ok(Some(RunOnceCmdInner::BeginStream {
441 leg: leg_id,
442 cell,
443 hop: hop_location,
444 done,
445 }))
446 }
447 #[cfg(feature = "hs-service")]
448 CtrlMsg::ClosePendingStream {
449 hop,
450 stream_id,
451 message,
452 done,
453 } => Ok(Some(RunOnceCmdInner::CloseStream {
454 hop,
455 sid: stream_id,
456 behav: message,
457 reason: streammap::TerminateReason::ExplicitEnd,
458 done: Some(done),
459 })),
460 // TODO(#1860): remove stream-level sendme support
461 CtrlMsg::SendSendme {
462 stream_id,
463 hop,
464 sender,
465 } => {
466 // If resolving the hop fails,
467 // we want to report an error back to the initiator and not shut down the reactor.
468 let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
469 Ok(x) => x,
470 Err(e) => {
471 let e = into_bad_api_usage!("Could not resolve hop {hop:?}")(e);
472 // don't care if receiver goes away
473 let _ = sender.send(Err(e.into()));
474 return Ok(None);
475 }
476 };
477
478 // Congestion control decides if we can send stream level SENDMEs or not.
479 let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num) {
480 Some(x) => x,
481 None => {
482 // don't care if receiver goes away
483 let _ = sender.send(Err(bad_api_usage!(
484 "Unknown hop {hop_num:?} on leg {leg_id:?}"
485 )
486 .into()));
487 return Ok(None);
488 }
489 };
490
491 if !sendme_required {
492 // don't care if receiver goes away
493 let _ = sender.send(Ok(()));
494 return Ok(None);
495 }
496
497 let sendme = Sendme::new_empty();
498 let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
499
500 let cell = SendRelayCell {
501 hop: hop_num,
502 early: false,
503 cell,
504 };
505
506 Ok(Some(RunOnceCmdInner::Send {
507 leg: leg_id,
508 cell,
509 done: Some(sender),
510 }))
511 }
512 // TODO(conflux): this should specify which leg to send the msg on
513 // (currently we send it down the primary leg).
514 //
515 // This will involve updating ClientCIrc::send_raw_msg() to take a
516 // leg id argument (which is a breaking change.
517 #[cfg(feature = "send-control-msg")]
518 CtrlMsg::SendMsg {
519 hop_num,
520 msg,
521 sender,
522 } => {
523 let cell = AnyRelayMsgOuter::new(None, msg);
524 let cell = SendRelayCell {
525 hop: hop_num,
526 early: false,
527 cell,
528 };
529
530 let leg = self.reactor.circuits.primary_leg_id();
531
532 Ok(Some(RunOnceCmdInner::Send {
533 leg,
534 cell,
535 done: Some(sender),
536 }))
537 }
538 // TODO(conflux): this should specify which leg to send the msg on
539 // (currently we send it down the primary leg)
540 #[cfg(feature = "send-control-msg")]
541 CtrlMsg::SendMsgAndInstallHandler {
542 msg,
543 handler,
544 sender,
545 } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
546 msg,
547 handler,
548 done: sender,
549 })),
550 CtrlMsg::FirstHopClockSkew { answer } => {
551 Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
552 }
553 #[cfg(feature = "conflux")]
554 CtrlMsg::LinkCircuits { circuits, answer } => {
555 Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
556 }
557 }
558 }
559
560 /// Handle a control command.
561 pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
562 trace!(
563 tunnel_id = %self.reactor.tunnel_id,
564 msg = ?msg,
565 "reactor received control command"
566 );
567
568 match msg {
569 CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
570 #[cfg(feature = "hs-common")]
571 #[allow(unreachable_code)]
572 CtrlCmd::ExtendVirtual {
573 relay_cell_format: format,
574 cell_crypto,
575 settings,
576 done,
577 } => {
578 let (outbound, inbound, binding) = cell_crypto;
579
580 // TODO HS: Perhaps this should describe the onion service, or
581 // describe why the virtual hop was added, or something?
582 let peer_id = path::HopDetail::Virtual;
583
584 let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
585 // Don't care if the receiver goes away
586 let _ = done.send(Err(tor_error::bad_api_usage!(
587 "cannot extend multipath tunnel"
588 )
589 .into()));
590
591 return Ok(());
592 };
593
594 leg.add_hop(format, peer_id, outbound, inbound, binding, &settings)?;
595 let _ = done.send(Ok(()));
596
597 Ok(())
598 }
599 #[cfg(feature = "hs-service")]
600 CtrlCmd::AwaitStreamRequest {
601 cmd_checker,
602 incoming_sender,
603 hop_num,
604 done,
605 filter,
606 } => {
607 // TODO: At some point we might want to add a CtrlCmd for
608 // de-registering the handler. See comments on `allow_stream_requests`.
609 let handler = IncomingStreamRequestHandler {
610 incoming_sender,
611 cmd_checker,
612 hop_num,
613 filter,
614 };
615
616 let ret = self
617 .reactor
618 .cell_handlers
619 .set_incoming_stream_req_handler(handler);
620 let _ = done.send(ret); // don't care if the corresponding receiver goes away.
621
622 Ok(())
623 }
624 #[cfg(test)]
625 CtrlCmd::AddFakeHop {
626 relay_cell_format,
627 fwd_lasthop,
628 rev_lasthop,
629 peer_id,
630 params,
631 done,
632 } => {
633 let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
634 // Don't care if the receiver goes away
635 let _ = done.send(Err(tor_error::bad_api_usage!(
636 "cannot add fake hop to multipath tunnel"
637 )
638 .into()));
639
640 return Ok(());
641 };
642
643 leg.handle_add_fake_hop(
644 relay_cell_format,
645 fwd_lasthop,
646 rev_lasthop,
647 peer_id,
648 ¶ms,
649 done,
650 );
651
652 Ok(())
653 }
654 #[cfg(test)]
655 CtrlCmd::QuerySendWindow { hop, done } => {
656 // Immediately invoked function means that errors will be sent to the channel.
657 let _ = done.send((|| {
658 let leg =
659 self.reactor
660 .circuits
661 .single_leg_mut()
662 .map_err(into_bad_api_usage!(
663 "cannot query send window of multipath tunnel"
664 ))?;
665
666 let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
667 "received QuerySendWindow for unknown hop {}",
668 hop.display()
669 ))?;
670
671 Ok(hop.send_window_and_expected_tags())
672 })());
673
674 Ok(())
675 }
676 #[cfg(feature = "conflux")]
677 CtrlCmd::ShutdownAndReturnCircuit { answer } => {
678 self.reactor.handle_shutdown_and_return_circuit(answer)
679 }
680 }
681 }
682}