1
//! Code to handle incoming cells on a circuit.
2
//!
3
//! ## On message validation
4
//!
5
//! There are three steps for validating an incoming message on a stream:
6
//!
7
//! 1. Is the message contextually appropriate? (e.g., no more than one
8
//!    `CONNECTED` message per stream.) This is handled by calling
9
//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10
//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11
//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12
//!    this; for half-closed streams, the reactor handles it using the
13
//!    `halfstream` module.
14
//! 3. Does the message have an acceptable command type, and is the message
15
//!    well-formed? For open streams, the streams themselves handle this check.
16
//!    For half-closed streams, the reactor handles it by calling
17
//!    `consume_checked_msg()`.
18

            
19
pub(super) mod circuit;
20
mod conflux;
21
mod control;
22
pub(super) mod syncview;
23

            
24
use crate::crypto::cell::HopNum;
25
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26
use crate::memquota::{CircuitAccount, StreamAccount};
27
use crate::stream::AnyCmdChecker;
28
#[cfg(feature = "hs-service")]
29
use crate::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
30
use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
31
use crate::tunnel::circuit::unique_id::UniqId;
32
use crate::tunnel::circuit::CircuitRxReceiver;
33
use crate::tunnel::{streammap, HopLocation, TargetHop};
34
use crate::util::err::ReactorError;
35
use crate::util::skew::ClockSkew;
36
use crate::{Error, Result};
37
use circuit::{Circuit, CircuitCmd};
38
use conflux::ConfluxSet;
39
use control::ControlHandler;
40
use std::cmp::Ordering;
41
use std::collections::BinaryHeap;
42
use std::mem::size_of;
43
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
44
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
45
use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
46
use tor_rtcompat::DynTimeProvider;
47

            
48
use futures::channel::mpsc;
49
use futures::StreamExt;
50
use futures::{select_biased, FutureExt as _};
51
use oneshot_fused_workaround as oneshot;
52

            
53
use std::result::Result as StdResult;
54
use std::sync::Arc;
55

            
56
use crate::channel::Channel;
57
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
58
use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
59
use derive_deftly::Deftly;
60
use derive_more::From;
61
use tor_cell::chancell::CircId;
62
use tor_llcrypto::pk;
63
use tor_memquota::mq_queue::{self, MpscSpec};
64
use tor_memquota::{derive_deftly_template_HasMemoryCost, memory_cost_structural_copy};
65
use tracing::trace;
66

            
67
use super::circuit::{MutableState, TunnelMutableState};
68

            
69
#[cfg(feature = "conflux")]
70
use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
71

            
72
pub(super) use control::CtrlCmd;
73
pub(super) use control::CtrlMsg;
74

            
75
/// The type of a oneshot channel used to inform reactor of the result of an operation.
76
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
77

            
78
/// Contains a list of conflux handshake results.
79
#[cfg(feature = "conflux")]
80
pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
81

            
82
/// The type of oneshot channel used to inform reactor users of the outcome
83
/// of a client-side conflux handshake.
84
///
85
/// Contains a list of handshake results, one for each circuit that we were asked
86
/// to link in the tunnel.
87
#[cfg(feature = "conflux")]
88
pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
89

            
90
pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
91

            
92
/// MPSC queue containing stream requests
93
#[cfg(feature = "hs-service")]
94
type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
95

            
96
/// A handshake type, to be used when creating circuit hops.
97
#[derive(Clone, Debug)]
98
pub(crate) enum CircuitHandshake {
99
    /// Use the CREATE_FAST handshake.
100
    CreateFast,
101
    /// Use the ntor handshake.
102
    Ntor {
103
        /// The public key of the relay.
104
        public_key: NtorPublicKey,
105
        /// The Ed25519 identity of the relay, which is verified against the
106
        /// identity held in the circuit's channel.
107
        ed_identity: pk::ed25519::Ed25519Identity,
108
    },
109
    /// Use the ntor-v3 handshake.
110
    NtorV3 {
111
        /// The public key of the relay.
112
        public_key: NtorV3PublicKey,
113
    },
114
}
115

            
116
/// A behavior to perform when closing a stream.
117
///
118
/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
119
/// that we shouldn't let it pass unremarked.
120
#[derive(Clone, Debug)]
121
pub(crate) enum CloseStreamBehavior {
122
    /// Send nothing at all, so that the other side will not realize we have
123
    /// closed the stream.
124
    ///
125
    /// We should only do this for incoming onion service streams when we
126
    /// want to black-hole the client's requests.
127
    SendNothing,
128
    /// Send an End cell, if we haven't already sent one.
129
    SendEnd(End),
130
}
131
impl Default for CloseStreamBehavior {
132
32
    fn default() -> Self {
133
32
        Self::SendEnd(End::new_misc())
134
32
    }
135
}
136

            
137
// TODO(conflux): the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
138
// proliferation is a bit bothersome, but unavoidable with the current design.
139
//
140
// We should consider getting rid of some of these enums (if possible),
141
// and coming up with more intuitive names.
142

            
143
/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
144
#[derive(From, Debug)]
145
enum RunOnceCmd {
146
    /// Run a single `RunOnceCmdInner` command.
147
    Single(RunOnceCmdInner),
148
    /// Run multiple `RunOnceCmdInner` commands.
149
    //
150
    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
151
    // but most of the time we're only going to have *one* RunOnceCmdInner
152
    // to run per run_once() loop. The enum enables us avoid the extra heap
153
    // allocation for the `RunOnceCmd::Single` case.
154
    Multiple(Vec<RunOnceCmdInner>),
155
}
156

            
157
/// Instructions for running something in the reactor loop.
158
///
159
/// Run at the end of [`Reactor::run_once`].
160
//
161
// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
162
// We should consider making each variant a tuple variant and deduplicating the fields.
163
#[derive(educe::Educe)]
164
#[educe(Debug)]
165
enum RunOnceCmdInner {
166
    /// Send a RELAY cell.
167
    Send {
168
        /// The leg the cell should be sent on.
169
        leg: LegId,
170
        /// The cell to send.
171
        cell: SendRelayCell,
172
        /// A channel for sending completion notifications.
173
        done: Option<ReactorResultChannel<()>>,
174
    },
175
    /// Send a given control message on this circuit, and install a control-message handler to
176
    /// receive responses.
177
    #[cfg(feature = "send-control-msg")]
178
    SendMsgAndInstallHandler {
179
        /// The message to send, if any
180
        msg: Option<AnyRelayMsgOuter>,
181
        /// A message handler to install.
182
        ///
183
        /// If this is `None`, there must already be a message handler installed
184
        #[educe(Debug(ignore))]
185
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
186
        /// A sender that we use to tell the caller that the message was sent
187
        /// and the handler installed.
188
        done: oneshot::Sender<Result<()>>,
189
    },
190
    /// Handle a SENDME message.
191
    HandleSendMe {
192
        /// The leg the SENDME was received on.
193
        leg: LegId,
194
        /// The hop number.
195
        hop: HopNum,
196
        /// The SENDME message to handle.
197
        sendme: Sendme,
198
    },
199
    /// Begin a stream with the provided hop in this circuit.
200
    ///
201
    /// Uses the provided stream ID, and sends the provided message to that hop.
202
    BeginStream {
203
        /// The cell to send.
204
        cell: Result<(SendRelayCell, StreamId)>,
205
        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
206
        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
207
        /// caller.
208
        hop: HopLocation,
209
        /// The circuit leg to begin the stream on.
210
        leg: LegId,
211
        /// Oneshot channel to notify on completion, with the allocated stream ID.
212
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
213
    },
214
    /// Close the specified stream.
215
    CloseStream {
216
        /// The hop number.
217
        hop: HopLocation,
218
        /// The ID of the stream to close.
219
        sid: StreamId,
220
        /// The stream-closing behavior.
221
        behav: CloseStreamBehavior,
222
        /// The reason for closing the stream.
223
        reason: streammap::TerminateReason,
224
        /// A channel for sending completion notifications.
225
        done: Option<ReactorResultChannel<()>>,
226
    },
227
    /// Get the clock skew claimed by the first hop of the circuit.
228
    FirstHopClockSkew {
229
        /// Oneshot channel to return the clock skew.
230
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
231
    },
232
    /// Remove a circuit leg from the conflux set.
233
    RemoveLeg {
234
        /// The circuit leg to remove.
235
        leg: LegId,
236
        /// The reason for removal.
237
        ///
238
        /// This is only used for conflux circuits that get removed
239
        /// before the conflux handshake is complete.
240
        ///
241
        /// The [`RemoveLegReason`] is mapped by the reactor to a
242
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
243
        /// handshake to indicate the reason the handshake failed.
244
        reason: RemoveLegReason,
245
    },
246
    /// A circuit has completed the conflux handshake,
247
    /// and wants to send the specified cell.
248
    ///
249
    /// This is similar to [`RunOnceCmdInner::Send`],
250
    /// but needs to remain a separate variant,
251
    /// because in addition to instructing the reactor to send a cell,
252
    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
253
    /// This enables the reactor to save the handshake result (`Ok(())`),
254
    /// and, if there are no other legs still in the handshake phase,
255
    /// send the result to the handshake initiator.
256
    #[cfg(feature = "conflux")]
257
    ConfluxHandshakeComplete {
258
        /// The circuit leg that has completed the handshake,
259
        /// This is the leg the cell should be sent on.
260
        leg: LegId,
261
        /// The cell to send.
262
        cell: SendRelayCell,
263
    },
264
    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
265
    #[cfg(feature = "conflux")]
266
    Link {
267
        /// The circuits to link into the tunnel
268
        #[educe(Debug(ignore))]
269
        circuits: Vec<Circuit>,
270
        /// Oneshot channel for notifying of conflux handshake completion.
271
        answer: ConfluxLinkResultChannel,
272
    },
273
    /// Enqueue an out-of-order cell in ooo_msg.
274
    #[cfg(feature = "conflux")]
275
    Enqueue {
276
        /// The leg the entry originated from.
277
        leg: LegId,
278
        /// The out-of-order message.
279
        msg: OooRelayMsg,
280
    },
281
    /// Perform a clean shutdown on this circuit.
282
    CleanShutdown,
283
}
284

            
285
impl RunOnceCmdInner {
286
    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`LegIdKey`].
