1
//! Code to handle incoming cells on a circuit.
2
//!
3
//! ## On message validation
4
//!
5
//! There are three steps for validating an incoming message on a stream:
6
//!
7
//! 1. Is the message contextually appropriate? (e.g., no more than one
8
//!    `CONNECTED` message per stream.) This is handled by calling
9
//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10
//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11
//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12
//!    this; for half-closed streams, the reactor handles it using the
13
//!    `halfstream` module.
14
//! 3. Does the message have an acceptable command type, and is the message
15
//!    well-formed? For open streams, the streams themselves handle this check.
16
//!    For half-closed streams, the reactor handles it by calling
17
//!    `consume_checked_msg()`.
18

            
19
pub(super) mod circuit;
20
mod conflux;
21
mod control;
22
pub(super) mod syncview;
23

            
24
use crate::crypto::cell::HopNum;
25
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26
use crate::memquota::{CircuitAccount, StreamAccount};
27
use crate::stream::queue::StreamQueueReceiver;
28
use crate::stream::{AnyCmdChecker, StreamRateLimit};
29
#[cfg(feature = "hs-service")]
30
use crate::stream::{DrainRateRequest, IncomingStreamRequest, IncomingStreamRequestFilter};
31
use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
32
use crate::tunnel::circuit::unique_id::UniqId;
33
use crate::tunnel::circuit::CircuitRxReceiver;
34
use crate::tunnel::{streammap, HopLocation, TargetHop, TunnelId, TunnelScopedCircId};
35
use crate::util::err::ReactorError;
36
use crate::util::notify::NotifyReceiver;
37
use crate::util::skew::ClockSkew;
38
use crate::{Error, Result};
39
use circuit::{Circuit, CircuitCmd};
40
use conflux::ConfluxSet;
41
use control::ControlHandler;
42
use postage::watch;
43
use std::cmp::Ordering;
44
use std::collections::BinaryHeap;
45
use std::mem::size_of;
46
use tor_cell::relaycell::flow_ctrl::XonKbpsEwma;
47
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
48
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
49
use tor_error::{bad_api_usage, internal, into_bad_api_usage, warn_report, Bug};
50
use tor_rtcompat::DynTimeProvider;
51

            
52
use futures::channel::mpsc;
53
use futures::StreamExt;
54
use futures::{select_biased, FutureExt as _};
55
use oneshot_fused_workaround as oneshot;
56

            
57
use std::result::Result as StdResult;
58
use std::sync::Arc;
59

            
60
use crate::channel::Channel;
61
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
62
use crate::tunnel::circuit::StreamMpscSender;
63
use derive_deftly::Deftly;
64
use derive_more::From;
65
use tor_cell::chancell::CircId;
66
use tor_llcrypto::pk;
67
use tor_memquota::derive_deftly_template_HasMemoryCost;
68
use tor_memquota::mq_queue::{self, MpscSpec};
69
use tracing::{debug, info, trace, warn};
70

            
71
use super::circuit::{MutableState, TunnelMutableState};
72

            
73
#[cfg(feature = "conflux")]
74
use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
75

            
76
pub(super) use control::{CtrlCmd, CtrlMsg, FlowCtrlMsg};
77

            
78
/// The type of a oneshot channel used to inform reactor of the result of an operation.
79
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
80

            
81
/// Contains a list of conflux handshake results.
82
#[cfg(feature = "conflux")]
83
pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
84

            
85
/// The type of oneshot channel used to inform reactor users of the outcome
86
/// of a client-side conflux handshake.
87
///
88
/// Contains a list of handshake results, one for each circuit that we were asked
89
/// to link in the tunnel.
90
#[cfg(feature = "conflux")]
91
pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
92

            
93
pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
94

            
95
/// MPSC queue containing stream requests
96
#[cfg(feature = "hs-service")]
97
type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
98

            
99
/// A handshake type, to be used when creating circuit hops.
100
#[derive(Clone, Debug)]
101
pub(crate) enum CircuitHandshake {
102
    /// Use the CREATE_FAST handshake.
103
    CreateFast,
104
    /// Use the ntor handshake.
105
    Ntor {
106
        /// The public key of the relay.
107
        public_key: NtorPublicKey,
108
        /// The Ed25519 identity of the relay, which is verified against the
109
        /// identity held in the circuit's channel.
110
        ed_identity: pk::ed25519::Ed25519Identity,
111
    },
112
    /// Use the ntor-v3 handshake.
113
    NtorV3 {
114
        /// The public key of the relay.
115
        public_key: NtorV3PublicKey,
116
    },
117
}
118

            
119
/// A behavior to perform when closing a stream.
120
///
121
/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
122
/// that we shouldn't let it pass unremarked.
123
#[derive(Clone, Debug)]
124
pub(crate) enum CloseStreamBehavior {
125
    /// Send nothing at all, so that the other side will not realize we have
126
    /// closed the stream.
127
    ///
128
    /// We should only do this for incoming onion service streams when we
129
    /// want to black-hole the client's requests.
130
    SendNothing,
131
    /// Send an End cell, if we haven't already sent one.
132
    SendEnd(End),
133
}
134
impl Default for CloseStreamBehavior {
135
28
    fn default() -> Self {
136
28
        Self::SendEnd(End::new_misc())
137
28
    }
138
}
139

            
140
// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
141
// proliferation is a bit bothersome, but unavoidable with the current design.
142
//
143
// We should consider getting rid of some of these enums (if possible),
144
// and coming up with more intuitive names.
145

            
146
/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
147
#[derive(From, Debug)]
148
enum RunOnceCmd {
149
    /// Run a single `RunOnceCmdInner` command.
150
    Single(RunOnceCmdInner),
151
    /// Run multiple `RunOnceCmdInner` commands.
152
    //
153
    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
154
    // but most of the time we're only going to have *one* RunOnceCmdInner
155
    // to run per run_once() loop. The enum enables us avoid the extra heap
156
    // allocation for the `RunOnceCmd::Single` case.
157
    Multiple(Vec<RunOnceCmdInner>),
158
}
159

            
160
/// Instructions for running something in the reactor loop.
161
///
162
/// Run at the end of [`Reactor::run_once`].
163
//
164
// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
165
// We should consider making each variant a tuple variant and deduplicating the fields.
166
#[derive(educe::Educe)]
167
#[educe(Debug)]
168
enum RunOnceCmdInner {
169
    /// Send a RELAY cell.
170
    Send {
171
        /// The leg the cell should be sent on.
172
        leg: UniqId,
173
        /// The cell to send.
174
        cell: SendRelayCell,
175
        /// A channel for sending completion notifications.
176
        done: Option<ReactorResultChannel<()>>,
177
    },
178
    /// Send a given control message on this circuit, and install a control-message handler to
179
    /// receive responses.
180
    #[cfg(feature = "send-control-msg")]
181
    SendMsgAndInstallHandler {
182
        /// The message to send, if any
183
        msg: Option<AnyRelayMsgOuter>,
184
        /// A message handler to install.
185
        ///
186
        /// If this is `None`, there must already be a message handler installed
187
        #[educe(Debug(ignore))]
188
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
189
        /// A sender that we use to tell the caller that the message was sent
190
        /// and the handler installed.
191
        done: oneshot::Sender<Result<()>>,
192
    },
193
    /// Handle a SENDME message.
194
    HandleSendMe {
195
        /// The leg the SENDME was received on.
196
        leg: UniqId,
197
        /// The hop number.
198
        hop: HopNum,
199
        /// The SENDME message to handle.
200
        sendme: Sendme,
201
    },
202
    /// Begin a stream with the provided hop in this circuit.
203
    ///
204
    /// Uses the provided stream ID, and sends the provided message to that hop.
205
    BeginStream {
206
        /// The cell to send.
207
        cell: Result<(SendRelayCell, StreamId)>,
208
        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
209
        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
210
        /// caller.
211
        hop: HopLocation,
212
        /// The circuit leg to begin the stream on.
213
        leg: UniqId,
214
        /// Oneshot channel to notify on completion, with the allocated stream ID.
215
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
216
    },
217
    /// Consider sending an XON message with the given `rate`.
218
    MaybeSendXon {
219
        /// The drain rate to advertise in the XON message.
220
        rate: XonKbpsEwma,
221
        /// The ID of the stream to send the message on.
222
        stream_id: StreamId,
223
        /// The location of the hop on the tunnel.
224
        hop: HopLocation,
225
    },
226
    /// Close the specified stream.
227
    CloseStream {
228
        /// The hop number.
229
        hop: HopLocation,
230
        /// The ID of the stream to close.
231
        sid: StreamId,
232
        /// The stream-closing behavior.
233
        behav: CloseStreamBehavior,
234
        /// The reason for closing the stream.
235
        reason: streammap::TerminateReason,
236
        /// A channel for sending completion notifications.
237
        done: Option<ReactorResultChannel<()>>,
238
    },
239
    /// Get the clock skew claimed by the first hop of the circuit.
240
    FirstHopClockSkew {
241
        /// Oneshot channel to return the clock skew.
242
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
243
    },
244
    /// Remove a circuit leg from the conflux set.
245
    RemoveLeg {
246
        /// The circuit leg to remove.
247
        leg: UniqId,
248
        /// The reason for removal.
249
        ///
250
        /// This is only used for conflux circuits that get removed
251
        /// before the conflux handshake is complete.
252
        ///
253
        /// The [`RemoveLegReason`] is mapped by the reactor to a
254
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
255
        /// handshake to indicate the reason the handshake failed.
256
        reason: RemoveLegReason,
257
    },
258
    /// A circuit has completed the conflux handshake,
259
    /// and wants to send the specified cell.
260
    ///
261
    /// This is similar to [`RunOnceCmdInner::Send`],
262
    /// but needs to remain a separate variant,
263
    /// because in addition to instructing the reactor to send a cell,
264
    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
265
    /// This enables the reactor to save the handshake result (`Ok(())`),
266
    /// and, if there are no other legs still in the handshake phase,
267
    /// send the result to the handshake initiator.
268
    #[cfg(feature = "conflux")]
269
    ConfluxHandshakeComplete {
270
        /// The circuit leg that has completed the handshake,
271
        /// This is the leg the cell should be sent on.
272
        leg: UniqId,
273
        /// The cell to send.
274
        cell: SendRelayCell,
275
    },
276
    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
277
    #[cfg(feature = "conflux")]
278
    Link {
279
        /// The circuits to link into the tunnel
280
        #[educe(Debug(ignore))]
281
        circuits: Vec<Circuit>,
282
        /// Oneshot channel for notifying of conflux handshake completion.
283
        answer: ConfluxLinkResultChannel,
284
    },
285
    /// Enqueue an out-of-order cell in ooo_msg.
286
    #[cfg(feature = "conflux")]
287
    Enqueue {
288
        /// The leg the entry originated from.
289
        leg: UniqId,
290
        /// The out-of-order message.
291
        msg: OooRelayMsg,
292
    },
293
    /// Perform a clean shutdown on this circuit.
294
    CleanShutdown,
295
}
296

            
297
impl RunOnceCmdInner {
298
    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
299
4196
    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
300
4196
        match cmd {
301
4032
            CircuitCmd::Send(cell) => Self::Send {
302
4032
                leg,
303
4032
                cell,
304
4032
                done: None,
305
4032
            },
306
44
            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
307
            CircuitCmd::CloseStream {
308
28
                hop,
309
28
                sid,
310
28
                behav,
311
28
                reason,
312
28
            } => Self::CloseStream {
313
28
                hop: HopLocation::Hop((leg, hop)),
314
28
                sid,
315
28
                behav,
316
28
                reason,
317
28
                done: None,
318
28
            },
319
            #[cfg(feature = "conflux")]
320
16
            CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg { leg, reason },
321
            #[cfg(feature = "conflux")]
322
52
            CircuitCmd::ConfluxHandshakeComplete(cell) => {
323
52
                Self::ConfluxHandshakeComplete { leg, cell }
324
            }
325
            #[cfg(feature = "conflux")]
326
16
            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
327
8
            CircuitCmd::CleanShutdown => Self::CleanShutdown,
328
        }
329
4196
    }
