1use super::circmap::{CircEnt, CircMap};
10use crate::circuit::CircuitRxSender;
11use crate::client::circuit::halfcirc::HalfCirc;
12use crate::client::circuit::padding::{
13 PaddingController, PaddingEvent, PaddingEventStream, SendPadding, StartBlocking,
14};
15use crate::util::err::ReactorError;
16use crate::util::oneshot_broadcast;
17use crate::{Error, HopNum, Result};
18use tor_async_utils::SinkPrepareExt as _;
19use tor_cell::chancell::ChanMsg;
20use tor_cell::chancell::msg::{Destroy, DestroyReason, Padding, PaddingNegotiate};
21use tor_cell::chancell::{AnyChanCell, CircId, msg::AnyChanMsg};
22use tor_error::debug_report;
23use tor_rtcompat::{CoarseTimeProvider, DynTimeProvider, SleepProvider};
24
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_error::error_report;
27#[cfg_attr(not(target_os = "linux"), allow(unused))]
28use tor_rtcompat::StreamOps;
29
30use futures::channel::mpsc;
31use oneshot_fused_workaround as oneshot;
32
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::sink::SinkExt;
36use futures::stream::Stream;
37use futures::{select, select_biased};
38use tor_error::internal;
39
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43
44use crate::channel::{ChannelDetails, CloseInfo, kist::KistParams, padding, params::*, unique_id};
45use crate::circuit::celltypes::CreateResponse;
46use tracing::{debug, instrument, trace};
47
48pub(super) type BoxedChannelStream =
50 Box<dyn Stream<Item = std::result::Result<AnyChanCell, Error>> + Send + Unpin + 'static>;
51pub(super) type BoxedChannelSink =
53 Box<dyn Sink<AnyChanCell, Error = Error> + Send + Unpin + 'static>;
54pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
56pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
58
59cfg_if::cfg_if! {
60 if #[cfg(feature = "circ-padding")] {
61 use crate::util::sink_blocker::{SinkBlocker, CountingPolicy};
62 pub(super) type ChannelOutputSink = SinkBlocker<BoxedChannelSink, CountingPolicy>;
64 } else {
65 pub(super) type ChannelOutputSink = BoxedChannelSink;
67 }
68}
69
70#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
72#[derive(Debug)]
73#[allow(unreachable_pub)] #[allow(clippy::exhaustive_enums, private_interfaces)]
75pub enum CtrlMsg {
76 Shutdown,
78 CloseCircuit(CircId),
80 AllocateCircuit {
83 created_sender: oneshot::Sender<CreateResponse>,
85 sender: CircuitRxSender,
87 tx: ReactorResultChannel<(
89 CircId,
90 crate::circuit::UniqId,
91 PaddingController,
92 PaddingEventStream,
93 )>,
94 },
95 ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
104 KistConfigUpdate(KistParams),
110 #[cfg(feature = "circ-padding-manual")]
112 SetChannelPadder {
113 padder: Option<crate::client::CircuitPadder>,
115 sender: oneshot::Sender<Result<()>>,
117 },
118}
119
120#[must_use = "If you don't call run() on a reactor, the channel won't work."]
125pub struct Reactor<S: SleepProvider + CoarseTimeProvider> {
126 pub(super) runtime: S,
128 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
130 pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
133 pub(super) cells: super::CellRx,
137 pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
141 pub(super) output: ChannelOutputSink,
145 #[cfg_attr(not(target_os = "linux"), allow(unused))]
147 pub(super) streamops: BoxedChannelStreamOps,
148 pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
155 pub(super) special_outgoing: SpecialOutgoing,
157 pub(super) circs: CircMap,
159 pub(super) unique_id: super::UniqId,
161 pub(super) details: Arc<ChannelDetails>,
163 pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
165 pub(super) padding_ctrl: PaddingController<DynTimeProvider>,
173 pub(super) padding_event_stream: PaddingEventStream<DynTimeProvider>,
177 pub(super) padding_blocker: Option<StartBlocking>,
179 #[allow(dead_code)] pub(super) link_protocol: u16,
182}
183
184#[derive(Default, Debug, Clone)]
186pub(super) struct SpecialOutgoing {
187 padding_negotiate: Option<PaddingNegotiate>,
189 n_padding: u16,
191}
192
193impl SpecialOutgoing {
194 #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
199 fn next(&mut self) -> Option<AnyChanCell> {
200 if let Some(p) = self.padding_negotiate.take() {
203 return Some(p.into());
204 }
205 if self.n_padding > 0 {
206 self.n_padding -= 1;
207 return Some(Padding::new().into());
208 }
209 None
210 }
211
212 fn queue_padding_cell(&mut self) {
214 self.n_padding = self.n_padding.saturating_add(1);
215 }
216}
217
218impl<S: SleepProvider + CoarseTimeProvider> fmt::Display for Reactor<S> {
223 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
224 fmt::Debug::fmt(&self.unique_id, f)
225 }
226}
227
228impl<S: SleepProvider + CoarseTimeProvider> Reactor<S> {
229 #[instrument(level = "trace", skip_all)]
235 pub async fn run(mut self) -> Result<()> {
236 trace!(channel_id = %self, "Running reactor");
237 let result: Result<()> = loop {
238 match self.run_once().await {
239 Ok(()) => (),
240 Err(ReactorError::Shutdown) => break Ok(()),
241 Err(ReactorError::Err(e)) => break Err(e),
242 }
243 };
244
245 const MSG: &str = "Reactor stopped";
248 match &result {
249 Ok(()) => debug!(channel_id = %self, "{MSG}"),
250 Err(e) => debug_report!(e, channel_id = %self, "{MSG}"),
251 }
252
253 let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
255 self.reactor_closed_tx.send(close_msg);
256 result
257 }
258
259 #[instrument(level = "trace", skip_all)]
261 async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
262 select! {
263
264 ret = self.output.prepare_send_from(async {
267 if let Some(l) = self.special_outgoing.next() {
270 self.padding_timer.as_mut().note_cell_sent();
273 return Some((l, None));
274 }
275
276 select_biased! {
277 n = self.cells.next() => {
278 self.padding_timer.as_mut().note_cell_sent();
295 n
296 },
297 p = self.padding_timer.as_mut().next() => {
298 self.padding_ctrl.queued_data(HopNum::from(0));
303
304 self.padding_timer.as_mut().note_cell_sent();
305 Some((p.into(), None))
306 },
307 }
308 }) => {
309 self.padding_ctrl.flushed_channel_cell();
310 let (queued, sendable) = ret?;
311 let (msg, cell_padding_info) = queued.ok_or(ReactorError::Shutdown)?;
312 if let (Some(cell_padding_info), Some(circid)) = (cell_padding_info, msg.circid()) {
319 self.circs.note_cell_flushed(circid, cell_padding_info);
320 }
321 sendable.send(msg)?;
322 }
323
324 ret = self.control.next() => {
325 let ctrl = match ret {
326 None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
327 Some(x) => x,
328 };
329 self.handle_control(ctrl).await?;
330 }
331
332 ret = self.padding_event_stream.next() => {
333 let event = ret.ok_or_else(|| Error::from(internal!("Padding event stream was exhausted")))?;
334 self.handle_padding_event(event).await?;
335 }
336
337 ret = self.input.next() => {
338 let item = ret
339 .ok_or(ReactorError::Shutdown)??;
340 crate::note_incoming_traffic();
341 self.handle_cell(item).await?;
342 }
343
344 }
345 Ok(()) }
347
348 #[instrument(level = "trace", skip(self))] async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
351 trace!(
352 channel_id = %self,
353 msg = ?msg,
354 "reactor received control message"
355 );
356
357 match msg {
358 CtrlMsg::Shutdown => panic!(), CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
360 CtrlMsg::AllocateCircuit {
361 created_sender,
362 sender,
363 tx,
364 } => {
365 let mut rng = rand::rng();
366 let my_unique_id = self.unique_id;
367 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
368 let (padding_ctrl, padding_stream) = crate::client::circuit::padding::new_padding(
377 DynTimeProvider::new(self.runtime.clone()),
379 );
380 let ret: Result<_> = self
381 .circs
382 .add_ent(&mut rng, created_sender, sender, padding_ctrl.clone())
383 .map(|id| (id, circ_unique_id, padding_ctrl, padding_stream));
384 let _ = tx.send(ret); self.update_disused_since();
386 }
387 CtrlMsg::ConfigUpdate(updates) => {
388 if self.link_protocol == 4 {
389 return Ok(());
393 }
394
395 let ChannelPaddingInstructionsUpdates {
396 padding_enable,
399 padding_parameters,
400 padding_negotiate,
401 } = &*updates;
402 if let Some(parameters) = padding_parameters {
403 self.padding_timer.as_mut().reconfigure(parameters)?;
404 }
405 if let Some(enable) = padding_enable {
406 if *enable {
407 self.padding_timer.as_mut().enable();
408 } else {
409 self.padding_timer.as_mut().disable();
410 }
411 }
412 if let Some(padding_negotiate) = padding_negotiate {
413 self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
417 }
418 }
419 CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
420 #[cfg(feature = "circ-padding-manual")]
421 CtrlMsg::SetChannelPadder { padder, sender } => {
422 self.padding_ctrl
423 .install_padder_padding_at_hop(HopNum::from(0), padder);
424 let _ignore = sender.send(Ok(()));
425 }
426 }
427 Ok(())
428 }
429
430 #[cfg(not(feature = "circ-padding"))]
434 #[allow(clippy::unused_async)] async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
436 void::unreachable(action.0)
437 }
438
439 #[cfg(feature = "circ-padding")]
441 async fn handle_padding_event(&mut self, action: PaddingEvent) -> Result<()> {
442 use PaddingEvent as PE;
443 match action {
444 PE::SendPadding(send_padding) => {
445 self.handle_send_padding(send_padding).await?;
446 }
447 PE::StartBlocking(start_blocking) => {
448 if self.output.is_unlimited() {
449 self.output.set_blocked();
450 }
451 self.padding_blocker = Some(start_blocking);
452 }
453 PE::StopBlocking => {
454 self.output.set_unlimited();
455 }
456 }
457 Ok(())
458 }
459
460 #[cfg(feature = "circ-padding")]
462 async fn handle_send_padding(&mut self, padding: SendPadding) -> Result<()> {
463 use crate::client::circuit::padding::{Bypass::*, Replace::*};
468 let hop = HopNum::from(0);
470 assert_eq!(padding.hop, hop);
471
472 let blocking_bypassed = matches!(
474 (&self.padding_blocker, padding.may_bypass_block()),
475 (
476 Some(StartBlocking {
477 is_bypassable: true
478 }),
479 BypassBlocking
480 )
481 );
482 let this_padding_blocked = self.padding_blocker.is_some() && !blocking_bypassed;
484
485 if padding.may_replace_with_data() == Replaceable {
486 if self.output_is_full().await? {
487 self.padding_ctrl
495 .replaceable_padding_already_queued(hop, padding);
496 return Ok(());
497 } else if self.cells.approx_count() > 0 {
498 if this_padding_blocked {
500 self.padding_ctrl
502 .replaceable_padding_already_queued(hop, padding);
503 } else {
504 self.padding_ctrl.queued_data_as_padding(hop, padding);
507 if blocking_bypassed {
508 self.output.allow_n_additional_items(1);
509 }
510 }
511 return Ok(());
512 } else {
513 }
515 }
516
517 self.special_outgoing.queue_padding_cell();
519 self.padding_ctrl.queued_padding(hop, padding);
520 if blocking_bypassed {
521 self.output.allow_n_additional_items(1);
522 }
523
524 Ok(())
525 }
526
527 #[cfg(feature = "circ-padding")]
534 async fn output_is_full(&mut self) -> Result<bool> {
535 use futures::future::poll_fn;
536 use std::task::Poll;
537 poll_fn(|cx| {
539 Poll::Ready(match self.output.poll_ready_unpin(cx) {
540 Poll::Ready(Ok(())) => Ok(false),
542 Poll::Pending => Ok(true),
544 Poll::Ready(Err(e)) => Err(e),
546 })
547 })
548 .await
549 }
550
551 #[instrument(level = "trace", skip_all)]
554 async fn handle_cell(&mut self, cell: AnyChanCell) -> Result<()> {
555 let (circid, msg) = cell.into_circid_and_msg();
556 use AnyChanMsg::*;
557
558 match msg {
559 Relay(_) | Padding(_) | Vpadding(_) => {} _ => trace!(
561 channel_id = %self,
562 "received {} for {}",
563 msg.cmd(),
564 CircId::get_or_zero(circid)
565 ),
566 }
567
568 match msg {
570 Padding(_) | Vpadding(_) => {
571 let _always_acceptable = self.padding_ctrl.decrypted_padding(HopNum::from(0));
573 }
574 _ => self.padding_ctrl.decrypted_data(HopNum::from(0)),
575 }
576
577 match msg {
578 Relay(_) => self.deliver_relay(circid, msg).await,
580
581 Destroy(_) => self.deliver_destroy(circid, msg).await,
582
583 CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg),
584
585 Padding(_) | Vpadding(_) => Ok(()),
587 _ => Err(Error::ChanProto(format!("Unexpected cell: {msg:?}"))),
588 }
589 }
590
591 async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
593 let Some(circid) = circid else {
594 return Err(Error::ChanProto("Relay cell without circuit ID".into()));
595 };
596
597 let mut ent = self
598 .circs
599 .get_mut(circid)
600 .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
601
602 match &mut *ent {
603 CircEnt::Open { cell_sender: s, .. } => {
604 if s.send(msg).await.is_err() {
606 drop(ent);
607 self.outbound_destroy_circ(circid).await?;
609 }
610 Ok(())
611 }
612 CircEnt::Opening { .. } => Err(Error::ChanProto(
613 "Relay cell on pending circuit before CREATED* received".into(),
614 )),
615 CircEnt::DestroySent(hs) => hs.receive_cell(),
616 }
617 }
618
619 fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
622 let Some(circid) = circid else {
623 return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
624 };
625
626 let target = self.circs.advance_from_opening(circid)?;
627 let created = msg.try_into()?;
628 target.send(created).map_err(|_| {
631 Error::from(internal!(
632 "Circuit queue rejected created message. Is it closing?"
633 ))
634 })
635 }
636
637 async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
640 let Some(circid) = circid else {
641 return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
642 };
643
644 let entry = self.circs.remove(circid);
646 self.update_disused_since();
647 match entry {
648 Some(CircEnt::Opening {
651 create_response_sender,
652 ..
653 }) => {
654 trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
655 create_response_sender
656 .send(msg.try_into()?)
657 .map_err(|_| {
660 internal!("pending circuit wasn't interested in destroy cell?").into()
661 })
662 }
663 Some(CircEnt::Open {
665 mut cell_sender, ..
666 }) => {
667 trace!(channel_id = %self, "Passing destroy to open circuit {}", circid);
668 cell_sender
669 .send(msg)
670 .await
671 .map_err(|_| {
674 internal!("open circuit wasn't interested in destroy cell?").into()
675 })
676 }
677 Some(CircEnt::DestroySent(_)) => Ok(()),
679 None => {
681 trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
682 Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
683 }
684 }
685 }
686
687 async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
689 self.output.send(cell).await?;
690 Ok(())
691 }
692
693 async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
696 trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
697 self.circs.destroy_sent(id, HalfCirc::new(3000));
702 self.update_disused_since();
703 let destroy = Destroy::new(DestroyReason::NONE).into();
704 let cell = AnyChanCell::new(Some(id), destroy);
705 self.send_cell(cell).await?;
706
707 Ok(())
708 }
709
710 fn update_disused_since(&self) {
712 if self.circs.open_ent_count() == 0 {
713 self.details.unused_since.update_if_none();
715 } else {
716 self.details.unused_since.clear();
718 }
719 }
720
721 #[cfg(target_os = "linux")]
723 fn apply_kist_params(&self, params: &KistParams) {
724 use super::kist::KistMode;
725
726 let set_tcp_notsent_lowat = |v: u32| {
727 if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
728 error_report!(e, "Failed to set KIST socket options");
731 }
732 };
733
734 match params.kist_enabled() {
735 KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
736 KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
737 }
738 }
739
740 #[cfg(not(target_os = "linux"))]
742 fn apply_kist_params(&self, params: &KistParams) {
743 use super::kist::KistMode;
744
745 if params.kist_enabled() != KistMode::Disabled {
746 tracing::warn!("KIST not currently supported on non-linux platforms");
747 }
748 }
749}
750
751#[cfg(test)]
752pub(crate) mod test {
753 #![allow(clippy::unwrap_used)]
754 use super::*;
755 use crate::channel::{Canonicity, ChannelType, ClosedUnexpectedly, UniqId};
756 use crate::client::circuit::CircParameters;
757 use crate::client::circuit::padding::new_padding;
758 use crate::fake_mpsc;
759 use crate::peer::PeerInfo;
760 use crate::util::{DummyTimeoutEstimator, fake_mq};
761 use futures::sink::SinkExt;
762 use futures::stream::StreamExt;
763 use tor_cell::chancell::msg;
764 use tor_linkspec::OwnedChanTarget;
765 use tor_rtcompat::SpawnExt;
766 use tor_rtcompat::{DynTimeProvider, NoOpStreamOpsHandle, Runtime};
767
768 pub(crate) type CodecResult = std::result::Result<AnyChanCell, Error>;
769
770 pub(crate) fn new_reactor<R: Runtime>(
771 runtime: R,
772 ) -> (
773 Arc<crate::channel::Channel>,
774 Reactor<R>,
775 mpsc::Receiver<AnyChanCell>,
776 mpsc::Sender<CodecResult>,
777 ) {
778 let link_protocol = 4;
779 let (send1, recv1) = mpsc::channel(32);
780 let (send2, recv2) = mpsc::channel(32);
781 let unique_id = UniqId::new();
782 let dummy_target = OwnedChanTarget::builder()
783 .ed_identity([6; 32].into())
784 .rsa_identity([10; 20].into())
785 .build()
786 .unwrap();
787 let send1 = send1.sink_map_err(|e| {
788 trace!("got sink error: {:?}", e);
789 Error::CellDecodeErr {
790 object: "reactor test",
791 err: tor_cell::Error::ChanProto("dummy message".into()),
792 }
793 });
794 let stream_ops = NoOpStreamOpsHandle::default();
795 let (chan, reactor) = crate::channel::Channel::new(
796 ChannelType::ClientInitiator,
797 link_protocol,
798 Box::new(send1),
799 Box::new(recv2),
800 Box::new(stream_ops),
801 unique_id,
802 dummy_target,
803 safelog::MaybeSensitive::not_sensitive(PeerInfo::EMPTY),
804 crate::ClockSkew::None,
805 runtime,
806 fake_mq(),
807 Canonicity::new_canonical(),
808 )
809 .expect("channel create failed");
810 (chan, reactor, recv1, send2)
811 }
812
813 #[test]
815 fn shutdown() {
816 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
817 let (chan, mut reactor, _output, _input) = new_reactor(rt);
818
819 chan.terminate();
820 let r = reactor.run_once().await;
821 assert!(matches!(r, Err(ReactorError::Shutdown)));
822 });
823 }
824
825 #[test]
827 fn shutdown2() {
828 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
829 use futures::future::FutureExt;
832 use futures::join;
833
834 let (chan, reactor, _output, _input) = new_reactor(rt);
835 let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
837
838 let rr = run_reactor.clone();
839
840 let exit_then_check = async {
841 assert!(rr.peek().is_none());
842 chan.terminate();
844 };
845
846 let (rr_s, _) = join!(run_reactor, exit_then_check);
847
848 assert!(rr_s);
850 });
851 }
852
853 #[test]
854 fn new_circ_closed() {
855 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
856 let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
857 assert!(chan.duration_unused().is_some()); let (ret, reac) = futures::join!(
860 chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
861 reactor.run_once()
862 );
863 let (pending, circr) = ret.unwrap();
864 rt.spawn(async {
865 let _ignore = circr.run().await;
866 })
867 .unwrap();
868 assert!(reac.is_ok());
869
870 let id = pending.peek_circid();
871
872 let ent = reactor.circs.get_mut(id);
873 assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
874 assert!(chan.duration_unused().is_none()); drop(pending);
879
880 reactor.run_once().await.unwrap();
881 let ent = reactor.circs.get_mut(id);
882 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
883 let cell = output.next().await.unwrap();
884 assert_eq!(cell.circid(), Some(id));
885 assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
886 assert!(chan.duration_unused().is_some()); });
888 }
889
890 #[test]
892 #[ignore] fn new_circ_create_failure() {
894 use std::time::Duration;
895 use tor_rtcompat::SleepProvider;
896
897 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
898 let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
899
900 let (ret, reac) = futures::join!(
901 chan.new_tunnel(Arc::new(DummyTimeoutEstimator)),
902 reactor.run_once()
903 );
904 let (pending, circr) = ret.unwrap();
905 rt.spawn(async {
906 let _ignore = circr.run().await;
907 })
908 .unwrap();
909 assert!(reac.is_ok());
910
911 let circparams = CircParameters::default();
912
913 let id = pending.peek_circid();
914
915 let ent = reactor.circs.get_mut(id);
916 assert!(matches!(*ent.unwrap(), CircEnt::Opening { .. }));
917
918 #[allow(clippy::clone_on_copy)]
919 let rtc = rt.clone();
920 let send_response = async {
921 rtc.sleep(Duration::from_millis(100)).await;
922 trace!("sending createdfast");
923 let created_cell = AnyChanCell::new(Some(id), msg::CreatedFast::new(*b"x").into());
925 input.send(Ok(created_cell)).await.unwrap();
926 reactor.run_once().await.unwrap();
927 };
928
929 let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
930 assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
932
933 reactor.run_once().await.unwrap();
934
935 let cell_sent = output.next().await.unwrap();
937 assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
938
939 let ent = reactor.circs.get_mut(id);
941 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
942 });
943 }
944
945 #[test]
947 fn bad_cells() {
948 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
949 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
950
951 let created2_cell = msg::Created2::new(*b"hihi").into();
953 input
954 .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
955 .await
956 .unwrap();
957
958 let e = reactor.run_once().await.unwrap_err().unwrap_err();
959 assert_eq!(
960 format!("{}", e),
961 "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
962 );
963
964 let relay_cell = msg::Relay::new(b"abc").into();
966 input
967 .send(Ok(AnyChanCell::new(CircId::new(4), relay_cell)))
968 .await
969 .unwrap();
970 let e = reactor.run_once().await.unwrap_err().unwrap_err();
971 assert_eq!(
972 format!("{}", e),
973 "Channel protocol violation: Relay cell on nonexistent circuit"
974 );
975
976 });
980 }
981
982 #[test]
983 fn deliver_relay() {
984 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
985 use oneshot_fused_workaround as oneshot;
986
987 let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
988
989 let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
990
991 let (_circ_stream_7, mut circ_stream_13) = {
992 let (snd1, _rcv1) = oneshot::channel();
993 let (snd2, rcv2) = fake_mpsc(64);
994 reactor.circs.put_unchecked(
995 CircId::new(7).unwrap(),
996 CircEnt::Opening {
997 create_response_sender: snd1,
998 cell_sender: snd2,
999 padding_ctrl: padding_ctrl.clone(),
1000 },
1001 );
1002
1003 let (snd3, rcv3) = fake_mpsc(64);
1004 reactor.circs.put_unchecked(
1005 CircId::new(13).unwrap(),
1006 CircEnt::Open {
1007 cell_sender: snd3,
1008 padding_ctrl,
1009 },
1010 );
1011
1012 reactor.circs.put_unchecked(
1013 CircId::new(23).unwrap(),
1014 CircEnt::DestroySent(HalfCirc::new(25)),
1015 );
1016 (rcv2, rcv3)
1017 };
1018
1019 let relaycell: AnyChanMsg = msg::Relay::new(b"do you suppose").into();
1022 input
1023 .send(Ok(AnyChanCell::new(CircId::new(13), relaycell.clone())))
1024 .await
1025 .unwrap();
1026 reactor.run_once().await.unwrap();
1027 let got = circ_stream_13.next().await.unwrap();
1028 assert!(matches!(got, AnyChanMsg::Relay(_)));
1029
1030 input
1032 .send(Ok(AnyChanCell::new(CircId::new(7), relaycell.clone())))
1033 .await
1034 .unwrap();
1035 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1036 assert_eq!(
1037 format!("{}", e),
1038 "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
1039 );
1040
1041 input
1043 .send(Ok(AnyChanCell::new(CircId::new(101), relaycell.clone())))
1044 .await
1045 .unwrap();
1046 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1047 assert_eq!(
1048 format!("{}", e),
1049 "Channel protocol violation: Relay cell on nonexistent circuit"
1050 );
1051
1052 for _ in 0..25 {
1057 input
1058 .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1059 .await
1060 .unwrap();
1061 reactor.run_once().await.unwrap(); }
1063
1064 input
1066 .send(Ok(AnyChanCell::new(CircId::new(23), relaycell.clone())))
1067 .await
1068 .unwrap();
1069 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1070 assert_eq!(
1071 format!("{}", e),
1072 "Channel protocol violation: Too many cells received on destroyed circuit"
1073 );
1074 });
1075 }
1076
1077 #[test]
1078 fn deliver_destroy() {
1079 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1080 use crate::circuit::celltypes::*;
1081 use oneshot_fused_workaround as oneshot;
1082
1083 let (_chan, mut reactor, _output, mut input) = new_reactor(rt.clone());
1084
1085 let (padding_ctrl, _padding_stream) = new_padding(DynTimeProvider::new(rt));
1086
1087 let (circ_oneshot_7, mut circ_stream_13) = {
1088 let (snd1, rcv1) = oneshot::channel();
1089 let (snd2, _rcv2) = fake_mpsc(64);
1090 reactor.circs.put_unchecked(
1091 CircId::new(7).unwrap(),
1092 CircEnt::Opening {
1093 create_response_sender: snd1,
1094 cell_sender: snd2,
1095 padding_ctrl: padding_ctrl.clone(),
1096 },
1097 );
1098
1099 let (snd3, rcv3) = fake_mpsc(64);
1100 reactor.circs.put_unchecked(
1101 CircId::new(13).unwrap(),
1102 CircEnt::Open {
1103 cell_sender: snd3,
1104 padding_ctrl: padding_ctrl.clone(),
1105 },
1106 );
1107
1108 reactor.circs.put_unchecked(
1109 CircId::new(23).unwrap(),
1110 CircEnt::DestroySent(HalfCirc::new(25)),
1111 );
1112 (rcv1, rcv3)
1113 };
1114
1115 let destroycell: AnyChanMsg = msg::Destroy::new(0.into()).into();
1117 input
1118 .send(Ok(AnyChanCell::new(CircId::new(7), destroycell.clone())))
1119 .await
1120 .unwrap();
1121 reactor.run_once().await.unwrap();
1122 let msg = circ_oneshot_7.await;
1123 assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
1124
1125 input
1127 .send(Ok(AnyChanCell::new(CircId::new(13), destroycell.clone())))
1128 .await
1129 .unwrap();
1130 reactor.run_once().await.unwrap();
1131 let msg = circ_stream_13.next().await.unwrap();
1132 assert!(matches!(msg, AnyChanMsg::Destroy(_)));
1133
1134 input
1136 .send(Ok(AnyChanCell::new(CircId::new(23), destroycell.clone())))
1137 .await
1138 .unwrap();
1139 reactor.run_once().await.unwrap();
1140
1141 input
1143 .send(Ok(AnyChanCell::new(CircId::new(101), destroycell.clone())))
1144 .await
1145 .unwrap();
1146 let e = reactor.run_once().await.unwrap_err().unwrap_err();
1147 assert_eq!(
1148 format!("{}", e),
1149 "Channel protocol violation: Destroy for nonexistent circuit"
1150 );
1151 });
1152 }
1153
1154 #[test]
1155 fn closing_if_reactor_dropped() {
1156 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1157 let (chan, reactor, _output, _input) = new_reactor(rt);
1158
1159 assert!(!chan.is_closing());
1160 drop(reactor);
1161 assert!(chan.is_closing());
1162
1163 assert!(matches!(
1164 chan.wait_for_close().await,
1165 Err(ClosedUnexpectedly::ReactorDropped),
1166 ));
1167 });
1168 }
1169
1170 #[test]
1171 fn closing_if_reactor_shutdown() {
1172 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1173 let (chan, reactor, _output, _input) = new_reactor(rt);
1174
1175 assert!(!chan.is_closing());
1176 chan.terminate();
1177 assert!(!chan.is_closing());
1178
1179 let r = reactor.run().await;
1180 assert!(r.is_ok());
1181 assert!(chan.is_closing());
1182
1183 assert!(chan.wait_for_close().await.is_ok());
1184 });
1185 }
1186
1187 #[test]
1188 fn reactor_error_wait_for_close() {
1189 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1190 let (chan, reactor, _output, mut input) = new_reactor(rt);
1191
1192 let created2_cell = msg::Created2::new(*b"hihi").into();
1194 input
1195 .send(Ok(AnyChanCell::new(CircId::new(7), created2_cell)))
1196 .await
1197 .unwrap();
1198
1199 let run_error = reactor.run().await.unwrap_err();
1201
1202 let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
1204 else {
1205 panic!("Expected a 'ReactorError'");
1206 };
1207
1208 assert_eq!(run_error.to_string(), wait_error.to_string());
1210 });
1211 }
1212}