tor_proto/channel/
reactor.rs

1//! Code to handle incoming cells on a channel.
2//!
3//! The role of this code is to run in a separate asynchronous task,
4//! and routes cells to the right circuits.
5//!
6//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7//! or in the error handling behavior.
8
9use super::circmap::{CircEnt, CircMap};
10use super::OpenChanCellS2C;
11use crate::channel::OpenChanMsgS2C;
12use crate::tunnel::circuit::halfcirc::HalfCirc;
13use crate::util::err::ReactorError;
14use crate::util::oneshot_broadcast;
15use crate::{Error, Result};
16use tor_async_utils::SinkPrepareExt as _;
17use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
18use tor_cell::chancell::ChanMsg;
19use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};
20use tor_memquota::mq_queue;
21use tor_rtcompat::{SleepProvider, StreamOps};
22
23use futures::channel::mpsc;
24use oneshot_fused_workaround as oneshot;
25
26use futures::sink::SinkExt;
27use futures::stream::Stream;
28use futures::Sink;
29use futures::StreamExt as _;
30use futures::{select, select_biased};
31use tor_error::{error_report, internal};
32
33use std::fmt;
34use std::pin::Pin;
35use std::sync::Arc;
36
37use crate::channel::{
38    codec::CodecError, kist::KistParams, padding, params::*, unique_id, ChannelDetails, CloseInfo,
39};
40use crate::tunnel::circuit::{celltypes::CreateResponse, CircuitRxSender};
41use tracing::{debug, trace};
42
43/// A boxed trait object that can provide `ChanCell`s.
44pub(super) type BoxedChannelStream = Box<
45    dyn Stream<Item = std::result::Result<OpenChanCellS2C, CodecError>> + Send + Unpin + 'static,
46>;
47/// A boxed trait object that can sink `ChanCell`s.
48pub(super) type BoxedChannelSink =
49    Box<dyn Sink<AnyChanCell, Error = CodecError> + Send + Unpin + 'static>;
50/// A boxed trait object that can provide additional `StreamOps` on a `BoxedChannelStream`.
51pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
52/// The type of a oneshot channel used to inform reactor users of the result of an operation.
53pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
54
55/// Convert `err` to an Error, under the assumption that it's happening on an
56/// open channel.
57fn codec_err_to_chan(err: CodecError) -> Error {
58    match err {
59        CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
60        CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
61        CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
62    }
63}
64
65/// A message telling the channel reactor to do something.
66#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
67#[derive(Debug)]
68#[allow(unreachable_pub)] // Only `pub` with feature `testing`; otherwise, visible in crate
69#[allow(clippy::exhaustive_enums)]
70pub enum CtrlMsg {
71    /// Shut down the reactor.
72    Shutdown,
73    /// Tell the reactor that a given circuit has gone away.
74    CloseCircuit(CircId),
75    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
76    /// and registering senders for messages received for the circuit.
77    AllocateCircuit {
78        /// Channel to send the circuit's `CreateResponse` down.
79        created_sender: oneshot::Sender<CreateResponse>,
80        /// Channel to send other messages from this circuit down.
81        sender: CircuitRxSender,
82        /// Oneshot channel to send the new circuit's identifiers down.
83        tx: ReactorResultChannel<(CircId, crate::tunnel::circuit::UniqId)>,
84    },
85    /// Enable/disable/reconfigure channel padding
86    ///
87    /// The sender of these messages is responsible for the optimisation of
88    /// ensuring that "no-change" messages are elided.
89    /// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
90    ///
91    /// These updates are done via a control message to avoid adding additional branches to the
92    /// main reactor `select!`.
93    ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
94    /// Enable/disable/reconfigure KIST.
95    ///
96    /// Like in the case of `ConfigUpdate`,
97    /// the sender of these messages is responsible for the optimisation of
98    /// ensuring that "no-change" messages are elided.
99    KistConfigUpdate(KistParams),
100}
101
102/// Object to handle incoming cells and background tasks on a channel.
103///
104/// This type is returned when you finish a channel; you need to spawn a
105/// new task that calls `run()` on it.
106#[must_use = "If you don't call run() on a reactor, the channel won't work."]
107pub struct Reactor<S: SleepProvider> {
108    /// A receiver for control messages from `Channel` objects.
109    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
110    /// A oneshot sender that is used to alert other tasks when this reactor is
111    /// finally dropped.
112    pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
113    /// A receiver for cells to be sent on this reactor's sink.
114    ///
115    /// `Channel` objects have a sender that can send cells here.
116    pub(super) cells: mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
117    /// A Stream from which we can read `ChanCell`s.
118    ///
119    /// This should be backed by a TLS connection if you want it to be secure.
120    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
121    /// A Sink to which we can write `ChanCell`s.
122    ///
123    /// This should also be backed by a TLS connection if you want it to be secure.
124    pub(super) output: BoxedChannelSink,
125    /// A handler for setting stream options on the underlying stream.
126    pub(super) streamops: BoxedChannelStreamOps,
127    /// Timer tracking when to generate channel padding
128    pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
129    /// Outgoing cells introduced at the channel reactor
130    pub(super) special_outgoing: SpecialOutgoing,
131    /// A map from circuit ID to Sinks on which we can deliver cells.
132    pub(super) circs: CircMap,
133    /// A unique identifier for this channel.
134    pub(super) unique_id: super::UniqId,
135    /// Information shared with the frontend
136    pub(super) details: Arc<ChannelDetails>,
137    /// Context for allocating unique circuit log identifiers.
138    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
139    /// What link protocol is the channel using?
140    #[allow(dead_code)] // We don't support protocols where this would matter
141    pub(super) link_protocol: u16,
142}
143
144/// Outgoing cells introduced at the channel reactor
145#[derive(Default, Debug, Clone)]
146pub(super) struct SpecialOutgoing {
147    /// If we must send a `PaddingNegotiate`
148    pub(super) padding_negotiate: Option<PaddingNegotiate>,
149}
150
151impl SpecialOutgoing {
152    /// Do we have a special cell to send?
153    ///
154    /// Called by the reactor before looking for cells from the reactor's clients.
155    /// The returned message *must* be sent by the caller, not dropped!
156    #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
157    pub(super) fn next(&mut self) -> Option<AnyChanCell> {
158        // If this gets more cases, consider making SpecialOutgoing into a #[repr(C)]
159        // enum, so that we can fast-path the usual case of "no special message to send".
160        if let Some(p) = self.padding_negotiate.take() {
161            return Some(p.into());
162        }
163        None
164    }
165}
166
167/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
168///
169/// There is no risk of confusion because no-one would try to print a
170/// Reactor for some other reason.
171impl<S: SleepProvider> fmt::Display for Reactor<S> {
172    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173        fmt::Debug::fmt(&self.unique_id, f)
174    }
175}
176
177impl<S: SleepProvider> Reactor<S> {
178    /// Launch the reactor, and run until the channel closes or we
179    /// encounter an error.
180    ///
181    /// Once this function returns, the channel is dead, and can't be
182    /// used again.
183    pub async fn run(mut self) -> Result<()> {
184        trace!("{}: Running reactor", &self);
185        let result: Result<()> = loop {
186            match self.run_once().await {
187                Ok(()) => (),
188                Err(ReactorError::Shutdown) => break Ok(()),
189                Err(ReactorError::Err(e)) => break Err(e),
190            }
191        };
192        debug!("{}: Reactor stopped: {:?}", &self, result);
193        // Inform any waiters that the channel has closed.
194        let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
195        self.reactor_closed_tx.send(close_msg);
196        result
197    }
198
199    /// Helper for run(): handles only one action.
200    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
201        select! {
202
203            // See if the output sink can have cells written to it yet.
204            // If so, see if we have to-be-transmitted cells.
205            ret = self.output.prepare_send_from(async {
206                // This runs if we will be able to write, so try to obtain a cell:
207
208                if let Some(l) = self.special_outgoing.next() {
209                    // See reasoning below.
210                    // eprintln!("PADDING - SENDING NEOGIATION: {:?}", &l);
211                    self.padding_timer.as_mut().note_cell_sent();
212                    return Some(l)
213                }
214
215                select_biased! {
216                    n = self.cells.next() => {
217                        // Note transmission on *input* to the reactor, not ultimate
218                        // transmission.  Ideally we would tap into the TCP stream at the far
219                        // end of our TLS or perhaps during encoding on entry to the TLS, but
220                        // both of those would involve quite some plumbing.  Doing it here in
221                        // the reactor avoids additional inter-task communication, mutexes,
222                        // etc.  (And there is no real difference between doing it here on
223                        // input, to just below, on enquieing into the `sendable`.)
224                        //
225                        // Padding is sent when the output channel is idle, and the effect of
226                        // buffering is just that we might sent it a little early because we
227                        // measure idleness when we last put something into the output layers.
228                        //
229                        // We can revisit this if measurement shows it to be bad in practice.
230                        //
231                        // (We in any case need padding that we generate when idle to make it
232                        // through to the output promptly, or it will be late and ineffective.)
233                        self.padding_timer.as_mut().note_cell_sent();
234                        n
235                    },
236                    p = self.padding_timer.as_mut().next() => {
237                        // eprintln!("PADDING - SENDING PADDING: {:?}", &p);
238                        Some(p.into())
239                    },
240                }
241            }) => {
242                let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
243                let msg = msg.ok_or(ReactorError::Shutdown)?;
244                sendable.send(msg).map_err(codec_err_to_chan)?;
245            }
246
247            ret = self.control.next() => {
248                let ctrl = match ret {
249                    None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
250                    Some(x) => x,
251                };
252                self.handle_control(ctrl).await?;
253            }
254
255            ret = self.input.next() => {
256                let item = ret
257                    .ok_or(ReactorError::Shutdown)?
258                    .map_err(codec_err_to_chan)?;
259                crate::note_incoming_traffic();
260                self.handle_cell(item).await?;
261            }
262
263        }
264        Ok(()) // Run again.
265    }
266
267    /// Handle a CtrlMsg other than Shutdown.
268    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
269        trace!("{}: reactor received {:?}", &self, msg);
270        match msg {
271            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
272            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
273            CtrlMsg::AllocateCircuit {
274                created_sender,
275                sender,
276                tx,
277            } => {
278                let mut rng = rand::thread_rng();
279                let my_unique_id = self.unique_id;
280                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
281                let ret: Result<_> = self
282                    .circs
283                    .add_ent(&mut rng, created_sender, sender)
284                    .map(|id| (id, circ_unique_id));
285                let _ = tx.send(ret); // don't care about other side going away
286                self.update_disused_since();
287            }
288            CtrlMsg::ConfigUpdate(updates) => {
289                if self.link_protocol == 4 {
290                    // Link protocol 4 does not permit sending, or negotiating, link padding.
291                    // We test for == 4 so that future updates to handshake.rs LINK_PROTOCOLS
292                    // keep doing padding things.
293                    return Ok(());
294                }
295
296                let ChannelPaddingInstructionsUpdates {
297                    // List all the fields explicitly; that way the compiler will warn us
298                    // if one is added and we fail to handle it here.
299                    padding_enable,
300                    padding_parameters,
301                    padding_negotiate,
302                } = &*updates;
303                if let Some(parameters) = padding_parameters {
304                    self.padding_timer.as_mut().reconfigure(parameters);
305                }
306                if let Some(enable) = padding_enable {
307                    if *enable {
308                        self.padding_timer.as_mut().enable();
309                    } else {
310                        self.padding_timer.as_mut().disable();
311                    }
312                }
313                if let Some(padding_negotiate) = padding_negotiate {
314                    // This replaces any previous PADDING_NEGOTIATE cell that we were
315                    // told to send, but which we didn't manage to send yet.
316                    // It doesn't make sense to queue them up.
317                    self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
318                }
319            }
320            CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
321        }
322        Ok(())
323    }
324
325    /// Helper: process a cell on a channel.  Most cell types get ignored
326    /// or rejected; a few get delivered to circuits.
327    async fn handle_cell(&mut self, cell: OpenChanCellS2C) -> Result<()> {
328        let (circid, msg) = cell.into_circid_and_msg();
329        use OpenChanMsgS2C::*;
330
331        match msg {
332            Relay(_) | Padding(_) | Vpadding(_) => {} // too frequent to log.
333            _ => trace!(
334                "{}: received {} for {}",
335                &self,
336                msg.cmd(),
337                CircId::get_or_zero(circid)
338            ),
339        }
340
341        match msg {
342            // These are allowed, and need to be handled.
343            Relay(_) => self.deliver_relay(circid, msg.into()).await,
344
345            Destroy(_) => self.deliver_destroy(circid, msg.into()).await,
346
347            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg.into()).await,
348
349            // These are always ignored.
350            Padding(_) | Vpadding(_) => Ok(()),
351        }
352    }
353
354    /// Give the RELAY cell `msg` to the appropriate circuit.
355    async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
356        let Some(circid) = circid else {
357            return Err(Error::ChanProto("Relay cell without circuit ID".into()));
358        };
359
360        let mut ent = self
361            .circs
362            .get_mut(circid)
363            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
364
365        match &mut *ent {
366            CircEnt::Open(s) => {
367                // There's an open circuit; we can give it the RELAY cell.
368                if s.send(msg.try_into()?).await.is_err() {
369                    drop(ent);
370                    // The circuit's receiver went away, so we should destroy the circuit.
371                    self.outbound_destroy_circ(circid).await?;
372                }
373                Ok(())
374            }
375            CircEnt::Opening(_, _) => Err(Error::ChanProto(
376                "Relay cell on pending circuit before CREATED* received".into(),
377            )),
378            CircEnt::DestroySent(hs) => hs.receive_cell(),
379        }
380    }
381
382    /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate
383    /// circuit, if that circuit is waiting for one.
384    async fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
385        let Some(circid) = circid else {
386            return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
387        };
388
389        let target = self.circs.advance_from_opening(circid)?;
390        let created = msg.try_into()?;
391        // TODO(nickm) I think that this one actually means the other side
392        // is closed. See arti#269.
393        target.send(created).map_err(|_| {
394            Error::from(internal!(
395                "Circuit queue rejected created message. Is it closing?"
396            ))
397        })
398    }
399
400    /// Handle a DESTROY cell by removing the corresponding circuit
401    /// from the map, and passing the destroy cell onward to the circuit.
402    async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
403        let Some(circid) = circid else {
404            return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
405        };
406
407        // Remove the circuit from the map: nothing more can be done with it.
408        let entry = self.circs.remove(circid);
409        self.update_disused_since();
410        match entry {
411            // If the circuit is waiting for CREATED, tell it that it
412            // won't get one.
413            Some(CircEnt::Opening(oneshot, _)) => {
414                trace!("{}: Passing destroy to pending circuit {}", &self, circid);
415                oneshot
416                    .send(msg.try_into()?)
417                    // TODO(nickm) I think that this one actually means the other side
418                    // is closed. See arti#269.
419                    .map_err(|_| {
420                        internal!("pending circuit wasn't interested in destroy cell?").into()
421                    })
422            }
423            // It's an open circuit: tell it that it got a DESTROY cell.
424            Some(CircEnt::Open(mut sink)) => {
425                trace!("{}: Passing destroy to open circuit {}", &self, circid);
426                sink.send(msg.try_into()?)
427                    .await
428                    // TODO(nickm) I think that this one actually means the other side
429                    // is closed. See arti#269.
430                    .map_err(|_| {
431                        internal!("open circuit wasn't interested in destroy cell?").into()
432                    })
433            }
434            // We've sent a destroy; we can leave this circuit removed.
435            Some(CircEnt::DestroySent(_)) => Ok(()),
436            // Got a DESTROY cell for a circuit we don't have.
437            None => {
438                trace!("{}: Destroy for nonexistent circuit {}", &self, circid);
439                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
440            }
441        }
442    }
443
444    /// Helper: send a cell on the outbound sink.
445    async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
446        self.output.send(cell).await.map_err(codec_err_to_chan)?;
447        Ok(())
448    }
449
450    /// Called when a circuit goes away: sends a DESTROY cell and removes
451    /// the circuit.
452    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
453        trace!("{}: Circuit {} is gone; sending DESTROY", &self, id);
454        // Remove the circuit's entry from the map: nothing more
455        // can be done with it.
456        // TODO: It would be great to have a tighter upper bound for
457        // the number of relay cells we'll receive.
458        self.circs.destroy_sent(id, HalfCirc::new(3000));
459        self.update_disused_since();
460        let destroy = Destroy::new(DestroyReason::NONE).into();
461        let cell = AnyChanCell::new(Some(id), destroy);
462        self.send_cell(cell).await?;
463
464        Ok(())
465    }
466
467    /// Update disused timestamp with current time if this channel is no longer used
468    fn update_disused_since(&self) {
469        if self.circs.open_ent_count() == 0 {
470            // Update disused_since if it still indicates that the channel is in use
471            self.details.unused_since.update_if_none();
472        } else {
473            // Mark this channel as in use
474            self.details.unused_since.clear();
475        }
476    }
477
478    /// Use the new KIST parameters.
479    #[cfg(target_os = "linux")]
480    fn apply_kist_params(&self, params: &KistParams) {
481        use super::kist::KistMode;
482
483        let set_tcp_notsent_lowat = |v: u32| {
484            if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
485                // This is bad, but not fatal: not setting the KIST options
486                // comes with a performance penalty, but we don't have to crash.
487                error_report!(e, "Failed to set KIST socket options");
488            }
489        };
490
491        match params.kist_enabled() {
492            KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
493            KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
494        }
495    }
496
497    #[cfg(not(target_os = "linux"))]
498    fn apply_kist_params(&self, params: &KistParams) {
499        use super::kist::KistMode;
500
501        if params.kist_enabled() != KistMode::Disabled {
502            tracing::warn!("KIST not currently supported on non-linux platforms");
503        }
504    }
505}
506
507#[cfg(test)]
508pub(crate) mod test {
509    #![allow(clippy::unwrap_used)]
510    use super::*;
511    use crate::channel::{ClosedUnexpectedly, UniqId};
512    use crate::fake_mpsc;
513    use crate::tunnel::circuit::CircParameters;
514    use crate::util::fake_mq;
515    use futures::sink::SinkExt;
516    use futures::stream::StreamExt;
517    use futures::task::SpawnExt;
518    use tor_cell::chancell::msg;
519    use tor_linkspec::OwnedChanTarget;
520    use tor_rtcompat::{NoOpStreamOpsHandle, Runtime};
521
522    type CodecResult = std::result::Result<OpenChanCellS2C, CodecError>;
523
524    pub(crate) fn new_reactor<R: Runtime>(
525        runtime: R,
526    ) -> (
527        Arc<crate::channel::Channel>,
528        Reactor<R>,
529        mpsc::Receiver<AnyChanCell>,
530        mpsc::Sender<CodecResult>,
531    ) {
532        let link_protocol = 4;
533        let (send1, recv1) = mpsc::channel(32);
534        let (send2, recv2) = mpsc::channel(32);
535        let unique_id = UniqId::new();
536        let dummy_target = OwnedChanTarget::builder()
537            .ed_identity([6; 32].into())
538            .rsa_identity([10; 20].into())
539            .build()
540            .unwrap();
541        let send1 = send1.sink_map_err(|e| {
542            trace!("got sink error: {:?}", e);
543            CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
544        });
545        let stream_ops = NoOpStreamOpsHandle::default();
546        let (chan, reactor) = crate::channel::Channel::new(
547            link_protocol,
548            Box::new(send1),
549            Box::new(recv2),
550            Box::new(stream_ops),
551            unique_id,
552            dummy_target,
553            crate::ClockSkew::None,
554            runtime,
555            fake_mq(),
556        )
557        .expect("channel create failed");
558        (chan, reactor, recv1, send2)
559    }
560
561    // Try shutdown from inside run_once..
562    #[test]
563    fn shutdown() {
564        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
565            let (chan, mut reactor, _output, _input) = new_reactor(rt);
566
567            chan.terminate();
568            let r = reactor.run_once().await;
569            assert!(matches!(r, Err(ReactorError::Shutdown)));
570        });
571    }
572
573    // Try shutdown while reactor is running.
574    #[test]
575    fn shutdown2() {
576        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
577            // TODO: Ask a rust person if this is how to do this.
578
579            use futures::future::FutureExt;
580            use futures::join;
581
582            let (chan, reactor, _output, _input) = new_reactor(rt);
583            // Let's get the reactor running...
584            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
585
586            let rr = run_reactor.clone();
587
588            let exit_then_check = async {
589                assert!(rr.peek().is_none());
590                // ... and terminate the channel while that's happening.
591                chan.terminate();
592            };
593
594            let (rr_s, _) = join!(run_reactor, exit_then_check);
595
596            // Now let's see. The reactor should not _still_ be running.
597            assert!(rr_s);
598        });
599    }
600
601    #[test]
602    fn new_circ_closed() {
603        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
604            let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
605            assert!(chan.duration_unused().is_some()); // unused yet
606
607            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
608            let (pending, circr) = ret.unwrap();
609            rt.spawn(async {
610                let _ignore = circr.run().await;
611            })
612            .unwrap();
613            assert!(reac.is_ok());
614
615            let id = pending.peek_circid();
616
617            let ent = reactor.circs.get_mut(id);
618            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
619            assert!(chan.duration_unused().is_none()); // in use
620
621            // Now drop the circuit; this should tell the reactor to remove
622            // the circuit from the map.
623            drop(pending);
624
625            reactor.run_once().await.unwrap();
626            let ent = reactor.circs.get_mut(id);
627            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
628            let cell = output.next().await.unwrap();
629            assert_eq!(cell.circid(), Some(id));
630            assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
631            assert!(chan.duration_unused().is_some()); // unused again
632        });
633    }
634
635    // Test proper delivery of a created cell that doesn't make a channel
636    #[test]
637    #[ignore] // See bug #244: re-enable this test once it passes reliably.
638    fn new_circ_create_failure() {
639        use std::time::Duration;
640        use tor_rtcompat::SleepProvider;
641
642        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
643            let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
644
645            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
646            let (pending, circr) = ret.unwrap();
647            rt.spawn(async {
648                let _ignore = circr.run().await;
649            })
650            .unwrap();
651            assert!(reac.is_ok());
652
653            let circparams = CircParameters::default();
654
655            let id = pending.peek_circid();
656
657            let ent = reactor.circs.get_mut(id);
658            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
659
660            #[allow(clippy::clone_on_copy)]
661            let rtc = rt.clone();
662            let send_response = async {
663                rtc.sleep(Duration::from_millis(100)).await;
664                trace!("sending createdfast");
665                // We'll get a bad handshake result from this createdfast cell.
666                let created_cell =
667                    OpenChanCellS2C::new(Some(id), msg::CreatedFast::new(*b"x").into());
668                input.send(Ok(created_cell)).await.unwrap();
669                reactor.run_once().await.unwrap();
670            };
671
672            let (circ, _) =
673                futures::join!(pending.create_firsthop_fast(&circparams), send_response);
674            // Make sure statuses are as expected.
675            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
676
677            reactor.run_once().await.unwrap();
678
679            // Make sure that the createfast cell got sent
680            let cell_sent = output.next().await.unwrap();
681            assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
682
683            // But the next run if the reactor will make the circuit get closed.
684            let ent = reactor.circs.get_mut(id);
685            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
686        });
687    }
688
689    // Try incoming cells that shouldn't arrive on channels.
690    #[test]
691    fn bad_cells() {
692        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
693            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
694
695            // shouldn't get created2 cells for nonexistent circuits
696            let created2_cell = msg::Created2::new(*b"hihi").into();
697            input
698                .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
699                .await
700                .unwrap();
701
702            let e = reactor.run_once().await.unwrap_err().unwrap_err();
703            assert_eq!(
704                format!("{}", e),
705                "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
706            );
707
708            // Can't get a relay cell on a circuit we've never heard of.
709            let relay_cell = msg::Relay::new(b"abc").into();
710            input
711                .send(Ok(OpenChanCellS2C::new(CircId::new(4), relay_cell)))
712                .await
713                .unwrap();
714            let e = reactor.run_once().await.unwrap_err().unwrap_err();
715            assert_eq!(
716                format!("{}", e),
717                "Channel protocol violation: Relay cell on nonexistent circuit"
718            );
719
720            // There used to be tests here for other types, but now that we only
721            // accept OpenClientChanCell, we know that the codec can't even try
722            // to give us e.g. VERSIONS or CREATE.
723        });
724    }
725
726    #[test]
727    fn deliver_relay() {
728        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
729            use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
730            use oneshot_fused_workaround as oneshot;
731
732            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
733
734            let (_circ_stream_7, mut circ_stream_13) = {
735                let (snd1, _rcv1) = oneshot::channel();
736                let (snd2, rcv2) = fake_mpsc(64);
737                reactor
738                    .circs
739                    .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
740
741                let (snd3, rcv3) = fake_mpsc(64);
742                reactor
743                    .circs
744                    .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
745
746                reactor.circs.put_unchecked(
747                    CircId::new(23).unwrap(),
748                    CircEnt::DestroySent(HalfCirc::new(25)),
749                );
750                (rcv2, rcv3)
751            };
752
753            // If a relay cell is sent on an open channel, the correct circuit
754            // should get it.
755            let relaycell: OpenChanMsgS2C = msg::Relay::new(b"do you suppose").into();
756            input
757                .send(Ok(OpenChanCellS2C::new(CircId::new(13), relaycell.clone())))
758                .await
759                .unwrap();
760            reactor.run_once().await.unwrap();
761            let got = circ_stream_13.next().await.unwrap();
762            assert!(matches!(got, ClientCircChanMsg::Relay(_)));
763
764            // If a relay cell is sent on an opening channel, that's an error.
765            input
766                .send(Ok(OpenChanCellS2C::new(CircId::new(7), relaycell.clone())))
767                .await
768                .unwrap();
769            let e = reactor.run_once().await.unwrap_err().unwrap_err();
770            assert_eq!(
771                format!("{}", e),
772                "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
773            );
774
775            // If a relay cell is sent on a non-existent channel, that's an error.
776            input
777                .send(Ok(OpenChanCellS2C::new(
778                    CircId::new(101),
779                    relaycell.clone(),
780                )))
781                .await
782                .unwrap();
783            let e = reactor.run_once().await.unwrap_err().unwrap_err();
784            assert_eq!(
785                format!("{}", e),
786                "Channel protocol violation: Relay cell on nonexistent circuit"
787            );
788
789            // It's fine to get a relay cell on a DestroySent channel: that happens
790            // when the other side hasn't noticed the Destroy yet.
791
792            // We can do this 25 more times according to our setup:
793            for _ in 0..25 {
794                input
795                    .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
796                    .await
797                    .unwrap();
798                reactor.run_once().await.unwrap(); // should be fine.
799            }
800
801            // This one will fail.
802            input
803                .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
804                .await
805                .unwrap();
806            let e = reactor.run_once().await.unwrap_err().unwrap_err();
807            assert_eq!(
808                format!("{}", e),
809                "Channel protocol violation: Too many cells received on destroyed circuit"
810            );
811        });
812    }
813
814    #[test]
815    fn deliver_destroy() {
816        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
817            use crate::tunnel::circuit::celltypes::*;
818            use oneshot_fused_workaround as oneshot;
819
820            let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
821
822            let (circ_oneshot_7, mut circ_stream_13) = {
823                let (snd1, rcv1) = oneshot::channel();
824                let (snd2, _rcv2) = fake_mpsc(64);
825                reactor
826                    .circs
827                    .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
828
829                let (snd3, rcv3) = fake_mpsc(64);
830                reactor
831                    .circs
832                    .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
833
834                reactor.circs.put_unchecked(
835                    CircId::new(23).unwrap(),
836                    CircEnt::DestroySent(HalfCirc::new(25)),
837                );
838                (rcv1, rcv3)
839            };
840
841            // Destroying an opening circuit is fine.
842            let destroycell: OpenChanMsgS2C = msg::Destroy::new(0.into()).into();
843            input
844                .send(Ok(OpenChanCellS2C::new(
845                    CircId::new(7),
846                    destroycell.clone(),
847                )))
848                .await
849                .unwrap();
850            reactor.run_once().await.unwrap();
851            let msg = circ_oneshot_7.await;
852            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
853
854            // Destroying an open circuit is fine.
855            input
856                .send(Ok(OpenChanCellS2C::new(
857                    CircId::new(13),
858                    destroycell.clone(),
859                )))
860                .await
861                .unwrap();
862            reactor.run_once().await.unwrap();
863            let msg = circ_stream_13.next().await.unwrap();
864            assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
865
866            // Destroying a DestroySent circuit is fine.
867            input
868                .send(Ok(OpenChanCellS2C::new(
869                    CircId::new(23),
870                    destroycell.clone(),
871                )))
872                .await
873                .unwrap();
874            reactor.run_once().await.unwrap();
875
876            // Destroying a nonexistent circuit is an error.
877            input
878                .send(Ok(OpenChanCellS2C::new(
879                    CircId::new(101),
880                    destroycell.clone(),
881                )))
882                .await
883                .unwrap();
884            let e = reactor.run_once().await.unwrap_err().unwrap_err();
885            assert_eq!(
886                format!("{}", e),
887                "Channel protocol violation: Destroy for nonexistent circuit"
888            );
889        });
890    }
891
892    #[test]
893    fn closing_if_reactor_dropped() {
894        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
895            let (chan, reactor, _output, _input) = new_reactor(rt);
896
897            assert!(!chan.is_closing());
898            drop(reactor);
899            assert!(chan.is_closing());
900
901            assert!(matches!(
902                chan.wait_for_close().await,
903                Err(ClosedUnexpectedly::ReactorDropped),
904            ));
905        });
906    }
907
908    #[test]
909    fn closing_if_reactor_shutdown() {
910        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
911            let (chan, reactor, _output, _input) = new_reactor(rt);
912
913            assert!(!chan.is_closing());
914            chan.terminate();
915            assert!(!chan.is_closing());
916
917            let r = reactor.run().await;
918            assert!(r.is_ok());
919            assert!(chan.is_closing());
920
921            assert!(chan.wait_for_close().await.is_ok());
922        });
923    }
924
925    #[test]
926    fn reactor_error_wait_for_close() {
927        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
928            let (chan, reactor, _output, mut input) = new_reactor(rt);
929
930            // force an error by sending created2 cell for nonexistent circuit
931            let created2_cell = msg::Created2::new(*b"hihi").into();
932            input
933                .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
934                .await
935                .unwrap();
936
937            // `reactor.run()` should return an error
938            let run_error = reactor.run().await.unwrap_err();
939
940            // `chan.wait_for_close()` should return the same error
941            let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
942            else {
943                panic!("Expected a 'ReactorError'");
944            };
945
946            // `Error` doesn't implement `PartialEq`, so best we can do is to compare the strings
947            assert_eq!(run_error.to_string(), wait_error.to_string());
948        });
949    }
950}