330
}
331

            
332
/// Cmd for sending a relay cell.
333
///
334
/// The contents of this struct are passed to `send_relay_cell`
335
#[derive(educe::Educe)]
336
#[educe(Debug)]
337
pub(crate) struct SendRelayCell {
338
    /// The hop number.
339
    pub(crate) hop: HopNum,
340
    /// Whether to use a RELAY_EARLY cell.
341
    pub(crate) early: bool,
342
    /// The cell to send.
343
    pub(crate) cell: AnyRelayMsgOuter,
344
}
345

            
346
/// A command to execute at the end of [`Reactor::run_once`].
347
#[derive(From, Debug)]
348
#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
349
enum CircuitAction {
350
    /// Run a single `CircuitCmd` command.
351
    RunCmd {
352
        /// The unique identifier of the circuit leg to run the command on
353
        leg: UniqId,
354
        /// The command to run.
355
        cmd: CircuitCmd,
356
    },
357
    /// Handle a control message
358
    HandleControl(CtrlMsg),
359
    /// Handle an input message.
360
    HandleCell {
361
        /// The unique identifier of the circuit leg the message was received on.
362
        leg: UniqId,
363
        /// The message to handle.
364
        cell: ClientCircChanMsg,
365
    },
366
    /// Remove the specified circuit leg from the conflux set.
367
    ///
368
    /// Returned whenever a single circuit leg needs to be be removed
369
    /// from the reactor's conflux set, without necessarily tearing down
370
    /// the whole set or shutting down the reactor.
371
    ///
372
    /// Note: this action *can* cause the reactor to shut down
373
    /// (and the conflux set to be closed).
374
    ///
375
    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
376
    RemoveLeg {
377
        /// The leg to remove.
378
        leg: UniqId,
379
        /// The reason for removal.
380
        ///
381
        /// This is only used for conflux circuits that get removed
382
        /// before the conflux handshake is complete.
383
        ///
384
        /// The [`RemoveLegReason`] is mapped by the reactor to a
385
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
386
        /// handshake to indicate the reason the handshake failed.
387
        reason: RemoveLegReason,
388
    },
389
}
390

            
391
/// The reason for removing a circuit leg from the conflux set.
392
#[derive(Debug, derive_more::Display)]
393
enum RemoveLegReason {
394
    /// The conflux handshake timed out.
395
    ///
396
    /// On the client-side, this means we didn't receive
397
    /// the CONFLUX_LINKED response in time.
398
    #[display("conflux handshake timed out")]
399
    ConfluxHandshakeTimeout,
400
    /// An error occurred during conflux handshake.
401
    #[display("{}", _0)]
402
    ConfluxHandshakeErr(Error),
403
    /// The channel was closed.
404
    #[display("channel closed")]
405
    ChannelClosed,
406
}
407

            
408
/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
409
/// progress.
410
///
411
/// # Background
412
///
413
/// The `Reactor` can't have async functions that send and receive cells, because its job is to
414
/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
415
///
416
/// To get around this problem, the reactor can send some cells, and then make one of these
417
/// `MetaCellHandler` objects, which will be run when the reply arrives.
418
pub(crate) trait MetaCellHandler: Send {
419
    /// The hop we're expecting the message to come from. This is compared against the hop
420
    /// from which we actually receive messages, and an error is thrown if the two don't match.
421
    fn expected_hop(&self) -> HopLocation;
422
    /// Called when the message we were waiting for arrives.
423
    ///
424
    /// Gets a copy of the `Reactor` in order to do anything it likes there.
425
    ///
426
    /// If this function returns an error, the reactor will shut down.
427
    fn handle_msg(
428
        &mut self,
429
        msg: UnparsedRelayMsg,
430
        reactor: &mut Circuit,
431
    ) -> Result<MetaCellDisposition>;
432
}
433

            
434
/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
435
#[derive(Debug, Clone)]
436
#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
437
#[non_exhaustive]
438
pub(crate) enum MetaCellDisposition {
439
    /// The message was consumed; the handler should remain installed.
440
    #[cfg(feature = "send-control-msg")]
441
    Consumed,
442
    /// The message was consumed; the handler should be uninstalled.
443
    ConversationFinished,
444
    /// The message was consumed; the circuit should be closed.
445
    #[cfg(feature = "send-control-msg")]
446
    CloseCirc,
447
    // TODO: Eventually we might want the ability to have multiple handlers
448
    // installed, and to let them say "not for me, maybe for somebody else?".
449
    // But right now we don't need that.
450
}
451

            
452
/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
453
///
454
/// This is a macro instead of a function to work around borrowck errors
455
/// in the select! from run_once().
456
macro_rules! unwrap_or_shutdown {
457
    ($self:expr, $res:expr, $reason:expr) => {{
458
        match $res {
459
            None => {
460
                trace!(
461
                    tunnel_id = %$self.tunnel_id,
462
                    reason = %$reason,
463
                    "reactor shutdown"
464
                );
465
                Err(ReactorError::Shutdown)
466
            }
467
            Some(v) => Ok(v),
468
        }
469
    }};
470
}
471

            
472
/// Object to handle incoming cells and background tasks on a circuit
473
///
474
/// This type is returned when you finish a circuit; you need to spawn a
475
/// new task that calls `run()` on it.
476
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
477
pub struct Reactor {
478
    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
479
    ///
480
    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
481
    /// is ready to accept cells.
482
    control: mpsc::UnboundedReceiver<CtrlMsg>,
483
    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
484
    ///
485
    /// This channel is polled in [`Reactor::run_once`].
486
    ///
487
    /// NOTE: this is a separate channel from `control`, because some messages
488
    /// have higher priority and need to be handled even if the `chan_sender` is not
489
    /// ready (whereas `control` messages are not read until the `chan_sender` sink
490
    /// is ready to accept cells).
491
    command: mpsc::UnboundedReceiver<CtrlCmd>,
492
    /// A oneshot sender that is used to alert other tasks when this reactor is
493
    /// finally dropped.
494
    ///
495
    /// It is a sender for Void because we never actually want to send anything here;
496
    /// we only want to generate canceled events.
497
    #[allow(dead_code)] // the only purpose of this field is to be dropped.
498
    reactor_closed_tx: oneshot::Sender<void::Void>,
499
    /// A set of circuits that form a tunnel.
500
    ///
501
    /// Contains 1 or more circuits.
502
    ///
503
    /// Circuits may be added to this set throughout the lifetime of the reactor.
504
    ///
505
    /// Sometimes, the reactor will remove circuits from this set,
506
    /// for example if the `LINKED` message takes too long to arrive,
507
    /// or if congestion control negotiation fails.
508
    /// The reactor will continue running with the remaining circuits.
509
    /// It will shut down if *all* the circuits are removed.
510
    ///
511
    // TODO(conflux): document all the reasons why the reactor might
512
    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
513
    circuits: ConfluxSet,
514
    /// An identifier for logging about this tunnel reactor.
515
    tunnel_id: TunnelId,
516
    /// Handlers, shared with `Circuit`.
517
    cell_handlers: CellHandlers,
518
    /// The time provider, used for conflux handshake timeouts.
519
    runtime: DynTimeProvider,
520
    /// The conflux handshake context, if there is an on-going handshake.
521
    ///
522
    /// Set to `None` if this is a single-path tunnel,
523
    /// or if none of the circuit legs from our conflux set
524
    /// are currently in the conflux handshake phase.
525
    #[cfg(feature = "conflux")]
526
    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
527
    /// A min-heap buffering all the out-of-order messages received so far.
528
    ///
529
    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
530
    /// on its size. We should make this participate in the memquota memory
531
    /// tracking system, somehow.
532
    #[cfg(feature = "conflux")]
533
    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
534
}
535

            
536
/// The context for an on-going conflux handshake.
537
#[cfg(feature = "conflux")]
538
struct ConfluxHandshakeCtx {
539
    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
540
    answer: ConfluxLinkResultChannel,
541
    /// The number of legs that are currently doing the handshake.
542
    num_legs: usize,
543
    /// The handshake results we have collected so far.
544
    results: ConfluxHandshakeResult,
545
}
546

            
547
/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
548
#[derive(Debug)]
549
#[cfg(feature = "conflux")]
550
struct ConfluxHeapEntry {
551
    /// The leg id this message came from.
552
    leg_id: UniqId,
553
    /// The out of order message
554
    msg: OooRelayMsg,
555
}
556

            
557
#[cfg(feature = "conflux")]
558
impl Ord for ConfluxHeapEntry {
559
4
    fn cmp(&self, other: &Self) -> Ordering {
560
4
        self.msg.cmp(&other.msg)
561
4
    }
562
}
563

            
564
#[cfg(feature = "conflux")]
565
impl PartialOrd for ConfluxHeapEntry {
566
4
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
567
4
        Some(self.cmp(other))
568
4
    }
