1
//! Code to handle incoming cells on a circuit.
2
//!
3
//! ## On message validation
4
//!
5
//! There are three steps for validating an incoming message on a stream:
6
//!
7
//! 1. Is the message contextually appropriate? (e.g., no more than one
8
//!    `CONNECTED` message per stream.) This is handled by calling
9
//!    [`CmdChecker::check_msg`](crate::stream::CmdChecker::check_msg).
10
//! 2. Does the message comply with flow-control rules? (e.g., no more data than
11
//!    we've gotten SENDMEs for.) For open streams, the stream itself handles
12
//!    this; for half-closed streams, the reactor handles it using the
13
//!    `halfstream` module.
14
//! 3. Does the message have an acceptable command type, and is the message
15
//!    well-formed? For open streams, the streams themselves handle this check.
16
//!    For half-closed streams, the reactor handles it by calling
17
//!    `consume_checked_msg()`.
18

            
19
pub(super) mod circuit;
20
mod conflux;
21
mod control;
22
pub(super) mod syncview;
23

            
24
use crate::crypto::cell::HopNum;
25
use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
26
use crate::memquota::{CircuitAccount, StreamAccount};
27
use crate::stream::AnyCmdChecker;
28
#[cfg(feature = "hs-service")]
29
use crate::stream::{IncomingStreamRequest, IncomingStreamRequestFilter};
30
use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
31
use crate::tunnel::circuit::unique_id::UniqId;
32
use crate::tunnel::circuit::CircuitRxReceiver;
33
use crate::tunnel::{streammap, HopLocation, TargetHop, TunnelId, TunnelScopedCircId};
34
use crate::util::err::ReactorError;
35
use crate::util::skew::ClockSkew;
36
use crate::{Error, Result};
37
use circuit::{Circuit, CircuitCmd};
38
use conflux::ConfluxSet;
39
use control::ControlHandler;
40
use std::cmp::Ordering;
41
use std::collections::BinaryHeap;
42
use std::mem::size_of;
43
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
44
use tor_cell::relaycell::{AnyRelayMsgOuter, RelayCellFormat, StreamId, UnparsedRelayMsg};
45
use tor_error::{bad_api_usage, internal, into_bad_api_usage, Bug};
46
use tor_rtcompat::DynTimeProvider;
47

            
48
use futures::channel::mpsc;
49
use futures::StreamExt;
50
use futures::{select_biased, FutureExt as _};
51
use oneshot_fused_workaround as oneshot;
52

            
53
use std::result::Result as StdResult;
54
use std::sync::Arc;
55

            
56
use crate::channel::Channel;
57
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
58
use crate::tunnel::circuit::{StreamMpscReceiver, StreamMpscSender};
59
use derive_deftly::Deftly;
60
use derive_more::From;
61
use tor_cell::chancell::CircId;
62
use tor_llcrypto::pk;
63
use tor_memquota::derive_deftly_template_HasMemoryCost;
64
use tor_memquota::mq_queue::{self, MpscSpec};
65
use tracing::{info, trace, warn};
66

            
67
use super::circuit::{MutableState, TunnelMutableState};
68

            
69
#[cfg(feature = "conflux")]
70
use {crate::util::err::ConfluxHandshakeError, conflux::OooRelayMsg};
71

            
72
pub(super) use control::CtrlCmd;
73
pub(super) use control::CtrlMsg;
74

            
75
/// The type of a oneshot channel used to inform reactor of the result of an operation.
76
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
77

            
78
/// Contains a list of conflux handshake results.
79
#[cfg(feature = "conflux")]
80
pub(super) type ConfluxHandshakeResult = Vec<StdResult<(), ConfluxHandshakeError>>;
81

            
82
/// The type of oneshot channel used to inform reactor users of the outcome
83
/// of a client-side conflux handshake.
84
///
85
/// Contains a list of handshake results, one for each circuit that we were asked
86
/// to link in the tunnel.
87
#[cfg(feature = "conflux")]
88
pub(super) type ConfluxLinkResultChannel = ReactorResultChannel<ConfluxHandshakeResult>;
89

            
90
pub(crate) use circuit::{RECV_WINDOW_INIT, STREAM_READER_BUFFER};
91

            
92
/// MPSC queue containing stream requests
93
#[cfg(feature = "hs-service")]
94
type StreamReqSender = mq_queue::Sender<StreamReqInfo, MpscSpec>;
95

            
96
/// A handshake type, to be used when creating circuit hops.
97
#[derive(Clone, Debug)]
98
pub(crate) enum CircuitHandshake {
99
    /// Use the CREATE_FAST handshake.
100
    CreateFast,
101
    /// Use the ntor handshake.
102
    Ntor {
103
        /// The public key of the relay.
104
        public_key: NtorPublicKey,
105
        /// The Ed25519 identity of the relay, which is verified against the
106
        /// identity held in the circuit's channel.
107
        ed_identity: pk::ed25519::Ed25519Identity,
108
    },
109
    /// Use the ntor-v3 handshake.
110
    NtorV3 {
111
        /// The public key of the relay.
112
        public_key: NtorV3PublicKey,
113
    },
114
}
115

            
116
/// A behavior to perform when closing a stream.
117
///
118
/// We don't use `Option<End>`` here, since the behavior of `SendNothing` is so surprising
119
/// that we shouldn't let it pass unremarked.
120
#[derive(Clone, Debug)]
121
pub(crate) enum CloseStreamBehavior {
122
    /// Send nothing at all, so that the other side will not realize we have
123
    /// closed the stream.
124
    ///
125
    /// We should only do this for incoming onion service streams when we
126
    /// want to black-hole the client's requests.
127
    SendNothing,
128
    /// Send an End cell, if we haven't already sent one.
129
    SendEnd(End),
130
}
131
impl Default for CloseStreamBehavior {
132
32
    fn default() -> Self {
133
32
        Self::SendEnd(End::new_misc())
134
32
    }
135
}
136

            
137
// TODO: the RunOnceCmd/RunOnceCmdInner/CircuitCmd/CircuitAction enum
138
// proliferation is a bit bothersome, but unavoidable with the current design.
139
//
140
// We should consider getting rid of some of these enums (if possible),
141
// and coming up with more intuitive names.
142

            
143
/// One or more [`RunOnceCmdInner`] to run inside [`Reactor::run_once`].
144
#[derive(From, Debug)]
145
enum RunOnceCmd {
146
    /// Run a single `RunOnceCmdInner` command.
147
    Single(RunOnceCmdInner),
148
    /// Run multiple `RunOnceCmdInner` commands.
149
    //
150
    // Note: this whole enum *could* be replaced with Vec<RunOnceCmdInner>,
151
    // but most of the time we're only going to have *one* RunOnceCmdInner
152
    // to run per run_once() loop. The enum enables us avoid the extra heap
153
    // allocation for the `RunOnceCmd::Single` case.
154
    Multiple(Vec<RunOnceCmdInner>),
155
}
156

            
157
/// Instructions for running something in the reactor loop.
158
///
159
/// Run at the end of [`Reactor::run_once`].
160
//
161
// TODO: many of the variants of this enum have an identical CtrlMsg counterpart.
162
// We should consider making each variant a tuple variant and deduplicating the fields.
163
#[derive(educe::Educe)]
164
#[educe(Debug)]
165
enum RunOnceCmdInner {
166
    /// Send a RELAY cell.
167
    Send {
168
        /// The leg the cell should be sent on.
169
        leg: UniqId,
170
        /// The cell to send.
171
        cell: SendRelayCell,
172
        /// A channel for sending completion notifications.
173
        done: Option<ReactorResultChannel<()>>,
174
    },
175
    /// Send a given control message on this circuit, and install a control-message handler to
176
    /// receive responses.
177
    #[cfg(feature = "send-control-msg")]
178
    SendMsgAndInstallHandler {
179
        /// The message to send, if any
180
        msg: Option<AnyRelayMsgOuter>,
181
        /// A message handler to install.
182
        ///
183
        /// If this is `None`, there must already be a message handler installed
184
        #[educe(Debug(ignore))]
185
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
186
        /// A sender that we use to tell the caller that the message was sent
187
        /// and the handler installed.
188
        done: oneshot::Sender<Result<()>>,
189
    },
190
    /// Handle a SENDME message.
191
    HandleSendMe {
192
        /// The leg the SENDME was received on.
193
        leg: UniqId,
194
        /// The hop number.
195
        hop: HopNum,
196
        /// The SENDME message to handle.
197
        sendme: Sendme,
198
    },
199
    /// Begin a stream with the provided hop in this circuit.
200
    ///
201
    /// Uses the provided stream ID, and sends the provided message to that hop.
202
    BeginStream {
203
        /// The cell to send.
204
        cell: Result<(SendRelayCell, StreamId)>,
205
        /// The location of the hop on the tunnel. We don't use this (and `Circuit`s shouldn't need
206
        /// to worry about legs anyways), but need it so that we can pass it back in `done` to the
207
        /// caller.
208
        hop: HopLocation,
209
        /// The circuit leg to begin the stream on.
210
        leg: UniqId,
211
        /// Oneshot channel to notify on completion, with the allocated stream ID.
212
        done: ReactorResultChannel<(StreamId, HopLocation, RelayCellFormat)>,
213
    },
214
    /// Close the specified stream.
215
    CloseStream {
216
        /// The hop number.
217
        hop: HopLocation,
218
        /// The ID of the stream to close.
219
        sid: StreamId,
220
        /// The stream-closing behavior.
221
        behav: CloseStreamBehavior,
222
        /// The reason for closing the stream.
223
        reason: streammap::TerminateReason,
224
        /// A channel for sending completion notifications.
225
        done: Option<ReactorResultChannel<()>>,
226
    },
227
    /// Get the clock skew claimed by the first hop of the circuit.
228
    FirstHopClockSkew {
229
        /// Oneshot channel to return the clock skew.
230
        answer: oneshot::Sender<StdResult<ClockSkew, Bug>>,
231
    },
232
    /// Remove a circuit leg from the conflux set.
233
    RemoveLeg {
234
        /// The circuit leg to remove.
235
        leg: UniqId,
236
        /// The reason for removal.
237
        ///
238
        /// This is only used for conflux circuits that get removed
239
        /// before the conflux handshake is complete.
240
        ///
241
        /// The [`RemoveLegReason`] is mapped by the reactor to a
242
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
243
        /// handshake to indicate the reason the handshake failed.
244
        reason: RemoveLegReason,
245
    },
246
    /// A circuit has completed the conflux handshake,
247
    /// and wants to send the specified cell.
248
    ///
249
    /// This is similar to [`RunOnceCmdInner::Send`],
250
    /// but needs to remain a separate variant,
251
    /// because in addition to instructing the reactor to send a cell,
252
    /// it also notifies it that the conflux handshake is complete on the specified `leg`.
253
    /// This enables the reactor to save the handshake result (`Ok(())`),
254
    /// and, if there are no other legs still in the handshake phase,
255
    /// send the result to the handshake initiator.
256
    #[cfg(feature = "conflux")]
257
    ConfluxHandshakeComplete {
258
        /// The circuit leg that has completed the handshake,
259
        /// This is the leg the cell should be sent on.
260
        leg: UniqId,
261
        /// The cell to send.
262
        cell: SendRelayCell,
263
    },
264
    /// Send a LINK cell on each of the unlinked circuit legs in the conflux set of this reactor.
265
    #[cfg(feature = "conflux")]
266
    Link {
267
        /// The circuits to link into the tunnel
268
        #[educe(Debug(ignore))]
269
        circuits: Vec<Circuit>,
270
        /// Oneshot channel for notifying of conflux handshake completion.
271
        answer: ConfluxLinkResultChannel,
272
    },
273
    /// Enqueue an out-of-order cell in ooo_msg.
274
    #[cfg(feature = "conflux")]
275
    Enqueue {
276
        /// The leg the entry originated from.
277
        leg: UniqId,
278
        /// The out-of-order message.
279
        msg: OooRelayMsg,
280
    },
281
    /// Perform a clean shutdown on this circuit.
282
    CleanShutdown,
283
}
284

            
285
impl RunOnceCmdInner {
286
    /// Create a [`RunOnceCmdInner`] out of a [`CircuitCmd`] and [`UniqId`].
287
4046
    fn from_circuit_cmd(leg: UniqId, cmd: CircuitCmd) -> Self {
288
4046
        match cmd {
289
3942
            CircuitCmd::Send(cell) => Self::Send {
290
3942
                leg,
291
3942
                cell,
292
3942
                done: None,
293
3942
            },
294
12
            CircuitCmd::HandleSendMe { hop, sendme } => Self::HandleSendMe { leg, hop, sendme },
295
            CircuitCmd::CloseStream {
296
32
                hop,
297
32
                sid,
298
32
                behav,
299
32
                reason,
300
32
            } => Self::CloseStream {
301
32
                hop: HopLocation::Hop((leg, hop)),
302
32
                sid,
303
32
                behav,
304
32
                reason,
305
32
                done: None,
306
32
            },
307
            #[cfg(feature = "conflux")]
308
16
            CircuitCmd::ConfluxRemove(reason) => Self::RemoveLeg { leg, reason },
309
            #[cfg(feature = "conflux")]
310
36
            CircuitCmd::ConfluxHandshakeComplete(cell) => {
311
36
                Self::ConfluxHandshakeComplete { leg, cell }
312
            }
313
            #[cfg(feature = "conflux")]
314
            CircuitCmd::Enqueue(msg) => Self::Enqueue { leg, msg },
315
8
            CircuitCmd::CleanShutdown => Self::CleanShutdown,
316
        }
317
4046
    }
318
}
319

            
320
/// Cmd for sending a relay cell.
321
///
322
/// The contents of this struct are passed to `send_relay_cell`
323
#[derive(educe::Educe)]
324
#[educe(Debug)]
325
pub(crate) struct SendRelayCell {
326
    /// The hop number.
327
    pub(crate) hop: HopNum,
328
    /// Whether to use a RELAY_EARLY cell.
329
    pub(crate) early: bool,
330
    /// The cell to send.
331
    pub(crate) cell: AnyRelayMsgOuter,
332
}
333

            
334
/// A command to execute at the end of [`Reactor::run_once`].
335
#[derive(From, Debug)]
336
#[allow(clippy::large_enum_variant)] // TODO #2003: should we resolve this?
337
enum CircuitAction {
338
    /// Run a single `CircuitCmd` command.
339
    RunCmd {
340
        /// The unique identifier of the circuit leg to run the command on
341
        leg: UniqId,
342
        /// The command to run.
343
        cmd: CircuitCmd,
344
    },
345
    /// Handle a control message
346
    HandleControl(CtrlMsg),
347
    /// Handle an input message.
348
    HandleCell {
349
        /// The unique identifier of the circuit leg the message was received on.
350
        leg: UniqId,
351
        /// The message to handle.
352
        cell: ClientCircChanMsg,
353
    },
354
    /// Remove the specified circuit leg from the conflux set.
355
    ///
356
    /// Returned whenever a single circuit leg needs to be be removed
357
    /// from the reactor's conflux set, without necessarily tearing down
358
    /// the whole set or shutting down the reactor.
359
    ///
360
    /// Note: this action *can* cause the reactor to shut down
361
    /// (and the conflux set to be closed).
362
    ///
363
    /// See the [`ConfluxSet::remove`] docs for more on the exact behavior of this command.
364
    RemoveLeg {
365
        /// The leg to remove.
366
        leg: UniqId,
367
        /// The reason for removal.
368
        ///
369
        /// This is only used for conflux circuits that get removed
370
        /// before the conflux handshake is complete.
371
        ///
372
        /// The [`RemoveLegReason`] is mapped by the reactor to a
373
        /// [`ConfluxHandshakeError`] that is sent to the initiator of the
374
        /// handshake to indicate the reason the handshake failed.
375
        reason: RemoveLegReason,
376
    },
377
}
378

            
379
/// The reason for removing a circuit leg from the conflux set.
380
#[derive(Debug, derive_more::Display)]
381
enum RemoveLegReason {
382
    /// The conflux handshake timed out.
383
    ///
384
    /// On the client-side, this means we didn't receive
385
    /// the CONFLUX_LINKED response in time.
386
    #[display("conflux handshake timed out")]
387
    ConfluxHandshakeTimeout,
388
    /// An error occurred during conflux handshake.
389
    #[display("{}", _0)]
390
    ConfluxHandshakeErr(Error),
391
    /// The channel was closed.
392
    #[display("channel closed")]
393
    ChannelClosed,
394
}
395

            
396
/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
397
/// progress.
398
///
399
/// # Background
400
///
401
/// The `Reactor` can't have async functions that send and receive cells, because its job is to
402
/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
403
///
404
/// To get around this problem, the reactor can send some cells, and then make one of these
405
/// `MetaCellHandler` objects, which will be run when the reply arrives.
406
pub(crate) trait MetaCellHandler: Send {
407
    /// The hop we're expecting the message to come from. This is compared against the hop
408
    /// from which we actually receive messages, and an error is thrown if the two don't match.
409
    fn expected_hop(&self) -> HopNum;
410
    /// Called when the message we were waiting for arrives.
411
    ///
412
    /// Gets a copy of the `Reactor` in order to do anything it likes there.
413
    ///
414
    /// If this function returns an error, the reactor will shut down.
415
    fn handle_msg(
416
        &mut self,
417
        msg: UnparsedRelayMsg,
418
        reactor: &mut Circuit,
419
    ) -> Result<MetaCellDisposition>;
420
}
421

            
422
/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
423
#[derive(Debug, Clone)]
424
#[cfg_attr(feature = "send-control-msg", visibility::make(pub))]
425
#[non_exhaustive]
426
pub(crate) enum MetaCellDisposition {
427
    /// The message was consumed; the handler should remain installed.
428
    #[cfg(feature = "send-control-msg")]
429
    Consumed,
430
    /// The message was consumed; the handler should be uninstalled.
431
    ConversationFinished,
432
    /// The message was consumed; the circuit should be closed.
433
    #[cfg(feature = "send-control-msg")]
434
    CloseCirc,
435
    // TODO: Eventually we might want the ability to have multiple handlers
436
    // installed, and to let them say "not for me, maybe for somebody else?".
437
    // But right now we don't need that.
438
}
439

            
440
/// Unwrap the specified [`Option`], returning a [`ReactorError::Shutdown`] if it is `None`.
441
///
442
/// This is a macro instead of a function to work around borrowck errors
443
/// in the select! from run_once().
444
macro_rules! unwrap_or_shutdown {
445
    ($self:expr, $res:expr, $reason:expr) => {{
446
        match $res {
447
            None => {
448
                trace!(
449
                    tunnel_id = %$self.tunnel_id,
450
                    reason = %$reason,
451
                    "reactor shutdown"
452
                );
453
                Err(ReactorError::Shutdown)
454
            }
455
            Some(v) => Ok(v),
456
        }
457
    }};
458
}
459

            
460
/// Object to handle incoming cells and background tasks on a circuit
461
///
462
/// This type is returned when you finish a circuit; you need to spawn a
463
/// new task that calls `run()` on it.
464
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
465
pub struct Reactor {
466
    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
467
    ///
468
    /// This channel is polled in [`Reactor::run_once`], but only if the `chan_sender` sink
469
    /// is ready to accept cells.
470
    control: mpsc::UnboundedReceiver<CtrlMsg>,
471
    /// Receiver for command messages for this reactor, sent by `ClientCirc` objects.
472
    ///
473
    /// This channel is polled in [`Reactor::run_once`].
474
    ///
475
    /// NOTE: this is a separate channel from `control`, because some messages
476
    /// have higher priority and need to be handled even if the `chan_sender` is not
477
    /// ready (whereas `control` messages are not read until the `chan_sender` sink
478
    /// is ready to accept cells).
479
    command: mpsc::UnboundedReceiver<CtrlCmd>,
480
    /// A oneshot sender that is used to alert other tasks when this reactor is
481
    /// finally dropped.
482
    ///
483
    /// It is a sender for Void because we never actually want to send anything here;
484
    /// we only want to generate canceled events.
485
    #[allow(dead_code)] // the only purpose of this field is to be dropped.
486
    reactor_closed_tx: oneshot::Sender<void::Void>,
487
    /// A set of circuits that form a tunnel.
488
    ///
489
    /// Contains 1 or more circuits.
490
    ///
491
    /// Circuits may be added to this set throughout the lifetime of the reactor.
492
    ///
493
    /// Sometimes, the reactor will remove circuits from this set,
494
    /// for example if the `LINKED` message takes too long to arrive,
495
    /// or if congestion control negotiation fails.
496
    /// The reactor will continue running with the remaining circuits.
497
    /// It will shut down if *all* the circuits are removed.
498
    ///
499
    // TODO(conflux): document all the reasons why the reactor might
500
    // chose to tear down a circuit or tunnel (timeouts, protocol violations, etc.)
501
    circuits: ConfluxSet,
502
    /// An identifier for logging about this tunnel reactor.
503
    tunnel_id: TunnelId,
504
    /// Handlers, shared with `Circuit`.
505
    cell_handlers: CellHandlers,
506
    /// The time provider, used for conflux handshake timeouts.
507
    runtime: DynTimeProvider,
508
    /// The conflux handshake context, if there is an on-going handshake.
509
    ///
510
    /// Set to `None` if this is a single-path tunnel,
511
    /// or if none of the circuit legs from our conflux set
512
    /// are currently in the conflux handshake phase.
513
    #[cfg(feature = "conflux")]
514
    conflux_hs_ctx: Option<ConfluxHandshakeCtx>,
515
    /// A min-heap buffering all the out-of-order messages received so far.
516
    ///
517
    /// TODO(conflux): this becomes a DoS vector unless we impose a limit
518
    /// on its size. We should make this participate in the memquota memory
519
    /// tracking system, somehow.
520
    #[cfg(feature = "conflux")]
521
    ooo_msgs: BinaryHeap<ConfluxHeapEntry>,
522
}
523

            
524
/// The context for an on-going conflux handshake.
525
#[cfg(feature = "conflux")]
526
struct ConfluxHandshakeCtx {
527
    /// A channel for notifying the caller of the outcome of a CONFLUX_LINK request.
528
    answer: ConfluxLinkResultChannel,
529
    /// The number of legs that are currently doing the handshake.
530
    num_legs: usize,
531
    /// The handshake results we have collected so far.
532
    results: ConfluxHandshakeResult,
533
}
534

            
535
/// An out-of-order message buffered in [`Reactor::ooo_msgs`].
536
#[derive(Debug)]
537
#[cfg(feature = "conflux")]
538
struct ConfluxHeapEntry {
539
    /// The leg id this message came from.
540
    leg_id: UniqId,
541
    /// The out of order message
542
    msg: OooRelayMsg,
543
}
544

            
545
#[cfg(feature = "conflux")]
546
impl Ord for ConfluxHeapEntry {
547
    fn cmp(&self, other: &Self) -> Ordering {
548
        self.msg.cmp(&other.msg).reverse()
549
    }
550
}
551

            
552
#[cfg(feature = "conflux")]
553
impl PartialOrd for ConfluxHeapEntry {
554
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
555
        Some(self.cmp(other))
556
    }