287
2792
    fn from_circuit_cmd(leg: LegIdKey, cmd: CircuitCmd) -> Self {
288
2792
        match cmd {
289
2744
            CircuitCmd::Send(cell) => Self::Send {
290
2744
                leg: LegId(leg),
291
2744
                cell,
292
2744
                done: None,
293
2744
            },
294
8
            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe {
295
8
                leg: LegId(leg),
296
8
                hop,
297
8
                sendme,
298
8
            },
299
            CircuitCmd::CloseStream {
300
32
                hop,
301
32
                sid,
302
32
                behav,
303
32
                reason,
304
32
            } => Self::CloseStream {
305
32
                hop: HopLocation::Hop((LegId(leg), hop)),
306
32
                sid,
307
32
                behav,
308
32
                reason,
309
32
                done: None,
310
32
            },
311
            #[cfg(feature = "conflux")]
312
            CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg {
313
                leg: LegId(leg),
314
                reason,
315
            },
316
            #[cfg(feature = "conflux")]
317
            CircuitCmd::ConfluxHandshakeComplete(cell) => Self::ConfluxHandshakeComplete {
318
                leg: LegId(leg),
319
                cell,
320
            },
321
            #[cfg(feature = "conflux")]
322
            CircuitCmd::Enqueue(msg) => Self::Enqueue {
323
                leg: LegId(leg),
324
                msg,
325
            },
326
8
            CircuitCmd::CleanShutdown => Self::CleanShutdown,
327
        }
328
2792
    }