569
}
570

            
571
#[cfg(feature = "conflux")]
572
impl PartialEq for ConfluxHeapEntry {
573
    fn eq(&self, other: &Self) -> bool {
574
        self.msg == other.msg
575
    }
576
}
577

            
578
#[cfg(feature = "conflux")]
579
impl Eq for ConfluxHeapEntry {}
580

            
581
/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
582
struct CellHandlers {
583
    /// A handler for a meta cell, together with a result channel to notify on completion.
584
    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
585
    /// A handler for incoming stream requests.
586
    #[cfg(feature = "hs-service")]
587
    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
588
}
589

            
590
/// Information about an incoming stream request.
591
#[cfg(feature = "hs-service")]
592
#[derive(Debug, Deftly)]
593
#[derive_deftly(HasMemoryCost)]
594
pub(crate) struct StreamReqInfo {
595
    /// The [`IncomingStreamRequest`].
596
    pub(crate) req: IncomingStreamRequest,
597
    /// The ID of the stream being requested.
598
    pub(crate) stream_id: StreamId,
599
    /// The [`HopNum`].
600
    //
601
    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
602
    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
603
    //
604
    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
605
    // incoming stream request from two separate hops.  (There is only one that's valid.)
606
    pub(crate) hop: HopLocation,
607
    /// The format which must be used with this stream to encode messages.
608
    #[deftly(has_memory_cost(indirect_size = "0"))]
609
    pub(crate) relay_cell_format: RelayCellFormat,
610
    /// A channel for receiving messages from this stream.
611
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
612
    pub(crate) receiver: StreamQueueReceiver,
613
    /// A channel for sending messages to be sent on this stream.
614
    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
615
    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
616
    /// A [`Stream`](futures::Stream) that provides updates to the rate limit for sending data.
617
    // TODO(arti#2068): we should consider making this an `Option`
618
    // the `watch::Sender` owns the indirect data
619
    #[deftly(has_memory_cost(indirect_size = "0"))]
620
    pub(crate) rate_limit_stream: watch::Receiver<StreamRateLimit>,
621
    /// A [`Stream`](futures::Stream) that provides notifications when a new drain rate is
622
    /// requested.
623
    #[deftly(has_memory_cost(indirect_size = "0"))]
624
    pub(crate) drain_rate_request_stream: NotifyReceiver<DrainRateRequest>,
625
    /// The memory quota account to be used for this stream
626
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
627
    pub(crate) memquota: StreamAccount,
628
}
629

            
630
/// Data required for handling an incoming stream request.
631
#[cfg(feature = "hs-service")]
632
#[derive(educe::Educe)]
633
#[educe(Debug)]
634
struct IncomingStreamRequestHandler {
635
    /// A sender for sharing information about an incoming stream request.
636
    incoming_sender: StreamReqSender,
637
    /// A [`AnyCmdChecker`] for validating incoming stream requests.
638
    cmd_checker: AnyCmdChecker,
639
    /// The hop to expect incoming stream requests from.
640
    hop_num: HopNum,
641
    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
642
    /// this request, or wants to reject it immediately.
643
    #[educe(Debug(ignore))]
644
    filter: Box<dyn IncomingStreamRequestFilter>,
645
}
646

            
647
impl Reactor {
648
    /// Create a new circuit reactor.
649
    ///
650
    /// The reactor will send outbound messages on `channel`, receive incoming
651
    /// messages on `input`, and identify this circuit by the channel-local
652
    /// [`CircId`] provided.
653
    ///
654
    /// The internal unique identifier for this circuit will be `unique_id`.
655
    #[allow(clippy::type_complexity)] // TODO
656
272
    pub(super) fn new(
657
272
        channel: Arc<Channel>,
658
272
        channel_id: CircId,
659
272
        unique_id: UniqId,
660
272
        input: CircuitRxReceiver,
661
272
        runtime: DynTimeProvider,
662
272
        memquota: CircuitAccount,
663
272
    ) -> (
664
272
        Self,
665
272
        mpsc::UnboundedSender<CtrlMsg>,
666
272
        mpsc::UnboundedSender<CtrlCmd>,
667
272
        oneshot::Receiver<void::Void>,
668
272
        Arc<TunnelMutableState>,
669
272
    ) {
670
272
        let tunnel_id = TunnelId::next();
671
272
        let (control_tx, control_rx) = mpsc::unbounded();
672
272
        let (command_tx, command_rx) = mpsc::unbounded();
673
272
        let mutable = Arc::new(MutableState::default());
674
272

            
675
272
        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
676
272

            
677
272
        let cell_handlers = CellHandlers {
678
272
            meta_handler: None,
679
272
            #[cfg(feature = "hs-service")]
680
272
            incoming_stream_req_handler: None,
681
272
        };
682
272

            
683
272
        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
684
272
        let circuit_leg = Circuit::new(
685
272
            runtime.clone(),
686
272
            channel,
687
272
            channel_id,
688
272
            unique_id,
689
272
            input,
690
272
            memquota,
691
272
            Arc::clone(&mutable),
692
272
        );
693
272

            
694
272
        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
695
272

            
696
272
        let reactor = Reactor {
697
272
            circuits,
698
272
            control: control_rx,
699
272
            command: command_rx,
700
272
            reactor_closed_tx,
701
272
            tunnel_id,
702
272
            cell_handlers,
703
272
            runtime,
704
272
            #[cfg(feature = "conflux")]
705
272
            conflux_hs_ctx: None,
706
272
            #[cfg(feature = "conflux")]
707
272
            ooo_msgs: Default::default(),
708
272
        };
709
272

            
710
272
        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
711
272
    }
712

            
713
    /// Launch the reactor, and run until the circuit closes or we
714
    /// encounter an error.
715
    ///
716
    /// Once this method returns, the circuit is dead and cannot be
717
    /// used again.
718
408
    pub async fn run(mut self) -> Result<()> {
719
272
        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
720
272
        let result: Result<()> = loop {
721
5600
            match self.run_once().await {
722
5328
                Ok(()) => (),
723
198
                Err(ReactorError::Shutdown) => break Ok(()),
724
74
                Err(ReactorError::Err(e)) => break Err(e),
725
            }
726
        };
727
272
        trace!(tunnel_id = %self.tunnel_id, "Tunnel reactor stopped: {:?}", result);
728
272
        result
729
272
    }
730

            
731
    /// Helper for run: doesn't mark the circuit closed on finish.  Only
732
    /// processes one cell or control message.
733
8400
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
734
5600
        // If all the circuits are closed, shut down the reactor
735
5600
        if self.circuits.is_empty() {
736
            trace!(
737
                tunnel_id = %self.tunnel_id,
738
                "Tunnel reactor shutting down: all circuits have closed",
739
            );
740

            
741
            return Err(ReactorError::Shutdown);
742
5600
        }
743
5600

            
744
5600
        // If this is a single path circuit, we need to wait until the first hop
745
5600
        // is created before doing anything else
746
5600
        let single_path_with_hops = self
747
5600
            .circuits
748
5600
            .single_leg_mut()
749
5600
            .is_ok_and(|leg| !leg.has_hops());
750
5600
        if single_path_with_hops {
751
272
            self.wait_for_create().await?;
752

            
753
264
            return Ok(());
754
5328
        }
755
5328

            
756
5328
        // Prioritize the buffered messages.
757
5328
        //
758
5328
        // Note: if any of the messages are ready to be handled,
759
5328
        // this will block the reactor until we are done processing them
760
5328
        #[cfg(feature = "conflux")]
761
5328
        self.try_dequeue_ooo_msgs().await?;
762

            
763
5328
        let action = select_biased! {
764
5328
            res = self.command.next() => {
765
680
                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
766
572
                return ControlHandler::new(self).handle_cmd(cmd);
767
            },
768
            // Check whether we've got a control message pending.
769
            //
770
            // Note: unfortunately, reading from control here means we might start
771
            // handling control messages before our chan_senders are ready.
772
            // With the current design, this is inevitable: we can't know which circuit leg
773
            // a control message is meant for without first reading the control message from
774
            // the channel, and at that point, we can't know for sure whether that particular
775
            // circuit is ready for sending.
776
5328
            ret = self.control.next() => {
777
182
                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
778
172
                CircuitAction::HandleControl(msg)
779
            },
780
5328
            res = self.circuits.next_circ_action(&self.runtime)?.fuse() => res?,
781
        };
782

            
783
4636
        let cmd = match action {
784
4060
            CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
785
4060
                RunOnceCmdInner::from_circuit_cmd(leg, cmd),
786
4060
            )),