557
}
558

            
559
#[cfg(feature = "conflux")]
560
impl PartialEq for ConfluxHeapEntry {
561
    fn eq(&self, other: &Self) -> bool {
562
        self.msg == other.msg
563
    }
564
}
565

            
566
#[cfg(feature = "conflux")]
567
impl Eq for ConfluxHeapEntry {}
568

            
569
/// Cell handlers, shared between the Reactor and its underlying `Circuit`s.
570
struct CellHandlers {
571
    /// A handler for a meta cell, together with a result channel to notify on completion.
572
    meta_handler: Option<Box<dyn MetaCellHandler + Send>>,
573
    /// A handler for incoming stream requests.
574
    #[cfg(feature = "hs-service")]
575
    incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
576
}
577

            
578
/// Information about an incoming stream request.
579
#[cfg(feature = "hs-service")]
580
#[derive(Debug, Deftly)]
581
#[derive_deftly(HasMemoryCost)]
582
pub(crate) struct StreamReqInfo {
583
    /// The [`IncomingStreamRequest`].
584
    pub(crate) req: IncomingStreamRequest,
585
    /// The ID of the stream being requested.
586
    pub(crate) stream_id: StreamId,
587
    /// The [`HopNum`].
588
    //
589
    // TODO: When we add support for exit relays, we need to turn this into an Option<HopNum>.
590
    // (For outbound messages (towards relays), there is only one hop that can send them: the client.)
591
    //
592
    // TODO: For onion services, we might be able to enforce the HopNum earlier: we would never accept an
593
    // incoming stream request from two separate hops.  (There is only one that's valid.)
594
    pub(crate) hop_num: HopNum,
595
    /// The [`UniqId`] of the circuit the request came on.
596
    pub(crate) leg: UniqId,
597
    /// The format which must be used with this stream to encode messages.
598
    #[deftly(has_memory_cost(indirect_size = "0"))]
599
    pub(crate) relay_cell_format: RelayCellFormat,
600
    /// A channel for receiving messages from this stream.
601
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate
602
    pub(crate) receiver: StreamMpscReceiver<UnparsedRelayMsg>,
603
    /// A channel for sending messages to be sent on this stream.
604
    #[deftly(has_memory_cost(indirect_size = "size_of::<AnyRelayMsg>()"))] // estimate
605
    pub(crate) msg_tx: StreamMpscSender<AnyRelayMsg>,
606
    /// The memory quota account to be used for this stream
607
    #[deftly(has_memory_cost(indirect_size = "0"))] // estimate (it contains an Arc)
608
    pub(crate) memquota: StreamAccount,
609
}
610

            
611
/// Data required for handling an incoming stream request.
612
#[cfg(feature = "hs-service")]
613
#[derive(educe::Educe)]
614
#[educe(Debug)]
615
struct IncomingStreamRequestHandler {
616
    /// A sender for sharing information about an incoming stream request.
617
    incoming_sender: StreamReqSender,
618
    /// A [`AnyCmdChecker`] for validating incoming stream requests.
619
    cmd_checker: AnyCmdChecker,
620
    /// The hop to expect incoming stream requests from.
621
    hop_num: HopNum,
622
    /// An [`IncomingStreamRequestFilter`] for checking whether the user wants
623
    /// this request, or wants to reject it immediately.
624
    #[educe(Debug(ignore))]
625
    filter: Box<dyn IncomingStreamRequestFilter>,
626
}
627

            
628
impl Reactor {
629
    /// Create a new circuit reactor.
630
    ///
631
    /// The reactor will send outbound messages on `channel`, receive incoming
632
    /// messages on `input`, and identify this circuit by the channel-local
633
    /// [`CircId`] provided.
634
    ///
635
    /// The internal unique identifier for this circuit will be `unique_id`.
636
    #[allow(clippy::type_complexity)] // TODO
637
256
    pub(super) fn new(
638
256
        channel: Arc<Channel>,
639
256
        channel_id: CircId,
640
256
        unique_id: UniqId,
641
256
        input: CircuitRxReceiver,
642
256
        runtime: DynTimeProvider,
643
256
        memquota: CircuitAccount,
644
256
    ) -> (
645
256
        Self,
646
256
        mpsc::UnboundedSender<CtrlMsg>,
647
256
        mpsc::UnboundedSender<CtrlCmd>,
648
256
        oneshot::Receiver<void::Void>,
649
256
        Arc<TunnelMutableState>,
650
256
    ) {
651
256
        let tunnel_id = TunnelId::next();
652
256
        let (control_tx, control_rx) = mpsc::unbounded();
653
256
        let (command_tx, command_rx) = mpsc::unbounded();
654
256
        let mutable = Arc::new(MutableState::default());
655
256

            
656
256
        let (reactor_closed_tx, reactor_closed_rx) = oneshot::channel();
657
256

            
658
256
        let cell_handlers = CellHandlers {
659
256
            meta_handler: None,
660
256
            #[cfg(feature = "hs-service")]
661
256
            incoming_stream_req_handler: None,
662
256
        };
663
256

            
664
256
        let unique_id = TunnelScopedCircId::new(tunnel_id, unique_id);
665
256
        let circuit_leg = Circuit::new(
666
256
            channel,
667
256
            channel_id,
668
256
            unique_id,
669
256
            input,
670
256
            memquota,
671
256
            Arc::clone(&mutable),
672
256
        );
673
256

            
674
256
        let (circuits, mutable) = ConfluxSet::new(tunnel_id, circuit_leg);
675
256

            
676
256
        let reactor = Reactor {
677
256
            circuits,
678
256
            control: control_rx,
679
256
            command: command_rx,
680
256
            reactor_closed_tx,
681
256
            tunnel_id,
682
256
            cell_handlers,
683
256
            runtime,
684
256
            #[cfg(feature = "conflux")]
685
256
            conflux_hs_ctx: None,
686
256
            #[cfg(feature = "conflux")]
687
256
            ooo_msgs: Default::default(),
688
256
        };
689
256

            
690
256
        (reactor, control_tx, command_tx, reactor_closed_rx, mutable)
691
256
    }
692

            
693
    /// Launch the reactor, and run until the circuit closes or we
694
    /// encounter an error.
695
    ///
696
    /// Once this method returns, the circuit is dead and cannot be
697
    /// used again.
698
384
    pub async fn run(mut self) -> Result<()> {
699
256
        trace!(tunnel_id = %self.tunnel_id, "Running tunnel reactor");
700
256
        let result: Result<()> = loop {
701
5286
            match self.run_once().await {
702
5030
                Ok(()) => (),
703
184
                Err(ReactorError::Shutdown) => break Ok(()),
704
72
                Err(ReactorError::Err(e)) => break Err(e),
705
            }
706
        };
707
256
        trace!(tunnel_id = %self.tunnel_id, "Tunnel reactor stopped: {:?}", result);
708
256
        result
709
256
    }
710

            
711
    /// Helper for run: doesn't mark the circuit closed on finish.  Only
712
    /// processes one cell or control message.
713
7929
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
714
5286
        // If all the circuits are closed, shut down the reactor
715
5286
        if self.circuits.is_empty() {
716
            trace!(
717
                tunnel_id = %self.tunnel_id,
718
                "Tunnel reactor shutting down: all circuits have closed",
719
            );
720

            
721
            return Err(ReactorError::Shutdown);
722
5286
        }
723
5286

            
724
5286
        // If this is a single path circuit, we need to wait until the first hop
725
5286
        // is created before doing anything else
726
5286
        let single_path_with_hops = self
727
5286
            .circuits
728
5286
            .single_leg_mut()
729
5286
            .is_ok_and(|leg| !leg.has_hops());
730
5286
        if single_path_with_hops {
731
256
            self.wait_for_create().await?;
732

            
733
248
            return Ok(());
734
5030
        }
735
5030

            
736
5030
        // Prioritize the buffered messages.
737
5030
        //
738
5030
        // Note: if any of the messages are ready to be handled,
739
5030
        // this will block the reactor until we are done processing them
740
5030
        #[cfg(feature = "conflux")]
741
5030
        self.try_dequeue_ooo_msgs().await?;
742

            
743
5030
        let action = select_biased! {
744
5030
            res = self.command.next() => {
745
630
                let cmd = unwrap_or_shutdown!(self, res, "command channel drop")?;
746
520
                return ControlHandler::new(self).handle_cmd(cmd);
747
            },
748
            // Check whether we've got a control message pending.
749
            //
750
            // Note: unfortunately, reading from control here means we might start
751
            // handling control messages before our chan_senders are ready.
752
            // With the current design, this is inevitable: we can't know which circuit leg
753
            // a control message is meant for without first reading the control message from
754
            // the channel, and at that point, we can't know for sure whether that particular
755
            // circuit is ready for sending.
756
5030
            ret = self.control.next() => {
757
160
                let msg = unwrap_or_shutdown!(self, ret, "control drop")?;
758
156
                CircuitAction::HandleControl(msg)
759
            },
760
5030
            res = self.circuits.next_circ_action(&self.runtime).fuse() => res?,
761
        };
762

            
763
4396
        let cmd = match action {
764
3974
            CircuitAction::RunCmd { leg, cmd } => Some(RunOnceCmd::Single(
765
3974
                RunOnceCmdInner::from_circuit_cmd(leg, cmd),
766
3974
            )),
767
156
            CircuitAction::HandleControl(ctrl) => ControlHandler::new(self)
768
156
                .handle_msg(ctrl)?
769
156
                .map(RunOnceCmd::Single),
770
256
            CircuitAction::HandleCell { leg, cell } => {
771
256
                let circ = self
772
256
                    .circuits
773
256
                    .leg_mut(leg)
774
256
                    .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
775

            
776
256
                let circ_cmds = circ.handle_cell(&mut self.cell_handlers, leg, cell)?;
777
208
                if circ_cmds.is_empty() {
778
136
                    None
779
                } else {
780
                    // TODO: we return RunOnceCmd::Multiple even if there's a single command.
781
                    //
782
                    // See the TODO on `Circuit::handle_cell`.
783
72
                    let cmd = RunOnceCmd::Multiple(
784
72
                        circ_cmds
785
72
                            .into_iter()
786
72
                            .map(|cmd| RunOnceCmdInner::from_circuit_cmd(leg, cmd))
787
72
                            .collect(),
788
72
                    );
789
72

            
790
72
                    Some(cmd)
791
                }
792
            }
793
10
            CircuitAction::RemoveLeg { leg, reason } => {
794
10
                Some(RunOnceCmdInner::RemoveLeg { leg, reason }.into())
795
            }
796
        };
797

            
798
4348
        if let Some(cmd) = cmd {
799
4212
            self.handle_run_once_cmd(cmd).await?;
800
136
        }
801

            
802
4302
        Ok(())
803
5286
    }