329
}
330

            
331
/// Cmd for sending a relay cell.
332
///
333
/// The contents of this struct are passed to `send_relay_cell`
334
#[derive(educe::Educe)]
335
#[educe(Debug)]
336
pub(crate) struct SendRelayCell {
337
    /// The hop number.
338
    pub(crate) hop: HopNum,
339
    /// Whether to use a RELAY_EARLY cell.
340
    pub(crate) early: bool,
341
    /// The cell to send.
342
    pub(crate) cell: AnyRelayMsgOuter,
343
}
344

            
345
/// A command to execute at the end of [`Reactor::run_once`].
346
#[derive(From, Debug)]
347
#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
348
enum CircuitAction {
349
    /// Run a single `CircuitCmd` command.
350
    RunCmd {
351
        /// The unique identifier of the circuit leg to run the command on
352
        leg: LegIdKey,
353
        /// The command to run.
354
        cmd: CircuitCmd,
355
    },
356
    /// Handle a control message
357
    HandleControl(CtrlMsg),
358
    /// Handle an input message.
359
    HandleCell {
360
        /// The unique identifier of the circuit leg the message was received on.
361
        leg: LegIdKey,
362
        /// The message to handle.
363
        cell: ClientCircChanMsg,
364
    },
365
    /// Remove the specified circuit leg from the conflux set.
366
    ///
367
    /// Returned whenever a single circuit leg needs to be be removed
368
    /// from the reactor's conflux set, without necessarily tearing down
369
    /// the whole set or shutting down the reactor.
370
    ///
371
    /// Note: this action *can* cause the reactor to shut down
372
    /// (and the conflux set to be closed).
373
    ///
374
    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
375
    RemoveLeg {
376
        /// The leg to remove.
377
        leg: LegIdKey,
378
        /// The reason for removal.
379
        ///
380
        /// This is only used for conflux circuits that get removed
381
        /// before the conflux handshake is complete.
382
        ///
383
        /// The [`RemoveLegReason`] is mapped by the reactor to a
384
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
385
        /// handshake to indicate the reason the handshake failed.
386
        reason: RemoveLegReason,
387
    },
388
}
389

            
390
/// The reason for removing a circuit leg from the conflux set.
391
#[derive(Debug)]
392
enum RemoveLegReason {
393
    /// The conflux handshake timed out.
394
    ///
395
    /// On the client-side, this means we didn't receive
396
    /// the CONFLUX_LINKED response in time.
397
    ConfluxHandshakeTimeout,
398
    /// An error occurred during conflux handshake.
399
    ConfluxHandshakeErr(Error),
400
    /// The channel was closed.
401
    ChannelClosed,
402
}
403

            
404
/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
405
/// progress.
406
///
407
/// # Background
408
///
409
/// The `Reactor` can't have async functions that send and receive cells, because its job is to
410
/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
411
///
412
/// To get around this problem, the reactor can send some cells, and then make one of these
413
/// `MetaCellHandler` objects, which will be run when the reply arrives.
414
pub(crate) trait MetaCellHandler: Send {
415
    /// The hop we're expecting the message to come from. This is compared against the hop
416
    /// from which we actually receive messages, and an error is thrown if the two don't match.
417
    fn expected_hop(&self) -> HopNum;
418
    /// Called when the message we were waiting for arrives.
419
    ///
420
    /// Gets a copy of the `Reactor` in order to do anything it likes there.
421
    ///
422
    /// If this function returns an error, the reactor will shut down.
423
    fn handle_msg(
424
        &mut self,
425
        msg: UnparsedRelayMsg,
426
        reactor: &mut Circuit,
427
    ) -> Result<MetaCellDisposition>;
428
}
429

            
430
/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
431
#[derive(Debug, Clone)]
432
#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
433
#[non_exhaustive]
434
pub(crate) enum MetaCellDisposition {
435
    /// The message was consumed; the handler should remain installed.
436
    #[cfg(feature = "send-control-msg")]
437
    Consumed,
438
    /// The message was consumed; the handler should be uninstalled.
439
    ConversationFinished,
440
    /// The message was consumed; the circuit should be closed.
441
    #[cfg(feature = "send-control-msg")]
442
    CloseCirc,
443
    // TODO: Eventually we might want the ability to have multiple handlers
444
    // installed, and to let them say "not for me, maybe for somebody else?".
445
    // But right now we don't need that.
446
}
447

            
448
/// A unique identifier for a circuit leg.
449
///
450
/// After the circuit is torn down, its `LegId` becomes invalid.
451
/// The same `LegId` won't be reused for a future circuit.
452
//
453
// TODO(#1857): make this pub
454
#[allow(unused)]
455
#[derive(Copy, Clone, Debug, Eq, PartialEq, Deftly)]
456
#[derive_deftly(HasMemoryCost)]
457
pub(crate) struct LegId(pub(crate) LegIdKey);
458

            
459
// TODO(#1999): can we use `UniqId` as the key instead of this newtype?
460
//
461
// See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2996#note_3199069
462
slotmap_careful::new_key_type! {
463
    /// A key type for the circuit leg slotmap
464
    ///
465
    /// See [`LegId`].
466
    pub(crate) struct LegIdKey;
467
}
468

            
469
impl From<LegIdKey> for LegId {
470
340
    fn from(leg_id: LegIdKey) -> Self {
471
340
        LegId(leg_id)
472
340
    }
473
}
474

            
475
memory_cost_structural_copy!(LegIdKey);
476

            
477
/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
478
///
479
/// This is a macro instead of a function to work around borrowck errors
480
/// in the select! from run_once().
481
macro_rules! unwrap_or_shutdown {
482
    ($self:expr, $res:expr, $reason:expr) => {{
483
        match $res {
484
            None => {
485
                trace!("{}: reactor shutdown due to {}", $self.unique_id, $reason);
486
                Err(ReactorError::Shutdown)
487
            }
488
            Some(v) => Ok(v),
489
        }
490
    }};
491
}
492

            
493
/// Object to handle incoming cells and background tasks on a circuit
494
///
495
/// This type is returned when you finish a circuit; you need to spawn a
496
/// new task that calls `run()` on it.
497
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
498
pub struct Reactor {
499
    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
500
    ///
501
    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
502
    /// is ready to accept cells.
503
    control: mpsc::UnboundedReceiver<CtrlMsg>,
504
    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
505
    ///
506
    /// This channel is polled in [`Reactor::run_once`].
507
    ///
508
    /// NOTE: this is a separate channel from `control`, because some messages
509
    /// have higher priority and need to be handled even if the `chan_sender` is not
510
    /// ready (whereas `control` messages are not read until the `chan_sender` sink
511
    /// is ready to accept cells).
512
    command: mpsc::UnboundedReceiver<CtrlCmd>,
513
    /// A oneshot sender that is used to alert other tasks when this reactor is
514
    /// finally dropped.
515
    ///
516
    /// It is a sender for Void because we never actually want to send anything here;
517
    /// we only want to generate canceled events.
518
    #[allow(dead_code)] // the only purpose of this field is to be dropped.
519
    reactor_closed_tx: oneshot::Sender<void::Void>,
520
    /// A set of circuits that form a tunnel.
521
    ///
522
    /// Contains 1 or more circuits.
523
    ///
524
    /// Circuits may be added to this set throughout the lifetime of the reactor.
525
    ///
526
    /// Sometimes, the reactor will remove circuits from this set,
527
    /// for example if the `LINKED` message takes too long to arrive,
528
    /// or if congestion control negotiation fails.
529
    /// The reactor will continue running with the remaining circuits.
530
    /// It will shut down if *all* the circuits are removed.
531
    ///
532
    // TODO(conflux): document all the reasons why the reactor might
533
    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
534
    circuits: ConfluxSet,
535
    /// An identifier for logging about this reactor's circuit.
536
    unique_id: UniqId,
537
    /// Handlers, shared with `Circuit`.
538
    cell_handlers: CellHandlers,
539
    /// The time provider, used for conflux handshake timeouts.
540
    runtime: DynTimeProvider,
541
    /// The conflux handshake context, if there is an on-going handshake.
542
    ///
543
    /// Set to `None` if this is a single-path tunnel,
544
    /// or if none of the circuit legs from our conflux set
545
    /// are currently in the conflux handshake phase.
546
    #[cfg(feature = "conflux")]
547
    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
548
    /// A min-heap buffering all the out-of-order messages received so far.
549
    ///
550
    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
551
    /// on its size. We should make this participate in the memquota memory
552
    /// tracking system, somehow.
553
    #[cfg(feature = "conflux")]
554
    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
555
}
556

            
557
/// The context for an on-going conflux handshake.
558
#[cfg(feature = "conflux")]
559
struct ConfluxHandshakeCtx {
560
    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
561
    answer: ConfluxLinkResultChannel,
562
    /// The number of legs that are currently doing the handshake.
563
    num_legs: usize,
564
    /// The handshake results we have collected so far.
565
    results: ConfluxHandshakeResult,
566
}
567

            
568
/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
569
#[derive(Debug)]
570
#[cfg(feature = "conflux")]
571
struct ConfluxHeapEntry {
572
    /// The leg id this message came from.
573
    leg_id: LegId,
574
    /// The out of order message
575
    msg: OooRelayMsg,
576
}
577

            
578
#[cfg(feature = "conflux")]
579
impl Ord for ConfluxHeapEntry {
580
    fn cmp(&self, other: &Self) -> Ordering {
581
        self.msg.cmp(&other.msg).reverse()
582
    }
583
}
584

            
585
#[cfg(feature = "conflux")]
586
impl PartialOrd for ConfluxHeapEntry {
587
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
588
        Some(self.cmp(other))
589
    }
