tor_proto/tunnel/reactor.rs
1//! Code to handle incoming cells on a circuit.
2//!
3//! ## On message validation
4//!
5//! There are three steps for validating an incoming message on a stream:
6//!
7//! 1. Is the message contextually appropriate? (e.g., no more than one
8//! `CONNECTED` message per stream.) This is handled by calling
9//! [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11//! we've gotten SENDMEs for.) For open streams, the stream itself handles
12//! this; for half-closed streams, the reactor handles it using the
13//! `halfstream` module.
14//! 3. Does the message have an acceptable command type, and is the message
15//! well-formed? For open streams, the streams themselves handle this check.
16//! For half-closed streams, the reactor handles it by calling
17//! `consume_checked_msg()`.
18
19pub(super) mod circuit;
20mod conflux;
21mod control;
22pub(super) mod syncview;
23
24use crate::crypto::cell::HopNum;
25use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26use crate::memquota::{CircuitAccount, StreamAccount};
27use crate::stream::AnyCmdChecker;
28#[cfg(feature = "hs-service")]
29use crate::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
30use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
31use crate::tunnel::circuit::unique_id::UniqId;
32use crate::tunnel::circuit::CircuitRxReceiver;
33use crate::tunnel::{streammap, HopLocation, TargetHop};
34use crate::util::err::ReactorError;
35use crate::util::skew::ClockSkew;
36use crate::{Error, Result};
37use circuit::{Circuit, CircuitCmd};
38use conflux::ConfluxSet;
39use control::ControlHandler;
40use std::cmp::Ordering;
41use std::collections::BinaryHeap;
42use std::mem::size_of;
43use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
44use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
45use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
46use tor_rtcompat::DynTimeProvider;
47
48use futures::channel::mpsc;
49use futures::StreamExt;
50use futures::{select_biased, FutureExt as _};
51use oneshot_fused_workaround as oneshot;
52
53use std::result::Result as StdResult;
54use std::sync::Arc;
55
56use crate::channel::Channel;
57use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
58use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
59use derive_deftly::Deftly;
60use derive_more::From;
61use tor_cell::chancell::CircId;
62use tor_llcrypto::pk;
63use tor_memquota::mq_queue::{self, MpscSpec};
64use tor_memquota::{derive_deftly_template_HasMemoryCost, memory_cost_structural_copy};
65use tracing::trace;
66
67use super::circuit::{MutableState, TunnelMutableState};
68
69#[cfg(feature = "conflux")]
70use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
71
72pub(super) use control::CtrlCmd;
73pub(super) use control::CtrlMsg;
74
75/// The type of a oneshot channel used to inform reactor of the result of an operation.
76pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
77
78/// Contains a list of conflux handshake results.
79#[cfg(feature = "conflux")]
80pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
81
82/// The type of oneshot channel used to inform reactor users of the outcome
83/// of a client-side conflux handshake.
84///
85/// Contains a list of handshake results, one for each circuit that we were asked
86/// to link in the tunnel.
87#[cfg(feature = "conflux")]
88pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
89
90pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
91
92/// MPSC queue containing stream requests
93#[cfg(feature = "hs-service")]
94type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
95
96/// A handshake type, to be used when creating circuit hops.
97#[derive(Clone, Debug)]
98pub(crate) enum CircuitHandshake {
99 /// Use the CREATE_FAST handshake.
100 CreateFast,
101 /// Use the ntor handshake.
102 Ntor {
103 /// The public key of the relay.
104 public_key: NtorPublicKey,
105 /// The Ed25519 identity of the relay, which is verified against the
106 /// identity held in the circuit's channel.
107 ed_identity: pk::ed25519::Ed25519Identity,
108 },
109 /// Use the ntor-v3 handshake.
110 NtorV3 {
111 /// The public key of the relay.
112 public_key: NtorV3PublicKey,
113 },
114}
115
116/// A behavior to perform when closing a stream.
117///
118/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
119/// that we shouldn't let it pass unremarked.
120#[derive(Clone, Debug)]
121pub(crate) enum CloseStreamBehavior {
122 /// Send nothing at all, so that the other side will not realize we have
123 /// closed the stream.
124 ///
125 /// We should only do this for incoming onion service streams when we
126 /// want to black-hole the client's requests.
127 SendNothing,
128 /// Send an End cell, if we haven't already sent one.
129 SendEnd(End),
130}
131impl Default for CloseStreamBehavior {
132 fn default() -> Self {
133 Self::SendEnd(End::new_misc())
134 }
135}
136
137// TODO(conflux): the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
138// proliferation is a bit bothersome, but unavoidable with the current design.
139//
140// We should consider getting rid of some of these enums (if possible),
141// and coming up with more intuitive names.
142
143/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
144#[derive(From, Debug)]
145enum RunOnceCmd {
146 /// Run a single `RunOnceCmdInner` command.
147 Single(RunOnceCmdInner),
148 /// Run multiple `RunOnceCmdInner` commands.
149 //
150 // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
151 // but most of the time we're only going to have *one* RunOnceCmdInner
152 // to run per run_once() loop. The enum enables us avoid the extra heap
153 // allocation for the `RunOnceCmd::Single` case.
154 Multiple(Vec<RunOnceCmdInner>),
155}
156
157/// Instructions for running something in the reactor loop.
158///
159/// Run at the end of [`Reactor::run_once`].
160//
161// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
162// We should consider making each variant a tuple variant and deduplicating the fields.
163#[derive(educe::Educe)]
164#[educe(Debug)]
165enum RunOnceCmdInner {
166 /// Send a RELAY cell.
167 Send {
168 /// The leg the cell should be sent on.
169 leg: LegId,
170 /// The cell to send.
171 cell: SendRelayCell,
172 /// A channel for sending completion notifications.
173 done: Option<ReactorResultChannel<()>>,
174 },
175 /// Send a given control message on this circuit, and install a control-message handler to
176 /// receive responses.
177 #[cfg(feature = "send-control-msg")]
178 SendMsgAndInstallHandler {
179 /// The message to send, if any
180 msg: Option<AnyRelayMsgOuter>,
181 /// A message handler to install.
182 ///
183 /// If this is `None`, there must already be a message handler installed
184 #[educe(Debug(ignore))]
185 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
186 /// A sender that we use to tell the caller that the message was sent
187 /// and the handler installed.
188 done: oneshot::Sender<Result<()>>,
189 },
190 /// Handle a SENDME message.
191 HandleSendMe {
192 /// The leg the SENDME was received on.
193 leg: LegId,
194 /// The hop number.
195 hop: HopNum,
196 /// The SENDME message to handle.
197 sendme: Sendme,
198 },
199 /// Begin a stream with the provided hop in this circuit.
200 ///
201 /// Uses the provided stream ID, and sends the provided message to that hop.
202 BeginStream {
203 /// The cell to send.
204 cell: Result<(SendRelayCell, StreamId)>,
205 /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
206 /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
207 /// caller.
208 hop: HopLocation,
209 /// The circuit leg to begin the stream on.
210 leg: LegId,
211 /// Oneshot channel to notify on completion, with the allocated stream ID.
212 done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
213 },
214 /// Close the specified stream.
215 CloseStream {
216 /// The hop number.
217 hop: HopLocation,
218 /// The ID of the stream to close.
219 sid: StreamId,
220 /// The stream-closing behavior.
221 behav: CloseStreamBehavior,
222 /// The reason for closing the stream.
223 reason: streammap::TerminateReason,
224 /// A channel for sending completion notifications.
225 done: Option<ReactorResultChannel<()>>,
226 },
227 /// Get the clock skew claimed by the first hop of the circuit.
228 FirstHopClockSkew {
229 /// Oneshot channel to return the clock skew.
230 answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
231 },
232 /// Remove a circuit leg from the conflux set.
233 RemoveLeg {
234 /// The circuit leg to remove.
235 leg: LegId,
236 /// The reason for removal.
237 ///
238 /// This is only used for conflux circuits that get removed
239 /// before the conflux handshake is complete.
240 ///
241 /// The [`RemoveLegReason`] is mapped by the reactor to a
242 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
243 /// handshake to indicate the reason the handshake failed.
244 reason: RemoveLegReason,
245 },
246 /// A circuit has completed the conflux handshake,
247 /// and wants to send the specified cell.
248 ///
249 /// This is similar to [`RunOnceCmdInner::Send`],
250 /// but needs to remain a separate variant,
251 /// because in addition to instructing the reactor to send a cell,
252 /// it also notifies it that the conflux handshake is complete on the specified `leg`.
253 /// This enables the reactor to save the handshake result (`Ok(())`),
254 /// and, if there are no other legs still in the handshake phase,
255 /// send the result to the handshake initiator.
256 #[cfg(feature = "conflux")]
257 ConfluxHandshakeComplete {
258 /// The circuit leg that has completed the handshake,
259 /// This is the leg the cell should be sent on.
260 leg: LegId,
261 /// The cell to send.
262 cell: SendRelayCell,
263 },
264 /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
265 #[cfg(feature = "conflux")]
266 Link {
267 /// The circuits to link into the tunnel
268 #[educe(Debug(ignore))]
269 circuits: Vec<Circuit>,
270 /// Oneshot channel for notifying of conflux handshake completion.
271 answer: ConfluxLinkResultChannel,
272 },
273 /// Enqueue an out-of-order cell in ooo_msg.
274 #[cfg(feature = "conflux")]
275 Enqueue {
276 /// The leg the entry originated from.
277 leg: LegId,
278 /// The out-of-order message.
279 msg: OooRelayMsg,
280 },
281 /// Perform a clean shutdown on this circuit.
282 CleanShutdown,
283}
284
285impl RunOnceCmdInner {
286 /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`LegIdKey`].
287 fn from_circuit_cmd(leg: LegIdKey, cmd: CircuitCmd) -> Self {
288 match cmd {
289 CircuitCmd::Send(cell) => Self::Send {
290 leg: LegId(leg),
291 cell,
292 done: None,
293 },
294 CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe {
295 leg: LegId(leg),
296 hop,
297 sendme,
298 },
299 CircuitCmd::CloseStream {
300 hop,
301 sid,
302 behav,
303 reason,
304 } => Self::CloseStream {
305 hop: HopLocation::Hop((LegId(leg), hop)),
306 sid,
307 behav,
308 reason,
309 done: None,
310 },
311 #[cfg(feature = "conflux")]
312 CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg {
313 leg: LegId(leg),
314 reason,
315 },
316 #[cfg(feature = "conflux")]
317 CircuitCmd::ConfluxHandshakeComplete(cell) => Self::ConfluxHandshakeComplete {
318 leg: LegId(leg),
319 cell,
320 },
321 #[cfg(feature = "conflux")]
322 CircuitCmd::Enqueue(msg) => Self::Enqueue {
323 leg: LegId(leg),
324 msg,
325 },
326 CircuitCmd::CleanShutdown => Self::CleanShutdown,
327 }
328 }
329}
330
331/// Cmd for sending a relay cell.
332///
333/// The contents of this struct are passed to `send_relay_cell`
334#[derive(educe::Educe)]
335#[educe(Debug)]
336pub(crate) struct SendRelayCell {
337 /// The hop number.
338 pub(crate) hop: HopNum,
339 /// Whether to use a RELAY_EARLY cell.
340 pub(crate) early: bool,
341 /// The cell to send.
342 pub(crate) cell: AnyRelayMsgOuter,
343}
344
345/// A command to execute at the end of [`Reactor::run_once`].
346#[derive(From, Debug)]
347#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
348enum CircuitAction {
349 /// Run a single `CircuitCmd` command.
350 RunCmd {
351 /// The unique identifier of the circuit leg to run the command on
352 leg: LegIdKey,
353 /// The command to run.
354 cmd: CircuitCmd,
355 },
356 /// Handle a control message
357 HandleControl(CtrlMsg),
358 /// Handle an input message.
359 HandleCell {
360 /// The unique identifier of the circuit leg the message was received on.
361 leg: LegIdKey,
362 /// The message to handle.
363 cell: ClientCircChanMsg,
364 },
365 /// Remove the specified circuit leg from the conflux set.
366 ///
367 /// Returned whenever a single circuit leg needs to be be removed
368 /// from the reactor's conflux set, without necessarily tearing down
369 /// the whole set or shutting down the reactor.
370 ///
371 /// Note: this action *can* cause the reactor to shut down
372 /// (and the conflux set to be closed).
373 ///
374 /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
375 RemoveLeg {
376 /// The leg to remove.
377 leg: LegIdKey,
378 /// The reason for removal.
379 ///
380 /// This is only used for conflux circuits that get removed
381 /// before the conflux handshake is complete.
382 ///
383 /// The [`RemoveLegReason`] is mapped by the reactor to a
384 /// [`ConfluxHandshakeError`] that is sent to the initiator of the
385 /// handshake to indicate the reason the handshake failed.
386 reason: RemoveLegReason,
387 },
388}
389
390/// The reason for removing a circuit leg from the conflux set.
391#[derive(Debug)]
392enum RemoveLegReason {
393 /// The conflux handshake timed out.
394 ///
395 /// On the client-side, this means we didn't receive
396 /// the CONFLUX_LINKED response in time.
397 ConfluxHandshakeTimeout,
398 /// An error occurred during conflux handshake.
399 ConfluxHandshakeErr(Error),
400 /// The channel was closed.
401 ChannelClosed,
402}
403
404/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
405/// progress.
406///
407/// # Background
408///
409/// The `Reactor` can't have async functions that send and receive cells, because its job is to
410/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
411///
412/// To get around this problem, the reactor can send some cells, and then make one of these
413/// `MetaCellHandler` objects, which will be run when the reply arrives.
414pub(crate) trait MetaCellHandler: Send {
415 /// The hop we're expecting the message to come from. This is compared against the hop
416 /// from which we actually receive messages, and an error is thrown if the two don't match.
417 fn expected_hop(&self) -> HopNum;
418 /// Called when the message we were waiting for arrives.
419 ///
420 /// Gets a copy of the `Reactor` in order to do anything it likes there.
421 ///
422 /// If this function returns an error, the reactor will shut down.
423 fn handle_msg(
424 &mut self,
425 msg: UnparsedRelayMsg,
426 reactor: &mut Circuit,
427 ) -> Result<MetaCellDisposition>;
428}
429
430/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
431#[derive(Debug, Clone)]
432#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
433#[non_exhaustive]
434pub(crate) enum MetaCellDisposition {
435 /// The message was consumed; the handler should remain installed.
436 #[cfg(feature = "send-control-msg")]
437 Consumed,
438 /// The message was consumed; the handler should be uninstalled.
439 ConversationFinished,
440 /// The message was consumed; the circuit should be closed.
441 #[cfg(feature = "send-control-msg")]
442 CloseCirc,
443 // TODO: Eventually we might want the ability to have multiple handlers
444 // installed, and to let them say "not for me, maybe for somebody else?".
445 // But right now we don't need that.
446}
447
448/// A unique identifier for a circuit leg.
449///
450/// After the circuit is torn down, its `LegId` becomes invalid.
451/// The same `LegId` won't be reused for a future circuit.
452//
453// TODO(#1857): make this pub
454#[allow(unused)]
455#[derive(Copy, Clone, Debug, Eq, PartialEq, Deftly)]
456#[derive_deftly(HasMemoryCost)]
457pub(crate) struct LegId(pub(crate) LegIdKey);
458
459// TODO(conflux): can we use `UniqId` as the key instead of this newtype?
460//
461// See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2996#note_3199069
462slotmap_careful::new_key_type! {
463 /// A key type for the circuit leg slotmap
464 ///
465 /// See [`LegId`].
466 pub(crate) struct LegIdKey;
467}
468
469impl From<LegIdKey> for LegId {
470 fn from(leg_id: LegIdKey) -> Self {
471 LegId(leg_id)
472 }
473}
474
475memory_cost_structural_copy!(LegIdKey);
476
477/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
478///
479/// This is a macro instead of a function to work around borrowck errors
480/// in the select! from run_once().
481macro_rules! unwrap_or_shutdown {
482 ($self:expr, $res:expr, $reason:expr) => {{
483 match $res {
484 None => {
485 trace!("{}: reactor shutdown due to {}", $self.unique_id, $reason);
486 Err(ReactorError::Shutdown)
487 }
488 Some(v) => Ok(v),
489 }
490 }};
491}
492
493/// Object to handle incoming cells and background tasks on a circuit
494///
495/// This type is returned when you finish a circuit; you need to spawn a
496/// new task that calls `run()` on it.
497#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
498pub struct Reactor {
499 /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
500 ///
501 /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
502 /// is ready to accept cells.
503 control: mpsc::UnboundedReceiver<CtrlMsg>,
504 /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
505 ///
506 /// This channel is polled in [`Reactor::run_once`].
507 ///
508 /// NOTE: this is a separate channel from `control`, because some messages
509 /// have higher priority and need to be handled even if the `chan_sender` is not
510 /// ready (whereas `control` messages are not read until the `chan_sender` sink
511 /// is ready to accept cells).
512 command: mpsc::UnboundedReceiver<CtrlCmd>,
513 /// A oneshot sender that is used to alert other tasks when this reactor is
514 /// finally dropped.
515 ///
516 /// It is a sender for Void because we never actually want to send anything here;
517 /// we only want to generate canceled events.
518 #[allow(dead_code)] // the only purpose of this field is to be dropped.
519 reactor_closed_tx: oneshot::Sender<void::Void>,
520 /// A set of circuits that form a tunnel.
521 ///
522 /// Contains 1 or more circuits.
523 ///
524 /// Circuits may be added to this set throughout the lifetime of the reactor.
525 ///
526 /// Sometimes, the reactor will remove circuits from this set,
527 /// for example if the `LINKED` message takes too long to arrive,
528 /// or if congestion control negotiation fails.
529 /// The reactor will continue running with the remaining circuits.
530 /// It will shut down if *all* the circuits are removed.
531 ///
532 // TODO(conflux): document all the reasons why the reactor might
533 // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
534 circuits: ConfluxSet,
535 /// An identifier for logging about this reactor's circuit.
536 unique_id: UniqId,
537 /// Handlers, shared with `Circuit`.
538 cell_handlers: CellHandlers,
539 /// The time provider, used for conflux handshake timeouts.
540 runtime: DynTimeProvider,
541 /// The conflux handshake context, if there is an on-going handshake.
542 ///
543 /// Set to `None` if this is a single-path tunnel,
544 /// or if none of the circuit legs from our conflux set
545 /// are currently in the conflux handshake phase.
546 #[cfg(feature = "conflux")]
547 conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
548 /// A min-heap buffering all the out-of-order messages received so far.
549 ///
550 /// TODO(conflux): this becomes a DoS vector unless we impose a limit
551 /// on its size. We should make this participate in the memquota memory
552 /// tracking system, somehow.
553 #[cfg(feature = "conflux")]
554 ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
555}
556
557/// The context for an on-going conflux handshake.
558#[cfg(feature = "conflux")]
559struct ConfluxHandshakeCtx {
560 /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
561 answer: ConfluxLinkResultChannel,
562 /// The number of legs that are currently doing the handshake.
563 num_legs: usize,
564 /// The handshake results we have collected so far.
565 results: ConfluxHandshakeResult,
566}
567
568/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
569#[derive(Debug)]
570#[cfg(feature = "conflux")]
571struct ConfluxHeapEntry {
572 /// The leg id this message came from.
573 leg_id: LegId,
574 /// The out of order message
575 msg: OooRelayMsg,
576}
577
578#[cfg(feature = "conflux")]
579impl Ord for ConfluxHeapEntry {
580 fn cmp(&self, other: &Self) -> Ordering {
581 self.msg.cmp(&other.msg).reverse()
582 }
583}
584
585#[cfg(feature = "conflux")]
586impl PartialOrd for ConfluxHeapEntry {
587 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
588 Some(self.cmp(other))
589 }
590}
591
592#[cfg(feature = "conflux")]
593impl PartialEq for ConfluxHeapEntry {
594 fn eq(&self, other: &Self) -> bool {
595 self.msg == other.msg
596 }
597}
598
599#[cfg(feature = "conflux")]
600impl Eq for ConfluxHeapEntry {}
601
602/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
603struct CellHandlers {
604 /// A handler for a meta cell, together with a result channel to notify on completion.
605 meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
606 /// A handler for incoming stream requests.
607 #[cfg(feature = "hs-service")]
608 incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
609}
610
611/// Information about an incoming stream request.
612#[cfg(feature = "hs-service")]
613#[derive(Debug, Deftly)]
614#[derive_deftly(HasMemoryCost)]
615pub(crate) struct StreamReqInfo {
616 /// The [`IncomingStreamRequest`].
617 pub(crate) req: IncomingStreamRequest,
618 /// The ID of the stream being requested.
619 pub(crate) stream_id: StreamId,
620 /// The [`HopNum`].
621 //
622 // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
623 // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
624 //
625 // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
626 // incoming stream request from two separate hops. (There is only one that's valid.)
627 pub(crate) hop_num: HopNum,
628 /// The [`LegId`] of the circuit the request came on.
629 pub(crate) leg: LegId,
630 /// The format which must be used with this stream to encode messages.
631 #[deftly(has_memory_cost(indirect_size = "0"))]
632 pub(crate) relay_cell_format: RelayCellFormat,
633 /// A channel for receiving messages from this stream.
634 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
635 pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
636 /// A channel for sending messages to be sent on this stream.
637 #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
638 pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
639 /// The memory quota account to be used for this stream
640 #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
641 pub(crate) memquota: StreamAccount,
642}
643
644/// Data required for handling an incoming stream request.
645#[cfg(feature = "hs-service")]
646#[derive(educe::Educe)]
647#[educe(Debug)]
648struct IncomingStreamRequestHandler {
649 /// A sender for sharing information about an incoming stream request.
650 incoming_sender: StreamReqSender,
651 /// A [`AnyCmdChecker`] for validating incoming stream requests.
652 cmd_checker: AnyCmdChecker,
653 /// The hop to expect incoming stream requests from.
654 hop_num: HopNum,
655 /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
656 /// this request, or wants to reject it immediately.
657 #[educe(Debug(ignore))]
658 filter: Box<dyn IncomingStreamRequestFilter>,
659}
660
661impl Reactor {
662 /// Create a new circuit reactor.
663 ///
664 /// The reactor will send outbound messages on `channel`, receive incoming
665 /// messages on `input`, and identify this circuit by the channel-local
666 /// [`CircId`] provided.
667 ///
668 /// The internal unique identifier for this circuit will be `unique_id`.
669 #[allow(clippy::type_complexity)] // TODO
670 pub(super) fn new(
671 channel: Arc<Channel>,
672 channel_id: CircId,
673 unique_id: UniqId,
674 input: CircuitRxReceiver,
675 runtime: DynTimeProvider,
676 memquota: CircuitAccount,
677 ) -> (
678 Self,
679 mpsc::UnboundedSender<CtrlMsg>,
680 mpsc::UnboundedSender<CtrlCmd>,
681 oneshot::Receiver<void::Void>,
682 Arc<TunnelMutableState>,
683 ) {
684 let (control_tx, control_rx) = mpsc::unbounded();
685 let (command_tx, command_rx) = mpsc::unbounded();
686 let mutable = Arc::new(MutableState::default());
687
688 let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
689
690 let cell_handlers = CellHandlers {
691 meta_handler: None,
692 #[cfg(feature = "hs-service")]
693 incoming_stream_req_handler: None,
694 };
695
696 let circuit_leg = Circuit::new(
697 channel,
698 channel_id,
699 unique_id,
700 input,
701 memquota,
702 Arc::clone(&mutable),
703 );
704
705 let (circuits, mutable) = ConfluxSet::new(circuit_leg);
706
707 let reactor = Reactor {
708 circuits,
709 control: control_rx,
710 command: command_rx,
711 reactor_closed_tx,
712 unique_id,
713 cell_handlers,
714 runtime,
715 #[cfg(feature = "conflux")]
716 conflux_hs_ctx: None,
717 #[cfg(feature = "conflux")]
718 ooo_msgs: Default::default(),
719 };
720
721 (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
722 }
723
724 /// Launch the reactor, and run until the circuit closes or we
725 /// encounter an error.
726 ///
727 /// Once this method returns, the circuit is dead and cannot be
728 /// used again.
729 pub async fn run(mut self) -> Result<()> {
730 trace!("{}: Running circuit reactor", self.unique_id);
731 let result: Result<()> = loop {
732 match self.run_once().await {
733 Ok(()) => (),
734 Err(ReactorError::Shutdown) => break Ok(()),
735 Err(ReactorError::Err(e)) => break Err(e),
736 }
737 };
738 trace!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
739 result
740 }
741
742 /// Helper for run: doesn't mark the circuit closed on finish. Only
743 /// processes one cell or control message.
744 async fn run_once(&mut self) -> StdResult<(), ReactorError> {
745 // If all the circuits are closed, shut down the reactor
746 //
747 // TODO(conflux): we might need to rethink this behavior
748 if self.circuits.is_empty() {
749 trace!(
750 "{}: Circuit reactor shutting down: all circuits have closed",
751 self.unique_id
752 );
753
754 return Err(ReactorError::Shutdown);
755 }
756
757 // If this is a single path circuit, we need to wait until the first hop
758 // is created before doing anything else
759 let single_path_with_hops = self
760 .circuits
761 .single_leg_mut()
762 .is_ok_and(|(_id, leg)| !leg.has_hops());
763 if single_path_with_hops {
764 self.wait_for_create().await?;
765
766 return Ok(());
767 }
768
769 // Prioritize the buffered messages.
770 //
771 // Note: if any of the messages are ready to be handled,
772 // this will block the reactor until we are done processing them
773 #[cfg(feature = "conflux")]
774 self.try_dequeue_ooo_msgs().await?;
775
776 let action = select_biased! {
777 res = self.command.next() => {
778 let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
779 return ControlHandler::new(self).handle_cmd(cmd);
780 },
781 // Check whether we've got a control message pending.
782 //
783 // Note: unfortunately, reading from control here means we might start
784 // handling control messages before our chan_senders are ready.
785 // With the current design, this is inevitable: we can't know which circuit leg
786 // a control message is meant for without first reading the control message from
787 // the channel, and at that point, we can't know for sure whether that particular
788 // circuit is ready for sending.
789 ret = self.control.next() => {
790 let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
791 CircuitAction::HandleControl(msg)
792 },
793 res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
794 };
795
796 let cmd = match action {
797 CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
798 RunOnceCmdInner::from_circuit_cmd(leg, cmd),
799 )),
800 CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
801 .handle_msg(ctrl)?
802 .map(RunOnceCmd::Single),
803 CircuitAction::HandleCell { leg, cell } => {
804 let circ = self
805 .circuits
806 .leg_mut(LegId(leg))
807 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
808
809 let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg.into(), cell)?;
810 if circ_cmds.is_empty() {
811 None
812 } else {
813 // TODO(conflux): we return RunOnceCmd::Multiple even if there's a single command.
814 //
815 // See the TODO(conflux) on `Circuit::handle_cell`.
816 let cmd = RunOnceCmd::Multiple(
817 circ_cmds
818 .into_iter()
819 .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
820 .collect(),
821 );
822
823 Some(cmd)
824 }
825 }
826 CircuitAction::RemoveLeg { leg, reason } => Some(
827 RunOnceCmdInner::RemoveLeg {
828 leg: LegId(leg),
829 reason,
830 }
831 .into(),
832 ),
833 };
834
835 if let Some(cmd) = cmd {
836 self.handle_run_once_cmd(cmd).await?;
837 }
838
839 // Check if it's time to switch our primary leg.
840 //
841 // TODO(conflux): this only be done just before sending a cell.
842 //
843 // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2946#note_3192013
844 #[cfg(feature = "conflux")]
845 if let Some(switch_cell) = self.circuits.maybe_update_primary_leg()? {
846 self.circuits
847 .primary_leg_mut()?
848 .send_relay_cell(switch_cell)
849 .await?;
850 }
851
852 Ok(())
853 }
854
855 /// Try to process the previously-out-of-order messages we might have buffered.
856 #[cfg(feature = "conflux")]
857 async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
858 // Check if we're ready to dequeue any of the previously out-of-order cells.
859 while let Some(entry) = self.ooo_msgs.peek() {
860 let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
861
862 if !should_pop {
863 break;
864 }
865
866 let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
867
868 let circ = self
869 .circuits
870 .leg_mut(entry.leg_id)
871 .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
872 let handlers = &mut self.cell_handlers;
873 let cmd = circ
874 .handle_in_order_relay_msg(
875 handlers,
876 entry.msg.hopnum,
877 entry.leg_id,
878 entry.msg.cell_counts_towards_windows,
879 entry.msg.streamid,
880 entry.msg.msg,
881 )?
882 .map(|cmd| {
883 RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id.0, cmd))
884 });
885
886 if let Some(cmd) = cmd {
887 self.handle_run_once_cmd(cmd).await?;
888 }
889 }
890
891 Ok(())
892 }
893
894 /// Handle a [`RunOnceCmd`].
895 async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
896 match cmd {
897 RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
898 RunOnceCmd::Multiple(cmds) => {
899 // While we know `sendable` is ready to accept *one* cell,
900 // we can't be certain it will be able to accept *all* of the cells
901 // that need to be sent here. This means we *may* end up buffering
902 // in its underlying SometimesUnboundedSink! That is OK, because
903 // RunOnceCmd::Multiple is only used for handling packed cells.
904 for cmd in cmds {
905 self.handle_single_run_once_cmd(cmd).await?;
906 }
907 }
908 }
909
910 Ok(())
911 }
912
913 /// Handle a [`RunOnceCmd`].
914 async fn handle_single_run_once_cmd(
915 &mut self,
916 cmd: RunOnceCmdInner,
917 ) -> StdResult<(), ReactorError> {
918 match cmd {
919 RunOnceCmdInner::Send { leg, cell, done } => {
920 // TODO: check the cc window
921 let leg = self
922 .circuits
923 .leg_mut(leg)
924 .ok_or_else(|| internal!("leg disappeared?!"))?;
925 let res = leg.send_relay_cell(cell).await;
926 if let Some(done) = done {
927 // Don't care if the receiver goes away
928 let _ = done.send(res.clone());
929 }
930 res?;
931 }
932 #[cfg(feature = "send-control-msg")]
933 RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
934 let cell: Result<Option<SendRelayCell>> =
935 self.prepare_msg_and_install_handler(msg, handler);
936
937 match cell {
938 Ok(Some(cell)) => {
939 // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
940 let outcome = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
941 // don't care if receiver goes away.
942 let _ = done.send(outcome.clone());
943 outcome?;
944 }
945 Ok(None) => {
946 // don't care if receiver goes away.
947 let _ = done.send(Ok(()));
948 }
949 Err(e) => {
950 // don't care if receiver goes away.
951 let _ = done.send(Err(e.clone()));
952 return Err(e.into());
953 }
954 }
955 }
956 RunOnceCmdInner::BeginStream {
957 leg,
958 cell,
959 hop,
960 done,
961 } => {
962 match cell {
963 Ok((cell, stream_id)) => {
964 let leg = self
965 .circuits
966 .leg_mut(leg)
967 .ok_or_else(|| internal!("leg disappeared?!"))?;
968 let cell_hop = cell.hop;
969 let relay_format = leg
970 .hop_mut(cell_hop)
971 // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
972 .ok_or(Error::NoSuchHop)?
973 .relay_cell_format();
974 let outcome = leg.send_relay_cell(cell).await;
975 // don't care if receiver goes away.
976 let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
977 outcome?;
978 }
979 Err(e) => {
980 // don't care if receiver goes away.
981 let _ = done.send(Err(e.clone()));
982 return Err(e.into());
983 }
984 }
985 }
986 RunOnceCmdInner::CloseStream {
987 hop,
988 sid,
989 behav,
990 reason,
991 done,
992 } => {
993 let result = (move || {
994 // this is needed to force the closure to be FnOnce rather than FnMut :(
995 let self_ = self;
996 let (leg_id, hop_num) = self_
997 .resolve_hop_location(hop)
998 .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
999 let leg = self_
1000 .circuits
1001 .leg_mut(leg_id)
1002 .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
1003 Ok::<_, Bug>((leg, hop_num))
1004 })();
1005
1006 let (leg, hop_num) = match result {
1007 Ok(x) => x,
1008 Err(e) => {
1009 if let Some(done) = done {
1010 // don't care if the sender goes away
1011 let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
1012 let _ = done.send(Err(e.into()));
1013 }
1014 return Ok(());
1015 }
1016 };
1017
1018 let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
1019
1020 if let Some(done) = done {
1021 // don't care if the sender goes away
1022 let _ = done.send(res);
1023 }
1024 }
1025 RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
1026 let leg = self
1027 .circuits
1028 .leg_mut(leg)
1029 .ok_or_else(|| internal!("leg disappeared?!"))?;
1030 // NOTE: it's okay to await. We are only awaiting on the congestion_signals
1031 // future which *should* resolve immediately
1032 let signals = leg.congestion_signals().await;
1033 leg.handle_sendme(hop, sendme, signals)?;
1034 }
1035 RunOnceCmdInner::FirstHopClockSkew { answer } => {
1036 let res = self
1037 .circuits
1038 .single_leg_mut()
1039 .map(|(_id, leg)| leg.clock_skew());
1040
1041 // don't care if the sender goes away
1042 let _ = answer.send(res.map_err(Into::into));
1043 }
1044 RunOnceCmdInner::CleanShutdown => {
1045 trace!("{}: reactor shutdown due to handled cell", self.unique_id);
1046 return Err(ReactorError::Shutdown);
1047 }
1048 RunOnceCmdInner::RemoveLeg { leg, reason } => {
1049 let circ = self.circuits.remove(leg.0)?;
1050 let is_conflux_pending = circ.is_conflux_pending();
1051
1052 // Drop the removed leg. This will cause it to close if it's not already closed.
1053 drop(circ);
1054
1055 // If we reach this point, it means we have more than one leg
1056 // (otherwise the .remove() would've returned a Shutdown error),
1057 // so we expect there to be a ConfluxHandshakeContext installed.
1058
1059 #[cfg(feature = "conflux")]
1060 if is_conflux_pending {
1061 let error = match reason {
1062 RemoveLegReason::ConfluxHandshakeTimeout => ConfluxHandshakeError::Timeout,
1063 RemoveLegReason::ConfluxHandshakeErr(e) => ConfluxHandshakeError::Link(e),
1064 RemoveLegReason::ChannelClosed => ConfluxHandshakeError::ChannelClosed,
1065 };
1066
1067 self.note_conflux_handshake_result(Err(error))?;
1068 }
1069 }
1070 #[cfg(feature = "conflux")]
1071 RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
1072 // Note: on the client-side, the handshake is considered complete once the
1073 // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
1074 //
1075 // We're optimistic here, and declare the handshake a success *before*
1076 // sending the LINKED_ACK response. I think this is OK though,
1077 // because if the send_relay_cell() below fails, the reactor will shut
1078 // down anyway. OTOH, marking the handshake as complete slightly early
1079 // means that on the happy path, the circuit is marked as usable sooner,
1080 // instead of blocking on the sending of the LINKED_ACK.
1081 self.note_conflux_handshake_result(Ok(()))?;
1082
1083 let leg = self
1084 .circuits
1085 .leg_mut(leg)
1086 .ok_or_else(|| internal!("leg disappeared?!"))?;
1087 let res = leg.send_relay_cell(cell).await;
1088
1089 res?;
1090 }
1091 #[cfg(feature = "conflux")]
1092 RunOnceCmdInner::Link { circuits, answer } => {
1093 // Add the specified circuits to our conflux set,
1094 // and send a LINK cell down each unlinked leg.
1095 //
1096 // NOTE: this will block the reactor until all the cells are sent.
1097 self.handle_link_circuits(circuits, answer).await?;
1098 }
1099 #[cfg(feature = "conflux")]
1100 RunOnceCmdInner::Enqueue { leg, msg } => {
1101 let entry = ConfluxHeapEntry { leg_id: leg, msg };
1102 self.ooo_msgs.push(entry);
1103 }
1104 }
1105
1106 Ok(())
1107 }
1108
1109 /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
1110 ///
1111 /// Returns an error if an unexpected `CtrlMsg` is received.
1112 async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
1113 let msg = select_biased! {
1114 res = self.command.next() => {
1115 let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
1116 match cmd {
1117 CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
1118 #[cfg(test)]
1119 CtrlCmd::AddFakeHop {
1120 relay_cell_format: format,
1121 fwd_lasthop,
1122 rev_lasthop,
1123 params,
1124 done,
1125 } => {
1126 let (_id, leg) = self.circuits.single_leg_mut()?;
1127 leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, ¶ms, done);
1128 return Ok(())
1129 },
1130 _ => {
1131 trace!("reactor shutdown due to unexpected command: {:?}", cmd);
1132 return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
1133 }
1134 }
1135 },
1136 res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
1137 };
1138
1139 match msg {
1140 CtrlMsg::Create {
1141 recv_created,
1142 handshake,
1143 mut params,
1144 done,
1145 } => {
1146 // TODO(conflux): instead of crashing the reactor, it might be better
1147 // to send the error via the done channel instead
1148 let (_id, leg) = self.circuits.single_leg_mut()?;
1149 leg.handle_create(recv_created, handshake, &mut params, done)
1150 .await
1151 }
1152 _ => {
1153 trace!("reactor shutdown due to unexpected cell: {:?}", msg);
1154
1155 Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
1156 }
1157 }
1158 }
1159
1160 /// Add the specified handshake result to our `ConfluxHandshakeContext`.
1161 ///
1162 /// If all the circuits we were waiting on have finished the conflux handshake,
1163 /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
1164 /// are sent to the handshake initiator.
1165 #[cfg(feature = "conflux")]
1166 fn note_conflux_handshake_result(
1167 &mut self,
1168 res: StdResult<(), ConfluxHandshakeError>,
1169 ) -> StdResult<(), ReactorError> {
1170 let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
1171 Some(conflux_ctx) => {
1172 conflux_ctx.results.push(res);
1173 // Whether all the legs have finished linking:
1174 conflux_ctx.results.len() == conflux_ctx.num_legs
1175 }
1176 None => {
1177 return Err(internal!("no conflux handshake context").into());
1178 }
1179 };
1180
1181 if tunnel_complete {
1182 // Time to remove the conflux handshake context
1183 // and extract the results we have collected
1184 let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
1185
1186 send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
1187
1188 // We don't expect to receive any more handshake results,
1189 // at least not until we get another LinkCircuits control message,
1190 // which will install a new ConfluxHandshakeCtx with a channel
1191 // for us to send updates on
1192 }
1193
1194 Ok(())
1195 }
1196
1197 /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
1198 fn prepare_msg_and_install_handler(
1199 &mut self,
1200 msg: Option<AnyRelayMsgOuter>,
1201 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1202 ) -> Result<Option<SendRelayCell>> {
1203 let msg = msg
1204 .map(|msg| {
1205 let handlers = &mut self.cell_handlers;
1206 let handler = handler
1207 .as_ref()
1208 .or(handlers.meta_handler.as_ref())
1209 .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
1210 Ok::<_, crate::Error>(SendRelayCell {
1211 hop: handler.expected_hop(),
1212 early: false,
1213 cell: msg,
1214 })
1215 })
1216 .transpose()?;
1217
1218 if let Some(handler) = handler {
1219 self.cell_handlers.set_meta_handler(handler)?;
1220 }
1221
1222 Ok(msg)
1223 }
1224
1225 /// Handle a shutdown request.
1226 fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
1227 trace!(
1228 "{}: reactor shutdown due to explicit request",
1229 self.unique_id
1230 );
1231
1232 Err(ReactorError::Shutdown)
1233 }
1234
1235 /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
1236 ///
1237 /// Returns an error over the `answer` channel if the reactor has no circuits,
1238 /// or more than one circuit. The reactor will shut down regardless.
1239 #[cfg(feature = "conflux")]
1240 fn handle_shutdown_and_return_circuit(
1241 &mut self,
1242 answer: oneshot::Sender<StdResult<Circuit, Bug>>,
1243 ) -> StdResult<(), ReactorError> {
1244 // Don't care if the receiver goes away
1245 let _ = answer.send(self.circuits.take_single_leg().map_err(Into::into));
1246 self.handle_shutdown().map(|_| ())
1247 }
1248
1249 /// Resolves a [`TargetHop`] to a [`HopLocation`].
1250 ///
1251 /// After resolving a `TargetHop::LastHop`,
1252 /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
1253 /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
1254 ///
1255 /// It's generally okay to hold on to a (possibly stale) `HopLocation`
1256 /// if you need a fixed hop position in the tunnel.
1257 /// For example if we open a stream to `TargetHop::LastHop`,
1258 /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
1259 /// as we don't want the stream position to change as the tunnel is extended or truncated.
1260 ///
1261 /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
1262 /// and the tunnel has no hops
1263 /// (either has no legs, or has legs which contain no hops).
1264 fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
1265 match hop {
1266 TargetHop::Hop(hop) => Ok(hop),
1267 TargetHop::LastHop => {
1268 if let Ok((leg_id, leg)) = self.circuits.single_leg() {
1269 // single-path tunnel
1270 let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
1271 Ok(HopLocation::Hop((leg_id, hop)))
1272 } else if !self.circuits.is_empty() {
1273 // multi-path tunnel
1274 return Ok(HopLocation::JoinPoint);
1275 } else {
1276 // no legs
1277 Err(NoHopsBuiltError)
1278 }
1279 }
1280 }
1281 }
1282
1283 /// Resolves a [`HopLocation`] to a [`LegId`] and [`HopNum`].
1284 ///
1285 /// After resolving a `HopLocation::JoinPoint`,
1286 /// the [`LegId`] and [`HopNum`] can become stale if the primary leg changes.
1287 ///
1288 /// You should try to only resolve to a specific [`LegId`] and [`HopNum`] immediately before you
1289 /// need them,
1290 /// and you should not hold on to the resolved [`LegId`] and [`HopNum`] between reactor
1291 /// iterations as the primary leg may change from one iteration to the next.
1292 ///
1293 /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
1294 /// but it does not have a join point.
1295 fn resolve_hop_location(
1296 &self,
1297 hop: HopLocation,
1298 ) -> StdResult<(LegId, HopNum), NoJoinPointError> {
1299 match hop {
1300 HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
1301 HopLocation::JoinPoint => {
1302 if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
1303 Ok((leg_id, hop_num))
1304 } else {
1305 // Attempted to get the join point of a non-multipath tunnel.
1306 Err(NoJoinPointError)
1307 }
1308 }
1309 }
1310 }
1311
1312 /// Does congestion control use stream SENDMEs for the given hop?
1313 ///
1314 /// Returns `None` if either the `leg` or `hop` don't exist.
1315 fn uses_stream_sendme(&self, leg: LegId, hop: HopNum) -> Option<bool> {
1316 self.circuits.uses_stream_sendme(leg, hop)
1317 }
1318
1319 /// Handle a request to link some extra circuits in the reactor's conflux set.
1320 ///
1321 /// The circuits are validated, and if they do not have the same length,
1322 /// or if they do not all have the same last hop, an error is returned on
1323 /// the `answer` channel, and the conflux handshake is *not* initiated.
1324 ///
1325 /// If validation succeeds, the circuits are added to this reactor's conflux set,
1326 /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
1327 ///
1328 /// NOTE: this blocks the reactor main loop until all the cells are sent.
1329 #[cfg(feature = "conflux")]
1330 async fn handle_link_circuits(
1331 &mut self,
1332 circuits: Vec<Circuit>,
1333 answer: ConfluxLinkResultChannel,
1334 ) -> StdResult<(), ReactorError> {
1335 if self.conflux_hs_ctx.is_some() {
1336 let err = internal!("conflux linking already in progress");
1337 send_conflux_outcome(answer, Err(err.into()))?;
1338
1339 return Ok(());
1340 }
1341
1342 let num_legs = circuits.len();
1343
1344 // Note: add_legs validates `circuits`
1345 let res = async {
1346 self.circuits.add_legs(circuits, &self.runtime)?;
1347
1348 // TODO(conflux): check if we negotiated prop324 cc on *all* circuits,
1349 // returning an error if not?
1350
1351 self.circuits.link_circuits(&self.runtime).await
1352 }
1353 .await;
1354
1355 if let Err(e) = res {
1356 send_conflux_outcome(answer, Err(e))?;
1357 } else {
1358 // Save the channel, to notify the user of completion.
1359 self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
1360 answer,
1361 num_legs,
1362 results: Default::default(),
1363 });
1364 }
1365
1366 Ok(())
1367 }
1368}
1369
1370/// Notify the conflux handshake initiator of the handshake outcome.
1371///
1372/// Returns an error if the initiator has done away.
1373#[cfg(feature = "conflux")]
1374fn send_conflux_outcome(
1375 tx: ConfluxLinkResultChannel,
1376 res: Result<ConfluxHandshakeResult>,
1377) -> StdResult<(), ReactorError> {
1378 if tx.send(res).is_err() {
1379 tracing::warn!("conflux initiator went away before handshake completed?");
1380 return Err(ReactorError::Shutdown);
1381 }
1382
1383 Ok(())
1384}
1385
1386/// The tunnel does not have any hops.
1387#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1388#[non_exhaustive]
1389#[error("no hops have been built for this tunnel")]
1390pub(crate) struct NoHopsBuiltError;
1391
1392/// The tunnel does not have a join point.
1393#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
1394#[non_exhaustive]
1395#[error("the tunnel does not have a join point")]
1396pub(crate) struct NoJoinPointError;
1397
1398impl CellHandlers {
1399 /// Try to install a given meta-cell handler to receive any unusual cells on
1400 /// this circuit, along with a result channel to notify on completion.
1401 fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
1402 if self.meta_handler.is_none() {
1403 self.meta_handler = Some(handler);
1404 Ok(())
1405 } else {
1406 Err(Error::from(internal!(
1407 "Tried to install a meta-cell handler before the old one was gone."
1408 )))
1409 }
1410 }
1411
1412 /// Try to install a given cell handler on this circuit.
1413 #[cfg(feature = "hs-service")]
1414 fn set_incoming_stream_req_handler(
1415 &mut self,
1416 handler: IncomingStreamRequestHandler,
1417 ) -> Result<()> {
1418 if self.incoming_stream_req_handler.is_none() {
1419 self.incoming_stream_req_handler = Some(handler);
1420 Ok(())
1421 } else {
1422 Err(Error::from(internal!(
1423 "Tried to install a BEGIN cell handler before the old one was gone."
1424 )))
1425 }
1426 }
1427}
1428
1429#[cfg(test)]
1430mod test {
1431 // Tested in [`crate::tunnel::circuit::test`].
1432}