804

            
805
    /// Try to process the previously-out-of-order messages we might have buffered.
806
    #[cfg(feature = "conflux")]
807
7545
    async fn try_dequeue_ooo_msgs(&mut self) -> StdResult<(), ReactorError> {
808
        // Check if we're ready to dequeue any of the previously out-of-order cells.
809
5030
        while let Some(entry) = self.ooo_msgs.peek() {
810
            let should_pop = self.circuits.is_seqno_in_order(entry.msg.seqno);
811

            
812
            if !should_pop {
813
                break;
814
            }
815

            
816
            let entry = self.ooo_msgs.pop().expect("item just disappeared?!");
817

            
818
            let circ = self
819
                .circuits
820
                .leg_mut(entry.leg_id)
821
                .ok_or_else(|| internal!("the circuit leg we just had disappeared?!"))?;
822
            let handlers = &mut self.cell_handlers;
823
            let cmd = circ
824
                .handle_in_order_relay_msg(
825
                    handlers,
826
                    entry.msg.hopnum,
827
                    entry.leg_id,
828
                    entry.msg.cell_counts_towards_windows,
829
                    entry.msg.streamid,
830
                    entry.msg.msg,
831
                )?
832
                .map(|cmd| {
833
                    RunOnceCmd::Single(RunOnceCmdInner::from_circuit_cmd(entry.leg_id, cmd))
834
                });
835

            
836
            if let Some(cmd) = cmd {
837
                self.handle_run_once_cmd(cmd).await?;
838
            }
839
        }
840

            
841
5030
        Ok(())
842
5030
    }