787
172
            CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
788
172
                .handle_msg(ctrl)?
789
172
                .map(RunOnceCmd::Single),
790
392
            CircuitAction::HandleCell { leg, cell } => {
791
392
                let circ = self
792
392
                    .circuits
793
392
                    .leg_mut(leg)
794
392
                    .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
795

            
796
392
                let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
797
344
                if circ_cmds.is_empty() {
798
208
                    None
799
                } else {
800
                    // TODO: we return RunOnceCmd::Multiple even if there's a single command.
801
                    //
802
                    // See the TODO on `Circuit::handle_cell`.
803
136
                    let cmd = RunOnceCmd::Multiple(
804
136
                        circ_cmds
805
136
                            .into_iter()
806
136
                            .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
807
136
                            .collect(),
808
136
                    );
809
136

            
810
136
                    Some(cmd)
811
                }
812
            }
813
12
            CircuitAction::RemoveLeg { leg, reason } => {
814
12
                Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
815
            }
816
        };
817

            
818
4588
        if let Some(cmd) = cmd {
819
4380
            self.handle_run_once_cmd(cmd).await?;
820
208
        }
821

            
822
4540
        Ok(())
823
5600
    }
824

            
825
    /// Try to process the previously-out-of-order messages we might have buffered.
826
    #[cfg(feature = "conflux")]
