tor_proto/channel/
reactor.rs

1//! Code to handle incoming cells on a channel.
2//!
3//! The role of this code is to run in a separate asynchronous task,
4//! and routes cells to the right circuits.
5//!
6//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7//! or in the error handling behavior.
8
9use super::circmap::{CircEnt, CircMap};
10use super::OpenChanCellS2C;
11use crate::channel::OpenChanMsgS2C;
12use crate::tunnel::circuit::halfcirc::HalfCirc;
13use crate::util::err::ReactorError;
14use crate::util::oneshot_broadcast;
15use crate::{Error, Result};
16use tor_async_utils::SinkPrepareExt as _;
17use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
18use tor_cell::chancell::ChanMsg;
19use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};
20use tor_memquota::mq_queue;
21use tor_rtcompat::SleepProvider;
22
23#[cfg_attr(not(target_os = "linux"), allow(unused))]
24use tor_error::error_report;
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_rtcompat::StreamOps;
27
28use futures::channel::mpsc;
29use oneshot_fused_workaround as oneshot;
30
31use futures::sink::SinkExt;
32use futures::stream::Stream;
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::{select, select_biased};
36use tor_error::internal;
37
38use std::fmt;
39use std::pin::Pin;
40use std::sync::Arc;
41
42use crate::channel::{
43    codec::CodecError, kist::KistParams, padding, params::*, unique_id, ChannelDetails, CloseInfo,
44};
45use crate::tunnel::circuit::{celltypes::CreateResponse, CircuitRxSender};
46use tracing::{debug, trace};
47
48/// A boxed trait object that can provide `ChanCell`s.
49pub(super) type BoxedChannelStream = Box<
50    dyn Stream<Item = std::result::Result<OpenChanCellS2C, CodecError>> + Send + Unpin + 'static,
51>;
52/// A boxed trait object that can sink `ChanCell`s.
53pub(super) type BoxedChannelSink =
54    Box<dyn Sink<AnyChanCell, Error = CodecError> + Send + Unpin + 'static>;
55/// A boxed trait object that can provide additional `StreamOps` on a `BoxedChannelStream`.
56pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
57/// The type of a oneshot channel used to inform reactor users of the result of an operation.
58pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
59
60/// Convert `err` to an Error, under the assumption that it's happening on an
61/// open channel.
62fn codec_err_to_chan(err: CodecError) -> Error {
63    match err {
64        CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
65        CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
66        CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
67    }
68}
69
70/// A message telling the channel reactor to do something.
71#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
72#[derive(Debug)]
73#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
74#[allow(clippy::exhaustive_enums)]
75pub enum CtrlMsg {
76    /// Shut down the reactor.
77    Shutdown,
78    /// Tell the reactor that a given circuit has gone away.
79    CloseCircuit(CircId),
80    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
81    /// and registering senders for messages received for the circuit.
82    AllocateCircuit {
83        /// Channel to send the circuit's `CreateResponse` down.
84        created_sender: oneshot::Sender<CreateResponse>,
85        /// Channel to send other messages from this circuit down.
86        sender: CircuitRxSender,
87        /// Oneshot channel to send the new circuit's identifiers down.
88        tx: ReactorResultChannel<(CircId, crate::tunnel::circuit::UniqId)>,
89    },
90    /// Enable/disable/reconfigure channel padding
91    ///
92    /// The sender of these messages is responsible for the optimisation of
93    /// ensuring that "no-change" messages are elided.
94    /// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
95    ///
96    /// These updates are done via a control message to avoid adding additional branches to the
97    /// main reactor `select!`.
98    ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
99    /// Enable/disable/reconfigure KIST.
100    ///
101    /// Like in the case of `ConfigUpdate`,
102    /// the sender of these messages is responsible for the optimisation of
103    /// ensuring that "no-change" messages are elided.
104    KistConfigUpdate(KistParams),
105}
106
107/// Object to handle incoming cells and background tasks on a channel.
108///
109/// This type is returned when you finish a channel; you need to spawn a
110/// new task that calls `run()` on it.
111#[must_use = "If you don't call run() on a reactor, the channel won't work."]
112pub struct Reactor<S: SleepProvider> {
113    /// A receiver for control messages from `Channel` objects.
114    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
115    /// A oneshot sender that is used to alert other tasks when this reactor is
116    /// finally dropped.
117    pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
118    /// A receiver for cells to be sent on this reactor's sink.
119    ///
120    /// `Channel` objects have a sender that can send cells here.
121    pub(super) cells: mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
122    /// A Stream from which we can read `ChanCell`s.
123    ///
124    /// This should be backed by a TLS connection if you want it to be secure.
125    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
126    /// A Sink to which we can write `ChanCell`s.
127    ///
128    /// This should also be backed by a TLS connection if you want it to be secure.
129    pub(super) output: BoxedChannelSink,
130    /// A handler for setting stream options on the underlying stream.
131    #[cfg_attr(not(target_os = "linux"), allow(unused))]
132    pub(super) streamops: BoxedChannelStreamOps,
133    /// Timer tracking when to generate channel padding
134    pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
135    /// Outgoing cells introduced at the channel reactor
136    pub(super) special_outgoing: SpecialOutgoing,
137    /// A map from circuit ID to Sinks on which we can deliver cells.
138    pub(super) circs: CircMap,
139    /// A unique identifier for this channel.
140    pub(super) unique_id: super::UniqId,
141    /// Information shared with the frontend
142    pub(super) details: Arc<ChannelDetails>,
143    /// Context for allocating unique circuit log identifiers.
144    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
145    /// What link protocol is the channel using?
146    #[allow(dead_code)] // We don't support protocols where this would matter
147    pub(super) link_protocol: u16,
148}
149
150/// Outgoing cells introduced at the channel reactor
151#[derive(Default, Debug, Clone)]
152pub(super) struct SpecialOutgoing {
153    /// If we must send a `PaddingNegotiate`
154    pub(super) padding_negotiate: Option<PaddingNegotiate>,
155}
156
157impl SpecialOutgoing {
158    /// Do we have a special cell to send?
159    ///
160    /// Called by the reactor before looking for cells from the reactor's clients.
161    /// The returned message *must* be sent by the caller, not dropped!
162    #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
163    pub(super) fn next(&mut self) -> Option<AnyChanCell> {
164        // If this gets more cases, consider making SpecialOutgoing into a #[repr(C)]
165        // enum, so that we can fast-path the usual case of "no special message to send".
166        if let Some(p) = self.padding_negotiate.take() {
167            return Some(p.into());
168        }
169        None
170    }
171}
172
173/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
174///
175/// There is no risk of confusion because no-one would try to print a
176/// Reactor for some other reason.
177impl<S: SleepProvider> fmt::Display for Reactor<S> {
178    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
179        fmt::Debug::fmt(&self.unique_id, f)
180    }
181}
182
183impl<S: SleepProvider> Reactor<S> {
184    /// Launch the reactor, and run until the channel closes or we
185    /// encounter an error.
186    ///
187    /// Once this function returns, the channel is dead, and can't be
188    /// used again.
189    pub async fn run(mut self) -> Result<()> {
190        trace!(channel_id = %self, "Running reactor");
191        let result: Result<()> = loop {
192            match self.run_once().await {
193                Ok(()) => (),
194                Err(ReactorError::Shutdown) => break Ok(()),
195                Err(ReactorError::Err(e)) => break Err(e),
196            }
197        };
198        debug!(channel_id = %self, "Reactor stopped");
199        // Inform any waiters that the channel has closed.
200        let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
201        self.reactor_closed_tx.send(close_msg);
202        result
203    }
204
205    /// Helper for run(): handles only one action.
206    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
207        select! {
208
209            // See if the output sink can have cells written to it yet.
210            // If so, see if we have to-be-transmitted cells.
211            ret = self.output.prepare_send_from(async {
212                // This runs if we will be able to write, so try to obtain a cell:
213
214                if let Some(l) = self.special_outgoing.next() {
215                    // See reasoning below.
216                    // eprintln!("PADDING - SENDING NEOGIATION: {:?}", &l);
217                    self.padding_timer.as_mut().note_cell_sent();
218                    return Some(l)
219                }
220
221                select_biased! {
222                    n = self.cells.next() => {
223                        // Note transmission on *input* to the reactor, not ultimate
224                        // transmission.  Ideally we would tap into the TCP stream at the far
225                        // end of our TLS or perhaps during encoding on entry to the TLS, but
226                        // both of those would involve quite some plumbing.  Doing it here in
227                        // the reactor avoids additional inter-task communication, mutexes,
228                        // etc.  (And there is no real difference between doing it here on
229                        // input, to just below, on enquieing into the `sendable`.)
230                        //
231                        // Padding is sent when the output channel is idle, and the effect of
232                        // buffering is just that we might sent it a little early because we
233                        // measure idleness when we last put something into the output layers.
234                        //
235                        // We can revisit this if measurement shows it to be bad in practice.
236                        //
237                        // (We in any case need padding that we generate when idle to make it
238                        // through to the output promptly, or it will be late and ineffective.)
239                        self.padding_timer.as_mut().note_cell_sent();
240                        n
241                    },
242                    p = self.padding_timer.as_mut().next() => {
243                        // eprintln!("PADDING - SENDING PADDING: {:?}", &p);
244                        Some(p.into())
245                    },
246                }
247            }) => {
248                let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
249                let msg = msg.ok_or(ReactorError::Shutdown)?;
250                sendable.send(msg).map_err(codec_err_to_chan)?;
251            }
252
253            ret = self.control.next() => {
254                let ctrl = match ret {
255                    None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
256                    Some(x) => x,
257                };
258                self.handle_control(ctrl).await?;
259            }
260
261            ret = self.input.next() => {
262                let item = ret
263                    .ok_or(ReactorError::Shutdown)?
264                    .map_err(codec_err_to_chan)?;
265                crate::note_incoming_traffic();
266                self.handle_cell(item).await?;
267            }
268
269        }
270        Ok(()) // Run again.
271    }
272
273    /// Handle a CtrlMsg other than Shutdown.
274    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
275        trace!(
276            channel_id = %self,
277            msg = ?msg,
278            "reactor received control message"
279        );
280
281        match msg {
282            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
283            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
284            CtrlMsg::AllocateCircuit {
285                created_sender,
286                sender,
287                tx,
288            } => {
289                let mut rng = rand::rng();
290                let my_unique_id = self.unique_id;
291                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
292                let ret: Result<_> = self
293                    .circs
294                    .add_ent(&mut rng, created_sender, sender)
295                    .map(|id| (id, circ_unique_id));
296                let _ = tx.send(ret); // don't care about other side going away
297                self.update_disused_since();
298            }
299            CtrlMsg::ConfigUpdate(updates) => {
300                if self.link_protocol == 4 {
301                    // Link protocol 4 does not permit sending, or negotiating, link padding.
302                    // We test for == 4 so that future updates to handshake.rs LINK_PROTOCOLS
303                    // keep doing padding things.
304                    return Ok(());
305                }
306
307                let ChannelPaddingInstructionsUpdates {
308                    // List all the fields explicitly; that way the compiler will warn us
309                    // if one is added and we fail to handle it here.
310                    padding_enable,
311                    padding_parameters,
312                    padding_negotiate,
313                } = &*updates;
314                if let Some(parameters) = padding_parameters {
315                    self.padding_timer.as_mut().reconfigure(parameters)?;
316                }
317                if let Some(enable) = padding_enable {
318                    if *enable {
319                        self.padding_timer.as_mut().enable();
320                    } else {
321                        self.padding_timer.as_mut().disable();
322                    }
323                }
324                if let Some(padding_negotiate) = padding_negotiate {
325                    // This replaces any previous PADDING_NEGOTIATE cell that we were
326                    // told to send, but which we didn't manage to send yet.
327                    // It doesn't make sense to queue them up.
328                    self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
329                }
330            }
331            CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
332        }
333        Ok(())
334    }
335
336    /// Helper: process a cell on a channel.  Most cell types get ignored
337    /// or rejected; a few get delivered to circuits.
338    async fn handle_cell(&mut self, cell: OpenChanCellS2C) -> Result<()> {
339        let (circid, msg) = cell.into_circid_and_msg();
340        use OpenChanMsgS2C::*;
341
342        match msg {
343            Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
344            _ => trace!(
345                channel_id = %self,
346                "received {} for {}",
347                msg.cmd(),
348                CircId::get_or_zero(circid)
349            ),
350        }
351
352        match msg {
353            // These are allowed, and need to be handled.
354            Relay(_) => self.deliver_relay(circid, msg.into()).await,
355
356            Destroy(_) => self.deliver_destroy(circid, msg.into()).await,
357
358            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg.into()).await,
359
360            // These are always ignored.
361            Padding(_) | Vpadding(_) => Ok(()),
362        }
363    }
364
365    /// Give the RELAY cell `msg` to the appropriate circuit.
366    async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
367        let Some(circid) = circid else {
368            return Err(Error::ChanProto("Relay cell without circuit ID".into()));
369        };
370
371        let mut ent = self
372            .circs
373            .get_mut(circid)
374            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
375
376        match &mut *ent {
377            CircEnt::Open(s) => {
378                // There's an open circuit; we can give it the RELAY cell.
379                if s.send(msg.try_into()?).await.is_err() {
380                    drop(ent);
381                    // The circuit's receiver went away, so we should destroy the circuit.
382                    self.outbound_destroy_circ(circid).await?;
383                }
384                Ok(())
385            }
386            CircEnt::Opening(_, _) => Err(Error::ChanProto(
387                "Relay cell on pending circuit before CREATED* received".into(),
388            )),
389            CircEnt::DestroySent(hs) => hs.receive_cell(),
390        }
391    }
392
393    /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate
394    /// circuit, if that circuit is waiting for one.
395    async fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
396        let Some(circid) = circid else {
397            return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
398        };
399
400        let target = self.circs.advance_from_opening(circid)?;
401        let created = msg.try_into()?;
402        // TODO(nickm) I think that this one actually means the other side
403        // is closed. See arti#269.
404        target.send(created).map_err(|_| {
405            Error::from(internal!(
406                "Circuit queue rejected created message. Is it closing?"
407            ))
408        })
409    }
410
411    /// Handle a DESTROY cell by removing the corresponding circuit
412    /// from the map, and passing the destroy cell onward to the circuit.
413    async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
414        let Some(circid) = circid else {
415            return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
416        };
417
418        // Remove the circuit from the map: nothing more can be done with it.
419        let entry = self.circs.remove(circid);
420        self.update_disused_since();
421        match entry {
422            // If the circuit is waiting for CREATED, tell it that it
423            // won't get one.
424            Some(CircEnt::Opening(oneshot, _)) => {
425                trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
426                oneshot
427                    .send(msg.try_into()?)
428                    // TODO(nickm) I think that this one actually means the other side
429                    // is closed. See arti#269.
430                    .map_err(|_| {
431                        internal!("pending circuit wasn't interested in destroy cell?").into()
432                    })
433            }
434            // It's an open circuit: tell it that it got a DESTROY cell.
435            Some(CircEnt::Open(mut sink)) => {
436                trace!(channel_id = %self, "Passing destroy to open circuit {}", circid);
437                sink.send(msg.try_into()?)
438                    .await
439                    // TODO(nickm) I think that this one actually means the other side
440                    // is closed. See arti#269.
441                    .map_err(|_| {
442                        internal!("open circuit wasn't interested in destroy cell?").into()
443                    })
444            }
445            // We've sent a destroy; we can leave this circuit removed.
446            Some(CircEnt::DestroySent(_)) => Ok(()),
447            // Got a DESTROY cell for a circuit we don't have.
448            None => {
449                trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
450                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
451            }
452        }
453    }
454
455    /// Helper: send a cell on the outbound sink.
456    async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
457        self.output.send(cell).await.map_err(codec_err_to_chan)?;
458        Ok(())
459    }
460
461    /// Called when a circuit goes away: sends a DESTROY cell and removes
462    /// the circuit.
463    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
464        trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
465        // Remove the circuit's entry from the map: nothing more
466        // can be done with it.
467        // TODO: It would be great to have a tighter upper bound for
468        // the number of relay cells we'll receive.
469        self.circs.destroy_sent(id, HalfCirc::new(3000));
470        self.update_disused_since();
471        let destroy = Destroy::new(DestroyReason::NONE).into();
472        let cell = AnyChanCell::new(Some(id), destroy);
473        self.send_cell(cell).await?;
474
475        Ok(())
476    }
477
478    /// Update disused timestamp with current time if this channel is no longer used
479    fn update_disused_since(&self) {
480        if self.circs.open_ent_count() == 0 {
481            // Update disused_since if it still indicates that the channel is in use
482            self.details.unused_since.update_if_none();
483        } else {
484            // Mark this channel as in use
485            self.details.unused_since.clear();
486        }
487    }
488
489    /// Use the new KIST parameters.
490    #[cfg(target_os = "linux")]
491    fn apply_kist_params(&self, params: &KistParams) {
492        use super::kist::KistMode;
493
494        let set_tcp_notsent_lowat = |v: u32| {
495            if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
496                // This is bad, but not fatal: not setting the KIST options
497                // comes with a performance penalty, but we don't have to crash.
498                error_report!(e, "Failed to set KIST socket options");
499            }
500        };
501
502        match params.kist_enabled() {
503            KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
504            KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
505        }
506    }
507
508    #[cfg(not(target_os = "linux"))]
509    fn apply_kist_params(&self, params: &KistParams) {
510        use super::kist::KistMode;
511
512        if params.kist_enabled() != KistMode::Disabled {
513            tracing::warn!("KIST not currently supported on non-linux platforms");
514        }
515    }
516}
517
518#[cfg(test)]
519pub(crate) mod test {
520    #![allow(clippy::unwrap_used)]
521    use super::*;
522    use crate::channel::{ClosedUnexpectedly, UniqId};
523    use crate::fake_mpsc;
524    use crate::tunnel::circuit::CircParameters;
525    use crate::util::fake_mq;
526    use futures::sink::SinkExt;
527    use futures::stream::StreamExt;
528    use futures::task::SpawnExt;
529    use tor_cell::chancell::msg;
530    use tor_linkspec::OwnedChanTarget;
531    use tor_rtcompat::{NoOpStreamOpsHandle, Runtime};
532
533    type CodecResult = std::result::Result<OpenChanCellS2C, CodecError>;
534
535    pub(crate) fn new_reactor<R: Runtime>(
536        runtime: R,
537    ) -> (
538        Arc<crate::channel::Channel>,
539        Reactor<R>,
540        mpsc::Receiver<AnyChanCell>,
541        mpsc::Sender<CodecResult>,
542    ) {
543        let link_protocol = 4;
544        let (send1, recv1) = mpsc::channel(32);
545        let (send2, recv2) = mpsc::channel(32);
546        let unique_id = UniqId::new();
547        let dummy_target = OwnedChanTarget::builder()
548            .ed_identity([6; 32].into())
549            .rsa_identity([10; 20].into())
550            .build()
551            .unwrap();
552        let send1 = send1.sink_map_err(|e| {
553            trace!("got sink error: {:?}", e);
554            CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
555        });
556        let stream_ops = NoOpStreamOpsHandle::default();
557        let (chan, reactor) = crate::channel::Channel::new(
558            link_protocol,
559            Box::new(send1),
560            Box::new(recv2),
561            Box::new(stream_ops),
562            unique_id,
563            dummy_target,
564            crate::ClockSkew::None,
565            runtime,
566            fake_mq(),
567        )
568        .expect("channel create failed");
569        (chan, reactor, recv1, send2)
570    }
571
572    // Try shutdown from inside run_once..
573    #[test]
574    fn shutdown() {
575        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
576            let (chan, mut reactor, _output, _input) = new_reactor(rt);
577
578            chan.terminate();
579            let r = reactor.run_once().await;
580            assert!(matches!(r, Err(ReactorError::Shutdown)));
581        });
582    }
583
584    // Try shutdown while reactor is running.
585    #[test]
586    fn shutdown2() {
587        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
588            // TODO: Ask a rust person if this is how to do this.
589
590            use futures::future::FutureExt;
591            use futures::join;
592
593            let (chan, reactor, _output, _input) = new_reactor(rt);
594            // Let's get the reactor running...
595            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
596
597            let rr = run_reactor.clone();
598
599            let exit_then_check = async {
600                assert!(rr.peek().is_none());
601                // ... and terminate the channel while that's happening.
602                chan.terminate();
603            };
604
605            let (rr_s, _) = join!(run_reactor, exit_then_check);
606
607            // Now let's see. The reactor should not _still_ be running.
608            assert!(rr_s);
609        });
610    }
611
612    #[test]
613    fn new_circ_closed() {
614        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
615            let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
616            assert!(chan.duration_unused().is_some()); // unused yet
617
618            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
619            let (pending, circr) = ret.unwrap();
620            rt.spawn(async {
621                let _ignore = circr.run().await;
622            })
623            .unwrap();
624            assert!(reac.is_ok());
625
626            let id = pending.peek_circid();
627
628            let ent = reactor.circs.get_mut(id);
629            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
630            assert!(chan.duration_unused().is_none()); // in use
631
632            // Now drop the circuit; this should tell the reactor to remove
633            // the circuit from the map.
634            drop(pending);
635
636            reactor.run_once().await.unwrap();
637            let ent = reactor.circs.get_mut(id);
638            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
639            let cell = output.next().await.unwrap();
640            assert_eq!(cell.circid(), Some(id));
641            assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
642            assert!(chan.duration_unused().is_some()); // unused again
643        });
644    }
645
646    // Test proper delivery of a created cell that doesn't make a channel
647    #[test]
648    #[ignore] // See bug #244: re-enable this test once it passes reliably.
649    fn new_circ_create_failure() {
650        use std::time::Duration;
651        use tor_rtcompat::SleepProvider;
652
653        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
654            let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
655
656            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
657            let (pending, circr) = ret.unwrap();
658            rt.spawn(async {
659                let _ignore = circr.run().await;
660            })
661            .unwrap();
662            assert!(reac.is_ok());
663
664            let circparams = CircParameters::default();
665
666            let id = pending.peek_circid();
667
668            let ent = reactor.circs.get_mut(id);
669            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
670
671            #[allow(clippy::clone_on_copy)]
672            let rtc = rt.clone();
673            let send_response = async {
674                rtc.sleep(Duration::from_millis(100)).await;
675                trace!("sending createdfast");
676                // We'll get a bad handshake result from this createdfast cell.
677                let created_cell =
678                    OpenChanCellS2C::new(Some(id), msg::CreatedFast::new(*b"x").into());
679                input.send(Ok(created_cell)).await.unwrap();
680                reactor.run_once().await.unwrap();
681            };
682
683            let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
684            // Make sure statuses are as expected.
685            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
686
687            reactor.run_once().await.unwrap();
688
689            // Make sure that the createfast cell got sent
690            let cell_sent = output.next().await.unwrap();
691            assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
692
693            // But the next run if the reactor will make the circuit get closed.
694            let ent = reactor.circs.get_mut(id);
695            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
696        });
697    }
698
699    // Try incoming cells that shouldn't arrive on channels.
700    #[test]
701    fn bad_cells() {
702        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
703            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
704
705            // shouldn't get created2 cells for nonexistent circuits
706            let created2_cell = msg::Created2::new(*b"hihi").into();
707            input
708                .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
709                .await
710                .unwrap();
711
712            let e = reactor.run_once().await.unwrap_err().unwrap_err();
713            assert_eq!(
714                format!("{}", e),
715                "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
716            );
717
718            // Can't get a relay cell on a circuit we've never heard of.
719            let relay_cell = msg::Relay::new(b"abc").into();
720            input
721                .send(Ok(OpenChanCellS2C::new(CircId::new(4), relay_cell)))
722                .await
723                .unwrap();
724            let e = reactor.run_once().await.unwrap_err().unwrap_err();
725            assert_eq!(
726                format!("{}", e),
727                "Channel protocol violation: Relay cell on nonexistent circuit"
728            );
729
730            // There used to be tests here for other types, but now that we only
731            // accept OpenClientChanCell, we know that the codec can't even try
732            // to give us e.g. VERSIONS or CREATE.
733        });
734    }
735
736    #[test]
737    fn deliver_relay() {
738        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
739            use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
740            use oneshot_fused_workaround as oneshot;
741
742            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
743
744            let (_circ_stream_7, mut circ_stream_13) = {
745                let (snd1, _rcv1) = oneshot::channel();
746                let (snd2, rcv2) = fake_mpsc(64);
747                reactor
748                    .circs
749                    .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
750
751                let (snd3, rcv3) = fake_mpsc(64);
752                reactor
753                    .circs
754                    .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
755
756                reactor.circs.put_unchecked(
757                    CircId::new(23).unwrap(),
758                    CircEnt::DestroySent(HalfCirc::new(25)),
759                );
760                (rcv2, rcv3)
761            };
762
763            // If a relay cell is sent on an open channel, the correct circuit
764            // should get it.
765            let relaycell: OpenChanMsgS2C = msg::Relay::new(b"do you suppose").into();
766            input
767                .send(Ok(OpenChanCellS2C::new(CircId::new(13), relaycell.clone())))
768                .await
769                .unwrap();
770            reactor.run_once().await.unwrap();
771            let got = circ_stream_13.next().await.unwrap();
772            assert!(matches!(got, ClientCircChanMsg::Relay(_)));
773
774            // If a relay cell is sent on an opening channel, that's an error.
775            input
776                .send(Ok(OpenChanCellS2C::new(CircId::new(7), relaycell.clone())))
777                .await
778                .unwrap();
779            let e = reactor.run_once().await.unwrap_err().unwrap_err();
780            assert_eq!(
781                format!("{}", e),
782                "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
783            );
784
785            // If a relay cell is sent on a non-existent channel, that's an error.
786            input
787                .send(Ok(OpenChanCellS2C::new(
788                    CircId::new(101),
789                    relaycell.clone(),
790                )))
791                .await
792                .unwrap();
793            let e = reactor.run_once().await.unwrap_err().unwrap_err();
794            assert_eq!(
795                format!("{}", e),
796                "Channel protocol violation: Relay cell on nonexistent circuit"
797            );
798
799            // It's fine to get a relay cell on a DestroySent channel: that happens
800            // when the other side hasn't noticed the Destroy yet.
801
802            // We can do this 25 more times according to our setup:
803            for _ in 0..25 {
804                input
805                    .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
806                    .await
807                    .unwrap();
808                reactor.run_once().await.unwrap(); // should be fine.
809            }
810
811            // This one will fail.
812            input
813                .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
814                .await
815                .unwrap();
816            let e = reactor.run_once().await.unwrap_err().unwrap_err();
817            assert_eq!(
818                format!("{}", e),
819                "Channel protocol violation: Too many cells received on destroyed circuit"
820            );
821        });
822    }
823
824    #[test]
825    fn deliver_destroy() {
826        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
827            use crate::tunnel::circuit::celltypes::*;
828            use oneshot_fused_workaround as oneshot;
829
830            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
831
832            let (circ_oneshot_7, mut circ_stream_13) = {
833                let (snd1, rcv1) = oneshot::channel();
834                let (snd2, _rcv2) = fake_mpsc(64);
835                reactor
836                    .circs
837                    .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
838
839                let (snd3, rcv3) = fake_mpsc(64);
840                reactor
841                    .circs
842                    .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
843
844                reactor.circs.put_unchecked(
845                    CircId::new(23).unwrap(),
846                    CircEnt::DestroySent(HalfCirc::new(25)),
847                );
848                (rcv1, rcv3)
849            };
850
851            // Destroying an opening circuit is fine.
852            let destroycell: OpenChanMsgS2C = msg::Destroy::new(0.into()).into();
853            input
854                .send(Ok(OpenChanCellS2C::new(
855                    CircId::new(7),
856                    destroycell.clone(),
857                )))
858                .await
859                .unwrap();
860            reactor.run_once().await.unwrap();
861            let msg = circ_oneshot_7.await;
862            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
863
864            // Destroying an open circuit is fine.
865            input
866                .send(Ok(OpenChanCellS2C::new(
867                    CircId::new(13),
868                    destroycell.clone(),
869                )))
870                .await
871                .unwrap();
872            reactor.run_once().await.unwrap();
873            let msg = circ_stream_13.next().await.unwrap();
874            assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
875
876            // Destroying a DestroySent circuit is fine.
877            input
878                .send(Ok(OpenChanCellS2C::new(
879                    CircId::new(23),
880                    destroycell.clone(),
881                )))
882                .await
883                .unwrap();
884            reactor.run_once().await.unwrap();
885
886            // Destroying a nonexistent circuit is an error.
887            input
888                .send(Ok(OpenChanCellS2C::new(
889                    CircId::new(101),
890                    destroycell.clone(),
891                )))
892                .await
893                .unwrap();
894            let e = reactor.run_once().await.unwrap_err().unwrap_err();
895            assert_eq!(
896                format!("{}", e),
897                "Channel protocol violation: Destroy for nonexistent circuit"
898            );
899        });
900    }
901
902    #[test]
903    fn closing_if_reactor_dropped() {
904        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
905            let (chan, reactor, _output, _input) = new_reactor(rt);
906
907            assert!(!chan.is_closing());
908            drop(reactor);
909            assert!(chan.is_closing());
910
911            assert!(matches!(
912                chan.wait_for_close().await,
913                Err(ClosedUnexpectedly::ReactorDropped),
914            ));
915        });
916    }
917
918    #[test]
919    fn closing_if_reactor_shutdown() {
920        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
921            let (chan, reactor, _output, _input) = new_reactor(rt);
922
923            assert!(!chan.is_closing());
924            chan.terminate();
925            assert!(!chan.is_closing());
926
927            let r = reactor.run().await;
928            assert!(r.is_ok());
929            assert!(chan.is_closing());
930
931            assert!(chan.wait_for_close().await.is_ok());
932        });
933    }
934
935    #[test]
936    fn reactor_error_wait_for_close() {
937        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
938            let (chan, reactor, _output, mut input) = new_reactor(rt);
939
940            // force an error by sending created2 cell for nonexistent circuit
941            let created2_cell = msg::Created2::new(*b"hihi").into();
942            input
943                .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
944                .await
945                .unwrap();
946
947            // `reactor.run()` should return an error
948            let run_error = reactor.run().await.unwrap_err();
949
950            // `chan.wait_for_close()` should return the same error
951            let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
952            else {
953                panic!("Expected a 'ReactorError'");
954            };
955
956            // `Error` doesn't implement `PartialEq`, so best we can do is to compare the strings
957            assert_eq!(run_error.to_string(), wait_error.to_string());
958        });
959    }
960}