Skip to main content

tor_proto/channel/
reactor.rs

1//! Code to handle incoming cells on a channel.
2//!
3//! The role of this code is to run in a separate asynchronous task,
4//! and routes cells to the right circuits.
5//!
6//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7//! or in the error handling behavior.
8
9use super::circmap::{CircEnt, CircMap};
10use crate::circuit::CircuitRxSender;
11use crate::client::circuit::halfcirc::HalfCirc;
12use crate::client::circuit::padding::{
13    PaddingController, PaddingEvent, PaddingEventStream, SendPadding, StartBlocking,
14};
15use crate::util::err::ReactorError;
16use crate::util::oneshot_broadcast;
17use crate::{Error, HopNum, Result};
18use tor_async_utils::SinkPrepareExt as _;
19use tor_cell::chancell::ChanMsg;
20use tor_cell::chancell::msg::{Destroy, DestroyReason, Padding, PaddingNegotiate};
21use tor_cell::chancell::{AnyChanCell, CircId, msg::AnyChanMsg};
22use tor_error::debug_report;
23use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
24
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_error::error_report;
27#[cfg_attr(not(target_os = "linux"), allow(unused))]
28use tor_rtcompat::StreamOps;
29
30use futures::channel::mpsc;
31use oneshot_fused_workaround as oneshot;
32
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::sink::SinkExt;
36use futures::stream::Stream;
37use futures::{select, select_biased};
38use tor_error::internal;
39
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43
44use crate::channel::{ChannelDetails, CloseInfo, kist::KistParams, padding, params::*, unique_id};
45use crate::circuit::celltypes::CreateResponse;
46use tracing::{debug, instrument, trace};
47
48/// A boxed trait object that can provide `ChanCell`s.
49pub(super) type BoxedChannelStream =
50    Box<dyn Stream<Item = std::result::Result<AnyChanCell, Error>> + Send + Unpin + 'static>;
51/// A boxed trait object that can sink `ChanCell`s.
52pub(super) type BoxedChannelSink =
53    Box<dyn Sink<AnyChanCell, Error = Error> + Send + Unpin + 'static>;
54/// A boxed trait object that can provide additional `StreamOps` on a `BoxedChannelStream`.
55pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
56/// The type of a oneshot channel used to inform reactor users of the result of an operation.
57pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
58
59cfg_if::cfg_if! {
60    if #[cfg(feature = "circ-padding")] {
61        use crate::util::sink_blocker::{SinkBlocker, CountingPolicy};
62        /// Type used by a channel reactor to send cells to the network.
63        pub(super) type ChannelOutputSink = SinkBlocker<BoxedChannelSink, CountingPolicy>;
64    } else {
65        /// Type used by a channel reactor to send cells to the network.
66        pub(super) type ChannelOutputSink = BoxedChannelSink;
67    }
68}
69
70/// A message telling the channel reactor to do something.
71#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
72#[derive(Debug)]
73#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
74#[allow(clippy::exhaustive_enums, private_interfaces)]
75pub enum CtrlMsg {
76    /// Shut down the reactor.
77    Shutdown,
78    /// Tell the reactor that a given circuit has gone away.
79    CloseCircuit(CircId),
80    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
81    /// and registering senders for messages received for the circuit.
82    AllocateCircuit {
83        /// Channel to send the circuit's `CreateResponse` down.
84        created_sender: oneshot::Sender<CreateResponse>,
85        /// Channel to send other messages from this circuit down.
86        sender: CircuitRxSender,
87        /// Oneshot channel to send the new circuit's identifiers down.
88        tx: ReactorResultChannel<(
89            CircId,
90            crate::circuit::UniqId,
91            PaddingController,
92            PaddingEventStream,
93        )>,
94    },
95    /// Enable/disable/reconfigure channel padding
96    ///
97    /// The sender of these messages is responsible for the optimisation of
98    /// ensuring that "no-change" messages are elided.
99    /// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
100    ///
101    /// These updates are done via a control message to avoid adding additional branches to the
102    /// main reactor `select!`.
103    ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
104    /// Enable/disable/reconfigure KIST.
105    ///
106    /// Like in the case of `ConfigUpdate`,
107    /// the sender of these messages is responsible for the optimisation of
108    /// ensuring that "no-change" messages are elided.
109    KistConfigUpdate(KistParams),
110    /// Change the current padding implementation to the one provided.
111    #[cfg(feature = "circ-padding-manual")]
112    SetChannelPadder {
113        /// The padder to install, or None to remove any existing padder.
114        padder: Option<crate::client::CircuitPadder>,
115        /// A oneshot channel to use in reporting the outcome.
116        sender: oneshot::Sender<Result<()>>,
117    },
118}
119
120/// Object to handle incoming cells and background tasks on a channel.
121///
122/// This type is returned when you finish a channel; you need to spawn a
123/// new task that calls `run()` on it.
124#[must_use = "If you don't call run() on a reactor, the channel won't work."]
125pub struct Reactor<S: SleepProvider + CoarseTimeProvider> {
126    /// Underlying runtime we use for generating sleep futures and telling time.
127    pub(super) runtime: S,
128    /// A receiver for control messages from `Channel` objects.
129    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
130    /// A oneshot sender that is used to alert other tasks when this reactor is
131    /// finally dropped.
132    pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
133    /// A receiver for cells to be sent on this reactor's sink.
134    ///
135    /// `Channel` objects have a sender that can send cells here.
136    pub(super) cells: super::CellRx,
137    /// A Stream from which we can read `ChanCell`s.
138    ///
139    /// This should be backed by a TLS connection if you want it to be secure.
140    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
141    /// A Sink to which we can write `ChanCell`s.
142    ///
143    /// This should also be backed by a TLS connection if you want it to be secure.
144    pub(super) output: ChannelOutputSink,
145    /// A handler for setting stream options on the underlying stream.
146    #[cfg_attr(not(target_os = "linux"), allow(unused))]
147    pub(super) streamops: BoxedChannelStreamOps,
148    /// Timer tracking when to generate channel padding.
149    ///
150    /// Note that this is _distinct_ from the experimental maybenot-based padding
151    /// implemented with padding_ctrl and padding_stream.
152    /// This is the existing per-channel padding
153    /// in the tor protocol used to resist netflow attacks.
154    pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
155    /// Outgoing cells introduced at the channel reactor
156    pub(super) special_outgoing: SpecialOutgoing,
157    /// A map from circuit ID to Sinks on which we can deliver cells.
158    pub(super) circs: CircMap,
159    /// A unique identifier for this channel.
160    pub(super) unique_id: super::UniqId,
161    /// Information shared with the frontend
162    pub(super) details: Arc<ChannelDetails>,
163    /// Context for allocating unique circuit log identifiers.
164    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
165    /// A padding controller to which padding-related events should be reported.
166    ///
167    /// (This is used for experimental maybenot-based padding.)
168    //
169    // TODO: It would be good to use S here instead of DynTimeProvider,
170    // but we still need the latter for the clones of padding_ctrl that we hand out
171    // inside ChannelSender.
172    pub(super) padding_ctrl: PaddingController<DynTimeProvider>,
173    /// An event stream telling us about padding-related events.
174    ///
175    /// (This is used for experimental maybenot-based padding.)
176    pub(super) padding_event_stream: PaddingEventStream<DynTimeProvider>,
177    /// If present, the current rules for blocking the output based on the padding framework.
178    pub(super) padding_blocker: Option<StartBlocking>,
179    /// What link protocol is the channel using?
180    #[allow(dead_code)] // We don't support protocols where this would matter
181    pub(super) link_protocol: u16,
182}
183
184/// Outgoing cells introduced at the channel reactor
185#[derive(Default, Debug, Clone)]
186pub(super) struct SpecialOutgoing {
187    /// If we must send a `PaddingNegotiate`, this is present.
188    padding_negotiate: Option<PaddingNegotiate>,
189    /// A number of pending PADDING cells that we have to send, once there is space.
190    n_padding: u16,
191}
192
193impl SpecialOutgoing {
194    /// Do we have a special cell to send?
195    ///
196    /// Called by the reactor before looking for cells from the reactor's clients.
197    /// The returned message *must* be sent by the caller, not dropped!
198    #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
199    fn next(&mut self) -> Option<AnyChanCell> {
200        // If this gets more cases, consider making SpecialOutgoing into a #[repr(C)]
201        // enum, so that we can fast-path the usual case of "no special message to send".
202        if let Some(p) = self.padding_negotiate.take() {
203            return Some(p.into());
204        }
205        if self.n_padding > 0 {
206            self.n_padding -= 1;
207            return Some(Padding::new().into());
208        }
209        None
210    }
211
212    /// Try to queue a padding cell to be sent.
213    fn queue_padding_cell(&mut self) {
214        self.n_padding = self.n_padding.saturating_add(1);
215    }
216}
217
218/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
219///
220/// There is no risk of confusion because no-one would try to print a
221/// Reactor for some other reason.
222impl<S: SleepProvider + CoarseTimeProvider> fmt::Display for Reactor<S> {
223    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
224        fmt::Debug::fmt(&self.unique_id, f)
225    }
226}
227
228impl<S: SleepProvider + CoarseTimeProvider> Reactor<S> {
229    /// Launch the reactor, and run until the channel closes or we
230    /// encounter an error.
231    ///
232    /// Once this function returns, the channel is dead, and can't be
233    /// used again.
234    #[instrument(level = "trace", skip_all)]
235    pub async fn run(mut self) -> Result<()> {
236        trace!(channel_id = %self, "Running reactor");
237        let result: Result<()> = loop {
238            match self.run_once().await {
239                Ok(()) => (),
240                Err(ReactorError::Shutdown) => break Ok(()),
241                Err(ReactorError::Err(e)) => break Err(e),
242            }
243        };
244
245        // Log that the reactor stopped, possibly with the associated error as a report.
246        // May log at a higher level depending on the error kind.
247        const MSG: &str = "Reactor stopped";
248        match &result {
249            Ok(()) => debug!(channel_id = %self, "{MSG}"),
250            Err(e) => debug_report!(e, channel_id = %self, "{MSG}"),
251        }
252
253        // Inform any waiters that the channel has closed.
254        let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
255        self.reactor_closed_tx.send(close_msg);
256        result
257    }
258
259    /// Helper for run(): handles only one action.
260    #[instrument(level = "trace", skip_all)]
261    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
262        select! {
263
264            // See if the output sink can have cells written to it yet.
265            // If so, see if we have to-be-transmitted cells.
266            ret = self.output.prepare_send_from(async {
267                // This runs if we will be able to write, so try to obtain a cell:
268
269                if let Some(l) = self.special_outgoing.next() {
270                    // See reasoning below.
271                    // eprintln!("PADDING - SENDING NEOGIATION: {:?}", &l);
272                    self.padding_timer.as_mut().note_cell_sent();
273                    return Some((l, None));
274                }
275
276                select_biased! {
277                    n = self.cells.next() => {
278                        // Note transmission on *input* to the reactor, not ultimate
279                        // transmission.  Ideally we would tap into the TCP stream at the far
280                        // end of our TLS or perhaps during encoding on entry to the TLS, but
281                        // both of those would involve quite some plumbing.  Doing it here in
282                        // the reactor avoids additional inter-task communication, mutexes,
283                        // etc.  (And there is no real difference between doing it here on
284                        // input, to just below, on enquieing into the `sendable`.)
285                        //
286                        // Padding is sent when the output channel is idle, and the effect of
287                        // buffering is just that we might sent it a little early because we
288                        // measure idleness when we last put something into the output layers.
289                        //
290                        // We can revisit this if measurement shows it to be bad in practice.
291                        //
292                        // (We in any case need padding that we generate when idle to make it
293                        // through to the output promptly, or it will be late and ineffective.)
294                        self.padding_timer.as_mut().note_cell_sent();
295                        n
296                    },
297                    p = self.padding_timer.as_mut().next() => {
298                        // eprintln!("PADDING - SENDING PADDING: {:?}", &p);
299
300                        // Note that we treat padding from the padding_timer as a normal cell,
301                        // since it doesn't have a padding machine.
302                        self.padding_ctrl.queued_data(HopNum::from(0));
303
304                        self.padding_timer.as_mut().note_cell_sent();
305                        Some((p.into(), None))
306                    },
307                }
308            }) => {
309                self.padding_ctrl.flushed_channel_cell();
310                let (queued, sendable) = ret?;
311                let (msg, cell_padding_info) = queued.ok_or(ReactorError::Shutdown)?;
312                // Tell the relevant circuit padder that this cell is getting flushed.
313                // Note that, technically, it won't go onto the network for a while longer:
314                // it has to go through the TLS buffer, and the kernel TCP buffer.
315                // We've got to live with that.
316                // TODO: conceivably we could defer this even longer, but it would take
317                // some tricky hacking!
318                if let (Some(cell_padding_info), Some(circid)) = (cell_padding_info, msg.circid()) {
319                    self.circs.note_cell_flushed(circid, cell_padding_info);
320                }
321                sendable.send(msg)?;
322            }
323
324            ret = self.control.next() => {
325                let ctrl = match ret {
326                    None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
327                    Some(x) => x,
328                };
329                self.handle_control(ctrl).await?;
330            }
331
332            ret = self.padding_event_stream.next() => {
333                let event = ret.ok_or_else(|| Error::from(internal!("Padding event stream was exhausted")))?;
334                self.handle_padding_event(event).await?;
335            }
336
337            ret = self.input.next() => {
338                let item = ret
339                    .ok_or(ReactorError::Shutdown)??;
340                crate::note_incoming_traffic();
341                self.handle_cell(item).await?;
342            }
343
344        }
345        Ok(()) // Run again.
346    }
347
348    /// Handle a CtrlMsg other than Shutdown.
349    #[instrument(level = "trace", skip(self))] // Intentionally omitting skip_all, msg is useful and not sensitive
350    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
351        trace!(
352            channel_id = %self,
353            msg = ?msg,
354            "reactor received control message"
355        );
356
357        match msg {
358            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
359            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
360            CtrlMsg::AllocateCircuit {
361                created_sender,
362                sender,
363                tx,
364            } => {
365                let mut rng = rand::rng();
366                let my_unique_id = self.unique_id;
367                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
368                // NOTE: This is a very weird place to be calling new_padding, but:
369                //  - we need to do it here or earlier, so we can add it as part of the CircEnt to
370                //    our map.
371                //  - We need to do it at some point where we have a runtime, which implies in a
372                //    reactor.
373                //
374                // TODO circpad: We might want to lazy-allocate this somehow, or try harder to make
375                // it a no-op when we aren't padding on a particular circuit.
376                let (padding_ctrl, padding_stream) = crate::client::circuit::padding::new_padding(
377                    // TODO: avoid using DynTimeProvider at some point, and re-parameterize for efficiency.
378                    DynTimeProvider::new(self.runtime.clone()),
379                );
380                let ret: Result<_> = self
381                    .circs
382                    .add_ent(&mut rng, created_sender, sender, padding_ctrl.clone())
383                    .map(|id| (id, circ_unique_id, padding_ctrl, padding_stream));
384                let _ = tx.send(ret); // don't care about other side going away
385                self.update_disused_since();
386            }
387            CtrlMsg::ConfigUpdate(updates) => {
388                if self.link_protocol == 4 {
389                    // Link protocol 4 does not permit sending, or negotiating, link padding.
390                    // We test for == 4 so that future updates to handshake.rs LINK_PROTOCOLS
391                    // keep doing padding things.
392                    return Ok(());
393                }
394
395                let ChannelPaddingInstructionsUpdates {
396                    // List all the fields explicitly; that way the compiler will warn us
397                    // if one is added and we fail to handle it here.
398                    padding_enable,
399                    padding_parameters,
400                    padding_negotiate,
401                } = &*updates;
402                if let Some(parameters) = padding_parameters {
403                    self.padding_timer.as_mut().reconfigure(parameters)?;
404                }
405                if let Some(enable) = padding_enable {
406                    if *enable {
407                        self.padding_timer.as_mut().enable();
408                    } else {
409                        self.padding_timer.as_mut().disable();
410                    }
411                }
412                if let Some(padding_negotiate) = padding_negotiate {
413                    // This replaces any previous PADDING_NEGOTIATE cell that we were
414                    // told to send, but which we didn't manage to send yet.
415                    // It doesn't make sense to queue them up.
416                    self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
417                }
418            }
419            CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
420            #[cfg(feature = "circ-padding-manual")]
421            CtrlMsg::SetChannelPadder { padder, sender } => {
422                self.padding_ctrl
423                    .install_padder_padding_at_hop(HopNum::from(0), padder);
424                let _ignore = sender.send(Ok(()));
425            }
426        }
427        Ok(())
428    }
429
430    /// Take the padding action described in `action`.
431    ///
432    /// (With circuit padding disabled, PaddingEvent can't be constructed.)
433    #[cfg(not(feature = "circ-padding"))]
434    #[allow(clippy::unused_async)] // for symmetry with the version below
435    async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
436        void::unreachable(action.0)
437    }
438
439    /// Take the padding action described in `action`.
440    #[cfg(feature = "circ-padding")]
441    async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
442        use PaddingEvent as PE;
443        match action {
444            PE::SendPadding(send_padding) => {
445                self.handle_send_padding(send_padding).await?;
446            }
447            PE::StartBlocking(start_blocking) => {
448                if self.output.is_unlimited() {
449                    self.output.set_blocked();
450                }
451                self.padding_blocker = Some(start_blocking);
452            }
453            PE::StopBlocking => {
454                self.output.set_unlimited();
455            }
456        }
457        Ok(())
458    }
459
460    /// Send the padding described in `padding`.
461    #[cfg(feature = "circ-padding")]
462    async fn handle_send_padding(&mut self, padding: SendPadding) -> Result<()> {
463        // TODO circpad: This is somewhat duplicative of the logic in `Circuit::send_padding` and
464        // `Circuit::padding_disposition`.  It might be good to unify them at some point.
465        // For now (Oct 2025), though, they have slightly different inputs and behaviors.
466
467        use crate::client::circuit::padding::{Bypass::*, Replace::*};
468        // multihop padding belongs in circuit padders, not here.
469        let hop = HopNum::from(0);
470        assert_eq!(padding.hop, hop);
471
472        // If true, there is blocking, but we are allowed to bypass it.
473        let blocking_bypassed = matches!(
474            (&self.padding_blocker, padding.may_bypass_block()),
475            (
476                Some(StartBlocking {
477                    is_bypassable: true
478                }),
479                BypassBlocking
480            )
481        );
482        // If true, there is blocking, and we can't bypass it.
483        let this_padding_blocked = self.padding_blocker.is_some() && !blocking_bypassed;
484
485        if padding.may_replace_with_data() == Replaceable {
486            if self.output_is_full().await? {
487                // When the output buffer is full,
488                // we _always_ treat it as satisfying our replaceable padding.
489                //
490                // TODO circpad: It would be better to check whether
491                // the output has any bytes at all, but futures_codec doesn't seem to give us a
492                // way to check that.  If we manage to do so in the future, we should change the
493                // logic in this function.
494                self.padding_ctrl
495                    .replaceable_padding_already_queued(hop, padding);
496                return Ok(());
497            } else if self.cells.approx_count() > 0 {
498                // We can replace the padding with outbound cells!
499                if this_padding_blocked {
500                    // In the blocked case, we just declare that the pending data _is_ the queued padding.
501                    self.padding_ctrl
502                        .replaceable_padding_already_queued(hop, padding);
503                } else {
504                    // Otherwise we report that queued data _became_ padding,
505                    // and we allow it to pass any blocking that's present.
506                    self.padding_ctrl.queued_data_as_padding(hop, padding);
507                    if blocking_bypassed {
508                        self.output.allow_n_additional_items(1);
509                    }
510                }
511                return Ok(());
512            } else {
513                // There's nothing to replace this with, so fall through.
514            }
515        }
516
517        // There's no replacement, so we queue unconditionally.
518        self.special_outgoing.queue_padding_cell();
519        self.padding_ctrl.queued_padding(hop, padding);
520        if blocking_bypassed {
521            self.output.allow_n_additional_items(1);
522        }
523
524        Ok(())
525    }
526
527    /// Return true if the output stream is full.
528    ///
529    /// We use this in circuit padding to implement replaceable padding.
530    //
531    // TODO circpad: We'd rather check whether there is any data at all queued in self.output,
532    // but futures_codec doesn't give us a way to do that.
533    #[cfg(feature = "circ-padding")]
534    async fn output_is_full(&mut self) -> Result<bool> {
535        use futures::future::poll_fn;
536        use std::task::Poll;
537        // We use poll_fn to get a cx that we can pass to poll_ready_unpin.
538        poll_fn(|cx| {
539            Poll::Ready(match self.output.poll_ready_unpin(cx) {
540                // If if's ready to send, it isn't full.
541                Poll::Ready(Ok(())) => Ok(false),
542                // If it isn't ready to send, it's full.
543                Poll::Pending => Ok(true),
544                // Propagate errors:
545                Poll::Ready(Err(e)) => Err(e),
546            })
547        })
548        .await
549    }
550
551    /// Helper: process a cell on a channel.  Most cell types get ignored
552    /// or rejected; a few get delivered to circuits.
553    #[instrument(level = "trace", skip_all)]
554    async fn handle_cell(&mut self, cell: AnyChanCell) -> Result<()> {
555        let (circid, msg) = cell.into_circid_and_msg();
556        use AnyChanMsg::*;
557
558        match msg {
559            Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
560            _ => trace!(
561                channel_id = %self,
562                "received {} for {}",
563                msg.cmd(),
564                CircId::get_or_zero(circid)
565            ),
566        }
567
568        // Report the message to the padding controller.
569        match msg {
570            Padding(_) | Vpadding(_) => {
571                // We always accept channel padding, even if we haven't negotiated any.
572                let _always_acceptable = self.padding_ctrl.decrypted_padding(HopNum::from(0));
573            }
574            _ => self.padding_ctrl.decrypted_data(HopNum::from(0)),
575        }
576
577        match msg {
578            // These are allowed, and need to be handled.
579            Relay(_) => self.deliver_relay(circid, msg).await,
580
581            Destroy(_) => self.deliver_destroy(circid, msg).await,
582
583            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg),
584
585            // These are always ignored.
586            Padding(_) | Vpadding(_) => Ok(()),
587            _ => Err(Error::ChanProto(format!("Unexpected cell: {msg:?}"))),
588        }
589    }
590
591    /// Give the RELAY cell `msg` to the appropriate circuit.
592    async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
593        let Some(circid) = circid else {
594            return Err(Error::ChanProto("Relay cell without circuit ID".into()));
595        };
596
597        let mut ent = self
598            .circs
599            .get_mut(circid)
600            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
601
602        match &mut *ent {
603            CircEnt::Open { cell_sender: s, .. } => {
604                // There's an open circuit; we can give it the RELAY cell.
605                if s.send(msg).await.is_err() {
606                    drop(ent);
607                    // The circuit's receiver went away, so we should destroy the circuit.
608                    self.outbound_destroy_circ(circid).await?;
609                }
610                Ok(())
611            }
612            CircEnt::Opening { .. } => Err(Error::ChanProto(
613                "Relay cell on pending circuit before CREATED* received".into(),
614            )),
615            CircEnt::DestroySent(hs) => hs.receive_cell(),
616        }
617    }
618
619    /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate
620    /// circuit, if that circuit is waiting for one.
621    fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
622        let Some(circid) = circid else {
623            return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
624        };
625
626        let target = self.circs.advance_from_opening(circid)?;
627        let created = msg.try_into()?;
628        // TODO(nickm) I think that this one actually means the other side
629        // is closed. See arti#269.
630        target.send(created).map_err(|_| {
631            Error::from(internal!(
632                "Circuit queue rejected created message. Is it closing?"
633            ))
634        })
635    }
636
637    /// Handle a DESTROY cell by removing the corresponding circuit
638    /// from the map, and passing the destroy cell onward to the circuit.
639    async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
640        let Some(circid) = circid else {
641            return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
642        };
643
644        // Remove the circuit from the map: nothing more can be done with it.
645        let entry = self.circs.remove(circid);
646        self.update_disused_since();
647        match entry {
648            // If the circuit is waiting for CREATED, tell it that it
649            // won't get one.
650            Some(CircEnt::Opening {
651                create_response_sender,
652                ..
653            }) => {
654                trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
655                create_response_sender
656                    .send(msg.try_into()?)
657                    // TODO(nickm) I think that this one actually means the other side
658                    // is closed. See arti#269.
659                    .map_err(|_| {
660                        internal!("pending circuit wasn't interested in destroy cell?").into()
661                    })
662            }
663            // It's an open circuit: tell it that it got a DESTROY cell.
664            Some(CircEnt::Open {
665                mut cell_sender, ..
666            }) => {
667                trace!(channel_id = %self, "Passing destroy to open circuit {}", circid);
668                cell_sender
669                    .send(msg)
670                    .await
671                    // TODO(nickm) I think that this one actually means the other side
672                    // is closed. See arti#269.
673                    .map_err(|_| {
674                        internal!("open circuit wasn't interested in destroy cell?").into()
675                    })
676            }
677            // We've sent a destroy; we can leave this circuit removed.
678            Some(CircEnt::DestroySent(_)) => Ok(()),
679            // Got a DESTROY cell for a circuit we don't have.
680            None => {
681                trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
682                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
683            }
684        }
685    }
686
687    /// Helper: send a cell on the outbound sink.
688    async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
689        self.output.send(cell).await?;
690        Ok(())
691    }
692
693    /// Called when a circuit goes away: sends a DESTROY cell and removes
694    /// the circuit.
695    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
696        trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
697        // Remove the circuit's entry from the map: nothing more
698        // can be done with it.
699        // TODO: It would be great to have a tighter upper bound for
700        // the number of relay cells we'll receive.
701        self.circs.destroy_sent(id, HalfCirc::new(3000));
702        self.update_disused_since();
703        let destroy = Destroy::new(DestroyReason::NONE).into();
704        let cell = AnyChanCell::new(Some(id), destroy);
705        self.send_cell(cell).await?;
706
707        Ok(())
708    }
709
710    /// Update disused timestamp with current time if this channel is no longer used
711    fn update_disused_since(&self) {
712        if self.circs.open_ent_count() == 0 {
713            // Update disused_since if it still indicates that the channel is in use
714            self.details.unused_since.update_if_none();
715        } else {
716            // Mark this channel as in use
717            self.details.unused_since.clear();
718        }
719    }
720
721    /// Use the new KIST parameters.
722    #[cfg(target_os = "linux")]
723    fn apply_kist_params(&self, params: &KistParams) {
724        use super::kist::KistMode;
725
726        let set_tcp_notsent_lowat = |v: u32| {
727            if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
728                // This is bad, but not fatal: not setting the KIST options
729                // comes with a performance penalty, but we don't have to crash.
730                error_report!(e, "Failed to set KIST socket options");
731            }
732        };
733
734        match params.kist_enabled() {
735            KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
736            KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
737        }
738    }
739
740    /// Use the new KIST parameters.
741    #[cfg(not(target_os = "linux"))]
742    fn apply_kist_params(&self, params: &KistParams) {
743        use super::kist::KistMode;
744
745        if params.kist_enabled() != KistMode::Disabled {
746            tracing::warn!("KIST not currently supported on non-linux platforms");
747        }
748    }
749}
750
751#[cfg(test)]
752pub(crate) mod test {
753    #![allow(clippy::unwrap_used)]
754    use super::*;
755    use crate::channel::{Canonicity, ChannelType, ClosedUnexpectedly, UniqId};
756    use crate::client::circuit::CircParameters;
757    use crate::client::circuit::padding::new_padding;
758    use crate::fake_mpsc;
759    use crate::peer::PeerInfo;
760    use crate::util::{DummyTimeoutEstimator, fake_mq};
761    use futures::sink::SinkExt;
762    use futures::stream::StreamExt;
763    use tor_cell::chancell::msg;
764    use tor_linkspec::OwnedChanTarget;
765    use tor_rtcompat::SpawnExt;
766    use tor_rtcompat::{DynTimeProvider, NoOpStreamOpsHandle, Runtime};
767
768    pub(crate) type CodecResult = std::result::Result<AnyChanCell, Error>;
769
770    pub(crate) fn new_reactor<R: Runtime>(
771        runtime: R,
772    ) -> (
773        Arc<crate::channel::Channel>,
774        Reactor<R>,
775        mpsc::Receiver<AnyChanCell>,
776        mpsc::Sender<CodecResult>,
777    ) {
778        let link_protocol = 4;
779        let (send1, recv1) = mpsc::channel(32);
780        let (send2, recv2) = mpsc::channel(32);
781        let unique_id = UniqId::new();
782        let dummy_target = OwnedChanTarget::builder()
783            .ed_identity([6; 32].into())
784            .rsa_identity([10; 20].into())
785            .build()
786            .unwrap();
787        let send1 = send1.sink_map_err(|e| {
788            trace!("got sink error: {:?}", e);
789            Error::CellDecodeErr {
790                object: "reactor test",
791                err: tor_cell::Error::ChanProto("dummy message".into()),
792            }
793        });
794        let stream_ops = NoOpStreamOpsHandle::default();
795        let (chan, reactor) = crate::channel::Channel::new(
796            ChannelType::ClientInitiator,
797            link_protocol,
798            Box::new(send1),
799            Box::new(recv2),
800            Box::new(stream_ops),
801            unique_id,
802            dummy_target,
803            safelog::MaybeSensitive::not_sensitive(PeerInfo::EMPTY),
804            crate::ClockSkew::None,
805            runtime,
806            fake_mq(),
807            Canonicity::new_canonical(),
808        )
809        .expect("channel create failed");
810        (chan, reactor, recv1, send2)
811    }
812
813    // Try shutdown from inside run_once..
814    #[test]
815    fn shutdown() {
816        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
817            let (chan, mut reactor, _output, _input) = new_reactor(rt);
818
819            chan.terminate();
820            let r = reactor.run_once().await;
821            assert!(matches!(r, Err(ReactorError::Shutdown)));
822        });
823    }
824
825    // Try shutdown while reactor is running.
826    #[test]
827    fn shutdown2() {
828        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
829            // TODO: Ask a rust person if this is how to do this.
830
831            use futures::future::FutureExt;
832            use futures::join;
833
834            let (chan, reactor, _output, _input) = new_reactor(rt);
835            // Let's get the reactor running...
836            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
837
838            let rr = run_reactor.clone();
839
840            let exit_then_check = async {
841                assert!(rr.peek().is_none());
842                // ... and terminate the channel while that's happening.
843                chan.terminate();
844            };
845
846            let (rr_s, _) = join!(run_reactor, exit_then_check);
847
848            // Now let's see. The reactor should not _still_ be running.
849            assert!(rr_s);
850        });
851    }
852
853    #[test]
854    fn new_circ_closed() {
855        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
856            let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
857            assert!(chan.duration_unused().is_some()); // unused yet
858
859            let (ret, reac) = futures::join!(
860                chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
861                reactor.run_once()
862            );
863            let (pending, circr) = ret.unwrap();
864            rt.spawn(async {
865                let _ignore = circr.run().await;
866            })
867            .unwrap();
868            assert!(reac.is_ok());
869
870            let id = pending.peek_circid();
871
872            let ent = reactor.circs.get_mut(id);
873            assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
874            assert!(chan.duration_unused().is_none()); // in use
875
876            // Now drop the circuit; this should tell the reactor to remove
877            // the circuit from the map.
878            drop(pending);
879
880            reactor.run_once().await.unwrap();
881            let ent = reactor.circs.get_mut(id);
882            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
883            let cell = output.next().await.unwrap();
884            assert_eq!(cell.circid(), Some(id));
885            assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
886            assert!(chan.duration_unused().is_some()); // unused again
887        });
888    }
889
890    // Test proper delivery of a created cell that doesn't make a channel
891    #[test]
892    #[ignore] // See bug #244: re-enable this test once it passes reliably.
893    fn new_circ_create_failure() {
894        use std::time::Duration;
895        use tor_rtcompat::SleepProvider;
896
897        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
898            let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
899
900            let (ret, reac) = futures::join!(
901                chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
902                reactor.run_once()
903            );
904            let (pending, circr) = ret.unwrap();
905            rt.spawn(async {
906                let _ignore = circr.run().await;
907            })
908            .unwrap();
909            assert!(reac.is_ok());
910
911            let circparams = CircParameters::default();
912
913            let id = pending.peek_circid();
914
915            let ent = reactor.circs.get_mut(id);
916            assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
917
918            #[allow(clippy::clone_on_copy)]
919            let rtc = rt.clone();
920            let send_response = async {
921                rtc.sleep(Duration::from_millis(100)).await;
922                trace!("sending createdfast");
923                // We'll get a bad handshake result from this createdfast cell.
924                let created_cell = AnyChanCell::new(Some(id), msg::CreatedFast::new(*b"x").into());
925                input.send(Ok(created_cell)).await.unwrap();
926                reactor.run_once().await.unwrap();
927            };
928
929            let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
930            // Make sure statuses are as expected.
931            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
932
933            reactor.run_once().await.unwrap();
934
935            // Make sure that the createfast cell got sent
936            let cell_sent = output.next().await.unwrap();
937            assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
938
939            // But the next run if the reactor will make the circuit get closed.
940            let ent = reactor.circs.get_mut(id);
941            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
942        });
943    }
944
945    // Try incoming cells that shouldn't arrive on channels.
946    #[test]
947    fn bad_cells() {
948        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
949            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
950
951            // shouldn't get created2 cells for nonexistent circuits
952            let created2_cell = msg::Created2::new(*b"hihi").into();
953            input
954                .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
955                .await
956                .unwrap();
957
958            let e = reactor.run_once().await.unwrap_err().unwrap_err();
959            assert_eq!(
960                format!("{}", e),
961                "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
962            );
963
964            // Can't get a relay cell on a circuit we've never heard of.
965            let relay_cell = msg::Relay::new(b"abc").into();
966            input
967                .send(Ok(AnyChanCell::new(CircId::new(4), relay_cell)))
968                .await
969                .unwrap();
970            let e = reactor.run_once().await.unwrap_err().unwrap_err();
971            assert_eq!(
972                format!("{}", e),
973                "Channel protocol violation: Relay cell on nonexistent circuit"
974            );
975
976            // There used to be tests here for other types, but now that we only
977            // accept OpenClientChanCell, we know that the codec can't even try
978            // to give us e.g. VERSIONS or CREATE.
979        });
980    }
981
982    #[test]
983    fn deliver_relay() {
984        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
985            use oneshot_fused_workaround as oneshot;
986
987            let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
988
989            let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
990
991            let (_circ_stream_7, mut circ_stream_13) = {
992                let (snd1, _rcv1) = oneshot::channel();
993                let (snd2, rcv2) = fake_mpsc(64);
994                reactor.circs.put_unchecked(
995                    CircId::new(7).unwrap(),
996                    CircEnt::Opening {
997                        create_response_sender: snd1,
998                        cell_sender: snd2,
999                        padding_ctrl: padding_ctrl.clone(),
1000                    },
1001                );
1002
1003                let (snd3, rcv3) = fake_mpsc(64);
1004                reactor.circs.put_unchecked(
1005                    CircId::new(13).unwrap(),
1006                    CircEnt::Open {
1007                        cell_sender: snd3,
1008                        padding_ctrl,
1009                    },
1010                );
1011
1012                reactor.circs.put_unchecked(
1013                    CircId::new(23).unwrap(),
1014                    CircEnt::DestroySent(HalfCirc::new(25)),
1015                );
1016                (rcv2, rcv3)
1017            };
1018
1019            // If a relay cell is sent on an open channel, the correct circuit
1020            // should get it.
1021            let relaycell: AnyChanMsg = msg::Relay::new(b"do you suppose").into();
1022            input
1023                .send(Ok(AnyChanCell::new(CircId::new(13), relaycell.clone())))
1024                .await
1025                .unwrap();
1026            reactor.run_once().await.unwrap();
1027            let got = circ_stream_13.next().await.unwrap();
1028            assert!(matches!(got, AnyChanMsg::Relay(_)));
1029
1030            // If a relay cell is sent on an opening channel, that's an error.
1031            input
1032                .send(Ok(AnyChanCell::new(CircId::new(7), relaycell.clone())))
1033                .await
1034                .unwrap();
1035            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1036            assert_eq!(
1037                format!("{}", e),
1038                "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
1039            );
1040
1041            // If a relay cell is sent on a non-existent channel, that's an error.
1042            input
1043                .send(Ok(AnyChanCell::new(CircId::new(101), relaycell.clone())))
1044                .await
1045                .unwrap();
1046            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1047            assert_eq!(
1048                format!("{}", e),
1049                "Channel protocol violation: Relay cell on nonexistent circuit"
1050            );
1051
1052            // It's fine to get a relay cell on a DestroySent channel: that happens
1053            // when the other side hasn't noticed the Destroy yet.
1054
1055            // We can do this 25 more times according to our setup:
1056            for _ in 0..25 {
1057                input
1058                    .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1059                    .await
1060                    .unwrap();
1061                reactor.run_once().await.unwrap(); // should be fine.
1062            }
1063
1064            // This one will fail.
1065            input
1066                .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1067                .await
1068                .unwrap();
1069            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1070            assert_eq!(
1071                format!("{}", e),
1072                "Channel protocol violation: Too many cells received on destroyed circuit"
1073            );
1074        });
1075    }
1076
1077    #[test]
1078    fn deliver_destroy() {
1079        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1080            use crate::circuit::celltypes::*;
1081            use oneshot_fused_workaround as oneshot;
1082
1083            let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
1084
1085            let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
1086
1087            let (circ_oneshot_7, mut circ_stream_13) = {
1088                let (snd1, rcv1) = oneshot::channel();
1089                let (snd2, _rcv2) = fake_mpsc(64);
1090                reactor.circs.put_unchecked(
1091                    CircId::new(7).unwrap(),
1092                    CircEnt::Opening {
1093                        create_response_sender: snd1,
1094                        cell_sender: snd2,
1095                        padding_ctrl: padding_ctrl.clone(),
1096                    },
1097                );
1098
1099                let (snd3, rcv3) = fake_mpsc(64);
1100                reactor.circs.put_unchecked(
1101                    CircId::new(13).unwrap(),
1102                    CircEnt::Open {
1103                        cell_sender: snd3,
1104                        padding_ctrl: padding_ctrl.clone(),
1105                    },
1106                );
1107
1108                reactor.circs.put_unchecked(
1109                    CircId::new(23).unwrap(),
1110                    CircEnt::DestroySent(HalfCirc::new(25)),
1111                );
1112                (rcv1, rcv3)
1113            };
1114
1115            // Destroying an opening circuit is fine.
1116            let destroycell: AnyChanMsg = msg::Destroy::new(0.into()).into();
1117            input
1118                .send(Ok(AnyChanCell::new(CircId::new(7), destroycell.clone())))
1119                .await
1120                .unwrap();
1121            reactor.run_once().await.unwrap();
1122            let msg = circ_oneshot_7.await;
1123            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
1124
1125            // Destroying an open circuit is fine.
1126            input
1127                .send(Ok(AnyChanCell::new(CircId::new(13), destroycell.clone())))
1128                .await
1129                .unwrap();
1130            reactor.run_once().await.unwrap();
1131            let msg = circ_stream_13.next().await.unwrap();
1132            assert!(matches!(msg, AnyChanMsg::Destroy(_)));
1133
1134            // Destroying a DestroySent circuit is fine.
1135            input
1136                .send(Ok(AnyChanCell::new(CircId::new(23), destroycell.clone())))
1137                .await
1138                .unwrap();
1139            reactor.run_once().await.unwrap();
1140
1141            // Destroying a nonexistent circuit is an error.
1142            input
1143                .send(Ok(AnyChanCell::new(CircId::new(101), destroycell.clone())))
1144                .await
1145                .unwrap();
1146            let e = reactor.run_once().await.unwrap_err().unwrap_err();
1147            assert_eq!(
1148                format!("{}", e),
1149                "Channel protocol violation: Destroy for nonexistent circuit"
1150            );
1151        });
1152    }
1153
1154    #[test]
1155    fn closing_if_reactor_dropped() {
1156        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1157            let (chan, reactor, _output, _input) = new_reactor(rt);
1158
1159            assert!(!chan.is_closing());
1160            drop(reactor);
1161            assert!(chan.is_closing());
1162
1163            assert!(matches!(
1164                chan.wait_for_close().await,
1165                Err(ClosedUnexpectedly::ReactorDropped),
1166            ));
1167        });
1168    }
1169
1170    #[test]
1171    fn closing_if_reactor_shutdown() {
1172        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1173            let (chan, reactor, _output, _input) = new_reactor(rt);
1174
1175            assert!(!chan.is_closing());
1176            chan.terminate();
1177            assert!(!chan.is_closing());
1178
1179            let r = reactor.run().await;
1180            assert!(r.is_ok());
1181            assert!(chan.is_closing());
1182
1183            assert!(chan.wait_for_close().await.is_ok());
1184        });
1185    }
1186
1187    #[test]
1188    fn reactor_error_wait_for_close() {
1189        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1190            let (chan, reactor, _output, mut input) = new_reactor(rt);
1191
1192            // force an error by sending created2 cell for nonexistent circuit
1193            let created2_cell = msg::Created2::new(*b"hihi").into();
1194            input
1195                .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
1196                .await
1197                .unwrap();
1198
1199            // `reactor.run()` should return an error
1200            let run_error = reactor.run().await.unwrap_err();
1201
1202            // `chan.wait_for_close()` should return the same error
1203            let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
1204            else {
1205                panic!("Expected a 'ReactorError'");
1206            };
1207
1208            // `Error` doesn't implement `PartialEq`, so best we can do is to compare the strings
1209            assert_eq!(run_error.to_string(), wait_error.to_string());
1210        });
1211    }
1212}