827
7992
    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
828
        // Check if we're ready to dequeue any of the previously out-of-order cells.
829
5344
        while let Some(entry) = self.ooo_msgs.peek() {
830
36
            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
831
36

            
832
36
            if !should_pop {
833
20
                break;
834
16
            }
835
16

            
836
16
            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
837

            
838
16
            let circ = self
839
16
                .circuits
840
16
                .leg_mut(entry.leg_id)
841
16
                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
842
16
            let handlers = &mut self.cell_handlers;
843
16
            let cmd = circ
844
16
                .handle_in_order_relay_msg(
845
16
                    handlers,
846
16
                    entry.msg.hopnum,
847
16
                    entry.leg_id,
848
16
                    entry.msg.cell_counts_towards_windows,
849
16
                    entry.msg.streamid,
850
16
                    entry.msg.msg,
851
16
                )?
852
16
                .map(|cmd| {
853
                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
854
16
                });
855

            
856
16
            if let Some(cmd) = cmd {
857
                self.handle_run_once_cmd(cmd).await?;
858
16
            }
859
        }
860

            
861
5328
        Ok(())
862
5328
    }
863

            
864
    /// Handle a [`RunOnceCmd`].
865
6570
    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
866
4380
        match cmd {
867
4244
            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
868
136
            RunOnceCmd::Multiple(cmds) => {
869
                // While we know `sendable` is ready to accept *one* cell,
870
                // we can't be certain it will be able to accept *all* of the cells
871
                // that need to be sent here. This means we *may* end up buffering
872
                // in its underlying SometimesUnboundedSink! That is OK, because
873
                // RunOnceCmd::Multiple is only used for handling packed cells.
874
240
                for cmd in cmds {
875
136
                    self.handle_single_run_once_cmd(cmd).await?;
876
                }
877
            }
878
        }