843

            
844
    /// Handle a [`RunOnceCmd`].
845
6318
    async fn handle_run_once_cmd(&mut self, cmd: RunOnceCmd) -> StdResult<(), ReactorError> {
846
4212
        match cmd {
847
4140
            RunOnceCmd::Single(cmd) => return self.handle_single_run_once_cmd(cmd).await,
848
72
            RunOnceCmd::Multiple(cmds) => {
849
                // While we know `sendable` is ready to accept *one* cell,
850
                // we can't be certain it will be able to accept *all* of the cells
851
                // that need to be sent here. This means we *may* end up buffering
852
                // in its underlying SometimesUnboundedSink! That is OK, because
853
                // RunOnceCmd::Multiple is only used for handling packed cells.
854
112
                for cmd in cmds {
855
72
                    self.handle_single_run_once_cmd(cmd).await?;
856
                }
857
            }
858
        }
859

            
860
40
        Ok(())
861
4212
    }
862

            
863
    /// Handle a [`RunOnceCmd`].
864
4212
    async fn handle_single_run_once_cmd(
865
4212
        &mut self,
866
4212
        cmd: RunOnceCmdInner,
867
6318
    ) -> StdResult<(), ReactorError> {
868
4212
        match cmd {
869
3990
            RunOnceCmdInner::Send { leg, cell, done } => {
870
                // TODO: check the cc window
871
3990
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
872
3990
                if let Some(done) = done {
873
                    // Don't care if the receiver goes away
874
                    let _ = done.send(res.clone());
875
3990
                }
876
3990
                res?;
877
            }
878
            #[cfg(feature = "send-control-msg")]
879
            RunOnceCmdInner::SendMsgAndInstallHandler { msg, handler, done } => {
880
                let cell: Result<Option<SendRelayCell>> =
881
                    self.prepare_msg_and_install_handler(msg, handler);
882

            
883
                match cell {
884
                    Ok(Some(cell)) => {
885
                        // TODO(conflux): let the RunOnceCmdInner specify which leg to send the cell on
886
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, None).await;
887
                        // don't care if receiver goes away.
888
                        let _ = done.send(outcome.clone());
889
                        outcome?;
890
                    }
891
                    Ok(None) => {
892
                        // don't care if receiver goes away.
893
                        let _ = done.send(Ok(()));
894
                    }
895
                    Err(e) => {
896
                        // don't care if receiver goes away.
897
                        let _ = done.send(Err(e.clone()));
898
                        return Err(e.into());
899
                    }
900
                }
901
            }
902
            RunOnceCmdInner::BeginStream {
903
60
                leg,
904
60
                cell,
905
60
                hop,
906
60
                done,
907
60
            } => {
908
60
                match cell {
909
60
                    Ok((cell, stream_id)) => {
910
60
                        let circ = self
911
60
                            .circuits
912
60
                            .leg_mut(leg)
913
60
                            .ok_or_else(|| internal!("leg disappeared?!"))?;
914
60
                        let cell_hop = cell.hop;
915
60
                        let relay_format = circ
916
60
                            .hop_mut(cell_hop)
917
60
                            // TODO: Is this the right error type here? Or should there be a "HopDisappeared"?
918
60
                            .ok_or(Error::NoSuchHop)?
919
60
                            .relay_cell_format();
920

            
921
60
                        let outcome = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
922
                        // don't care if receiver goes away.
923
60
                        let _ = done.send(outcome.clone().map(|_| (stream_id, hop, relay_format)));
924
60
                        outcome?;
925
                    }
926
                    Err(e) => {
927
                        // don't care if receiver goes away.
928
                        let _ = done.send(Err(e.clone()));
929
                        return Err(e.into());
930
                    }
931
                }
932
            }
933
            RunOnceCmdInner::CloseStream {
934
40
                hop,
935
40
                sid,
936
40
                behav,
937
40
                reason,
938
40
                done,
939
40
            } => {
940
40
                let result = (move || {
941
40
                    // this is needed to force the closure to be FnOnce rather than FnMut :(
942
40
                    let self_ = self;
943
40
                    let (leg_id, hop_num) = self_
944
40
                        .resolve_hop_location(hop)
945
40
                        .map_err(into_bad_api_usage!("Could not resolve {hop:?}"))?;
946
40
                    let leg = self_
947
40
                        .circuits
948
40
                        .leg_mut(leg_id)
949
40
                        .ok_or(bad_api_usage!("No leg for id {:?}", leg_id))?;
950
40
                    Ok::<_, Bug>((leg, hop_num))
951
40
                })();
952

            
953
40
                let (leg, hop_num) = match result {
954
40
                    Ok(x) => x,
955
                    Err(e) => {
956
                        if let Some(done) = done {
957
                            // don't care if the sender goes away
958
                            let e = into_bad_api_usage!("Could not resolve {hop:?}")(e);
959
                            let _ = done.send(Err(e.into()));
960
                        }
961
                        return Ok(());
962
                    }
963
                };
964

            
965
40
                let res: Result<()> = leg.close_stream(hop_num, sid, behav, reason).await;
966

            
967
40
                if let Some(done) = done {
968
8
                    // don't care if the sender goes away
969
8
                    let _ = done.send(res);
970
32
                }
971
            }
972
12
            RunOnceCmdInner::HandleSendMe { leg, hop, sendme } => {
973
12
                let leg = self
974
12
                    .circuits
975
12
                    .leg_mut(leg)
976
12
                    .ok_or_else(|| internal!("leg disappeared?!"))?;
977
                // NOTE: it's okay to await. We are only awaiting on the congestion_signals
978
                // future which *should* resolve immediately
979
12
                let signals = leg.congestion_signals().await;
980
12
                leg.handle_sendme(hop, sendme, signals)?;
981
            }
982
            RunOnceCmdInner::FirstHopClockSkew { answer } => {
983
                let res = self.circuits.single_leg_mut().map(|leg| leg.clock_skew());
984

            
985
                // don't care if the sender goes away
986
                let _ = answer.send(res.map_err(Into::into));
987
            }
988
            RunOnceCmdInner::CleanShutdown => {
989
8
                trace!(tunnel_id = %self.tunnel_id, "reactor shutdown due to handled cell");
990
8
                return Err(ReactorError::Shutdown);
991
            }
992
26
            RunOnceCmdInner::RemoveLeg { leg, reason } => {
993
26
                warn!(tunnel_id = %self.tunnel_id, reason = %reason, "removing circuit leg");
994

            
995
26
                let circ = self.circuits.remove(leg)?;
996
16
                let is_conflux_pending = circ.is_conflux_pending();
997
16

            
998
16
                // Drop the removed leg. This will cause it to close if it's not already closed.
999
16
                drop(circ);
16

            
16
                // If we reach this point, it means we have more than one leg
16
                // (otherwise the .remove() would've returned a Shutdown error),
16
                // so we expect there to be a ConfluxHandshakeContext installed.
16

            
16
                #[cfg(feature = "conflux")]
16
                if is_conflux_pending {
16
                    let (error, proto_violation): (_, Option<Error>) = match &reason {
                        RemoveLegReason::ConfluxHandshakeTimeout => {
4
                            (ConfluxHandshakeError::Timeout, None)
                        }
12
                        RemoveLegReason::ConfluxHandshakeErr(e) => {
12
                            (ConfluxHandshakeError::Link(e.clone()), Some(e.clone()))
                        }
                        RemoveLegReason::ChannelClosed => {
                            (ConfluxHandshakeError::ChannelClosed, None)
                        }
                    };
16
                    self.note_conflux_handshake_result(Err(error), proto_violation.is_some())?;
16
                    if let Some(e) = proto_violation {
                        // TODO: make warn_report support structured logging
12
                        tor_error::warn_report!(
                            e,
                            "{}: Malformed conflux handshake, tearing down tunnel",
                            self.tunnel_id
                        );
12
                        return Err(e.into());
4
                    }
                }
            }
            #[cfg(feature = "conflux")]
36
            RunOnceCmdInner::ConfluxHandshakeComplete { leg, cell } => {
36
                // Note: on the client-side, the handshake is considered complete once the
36
                // RELAY_CONFLUX_LINKED_ACK is sent (roughly upon receipt of the LINKED cell).
36
                //
36
                // We're optimistic here, and declare the handshake a success *before*
36
                // sending the LINKED_ACK response. I think this is OK though,
36
                // because if the send_relay_cell() below fails, the reactor will shut
36
                // down anyway. OTOH, marking the handshake as complete slightly early
36
                // means that on the happy path, the circuit is marked as usable sooner,
36
                // instead of blocking on the sending of the LINKED_ACK.
36
                self.note_conflux_handshake_result(Ok(()), false)?;
32
                let res = self.circuits.send_relay_cell_on_leg(cell, Some(leg)).await;
32
                res?;
            }
            #[cfg(feature = "conflux")]
40
            RunOnceCmdInner::Link { circuits, answer } => {
40
                // Add the specified circuits to our conflux set,
40
                // and send a LINK cell down each unlinked leg.
40
                //
40
                // NOTE: this will block the reactor until all the cells are sent.
40
                self.handle_link_circuits(circuits, answer).await?;
            }
            #[cfg(feature = "conflux")]
            RunOnceCmdInner::Enqueue { leg, msg } => {
                let entry = ConfluxHeapEntry { leg_id: leg, msg };
                self.ooo_msgs.push(entry);
            }
        }