590
}
591

            
592
#[cfg(feature = "conflux")]
593
impl PartialEq for ConfluxHeapEntry {
594
    fn eq(&self, other: &Self) -> bool {
595
        self.msg == other.msg
596
    }
597
}
598

            
599
#[cfg(feature = "conflux")]
600
impl Eq for ConfluxHeapEntry {}
601

            
602
/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
603
struct CellHandlers {
604
    /// A handler for a meta cell, together with a result channel to notify on completion.
605
    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
606
    /// A handler for incoming stream requests.
607
    #[cfg(feature = "hs-service")]
608
    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
609
}
610

            
611
/// Information about an incoming stream request.
612
#[cfg(feature = "hs-service")]
613
#[derive(Debug, Deftly)]
614
#[derive_deftly(HasMemoryCost)]
615
pub(crate) struct StreamReqInfo {
616
    /// The [`IncomingStreamRequest`].
617
    pub(crate) req: IncomingStreamRequest,
618
    /// The ID of the stream being requested.
619
    pub(crate) stream_id: StreamId,
620
    /// The [`HopNum`].
621
    //
622
    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
623
    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
624
    //
625
    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
626
    // incoming stream request from two separate hops.  (There is only one that's valid.)
627
    pub(crate) hop_num: HopNum,
628
    /// The [`LegId`] of the circuit the request came on.
629
    pub(crate) leg: LegId,
630
    /// The format which must be used with this stream to encode messages.
631
    #[deftly(has_memory_cost(indirect_size = "0"))]
632
    pub(crate) relay_cell_format: RelayCellFormat,
633
    /// A channel for receiving messages from this stream.
634
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
635
    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
636
    /// A channel for sending messages to be sent on this stream.
637
    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
638
    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
639
    /// The memory quota account to be used for this stream
640
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
641
    pub(crate) memquota: StreamAccount,
642
}
643

            
644
/// Data required for handling an incoming stream request.
645
#[cfg(feature = "hs-service")]
646
#[derive(educe::Educe)]
647
#[educe(Debug)]
648
struct IncomingStreamRequestHandler {
649
    /// A sender for sharing information about an incoming stream request.
650
    incoming_sender: StreamReqSender,
651
    /// A [`AnyCmdChecker`] for validating incoming stream requests.
652
    cmd_checker: AnyCmdChecker,
653
    /// The hop to expect incoming stream requests from.
654
    hop_num: HopNum,
655
    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
656
    /// this request, or wants to reject it immediately.
657
    #[educe(Debug(ignore))]
658
    filter: Box<dyn IncomingStreamRequestFilter>,
659
}
660

            
661
impl Reactor {
662
    /// Create a new circuit reactor.
663
    ///
664
    /// The reactor will send outbound messages on `channel`, receive incoming
665
    /// messages on `input`, and identify this circuit by the channel-local
666
    /// [`CircId`] provided.
667
    ///
668
    /// The internal unique identifier for this circuit will be `unique_id`.
669
    #[allow(clippy::type_complexity)] // TODO
670
160
    pub(super) fn new(
671
160
        channel: Arc<Channel>,
672
160
        channel_id: CircId,
673
160
        unique_id: UniqId,
674
160
        input: CircuitRxReceiver,
675
160
        runtime: DynTimeProvider,
676
160
        memquota: CircuitAccount,
677
160
    ) -> (
678
160
        Self,
679
160
        mpsc::UnboundedSender<CtrlMsg>,
680
160
        mpsc::UnboundedSender<CtrlCmd>,
681
160
        oneshot::Receiver<void::Void>,
682
160
        Arc<TunnelMutableState>,
683
160
    ) {
684
160
        let (control_tx, control_rx) = mpsc::unbounded();
685
160
        let (command_tx, command_rx) = mpsc::unbounded();
686
160
        let mutable = Arc::new(MutableState::default());
687
160

            
688
160
        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
689
160

            
690
160
        let cell_handlers = CellHandlers {
691
160
            meta_handler: None,
692
160
            #[cfg(feature = "hs-service")]
693
160
            incoming_stream_req_handler: None,
694
160
        };
695
160

            
696
160
        let circuit_leg = Circuit::new(
697
160
            channel,
698
160
            channel_id,
699
160
            unique_id,
700
160
            input,
701
160
            memquota,
702
160
            Arc::clone(&mutable),
703
160
        );
704
160

            
705
160
        let (circuits, mutable) = ConfluxSet::new(circuit_leg);
706
160

            
707
160
        let reactor = Reactor {
708
160
            circuits,
709
160
            control: control_rx,
710
160
            command: command_rx,
711
160
            reactor_closed_tx,
712
160
            unique_id,
713
160
            cell_handlers,
714
160
            runtime,
715
160
            #[cfg(feature = "conflux")]
716
160
            conflux_hs_ctx: None,
717
160
            #[cfg(feature = "conflux")]
718
160
            ooo_msgs: Default::default(),
719
160
        };
720
160

            
721
160
        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
722
160
    }
723

            
724
    /// Launch the reactor, and run until the circuit closes or we
725
    /// encounter an error.
726
    ///
727
    /// Once this method returns, the circuit is dead and cannot be
728
    /// used again.
729
240
    pub async fn run(mut self) -> Result<()> {
730
160
        trace!("{}: Running circuit reactor", self.unique_id);
731
160
        let result: Result<()> = loop {
732
3622
            match self.run_once().await {
733
3462
                Ok(()) => (),
734
122
                Err(ReactorError::Shutdown) => break Ok(()),
735
38
                Err(ReactorError::Err(e)) => break Err(e),
736
            }
737
        };
738
160
        trace!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
739
160
        result
740
160
    }
741

            
742
    /// Helper for run: doesn't mark the circuit closed on finish.  Only
743
    /// processes one cell or control message.
744
5433
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
745
3622
        // If all the circuits are closed, shut down the reactor
746
3622
        //
747
3622
        // TODO(conflux): we might need to rethink this behavior
748
3622
        if self.circuits.is_empty() {
749
            trace!(
750
                "{}: Circuit reactor shutting down: all circuits have closed",
751
                self.unique_id
752
            );
753

            
754
            return Err(ReactorError::Shutdown);
755
3622
        }
756
3622

            
757
3622
        // If this is a single path circuit, we need to wait until the first hop
758
3622
        // is created before doing anything else
759
3622
        let single_path_with_hops = self
760
3622
            .circuits
761
3622
            .single_leg_mut()
762
3622
            .is_ok_and(|(_id, leg)| !leg.has_hops());
763
3622
        if single_path_with_hops {
764
160
            self.wait_for_create().await?;
765

            
766
152
            return Ok(());
767
3462
        }
768
3462

            
769
3462
        // Prioritize the buffered messages.
770
3462
        //
771
3462
        // Note: if any of the messages are ready to be handled,
772
3462
        // this will block the reactor until we are done processing them
773
3462
        #[cfg(feature = "conflux")]
774
3462
        self.try_dequeue_ooo_msgs().await?;
775

            
776
3462
        let action = select_biased! {
777
3462
            res = self.command.next() => {
778
374
                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
779
288
                return ControlHandler::new(self).handle_cmd(cmd);
780
            },
781
            // Check whether we've got a control message pending.
782
            //
783
            // Note: unfortunately, reading from control here means we might start
784
            // handling control messages before our chan_senders are ready.
785
            // With the current design, this is inevitable: we can't know which circuit leg
786
            // a control message is meant for without first reading the control message from
787
            // the channel, and at that point, we can't know for sure whether that particular
788
            // circuit is ready for sending.
789
3462
            ret = self.control.next() => {
790
114
                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
791
112
                CircuitAction::HandleControl(msg)
792
            },
793
3462
            res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
794
        };
795

            
796
3086
        let cmd = match action {
797
2776
            CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
798
2776
                RunOnceCmdInner::from_circuit_cmd(leg, cmd),
799
2776
            )),