879

            
880
104
        Ok(())
881
4380
    }
882

            
883
    /// Handle a [`RunOnceCmd`].
884
4380
    async fn handle_single_run_once_cmd(
885
4380
        &mut self,
886
4380
        cmd: RunOnceCmdInner,
887
6570
    ) -> StdResult<(), ReactorError> {
888
4380
        match cmd {
889
4080
            RunOnceCmdInner::Send { leg, cell, done } => {
890
                // TODO: check the cc window
891
4080
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
892
4080
                if let Some(done) = done {
893
                    // Don't care if the receiver goes away
894
                    let _ = done.send(res.clone());
895
4080
                }
896
4080
                res?;
897
            }
898
            #[cfg(feature = "send-control-msg")]
899
            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
900
                let cell: Result<Option<SendRelayCell>> =
901
                    self.prepare_msg_and_install_handler(msg, handler);
902

            
903
                match cell {
904
                    Ok(Some(cell)) => {
905
                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
906
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
907
                        // don't care if receiver goes away.
908
                        let _ = done.send(outcome.clone());
909
                        outcome?;
910
                    }
911
                    Ok(None) => {
912
                        // don't care if receiver goes away.
913
                        let _ = done.send(Ok(()));
914
                    }
915
                    Err(e) => {
916
                        // don't care if receiver goes away.
917
                        let _ = done.send(Err(e.clone()));
918
                        return Err(e.into());
919
                    }
920
                }
921
            }
922
            RunOnceCmdInner::BeginStream {
923
68
                leg,
924
68
                cell,
925
68
                hop,
926
68
                done,
927
68
            } => {
928
68
                match cell {
929
68
                    Ok((cell, stream_id)) => {
930
68
                        let circ = self
931
68
                            .circuits
932
68
                            .leg_mut(leg)
933
68
                            .ok_or_else(|| internal!("leg disappeared?!"))?;
934
68
                        let cell_hop = cell.hop;
935
68
                        let relay_format = circ
936
68
                            .hop_mut(cell_hop)
937
68
                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
938
68
                            .ok_or(Error::NoSuchHop)?
939
68
                            .relay_cell_format();
940

            
941
68
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
942
                        // don't care if receiver goes away.
943
68
                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
944
68
                        outcome?;
945
                    }
946
                    Err(e) => {
947
                        // don't care if receiver goes away.
948
                        let _ = done.send(Err(e.clone()));
949
                        return Err(e.into());
950
                    }
951
                }
952
            }
953
            RunOnceCmdInner::CloseStream {
954
36
                hop,
955
36
                sid,
956
36
                behav,
957
36
                reason,
958
36
                done,
959
36
            } => {
960
36
                let result = (move || {
961
36
                    // this is needed to force the closure to be FnOnce rather than FnMut :(
962
36
                    let self_ = self;
963
36
                    let (leg_id, hop_num) = self_
964
36
                        .resolve_hop_location(hop)
965
36
                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
966
36
                    let leg = self_
967
36
                        .circuits
968
36
                        .leg_mut(leg_id)
969
36
                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
970
36
                    Ok::<_, Bug>((leg, hop_num))
971
36
                })();
972

            
973
36
                let (leg, hop_num) = match result {
974
36
                    Ok(x) => x,
975
                    Err(e) => {
976
                        if let Some(done) = done {
977
                            // don't care if the sender goes away
978
                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
979
                            let _ = done.send(Err(e.into()));
980
                        }
981
                        return Ok(());
982
                    }
983
                };
984

            
985
36
                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
986

            
987
36
                if let Some(done) = done {
988
8
                    // don't care if the sender goes away
989
8
                    let _ = done.send(res);
990
28
                }
991
            }
992
            RunOnceCmdInner::MaybeSendXon {
993
                rate,
994
                stream_id,
995
                hop,
996
            } => {
997
                let (leg_id, hop_num) = match self.resolve_hop_location(hop) {
998
                    Ok(x) => x,
999
                    Err(NoJoinPointError) => {
                        // A stream tried to send an XON message message to the join point of
                        // a tunnel that has never had a join point. Currently in arti, only a
                        // `StreamTarget` asks us to send an XON message, and this tunnel
                        // originally created the `StreamTarget` to begin with. So this is a
                        // legitimate bug somewhere in the tunnel code.
                        let err = internal!(
                            "Could not send an XON message to a join point on a tunnel without a join point",
                        );
                        // TODO: Rather than calling `warn_report` here, we should call
                        // `trace_report!` from `Reactor::run_once()`. Since this is an internal
                        // error, `trace_report!` should log it at "warn" level.
                        warn_report!(err, "Tunnel reactor error");
                        return Err(err.into());
                    }
                };
                let Some(leg) = self.circuits.leg_mut(leg_id) else {
                    // The leg has disappeared. This is fine since the stream may have ended and
                    // been cleaned up while this `CtrlMsg::MaybeSendXon` message was queued.
                    // It is possible that is a bug and this is an incorrect leg number, but
                    // it's not currently possible to differentiate between an incorrect leg
                    // number and a tunnel leg that has been closed.
                    debug!("Could not send an XON message on a leg that does not exist. Ignoring.");
                    return Ok(());
                };
                let Some(hop) = leg.hop_mut(hop_num) else {
                    // The hop has disappeared. This is fine since the circuit may have been
                    // been truncated while the `CtrlMsg::MaybeSendXon` message was queued.
                    // It is possible that is a bug and this is an incorrect hop number, but
                    // it's not currently possible to differentiate between an incorrect hop
                    // number and a circuit hop that has been removed.
                    debug!("Could not send an XON message on a hop that does not exist. Ignoring.");
                    return Ok(());
                };
                let Some(msg) = hop.maybe_send_xon(rate, stream_id)? else {
                    // Nothing to do.
                    return Ok(());
                };
                let cell = AnyRelayMsgOuter::new(Some(stream_id), msg.into());
                let cell = SendRelayCell {
                    hop: hop_num,
                    early: false,
                    cell,
                };
                leg.send_relay_cell(cell).await?;
            }
44
            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
44
                let leg = self
44
                    .circuits
44
                    .leg_mut(leg)
44
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
                // future which *should* resolve immediately
44
                let signals = leg.congestion_signals().await;
44
                leg.handle_sendme(hop, sendme, signals)?;
            }
            RunOnceCmdInner::FirstHopClockSkew { answer } => {
                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
                // don't care if the sender goes away
                let _ = answer.send(res.map_err(Into::into));
            }
            RunOnceCmdInner::CleanShutdown => {
8
                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
8
                return Err(ReactorError::Shutdown);
            }
