tor_proto/tunnel/reactor/
control.rs

1//! Module providing [`CtrlMsg`].
2
3use super::circuit::extender::CircuitExtender;
4use super::{
5    CircuitHandshake, CloseStreamBehavior, MetaCellHandler, Reactor, ReactorResultChannel,
6    RunOnceCmdInner, SendRelayCell,
7};
8#[cfg(test)]
9use crate::circuit::CircParameters;
10use crate::circuit::HopSettings;
11use crate::crypto::binding::CircuitBinding;
12use crate::crypto::cell::{HopNum, InboundClientLayer, OutboundClientLayer, Tor1RelayCrypto};
13use crate::crypto::handshake::ntor_v3::{NtorV3Client, NtorV3PublicKey};
14use crate::stream::AnyCmdChecker;
15use crate::tunnel::circuit::celltypes::CreateResponse;
16use crate::tunnel::circuit::path;
17use crate::tunnel::reactor::circuit::circ_extensions_from_settings;
18use crate::tunnel::reactor::{NtorClient, ReactorError};
19use crate::tunnel::{streammap, HopLocation, TargetHop};
20use crate::util::skew::ClockSkew;
21use crate::Result;
22use tor_cell::chancell::msg::HandshakeType;
23use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
24use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
25use tor_error::{bad_api_usage, into_bad_api_usage, Bug};
26use tracing::trace;
27#[cfg(feature = "hs-service")]
28use {
29    super::StreamReqSender, crate::stream::IncomingStreamRequestFilter,
30    crate::tunnel::reactor::IncomingStreamRequestHandler,
31};
32
33#[cfg(test)]
34use tor_cell::relaycell::msg::SendmeTag;
35
36#[cfg(feature = "conflux")]
37use super::{Circuit, ConfluxLinkResultChannel};
38
39use oneshot_fused_workaround as oneshot;
40
41use crate::crypto::handshake::ntor::NtorPublicKey;
42use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
43use tor_linkspec::{EncodedLinkSpec, OwnedChanTarget};
44
45use std::result::Result as StdResult;
46
47/// A message telling the reactor to do something.
48///
49/// For each `CtrlMsg`, the reactor will send a cell on the underlying channel.
50///
51/// The difference between this and [`CtrlCmd`] is that `CtrlMsg`s
52/// cause the reactor to send cells on the reactor's `chan_sender`,
53/// whereas `CtrlCmd` do not.
54#[derive(educe::Educe)]
55#[educe(Debug)]
56pub(crate) enum CtrlMsg {
57    /// Create the first hop of this circuit.
58    Create {
59        /// A oneshot channel on which we'll receive the creation response.
60        recv_created: oneshot::Receiver<CreateResponse>,
61        /// The handshake type to use for the first hop.
62        handshake: CircuitHandshake,
63        /// Other parameters relevant for circuit creation.
64        settings: HopSettings,
65        /// Oneshot channel to notify on completion.
66        done: ReactorResultChannel<()>,
67    },
68    /// Extend a circuit by one hop, using the ntor handshake.
69    ExtendNtor {
70        /// The peer that we're extending to.
71        ///
72        /// Used to extend our record of the circuit's path.
73        peer_id: OwnedChanTarget,
74        /// The handshake type to use for this hop.
75        public_key: NtorPublicKey,
76        /// Information about how to connect to the relay we're extending to.
77        linkspecs: Vec<EncodedLinkSpec>,
78        /// Other parameters we are negotiating.
79        settings: HopSettings,
80        /// Oneshot channel to notify on completion.
81        done: ReactorResultChannel<()>,
82    },
83    /// Extend a circuit by one hop, using the ntorv3 handshake.
84    ExtendNtorV3 {
85        /// The peer that we're extending to.
86        ///
87        /// Used to extend our record of the circuit's path.
88        peer_id: OwnedChanTarget,
89        /// The handshake type to use for this hop.
90        public_key: NtorV3PublicKey,
91        /// Information about how to connect to the relay we're extending to.
92        linkspecs: Vec<EncodedLinkSpec>,
93        /// Other parameters we are negotiating.
94        settings: HopSettings,
95        /// Oneshot channel to notify on completion.
96        done: ReactorResultChannel<()>,
97    },
98    /// Begin a stream with the provided hop in this circuit.
99    ///
100    /// Allocates a stream ID, and sends the provided message to that hop.
101    BeginStream {
102        /// The hop number to begin the stream with.
103        hop: TargetHop,
104        /// The message to send.
105        message: AnyRelayMsg,
106        /// A channel to send messages on this stream down.
107        ///
108        /// This sender shouldn't ever block, because we use congestion control and only send
109        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
110        /// can assume someone is trying to send us more cells than they should, and abort
111        /// the stream.
112        sender: StreamMpscSender<UnparsedRelayMsg>,
113        /// A channel to receive messages to send on this stream from.
114        rx: StreamMpscReceiver<AnyRelayMsg>,
115        /// Oneshot channel to notify on completion, with the allocated stream ID.
116        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
117        /// A `CmdChecker` to keep track of which message types are acceptable.
118        cmd_checker: AnyCmdChecker,
119    },
120    /// Close the specified pending incoming stream, sending the provided END message.
121    ///
122    /// A stream is said to be pending if the message for initiating the stream was received but
123    /// not has not been responded to yet.
124    ///
125    /// This should be used by responders for closing pending incoming streams initiated by the
126    /// other party on the circuit.
127    #[cfg(feature = "hs-service")]
128    ClosePendingStream {
129        /// The hop number the stream is on.
130        hop: HopLocation,
131        /// The stream ID to send the END for.
132        stream_id: StreamId,
133        /// The END message to send, if any.
134        message: CloseStreamBehavior,
135        /// Oneshot channel to notify on completion.
136        done: ReactorResultChannel<()>,
137    },
138    /// Send a given control message on this circuit.
139    #[cfg(feature = "send-control-msg")]
140    SendMsg {
141        /// The hop to receive this message.
142        hop_num: HopNum,
143        /// The message to send.
144        msg: AnyRelayMsg,
145        /// A sender that we use to tell the caller that the message was sent
146        /// and the handler installed.
147        sender: oneshot::Sender<Result<()>>,
148    },
149    /// Send a given control message on this circuit, and install a control-message handler to
150    /// receive responses.
151    #[cfg(feature = "send-control-msg")]
152    SendMsgAndInstallHandler {
153        /// The message to send, if any
154        msg: Option<AnyRelayMsgOuter>,
155        /// A message handler to install.
156        ///
157        /// If this is `None`, there must already be a message handler installed
158        #[educe(Debug(ignore))]
159        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
160        /// A sender that we use to tell the caller that the message was sent
161        /// and the handler installed.
162        sender: oneshot::Sender<Result<()>>,
163    },
164    /// Send a SENDME cell (used to ask for more data to be sent) on the given stream.
165    SendSendme {
166        /// The stream ID to send a SENDME for.
167        stream_id: StreamId,
168        /// The hop number the stream is on.
169        hop: HopLocation,
170        /// A sender that we use to tell the caller that the SENDME was sent.
171        sender: oneshot::Sender<Result<()>>,
172    },
173    /// Get the clock skew claimed by the first hop of the circuit.
174    FirstHopClockSkew {
175        /// Oneshot channel to return the clock skew.
176        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
177    },
178    /// Link the specified circuits into the current tunnel,
179    /// to form a multi-path tunnel.
180    #[cfg(feature = "conflux")]
181    #[allow(unused)] // TODO(conflux)
182    LinkCircuits {
183        /// The circuits to link into the tunnel,
184        #[educe(Debug(ignore))]
185        circuits: Vec<Circuit>,
186        /// Oneshot channel to notify sender when all the specified circuits have finished linking,
187        /// or have failed to link.
188        ///
189        /// A client circuit is said to be fully linked once the `RELAY_CONFLUX_LINKED_ACK` is sent
190        /// (see [set construction]).
191        ///
192        /// [set construction]: https://spec.torproject.org/proposals/329-traffic-splitting.html#set-construction
193        answer: ConfluxLinkResultChannel,
194    },
195}
196
197/// A message telling the reactor to do something.
198///
199/// The difference between this and [`CtrlMsg`] is that `CtrlCmd`s
200/// never cause cells to sent on the channel,
201/// while `CtrlMsg`s potentially do: `CtrlMsg`s are mapped to [`RunOnceCmdInner`] commands,
202/// some of which instruct the reactor to send cells down the channel.
203#[derive(educe::Educe)]
204#[educe(Debug)]
205pub(crate) enum CtrlCmd {
206    /// Shut down the reactor.
207    Shutdown,
208    /// Extend the circuit by one hop, in response to an out-of-band handshake.
209    ///
210    /// (This is used for onion services, where the negotiation takes place in
211    /// INTRODUCE and RENDEZVOUS messages.)
212    #[cfg(feature = "hs-common")]
213    ExtendVirtual {
214        /// Which relay cell format to use for this hop.
215        relay_cell_format: RelayCellFormat,
216        /// The cryptographic algorithms and keys to use when communicating with
217        /// the newly added hop.
218        #[educe(Debug(ignore))]
219        cell_crypto: (
220            Box<dyn OutboundClientLayer + Send>,
221            Box<dyn InboundClientLayer + Send>,
222            Option<CircuitBinding>,
223        ),
224        /// A set of parameters to negotiate with this hop.
225        settings: HopSettings,
226        /// Oneshot channel to notify on completion.
227        done: ReactorResultChannel<()>,
228    },
229    /// Begin accepting streams on this circuit.
230    #[cfg(feature = "hs-service")]
231    AwaitStreamRequest {
232        /// A channel for sending information about an incoming stream request.
233        incoming_sender: StreamReqSender,
234        /// A `CmdChecker` to keep track of which message types are acceptable.
235        cmd_checker: AnyCmdChecker,
236        /// Oneshot channel to notify on completion.
237        done: ReactorResultChannel<()>,
238        /// The hop that is allowed to create streams.
239        hop_num: HopNum,
240        /// A filter used to check requests before passing them on.
241        #[educe(Debug(ignore))]
242        #[cfg(feature = "hs-service")]
243        filter: Box<dyn IncomingStreamRequestFilter>,
244    },
245    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
246    #[cfg(test)]
247    AddFakeHop {
248        relay_cell_format: RelayCellFormat,
249        fwd_lasthop: bool,
250        rev_lasthop: bool,
251        peer_id: path::HopDetail,
252        params: CircParameters,
253        done: ReactorResultChannel<()>,
254    },
255    /// (tests only) Get the send window and expected tags for a given hop.
256    #[cfg(test)]
257    QuerySendWindow {
258        hop: HopNum,
259        done: ReactorResultChannel<(u32, Vec<SendmeTag>)>,
260    },
261    /// Shut down the reactor, and return the underlying [`Circuit`],
262    /// if the tunnel is not multi-path.
263    ///
264    /// Returns an error if called on a multi-path reactor.
265    #[cfg(feature = "conflux")]
266    #[allow(unused)] // TODO(conflux)
267    ShutdownAndReturnCircuit {
268        /// Oneshot channel to return the underlying [`Circuit`],
269        /// or an error if the reactor's tunnel is multi-path.
270        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
271    },
272}
273
274/// A control message handler object. Keep a reference to the Reactor tying its lifetime to it.
275///
276/// Its `handle_msg` and `handle_cmd` handlers decide how messages and commands,
277/// respectively, are handled.
278pub(crate) struct ControlHandler<'a> {
279    /// Reference to the reactor of this
280    reactor: &'a mut Reactor,
281}
282
283impl<'a> ControlHandler<'a> {
284    /// Constructor.
285    pub(crate) fn new(reactor: &'a mut Reactor) -> Self {
286        Self { reactor }
287    }
288
289    /// Handle a control message.
290    pub(super) fn handle_msg(&mut self, msg: CtrlMsg) -> Result<Option<RunOnceCmdInner>> {
291        trace!(
292            tunnel_id = %self.reactor.tunnel_id,
293            msg = ?msg,
294            "reactor received control message"
295        );
296
297        match msg {
298            // This is handled earlier, since it requires blocking.
299            CtrlMsg::Create { done, .. } => {
300                if self.reactor.circuits.len() == 1 {
301                    // This should've been handled in Reactor::run_once()
302                    // (ControlHandler::handle_msg() is never called before wait_for_create()).
303                    debug_assert!(self.reactor.circuits.single_leg()?.has_hops());
304                    // Don't care if the receiver goes away
305                    let _ = done.send(Err(tor_error::bad_api_usage!(
306                        "cannot create first hop twice"
307                    )
308                    .into()));
309                } else {
310                    // Don't care if the receiver goes away
311                    let _ = done.send(Err(tor_error::bad_api_usage!(
312                        "cannot create first hop on multipath tunnel"
313                    )
314                    .into()));
315                }
316
317                Ok(None)
318            }
319            CtrlMsg::ExtendNtor {
320                peer_id,
321                public_key,
322                linkspecs,
323                settings,
324                done,
325            } => {
326                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
327                    // Don't care if the receiver goes away
328                    let _ = done.send(Err(tor_error::bad_api_usage!(
329                        "cannot extend multipath tunnel"
330                    )
331                    .into()));
332
333                    return Ok(None);
334                };
335
336                let (extender, cell) = CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
337                    RelayCellFormat::V0,
338                    peer_id,
339                    HandshakeType::NTOR,
340                    &public_key,
341                    linkspecs,
342                    settings,
343                    &(),
344                    circ,
345                    done,
346                )?;
347                self.reactor
348                    .cell_handlers
349                    .set_meta_handler(Box::new(extender))?;
350
351                Ok(Some(RunOnceCmdInner::Send {
352                    leg: circ.unique_id(),
353                    cell,
354                    done: None,
355                }))
356            }
357            CtrlMsg::ExtendNtorV3 {
358                peer_id,
359                public_key,
360                linkspecs,
361                settings,
362                done,
363            } => {
364                let Ok(circ) = self.reactor.circuits.single_leg_mut() else {
365                    // Don't care if the receiver goes away
366                    let _ = done.send(Err(tor_error::bad_api_usage!(
367                        "cannot extend multipath tunnel"
368                    )
369                    .into()));
370
371                    return Ok(None);
372                };
373
374                // TODO #1067, TODO #1947: support negotiating other formats.
375                let relay_cell_format = RelayCellFormat::V0;
376
377                let client_extensions = circ_extensions_from_settings(&settings)?;
378
379                let (extender, cell) =
380                    CircuitExtender::<NtorV3Client, Tor1RelayCrypto, _, _>::begin(
381                        relay_cell_format,
382                        peer_id,
383                        HandshakeType::NTOR_V3,
384                        &public_key,
385                        linkspecs,
386                        settings,
387                        &client_extensions,
388                        circ,
389                        done,
390                    )?;
391                self.reactor
392                    .cell_handlers
393                    .set_meta_handler(Box::new(extender))?;
394
395                Ok(Some(RunOnceCmdInner::Send {
396                    leg: circ.unique_id(),
397                    cell,
398                    done: None,
399                }))
400            }
401            CtrlMsg::BeginStream {
402                hop,
403                message,
404                sender,
405                rx,
406                done,
407                cmd_checker,
408            } => {
409                // If resolving the hop fails,
410                // we want to report an error back to the initiator and not shut down the reactor.
411                let hop_location = match self.reactor.resolve_target_hop(hop) {
412                    Ok(x) => x,
413                    Err(e) => {
414                        let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
415                        // don't care if receiver goes away
416                        let _ = done.send(Err(e.into()));
417                        return Ok(None);
418                    }
419                };
420                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop_location) {
421                    Ok(x) => x,
422                    Err(e) => {
423                        let e = into_bad_api_usage!("Could not resolve {hop_location:?}")(e);
424                        // don't care if receiver goes away
425                        let _ = done.send(Err(e.into()));
426                        return Ok(None);
427                    }
428                };
429                let circ = match self.reactor.circuits.leg_mut(leg_id) {
430                    Some(x) => x,
431                    None => {
432                        let e = bad_api_usage!("Circuit leg {leg_id:?} does not exist");
433                        // don't care if receiver goes away
434                        let _ = done.send(Err(e.into()));
435                        return Ok(None);
436                    }
437                };
438
439                let cell = circ.begin_stream(hop_num, message, sender, rx, cmd_checker)?;
440                Ok(Some(RunOnceCmdInner::BeginStream {
441                    leg: leg_id,
442                    cell,
443                    hop: hop_location,
444                    done,
445                }))
446            }
447            #[cfg(feature = "hs-service")]
448            CtrlMsg::ClosePendingStream {
449                hop,
450                stream_id,
451                message,
452                done,
453            } => Ok(Some(RunOnceCmdInner::CloseStream {
454                hop,
455                sid: stream_id,
456                behav: message,
457                reason: streammap::TerminateReason::ExplicitEnd,
458                done: Some(done),
459            })),
460            // TODO(#1860): remove stream-level sendme support
461            CtrlMsg::SendSendme {
462                stream_id,
463                hop,
464                sender,
465            } => {
466                // If resolving the hop fails,
467                // we want to report an error back to the initiator and not shut down the reactor.
468                let (leg_id, hop_num) = match self.reactor.resolve_hop_location(hop) {
469                    Ok(x) => x,
470                    Err(e) => {
471                        let e = into_bad_api_usage!("Could not resolve hop {hop:?}")(e);
472                        // don't care if receiver goes away
473                        let _ = sender.send(Err(e.into()));
474                        return Ok(None);
475                    }
476                };
477
478                // Congestion control decides if we can send stream level SENDMEs or not.
479                let sendme_required = match self.reactor.uses_stream_sendme(leg_id, hop_num) {
480                    Some(x) => x,
481                    None => {
482                        // don't care if receiver goes away
483                        let _ = sender.send(Err(bad_api_usage!(
484                            "Unknown hop {hop_num:?} on leg {leg_id:?}"
485                        )
486                        .into()));
487                        return Ok(None);
488                    }
489                };
490
491                if !sendme_required {
492                    // don't care if receiver goes away
493                    let _ = sender.send(Ok(()));
494                    return Ok(None);
495                }
496
497                let sendme = Sendme::new_empty();
498                let cell = AnyRelayMsgOuter::new(Some(stream_id), sendme.into());
499
500                let cell = SendRelayCell {
501                    hop: hop_num,
502                    early: false,
503                    cell,
504                };
505
506                Ok(Some(RunOnceCmdInner::Send {
507                    leg: leg_id,
508                    cell,
509                    done: Some(sender),
510                }))
511            }
512            // TODO(conflux): this should specify which leg to send the msg on
513            // (currently we send it down the primary leg).
514            //
515            // This will involve updating ClientCIrc::send_raw_msg() to take a
516            // leg id argument (which is a breaking change.
517            #[cfg(feature = "send-control-msg")]
518            CtrlMsg::SendMsg {
519                hop_num,
520                msg,
521                sender,
522            } => {
523                let cell = AnyRelayMsgOuter::new(None, msg);
524                let cell = SendRelayCell {
525                    hop: hop_num,
526                    early: false,
527                    cell,
528                };
529
530                let leg = self.reactor.circuits.primary_leg_id();
531
532                Ok(Some(RunOnceCmdInner::Send {
533                    leg,
534                    cell,
535                    done: Some(sender),
536                }))
537            }
538            // TODO(conflux): this should specify which leg to send the msg on
539            // (currently we send it down the primary leg)
540            #[cfg(feature = "send-control-msg")]
541            CtrlMsg::SendMsgAndInstallHandler {
542                msg,
543                handler,
544                sender,
545            } => Ok(Some(RunOnceCmdInner::SendMsgAndInstallHandler {
546                msg,
547                handler,
548                done: sender,
549            })),
550            CtrlMsg::FirstHopClockSkew { answer } => {
551                Ok(Some(RunOnceCmdInner::FirstHopClockSkew { answer }))
552            }
553            #[cfg(feature = "conflux")]
554            CtrlMsg::LinkCircuits { circuits, answer } => {
555                Ok(Some(RunOnceCmdInner::Link { circuits, answer }))
556            }
557        }
558    }
559
560    /// Handle a control command.
561    pub(super) fn handle_cmd(&mut self, msg: CtrlCmd) -> StdResult<(), ReactorError> {
562        trace!(
563            tunnel_id = %self.reactor.tunnel_id,
564            msg = ?msg,
565            "reactor received control command"
566        );
567
568        match msg {
569            CtrlCmd::Shutdown => self.reactor.handle_shutdown().map(|_| ()),
570            #[cfg(feature = "hs-common")]
571            #[allow(unreachable_code)]
572            CtrlCmd::ExtendVirtual {
573                relay_cell_format: format,
574                cell_crypto,
575                settings,
576                done,
577            } => {
578                let (outbound, inbound, binding) = cell_crypto;
579
580                // TODO HS: Perhaps this should describe the onion service, or
581                // describe why the virtual hop was added, or something?
582                let peer_id = path::HopDetail::Virtual;
583
584                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
585                    // Don't care if the receiver goes away
586                    let _ = done.send(Err(tor_error::bad_api_usage!(
587                        "cannot extend multipath tunnel"
588                    )
589                    .into()));
590
591                    return Ok(());
592                };
593
594                leg.add_hop(format, peer_id, outbound, inbound, binding, &settings)?;
595                let _ = done.send(Ok(()));
596
597                Ok(())
598            }
599            #[cfg(feature = "hs-service")]
600            CtrlCmd::AwaitStreamRequest {
601                cmd_checker,
602                incoming_sender,
603                hop_num,
604                done,
605                filter,
606            } => {
607                // TODO: At some point we might want to add a CtrlCmd for
608                // de-registering the handler.  See comments on `allow_stream_requests`.
609                let handler = IncomingStreamRequestHandler {
610                    incoming_sender,
611                    cmd_checker,
612                    hop_num,
613                    filter,
614                };
615
616                let ret = self
617                    .reactor
618                    .cell_handlers
619                    .set_incoming_stream_req_handler(handler);
620                let _ = done.send(ret); // don't care if the corresponding receiver goes away.
621
622                Ok(())
623            }
624            #[cfg(test)]
625            CtrlCmd::AddFakeHop {
626                relay_cell_format,
627                fwd_lasthop,
628                rev_lasthop,
629                peer_id,
630                params,
631                done,
632            } => {
633                let Ok(leg) = self.reactor.circuits.single_leg_mut() else {
634                    // Don't care if the receiver goes away
635                    let _ = done.send(Err(tor_error::bad_api_usage!(
636                        "cannot add fake hop to multipath tunnel"
637                    )
638                    .into()));
639
640                    return Ok(());
641                };
642
643                leg.handle_add_fake_hop(
644                    relay_cell_format,
645                    fwd_lasthop,
646                    rev_lasthop,
647                    peer_id,
648                    &params,
649                    done,
650                );
651
652                Ok(())
653            }
654            #[cfg(test)]
655            CtrlCmd::QuerySendWindow { hop, done } => {
656                // Immediately invoked function means that errors will be sent to the channel.
657                let _ = done.send((|| {
658                    let leg =
659                        self.reactor
660                            .circuits
661                            .single_leg_mut()
662                            .map_err(into_bad_api_usage!(
663                                "cannot query send window of multipath tunnel"
664                            ))?;
665
666                    let hop = leg.hop_mut(hop).ok_or(bad_api_usage!(
667                        "received QuerySendWindow for unknown hop {}",
668                        hop.display()
669                    ))?;
670
671                    Ok(hop.send_window_and_expected_tags())
672                })());
673
674                Ok(())
675            }
676            #[cfg(feature = "conflux")]
677            CtrlCmd::ShutdownAndReturnCircuit { answer } => {
678                self.reactor.handle_shutdown_and_return_circuit(answer)
679            }
680        }
681    }
682}