4166
        Ok(())
4212
    }
    /// Wait for a [`CtrlMsg::Create`] to come along to set up the circuit.
    ///
    /// Returns an error if an unexpected `CtrlMsg` is received.
384
    async fn wait_for_create(&mut self) -> StdResult<(), ReactorError> {
256
        let msg = select_biased! {
256
            res = self.command.next() => {
224
                let cmd = unwrap_or_shutdown!(self, res, "shutdown channel drop")?;
216
                match cmd {
                    CtrlCmd::Shutdown => return self.handle_shutdown().map(|_| ()),
                    #[cfg(test)]
                    CtrlCmd::AddFakeHop {
216
                        relay_cell_format: format,
216
                        fwd_lasthop,
216
                        rev_lasthop,
216
                        peer_id,
216
                        params,
216
                        done,
                    } => {
216
                        let leg = self.circuits.single_leg_mut()?;
216
                        leg.handle_add_fake_hop(format, fwd_lasthop, rev_lasthop, peer_id, &params, done);
216
                        return Ok(())
                    },
                    _ => {
                        trace!("reactor shutdown due to unexpected command: {:?}", cmd);
                        return Err(Error::CircProto(format!("Unexpected control {cmd:?} on client circuit")).into());
                    }
                }
            },
256
            res = self.control.next() => unwrap_or_shutdown!(self, res, "control drop")?,
        };
32
        match msg {
            CtrlMsg::Create {
32
                recv_created,
32
                handshake,
32
                settings,
32
                done,
            } => {
                // TODO(conflux): instead of crashing the reactor, it might be better
                // to send the error via the done channel instead
32
                let leg = self.circuits.single_leg_mut()?;
32
                leg.handle_create(recv_created, handshake, settings, done)
32
                    .await
            }
            _ => {
                trace!("reactor shutdown due to unexpected cell: {:?}", msg);
                Err(Error::CircProto(format!("Unexpected {msg:?} cell on client circuit")).into())
            }
        }
256
    }
    /// Add the specified handshake result to our `ConfluxHandshakeContext`.
    ///
    /// If all the circuits we were waiting on have finished the conflux handshake,
    /// the `ConfluxHandshakeContext` is consumed, and the results we have collected
    /// are sent to the handshake initiator.
    #[cfg(feature = "conflux")]