800
112
            CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
801
112
                .handle_msg(ctrl)?
802
112
                .map(RunOnceCmd::Single),
803
180
            CircuitAction::HandleCell { leg, cell } => {
804
180
                let circ = self
805
180
                    .circuits
806
180
                    .leg_mut(LegId(leg))
807
180
                    .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
808

            
809
180
                let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg.into(), cell)?;
810
148
                if circ_cmds.is_empty() {
811
132
                    None
812
                } else {
813
                    // TODO(conflux): we return RunOnceCmd::Multiple even if there's a single command.
814
                    //
815
                    // See the TODO(conflux) on `Circuit::handle_cell`.
816
16
                    let cmd = RunOnceCmd::Multiple(
817
16
                        circ_cmds
818
16
                            .into_iter()
819
16
                            .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
820
16
                            .collect(),
821
16
                    );
822
16

            
823
16
                    Some(cmd)
824
                }
825
            }
826
18
            CircuitAction::RemoveLeg { leg, reason } => Some(
827
18
                RunOnceCmdInner::RemoveLeg {
828
18
                    leg: LegId(leg),
829
18
                    reason,
830
18
                }
831
18
                .into(),
832
18
            ),
833
        };
834

            
835
3054
        if let Some(cmd) = cmd {
836
2922
            self.handle_run_once_cmd(cmd).await?;
837
132
        }