28
            RunOnceCmdInner::RemoveLeg { leg, reason } => {
28
                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
28
                let circ = self.circuits.remove(leg)?;
16
                let is_conflux_pending = circ.is_conflux_pending();
16

            
16
                // Drop the removed leg. This will cause it to close if it's not already closed.
16
                drop(circ);
16

            
16
                // If we reach this point, it means we have more than one leg
16
                // (otherwise the .remove() would've returned a Shutdown error),
16
                // so we expect there to be a ConfluxHandshakeContext installed.
16

            
16
                #[cfg(feature = "conflux")]
16
                if is_conflux_pending {
16
                    let (error, proto_violation): (_, Option<Error>) = match &reason {
                        RemoveLegReason::ConfluxHandshakeTimeout => {
4
                            (ConfluxHandshakeError::Timeout, None)
                        }
12
                        RemoveLegReason::ConfluxHandshakeErr(e) => {
12
                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
                        }
                        RemoveLegReason::ChannelClosed => {
                            (ConfluxHandshakeError::ChannelClosed, None)
                        }
                    };
16
                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
16
                    if let Some(e) = proto_violation {
                        // TODO: make warn_report support structured logging
12
                        tor_error::warn_report!(
                            e,
                            "{}: Malformed conflux handshake, tearing down tunnel",
                            self.tunnel_id
                        );
12
                        return Err(e.into());
4
                    }
                }
            }
            #[cfg(feature = "conflux")]
52
            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
52
                // Note: on the client-side, the handshake is considered complete once the
52
                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
52
                //
52
                // We're optimistic here, and declare the handshake a success *before*
52
                // sending the LINKED_ACK response. I think this is OK though,
52
                // because if the send_relay_cell() below fails, the reactor will shut
52
                // down anyway. OTOH, marking the handshake as complete slightly early
52
                // means that on the happy path, the circuit is marked as usable sooner,
52
                // instead of blocking on the sending of the LINKED_ACK.
52
                self.note_conflux_handshake_result(Ok(()), false)?;
48
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
48
                res?;
            }
            #[cfg(feature = "conflux")]
48
            RunOnceCmdInner::Link { circuits, answer } => {
48
                // Add the specified circuits to our conflux set,
48
                // and send a LINK cell down each unlinked leg.
48
                //
48
                // NOTE: this will block the reactor until all the cells are sent.
48
                self.handle_link_circuits(circuits, answer).await?;
            }
            #[cfg(feature = "conflux")]
16
            RunOnceCmdInner::Enqueue { leg, msg } => {
16
                let entry = ConfluxHeapEntry { leg_id: leg, msg };
16
                self.ooo_msgs.push(entry);
16
            }
        }
4332
        Ok(())
4380
    }
    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
    ///
    /// Returns an error if an unexpected `CtrlMsg` is received.
408
    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
272
        let msg = select_biased! {
272
            res = self.command.next() => {
240
                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
232
                match cmd {
                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
                    #[cfg(test)]
                    CtrlCmd::AddFakeHop {
232
                        relay_cell_format: format,
232
                        fwd_lasthop,
232
                        rev_lasthop,
232
                        peer_id,
232
                        params,
232
                        done,
                    } => {
232
                        let leg = self.circuits.single_leg_mut()?;
232
                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
232
                        return Ok(())
                    },
                    _ => {
                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
                    }
                }
            },
272
            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
        };
32
        match msg {
            CtrlMsg::Create {
32
                recv_created,
32
                handshake,
32
                settings,
32
                done,
            } => {
                // TODO(conflux): instead of crashing the reactor, it might be better
                // to send the error via the done channel instead
32
                let leg = self.circuits.single_leg_mut()?;
32
                leg.handle_create(recv_created, handshake, settings, done)
32
                    .await
            }
            _ => {
                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
            }
        }
272
    }
    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
    ///
    /// If all the circuits we were waiting on have finished the conflux handshake,
    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
    /// are sent to the handshake initiator.
    #[cfg(feature = "conflux")]
68
    fn note_conflux_handshake_result(
68
        &mut self,
68
        res: StdResult<(), ConfluxHandshakeError>,
68
        reactor_is_closing: bool,
68
    ) -> StdResult<(), ReactorError> {
68
        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
68
            Some(conflux_ctx) => {
68
                conflux_ctx.results.push(res);
68
                // Whether all the legs have finished linking:
68
                conflux_ctx.results.len() == conflux_ctx.num_legs
            }
            None => {
                return Err(internal!("no conflux handshake context").into());
            }
        };
68
        if tunnel_complete || reactor_is_closing {
            // Time to remove the conflux handshake context
            // and extract the results we have collected
40
            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
40

            
88
            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
40
            let leg_count = conflux_ctx.results.len();
40

            
40
            info!(
                tunnel_id = %self.tunnel_id,
                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
            );
40
            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
            // We don't expect to receive any more handshake results,
            // at least not until we get another LinkCircuits control message,
            // which will install a new ConfluxHandshakeCtx with a channel
            // for us to send updates on
28
        }
64
        Ok(())
68
    }
    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
    fn prepare_msg_and_install_handler(
        &mut self,
        msg: Option<AnyRelayMsgOuter>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<Option<SendRelayCell>> {
        let msg = msg
            .map(|msg| {
                let handlers = &mut self.cell_handlers;
                let handler = handler
                    .as_ref()
                    .or(handlers.meta_handler.as_ref())
                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
                // We should always have a precise HopLocation here so this should never fails but
                // in case we have a ::JointPoint, we'll notice.
                let hop = handler.expected_hop().hop_num().ok_or(bad_api_usage!(
                    "MsgHandler doesn't have a precise HopLocation"
                ))?;
                Ok::<_, crate::Error>(SendRelayCell {
                    hop,
                    early: false,
                    cell: msg,
                })
            })
            .transpose()?;
        if let Some(handler) = handler {
            self.cell_handlers.set_meta_handler(handler)?;
        }
        Ok(msg)
    }
    /// Handle a shutdown request.
48
    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
48
        trace!(
            tunnel_id = %self.tunnel_id,
            "reactor shutdown due to explicit request",
        );
48
        Err(ReactorError::Shutdown)
48
    }
    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
    ///
    /// Returns an error over the `answer` channel if the reactor has no circuits,
    /// or more than one circuit. The reactor will shut down regardless.
    #[cfg(feature = "conflux")]