52
    fn note_conflux_handshake_result(
52
        &mut self,
52
        res: StdResult<(), ConfluxHandshakeError>,
52
        reactor_is_closing: bool,
52
    ) -> StdResult<(), ReactorError> {
52
        let tunnel_complete = match self.conflux_hs_ctx.as_mut() {
52
            Some(conflux_ctx) => {
52
                conflux_ctx.results.push(res);
52
                // Whether all the legs have finished linking:
52
                conflux_ctx.results.len() == conflux_ctx.num_legs
            }
            None => {
                return Err(internal!("no conflux handshake context").into());
            }
        };
52
        if tunnel_complete || reactor_is_closing {
            // Time to remove the conflux handshake context
            // and extract the results we have collected
32
            let conflux_ctx = self.conflux_hs_ctx.take().expect("context disappeared?!");
32

            
68
            let success_count = conflux_ctx.results.iter().filter(|res| res.is_ok()).count();
32
            let leg_count = conflux_ctx.results.len();
32

            
32
            info!(
                tunnel_id = %self.tunnel_id,
                "conflux tunnel ready ({success_count}/{leg_count} circuits successfully linked)",
            );
32
            send_conflux_outcome(conflux_ctx.answer, Ok(conflux_ctx.results))?;
            // We don't expect to receive any more handshake results,
            // at least not until we get another LinkCircuits control message,
            // which will install a new ConfluxHandshakeCtx with a channel
            // for us to send updates on
20
        }
48
        Ok(())
52
    }
    /// Prepare a `SendRelayCell` request, and install the given meta-cell handler.
    fn prepare_msg_and_install_handler(
        &mut self,
        msg: Option<AnyRelayMsgOuter>,
        handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
    ) -> Result<Option<SendRelayCell>> {
        let msg = msg
            .map(|msg| {
                let handlers = &mut self.cell_handlers;
                let handler = handler
                    .as_ref()
                    .or(handlers.meta_handler.as_ref())
                    .ok_or_else(|| internal!("tried to use an ended Conversation"))?;
                Ok::<_, crate::Error>(SendRelayCell {
                    hop: handler.expected_hop(),
                    early: false,
                    cell: msg,
                })
            })
            .transpose()?;
        if let Some(handler) = handler {
            self.cell_handlers.set_meta_handler(handler)?;
        }
        Ok(msg)
    }
    /// Handle a shutdown request.