838

            
839
        // Check if it's time to switch our primary leg.
840
        //
841
        // TODO(conflux): this only be done just before sending a cell.
842
        //
843
        // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2946#note_3192013
844
        #[cfg(feature = "conflux")]
845
3022
        if let Some(switch_cell) = self.circuits.maybe_update_primary_leg()? {
846
            self.circuits
847
                .primary_leg_mut()?
848
                .send_relay_cell(switch_cell)
849
                .await?;
850
3022
        }
851

            
852
3022
        Ok(())
853
3622
    }
854

            
855
    /// Try to process the previously-out-of-order messages we might have buffered.
856
    #[cfg(feature = "conflux")]
857
5193
    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
858
        // Check if we're ready to dequeue any of the previously out-of-order cells.
859
3462
        while let Some(entry) = self.ooo_msgs.peek() {
860
            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
861

            
862
            if !should_pop {
863
                break;
864
            }
865

            
866
            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
867

            
868
            let circ = self
869
                .circuits
870
                .leg_mut(entry.leg_id)
871
                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
872
            let handlers = &mut self.cell_handlers;
873
            let cmd = circ
874
                .handle_in_order_relay_msg(
875
                    handlers,
876
                    entry.msg.hopnum,
877
                    entry.leg_id,
878
                    entry.msg.cell_counts_towards_windows,
879
                    entry.msg.streamid,
880
                    entry.msg.msg,
881
                )?
882
                .map(|cmd| {
883
                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id.0, cmd))
884
                });
885

            
886
            if let Some(cmd) = cmd {
887
                self.handle_run_once_cmd(cmd).await?;
888
            }
889
        }
890

            
891
3462
        Ok(())
892
3462
    }
893

            
894
    /// Handle a [`RunOnceCmd`].
895
4383
    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
896
2922
        match cmd {
897
2906
            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
898
16
            RunOnceCmd::Multiple(cmds) => {
899
                // While we know `sendable` is ready to accept *one* cell,
900
                // we can't be certain it will be able to accept *all* of the cells
901
                // that need to be sent here. This means we *may* end up buffering
902
                // in its underlying SometimesUnboundedSink! That is OK, because
903
                // RunOnceCmd::Multiple is only used for handling packed cells.
904
20
                for cmd in cmds {
905
16
                    self.handle_single_run_once_cmd(cmd).await?;
906
                }
907
            }
908
        }
909

            
910
4
        Ok(())
911
2922
    }
912

            
913
    /// Handle a [`RunOnceCmd`].
914
2922
    async fn handle_single_run_once_cmd(
915
2922
        &mut self,
916
2922
        cmd: RunOnceCmdInner,
917
4383
    ) -> StdResult<(), ReactorError> {
918
2922
        match cmd {
919
2792
            RunOnceCmdInner::Send { leg, cell, done } => {
920
                // TODO: check the cc window
921
2792
                let leg = self
922
2792
                    .circuits
923
2792
                    .leg_mut(leg)
924
2792
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
925
2792
                let res = leg.send_relay_cell(cell).await;
926
2792
                if let Some(done) = done {
927
                    // Don't care if the receiver goes away
928
                    let _ = done.send(res.clone());
929
2792
                }
930
2792
                res?;
931
            }
932
            #[cfg(feature = "send-control-msg")]
933
            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
934
                let cell: Result<Option<SendRelayCell>> =
935
                    self.prepare_msg_and_install_handler(msg, handler);
936

            
937
                match cell {
938
                    Ok(Some(cell)) => {
939
                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
940
                        let outcome = self.circuits.primary_leg_mut()?.send_relay_cell(cell).await;
941
                        // don't care if receiver goes away.
942
                        let _ = done.send(outcome.clone());
943
                        outcome?;
944
                    }
945
                    Ok(None) => {
946
                        // don't care if receiver goes away.
947
                        let _ = done.send(Ok(()));
948
                    }
949
                    Err(e) => {
950
                        // don't care if receiver goes away.
951
                        let _ = done.send(Err(e.clone()));
952
                        return Err(e.into());
953
                    }
954
                }
955
            }
956
            RunOnceCmdInner::BeginStream {
957
56
                leg,
958
56
                cell,
959
56
                hop,
960
56
                done,
961
56
            } => {
962
56
                match cell {
963
56
                    Ok((cell, stream_id)) => {
964
56
                        let leg = self
965
56
                            .circuits
966
56
                            .leg_mut(leg)
967
56
                            .ok_or_else(|| internal!("leg disappeared?!"))?;
968
56
                        let cell_hop = cell.hop;
969
56
                        let relay_format = leg
970
56
                            .hop_mut(cell_hop)
971
56
                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
972
56
                            .ok_or(Error::NoSuchHop)?
973
56
                            .relay_cell_format();
974
56
                        let outcome = leg.send_relay_cell(cell).await;
975
                        // don't care if receiver goes away.
976
56
                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
977
56
                        outcome?;
978
                    }
979
                    Err(e) => {
980
                        // don't care if receiver goes away.
981
                        let _ = done.send(Err(e.clone()));
982
                        return Err(e.into());
983
                    }
984
                }
985
            }
986
            RunOnceCmdInner::CloseStream {
987
40
                hop,
988
40
                sid,
989
40
                behav,
990
40
                reason,
991
40
                done,
992
40
            } => {
993
40
                let result = (move || {
994
40
                    // this is needed to force the closure to be FnOnce rather than FnMut :(
995
40
                    let self_ = self;
996
40
                    let (leg_id, hop_num) = self_
997
40
                        .resolve_hop_location(hop)
998
40
                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
999
40
                    let leg = self_
40
                        .circuits
40
                        .leg_mut(leg_id)
40
                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
40
                    Ok::<_, Bug>((leg, hop_num))
40
                })();
40
                let (leg, hop_num) = match result {
40
                    Ok(x) => x,
                    Err(e) => {
                        if let Some(done) = done {
                            // don't care if the sender goes away
                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
                            let _ = done.send(Err(e.into()));
                        }
                        return Ok(());
                    }
                };
40
                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
40
                if let Some(done) = done {
8
                    // don't care if the sender goes away
8
                    let _ = done.send(res);
32
                }
            }
8
            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
8
                let leg = self
8
                    .circuits
8
                    .leg_mut(leg)
8
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
                // future which *should* resolve immediately
8
                let signals = leg.congestion_signals().await;
8
                leg.handle_sendme(hop, sendme, signals)?;
            }
            RunOnceCmdInner::FirstHopClockSkew { answer } => {
                let res = self
                    .circuits
                    .single_leg_mut()
                    .map(|(_id, leg)| leg.clock_skew());
                // don't care if the sender goes away
                let _ = answer.send(res.map_err(Into::into));
            }
            RunOnceCmdInner::CleanShutdown => {
8
                trace!("{}: reactor shutdown due to handled cell", self.unique_id);
8
                return Err(ReactorError::Shutdown);
            }