48
    fn handle_shutdown_and_return_circuit(
48
        &mut self,
48
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
48
    ) -> StdResult<(), ReactorError> {
48
        // Don't care if the receiver goes away
48
        let _ = answer.send(self.circuits.take_single_leg().map_err(Into::into));
48
        self.handle_shutdown().map(|_| ())
48
    }
    /// Resolves a [`TargetHop`] to a [`HopLocation`].
    ///
    /// After resolving a `TargetHop::LastHop`,
    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
    ///
    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
    /// if you need a fixed hop position in the tunnel.
    /// For example if we open a stream to `TargetHop::LastHop`,
    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
    /// as we don't want the stream position to change as the tunnel is extended or truncated.
    ///
    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
    /// and the tunnel has no hops
    /// (either has no legs, or has legs which contain no hops).
108
    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
108
        match hop {
40
            TargetHop::Hop(hop) => Ok(hop),
            TargetHop::LastHop => {
68
                if let Ok(leg) = self.circuits.single_leg() {
56
                    let leg_id = leg.unique_id();
                    // single-path tunnel
56
                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
56
                    Ok(HopLocation::Hop((leg_id, hop)))
12
                } else if !self.circuits.is_empty() {
                    // multi-path tunnel
12
                    Ok(HopLocation::JoinPoint)
                } else {
                    // no legs
                    Err(NoHopsBuiltError)
                }
            }
        }
108
    }
    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
    ///
    /// After resolving a `HopLocation::JoinPoint`,
    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
    ///
    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
    /// need them,
    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
    /// iterations as the primary leg may change from one iteration to the next.
    ///
    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
    /// but it does not have a join point.
144
    fn resolve_hop_location(
144
        &self,
144
        hop: HopLocation,
144
    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
144
        match hop {
132
            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
            HopLocation::JoinPoint => {
12
                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
12
                    Ok((leg_id, hop_num))
                } else {
                    // Attempted to get the join point of a non-multipath tunnel.
                    Err(NoJoinPointError)
                }
            }
        }
144
    }
    /// Resolve a [`TargetHop`] directly into a [`UniqId`] and [`HopNum`].
    ///
    /// This is a helper function that basically calls both resolve_target_hop and
    /// resolve_hop_location back to back.
    ///
    /// It returns None on failure to resolve meaning that if you want more detailed error on why
    /// it failed, explicitly use the resolve_hop_location() and resolve_target_hop() functions.
40
    pub(crate) fn target_hop_to_hopnum_id(&self, hop: TargetHop) -> Option<(UniqId, HopNum)> {
40
        self.resolve_target_hop(hop)
40
            .ok()
60
            .and_then(|resolved| self.resolve_hop_location(resolved).ok())
40
    }
    /// Does congestion control use stream SENDMEs for the given hop?
    ///
    /// Returns `None` if either the `leg` or `hop` don't exist.
    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
        self.circuits.uses_stream_sendme(leg, hop)
    }
    /// Handle a request to link some extra circuits in the reactor's conflux set.
    ///
    /// The circuits are validated, and if they do not have the same length,
    /// or if they do not all have the same last hop, an error is returned on
    /// the `answer` channel, and the conflux handshake is *not* initiated.
    ///
    /// If validation succeeds, the circuits are added to this reactor's conflux set,
    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
    ///
    /// NOTE: this blocks the reactor main loop until all the cells are sent.
    #[cfg(feature = "conflux")]
48
    async fn handle_link_circuits(
48
        &mut self,
48
        circuits: Vec<Circuit>,
48
        answer: ConfluxLinkResultChannel,
72
    ) -> StdResult<(), ReactorError> {
        use tor_error::warn_report;
48
        if self.conflux_hs_ctx.is_some() {
            let err = internal!("conflux linking already in progress");
            send_conflux_outcome(answer, Err(err.into()))?;
            return Ok(());
48
        }
48

            
48
        let unlinked_legs = self.circuits.num_unlinked();
48

            
48
        // We need to send the LINK cell on each of the new circuits
48
        // and on each of the existing, unlinked legs from self.circuits.
48
        //
48
        // In reality, there can only be one such circuit
48
        // (the "initial" one from the previously single-path tunnel),
48
        // because any circuits that to complete the conflux handshake
48
        // get removed from the set.
48
        let num_legs = circuits.len() + unlinked_legs;
        // Note: add_legs validates `circuits`
48
        let res = async {
48
            self.circuits.add_legs(circuits, &self.runtime)?;
40
            self.circuits.link_circuits(&self.runtime).await
48
        }
48
        .await;
48
        if let Err(e) = res {
8
            warn_report!(e, "Failed to link conflux circuits");
8
            send_conflux_outcome(answer, Err(e))?;
40
        } else {
40
            // Save the channel, to notify the user of completion.
40
            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
40
                answer,
40
                num_legs,
40
                results: Default::default(),
40
            });
40
        }
48
        Ok(())
48
    }
}
/// Notify the conflux handshake initiator of the handshake outcome.
///
/// Returns an error if the initiator has done away.
#[cfg(feature = "conflux")]
48
fn send_conflux_outcome(
48
    tx: ConfluxLinkResultChannel,
48
    res: Result<ConfluxHandshakeResult>,
48
) -> StdResult<(), ReactorError> {
48
    if tx.send(res).is_err() {
4
        tracing::warn!("conflux initiator went away before handshake completed?");
4
        return Err(ReactorError::Shutdown);
44
    }
44

            
44
    Ok(())
48
}
/// The tunnel does not have any hops.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("no hops have been built for this tunnel")]
pub(crate) struct NoHopsBuiltError;
/// The tunnel does not have a join point.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("the tunnel does not have a join point")]
pub(crate) struct NoJoinPointError;
impl CellHandlers {
    /// Try to install a given meta-cell handler to receive any unusual cells on
    /// this circuit, along with a result channel to notify on completion.
48
    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
48
        if self.meta_handler.is_none() {
48
            self.meta_handler = Some(handler);
48
            Ok(())
        } else {
            Err(Error::from(internal!(
                "Tried to install a meta-cell handler before the old one was gone."
            )))
        }
48
    }
    /// Try to install a given cell handler on this circuit.
    #[cfg(feature = "hs-service")]
40
    fn set_incoming_stream_req_handler(
40
        &mut self,
40
        handler: IncomingStreamRequestHandler,
40
    ) -> Result<()> {
40
        if self.incoming_stream_req_handler.is_none() {
32
            self.incoming_stream_req_handler = Some(handler);
32
            Ok(())
        } else {
8
            Err(Error::from(internal!(
8
                "Tried to install a BEGIN cell handler before the old one was gone."
8
            )))
        }
40
    }
}
#[cfg(test)]
mod test {
    // Tested in [`crate::tunnel::circuit::test`].
}