40
    fn handle_shutdown(&self) -> StdResult<Option<RunOnceCmdInner>, ReactorError> {
40
        trace!(
            tunnel_id = %self.tunnel_id,
            "reactor shutdown due to explicit request",
        );
40
        Err(ReactorError::Shutdown)
40
    }
    /// Handle a request to shutdown the reactor and return the only [`Circuit`] in this tunnel.
    ///
    /// Returns an error over the `answer` channel if the reactor has no circuits,
    /// or more than one circuit. The reactor will shut down regardless.
    #[cfg(feature = "conflux")]
40
    fn handle_shutdown_and_return_circuit(
40
        &mut self,
40
        answer: oneshot::Sender<StdResult<Circuit, Bug>>,
40
    ) -> StdResult<(), ReactorError> {
40
        // Don't care if the receiver goes away
40
        let _ = answer.send(self.circuits.take_single_leg().map_err(Into::into));
40
        self.handle_shutdown().map(|_| ())
40
    }
    /// Resolves a [`TargetHop`] to a [`HopLocation`].
    ///
    /// After resolving a `TargetHop::LastHop`,
    /// the `HopLocation` can become stale if a single-path circuit is later extended or truncated.
    /// This means that the `HopLocation` can become stale from one reactor iteration to the next.
    ///
    /// It's generally okay to hold on to a (possibly stale) `HopLocation`
    /// if you need a fixed hop position in the tunnel.
    /// For example if we open a stream to `TargetHop::LastHop`,
    /// we would want to store the stream position as a `HopLocation` and not a `TargetHop::LastHop`
    /// as we don't want the stream position to change as the tunnel is extended or truncated.
    ///
    /// Returns [`NoHopsBuiltError`] if trying to resolve `TargetHop::LastHop`
    /// and the tunnel has no hops
    /// (either has no legs, or has legs which contain no hops).