18
            RunOnceCmdInner::RemoveLeg { leg, reason } => {
18
                let circ = self.circuits.remove(leg.0)?;
                let is_conflux_pending = circ.is_conflux_pending();
                // Drop the removed leg. This will cause it to close if it's not already closed.
                drop(circ);
                // If we reach this point, it means we have more than one leg
                // (otherwise the .remove() would've returned a Shutdown error),
                // so we expect there to be a ConfluxHandshakeContext installed.
                #[cfg(feature = "conflux")]
                if is_conflux_pending {
                    let error = match reason {
                        RemoveLegReason::ConfluxHandshakeTimeout => ConfluxHandshakeError::Timeout,
                        RemoveLegReason::ConfluxHandshakeErr(e) => ConfluxHandshakeError::Link(e),
                        RemoveLegReason::ChannelClosed => ConfluxHandshakeError::ChannelClosed,
                    };
                    self.note_conflux_handshake_result(Err(error))?;
                }
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
                // Note: on the client-side, the handshake is considered complete once the
                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
                //
                // We're optimistic here, and declare the handshake a success *before*
                // sending the LINKED_ACK response. I think this is OK though,
                // because if the send_relay_cell() below fails, the reactor will shut
                // down anyway. OTOH, marking the handshake as complete slightly early
                // means that on the happy path, the circuit is marked as usable sooner,
                // instead of blocking on the sending of the LINKED_ACK.
                self.note_conflux_handshake_result(Ok(()))?;
                let leg = self
                    .circuits
                    .leg_mut(leg)
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
                let res = leg.send_relay_cell(cell).await;
                res?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Link { circuits, answer } => {
                // Add the specified circuits to our conflux set,
                // and send a LINK cell down each unlinked leg.
                //
                // NOTE: this will block the reactor until all the cells are sent.
                self.handle_link_circuits(circuits, answer).await?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Enqueue { leg, msg } => {
                let entry = ConfluxHeapEntry { leg_id: leg, msg };
                self.ooo_msgs.push(entry);
            }
        }
2890
        Ok(())
2922
    }
    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
    ///
    /// Returns an error if an unexpected `CtrlMsg` is received.
240
    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
160
        let msg = select_biased! {
160
            res = self.command.next() => {
128
                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
120
                match cmd {
                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
                    #[cfg(test)]
                    CtrlCmd::AddFakeHop {
120
                        relay_cell_format: format,
120
                        fwd_lasthop,
120
                        rev_lasthop,
120
                        params,
120
                        done,
                    } => {
120
                        let (_id, leg) = self.circuits.single_leg_mut()?;
120
                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, &params, done);
120
                        return Ok(())
                    },
                    _ => {
                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
                    }
                }
            },
160
            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
        };
32
        match msg {
            CtrlMsg::Create {
32
                recv_created,
32
                handshake,
32
                mut params,
32
                done,
            } => {
                // TODO(conflux): instead of crashing the reactor, it might be better
                // to send the error via the done channel instead
32
                let (_id, leg) = self.circuits.single_leg_mut()?;
32
                leg.handle_create(recv_created, handshake, &mut params, done)
32
                    .await
            }
            _ => {
                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
            }
        }
160
    }
    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
    ///
    /// If all the circuits we were waiting on have finished the conflux handshake,
    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
    /// are sent to the handshake initiator.
    #[cfg(feature = "conflux")]
    fn note_conflux_handshake_result(
        &mut self,
        res: StdResult<(), ConfluxHandshakeError>,
    ) -> StdResult<(), ReactorError> {
        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
            Some(conflux_ctx) => {
                conflux_ctx.results.push(res);
                // Whether all the legs have finished linking:
                conflux_ctx.results.len() == conflux_ctx.num_legs
            }
            None => {
                return Err(internal!("no conflux handshake context").into());
            }
        };
        if tunnel_complete {
            // Time to remove the conflux handshake context
            // and extract the results we have collected
            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
            // We don't expect to receive any more handshake results,
            // at least not until we get another LinkCircuits control message,
            // which will install a new ConfluxHandshakeCtx with a channel
            // for us to send updates on
        }
        Ok(())
    }
    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
    fn prepare_msg_and_install_handler(
        &mut self,
        msg: Option<AnyRelayMsgOuter>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<Option<SendRelayCell>> {
        let msg = msg
            .map(|msg| {
                let handlers = &mut self.cell_handlers;
                let handler = handler
                    .as_ref()
                    .or(handlers.meta_handler.as_ref())
                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
                Ok::<_, crate::Error>(SendRelayCell {
                    hop: handler.expected_hop(),
                    early: false,
                    cell: msg,
                })
            })
            .transpose()?;
        if let Some(handler) = handler {
            self.cell_handlers.set_meta_handler(handler)?;
        }
        Ok(msg)
    }
    /// Handle a shutdown request.
    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
        trace!(
            "{}: reactor shutdown due to explicit request",
            self.unique_id
        );
        Err(ReactorError::Shutdown)
    }
    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
    ///
    /// Returns an error over the `answer` channel if the reactor has no circuits,
    /// or more than one circuit. The reactor will shut down regardless.
    #[cfg(feature = "conflux")]
    fn handle_shutdown_and_return_circuit(
        &mut self,
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
    ) -> StdResult<(), ReactorError> {
        // Don't care if the receiver goes away
        let _ = answer.send(self.circuits.take_single_leg().map_err(Into::into));
        self.handle_shutdown().map(|_| ())
    }
    /// Resolves a [`TargetHop`] to a [`HopLocation`].
    ///
    /// After resolving a `TargetHop::LastHop`,
    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
    ///
    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
    /// if you need a fixed hop position in the tunnel.
    /// For example if we open a stream to `TargetHop::LastHop`,
    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
    /// as we don't want the stream position to change as the tunnel is extended or truncated.
    ///
    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
    /// and the tunnel has no hops
    /// (either has no legs, or has legs which contain no hops).
56
    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
56
        match hop {
            TargetHop::Hop(hop) => Ok(hop),
            TargetHop::LastHop => {
56
                if let Ok((leg_id, leg)) = self.circuits.single_leg() {
                    // single-path tunnel
56
                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
56
                    Ok(HopLocation::Hop((leg_id, hop)))
                } else if !self.circuits.is_empty() {
                    // multi-path tunnel
                    return Ok(HopLocation::JoinPoint);
                } else {
                    // no legs
                    Err(NoHopsBuiltError)
                }
            }
        }
56
    }
    /// Resolves a [`HopLocation`] to a [`LegId`] and [`HopNum`].
    ///
    /// After resolving a `HopLocation::JoinPoint`,
    /// the [`LegId`] and [`HopNum`] can become stale if the primary leg changes.
    ///
    /// You should try to only resolve to a specific [`LegId`] and [`HopNum`] immediately before you
    /// need them,
    /// and you should not hold on to the resolved [`LegId`] and [`HopNum`] between reactor
    /// iterations as the primary leg may change from one iteration to the next.
    ///
    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
    /// but it does not have a join point.
96
    fn resolve_hop_location(
96
        &self,
96
        hop: HopLocation,
96
    ) -> StdResult<(LegId, HopNum), NoJoinPointError> {
96
        match hop {
96
            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
            HopLocation::JoinPoint => {
                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
                    Ok((leg_id, hop_num))
                } else {
                    // Attempted to get the join point of a non-multipath tunnel.
                    Err(NoJoinPointError)
                }
            }
        }
96
    }
    /// Does congestion control use stream SENDMEs for the given hop?
    ///
    /// Returns `None` if either the `leg` or `hop` don't exist.
    fn uses_stream_sendme(&self, leg: LegId, hop: HopNum) -> Option<bool> {
        self.circuits.uses_stream_sendme(leg, hop)
    }
    /// Handle a request to link some extra circuits in the reactor's conflux set.
    ///
    /// The circuits are validated, and if they do not have the same length,
    /// or if they do not all have the same last hop, an error is returned on
    /// the `answer` channel, and the conflux handshake is *not* initiated.
    ///
    /// If validation succeeds, the circuits are added to this reactor's conflux set,
    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
    ///
    /// NOTE: this blocks the reactor main loop until all the cells are sent.
    #[cfg(feature = "conflux")]
    async fn handle_link_circuits(
        &mut self,
        circuits: Vec<Circuit>,
        answer: ConfluxLinkResultChannel,
    ) -> StdResult<(), ReactorError> {
        if self.conflux_hs_ctx.is_some() {
            let err = internal!("conflux linking already in progress");
            send_conflux_outcome(answer, Err(err.into()))?;
            return Ok(());
        }
        let num_legs = circuits.len();
        // Note: add_legs validates `circuits`
        let res = async {
            self.circuits.add_legs(circuits, &self.runtime)?;
            // TODO(conflux): check if we negotiated prop324 cc on *all* circuits,
            // returning an error if not?
            self.circuits.link_circuits(&self.runtime).await
        }
        .await;
        if let Err(e) = res {
            send_conflux_outcome(answer, Err(e))?;
        } else {
            // Save the channel, to notify the user of completion.
            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
                answer,
                num_legs,
                results: Default::default(),
            });
        }
        Ok(())
    }
}
/// Notify the conflux handshake initiator of the handshake outcome.
///
/// Returns an error if the initiator has done away.
#[cfg(feature = "conflux")]
fn send_conflux_outcome(
    tx: ConfluxLinkResultChannel,
    res: Result<ConfluxHandshakeResult>,
) -> StdResult<(), ReactorError> {
    if tx.send(res).is_err() {
        tracing::warn!("conflux initiator went away before handshake completed?");
        return Err(ReactorError::Shutdown);
    }
    Ok(())
}
/// The tunnel does not have any hops.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("no hops have been built for this tunnel")]
pub(crate) struct NoHopsBuiltError;
/// The tunnel does not have a join point.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("the tunnel does not have a join point")]
pub(crate) struct NoJoinPointError;
impl CellHandlers {
    /// Try to install a given meta-cell handler to receive any unusual cells on
    /// this circuit, along with a result channel to notify on completion.
48
    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
48
        if self.meta_handler.is_none() {
48
            self.meta_handler = Some(handler);
48
            Ok(())
        } else {
            Err(Error::from(internal!(
                "Tried to install a meta-cell handler before the old one was gone."
            )))
        }
48
    }
    /// Try to install a given cell handler on this circuit.
    #[cfg(feature = "hs-service")]
40
    fn set_incoming_stream_req_handler(
40
        &mut self,
40
        handler: IncomingStreamRequestHandler,
40
    ) -> Result<()> {
40
        if self.incoming_stream_req_handler.is_none() {
32
            self.incoming_stream_req_handler = Some(handler);
32
            Ok(())
        } else {
8
            Err(Error::from(internal!(
8
                "Tried to install a BEGIN cell handler before the old one was gone."
8
            )))
        }
40
    }
}
#[cfg(test)]
mod test {
    // Tested in [`crate::tunnel::circuit::test`].
}