60
    fn resolve_target_hop(&self, hop: TargetHop) -> StdResult<HopLocation, NoHopsBuiltError> {
60
        match hop {
            TargetHop::Hop(hop) => Ok(hop),
            TargetHop::LastHop => {
60
                if let Ok(leg) = self.circuits.single_leg() {
56
                    let leg_id = leg.unique_id();
                    // single-path tunnel
56
                    let hop = leg.last_hop_num().ok_or(NoHopsBuiltError)?;
56
                    Ok(HopLocation::Hop((leg_id, hop)))
4
                } else if !self.circuits.is_empty() {
                    // multi-path tunnel
4
                    return Ok(HopLocation::JoinPoint);
                } else {
                    // no legs
                    Err(NoHopsBuiltError)
                }
            }
        }
60
    }
    /// Resolves a [`HopLocation`] to a [`UniqId`] and [`HopNum`].
    ///
    /// After resolving a `HopLocation::JoinPoint`,
    /// the [`UniqId`] and [`HopNum`] can become stale if the primary leg changes.
    ///
    /// You should try to only resolve to a specific [`UniqId`] and [`HopNum`] immediately before you
    /// need them,
    /// and you should not hold on to the resolved [`UniqId`] and [`HopNum`] between reactor
    /// iterations as the primary leg may change from one iteration to the next.
    ///
    /// Returns [`NoJoinPointError`] if trying to resolve `HopLocation::JoinPoint`
    /// but it does not have a join point.
100
    fn resolve_hop_location(
100
        &self,
100
        hop: HopLocation,
100
    ) -> StdResult<(UniqId, HopNum), NoJoinPointError> {
100
        match hop {
96
            HopLocation::Hop((leg_id, hop_num)) => Ok((leg_id, hop_num)),
            HopLocation::JoinPoint => {
4
                if let Some((leg_id, hop_num)) = self.circuits.primary_join_point() {
4
                    Ok((leg_id, hop_num))
                } else {
                    // Attempted to get the join point of a non-multipath tunnel.
                    Err(NoJoinPointError)
                }
            }
        }
100
    }
    /// Does congestion control use stream SENDMEs for the given hop?
    ///
    /// Returns `None` if either the `leg` or `hop` don't exist.
    fn uses_stream_sendme(&self, leg: UniqId, hop: HopNum) -> Option<bool> {
        self.circuits.uses_stream_sendme(leg, hop)
    }
    /// Handle a request to link some extra circuits in the reactor's conflux set.
    ///
    /// The circuits are validated, and if they do not have the same length,
    /// or if they do not all have the same last hop, an error is returned on
    /// the `answer` channel, and the conflux handshake is *not* initiated.
    ///
    /// If validation succeeds, the circuits are added to this reactor's conflux set,
    /// and the conflux handshake is initiated (by sending a LINK cell on each leg).
    ///
    /// NOTE: this blocks the reactor main loop until all the cells are sent.
    #[cfg(feature = "conflux")]
40
    async fn handle_link_circuits(
40
        &mut self,
40
        circuits: Vec<Circuit>,
40
        answer: ConfluxLinkResultChannel,
60
    ) -> StdResult<(), ReactorError> {
        use tor_error::warn_report;
40
        if self.conflux_hs_ctx.is_some() {
            let err = internal!("conflux linking already in progress");
            send_conflux_outcome(answer, Err(err.into()))?;
            return Ok(());
40
        }
40

            
40
        let unlinked_legs = self.circuits.num_unlinked();
40

            
40
        // We need to send the LINK cell on each of the new circuits
40
        // and on each of the existing, unlinked legs from self.circuits.
40
        //
40
        // In reality, there can only be one such circuit
40
        // (the "initial" one from the previously single-path tunnel),
40
        // because any circuits that to complete the conflux handshake
40
        // get removed from the set.
40
        let num_legs = circuits.len() + unlinked_legs;
        // Note: add_legs validates `circuits`
40
        let res = async {
40
            self.circuits.add_legs(circuits, &self.runtime)?;
32
            self.circuits.link_circuits(&self.runtime).await
40
        }
40
        .await;
40
        if let Err(e) = res {
8
            warn_report!(e, "Failed to link conflux circuits");
8
            send_conflux_outcome(answer, Err(e))?;
32
        } else {
32
            // Save the channel, to notify the user of completion.
32
            self.conflux_hs_ctx = Some(ConfluxHandshakeCtx {
32
                answer,
32
                num_legs,
32
                results: Default::default(),
32
            });
32
        }
40
        Ok(())
40
    }
}
/// Notify the conflux handshake initiator of the handshake outcome.
///
/// Returns an error if the initiator has done away.
#[cfg(feature = "conflux")]
40
fn send_conflux_outcome(
40
    tx: ConfluxLinkResultChannel,
40
    res: Result<ConfluxHandshakeResult>,
40
) -> StdResult<(), ReactorError> {
40
    if tx.send(res).is_err() {
4
        tracing::warn!("conflux initiator went away before handshake completed?");
4
        return Err(ReactorError::Shutdown);
36
    }
36

            
36
    Ok(())
40
}
/// The tunnel does not have any hops.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("no hops have been built for this tunnel")]
pub(crate) struct NoHopsBuiltError;
/// The tunnel does not have a join point.
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
#[error("the tunnel does not have a join point")]
pub(crate) struct NoJoinPointError;
impl CellHandlers {
    /// Try to install a given meta-cell handler to receive any unusual cells on
    /// this circuit, along with a result channel to notify on completion.
48
    fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler + Send>) -> Result<()> {
48
        if self.meta_handler.is_none() {
48
            self.meta_handler = Some(handler);
48
            Ok(())
        } else {
            Err(Error::from(internal!(
                "Tried to install a meta-cell handler before the old one was gone."
            )))
        }
48
    }
    /// Try to install a given cell handler on this circuit.
    #[cfg(feature = "hs-service")]
40
    fn set_incoming_stream_req_handler(
40
        &mut self,
40
        handler: IncomingStreamRequestHandler,
40
    ) -> Result<()> {
40
        if self.incoming_stream_req_handler.is_none() {
32
            self.incoming_stream_req_handler = Some(handler);
32
            Ok(())
        } else {
8
            Err(Error::from(internal!(
8
                "Tried to install a BEGIN cell handler before the old one was gone."
8
            )))
        }
40
    }
}
#[cfg(test)]
mod test {
    // Tested in [`crate::tunnel::circuit::test`].
}