1use super::circmap::{CircEnt, CircMap};
10use super::OpenChanCellS2C;
11use crate::channel::OpenChanMsgS2C;
12use crate::tunnel::circuit::halfcirc::HalfCirc;
13use crate::util::err::ReactorError;
14use crate::util::oneshot_broadcast;
15use crate::{Error, Result};
16use tor_async_utils::SinkPrepareExt as _;
17use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
18use tor_cell::chancell::ChanMsg;
19use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};
20use tor_memquota::mq_queue;
21use tor_rtcompat::{SleepProvider, StreamOps};
22
23use futures::channel::mpsc;
24use oneshot_fused_workaround as oneshot;
25
26use futures::sink::SinkExt;
27use futures::stream::Stream;
28use futures::Sink;
29use futures::StreamExt as _;
30use futures::{select, select_biased};
31use tor_error::{error_report, internal};
32
33use std::fmt;
34use std::pin::Pin;
35use std::sync::Arc;
36
37use crate::channel::{
38 codec::CodecError, kist::KistParams, padding, params::*, unique_id, ChannelDetails, CloseInfo,
39};
40use crate::tunnel::circuit::{celltypes::CreateResponse, CircuitRxSender};
41use tracing::{debug, trace};
42
43pub(super) type BoxedChannelStream = Box<
45 dyn Stream<Item = std::result::Result<OpenChanCellS2C, CodecError>> + Send + Unpin + 'static,
46>;
47pub(super) type BoxedChannelSink =
49 Box<dyn Sink<AnyChanCell, Error = CodecError> + Send + Unpin + 'static>;
50pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
52pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
54
55fn codec_err_to_chan(err: CodecError) -> Error {
58 match err {
59 CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
60 CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
61 CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
62 }
63}
64
65#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
67#[derive(Debug)]
68#[allow(unreachable_pub)] #[allow(clippy::exhaustive_enums)]
70pub enum CtrlMsg {
71 Shutdown,
73 CloseCircuit(CircId),
75 AllocateCircuit {
78 created_sender: oneshot::Sender<CreateResponse>,
80 sender: CircuitRxSender,
82 tx: ReactorResultChannel<(CircId, crate::tunnel::circuit::UniqId)>,
84 },
85 ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
94 KistConfigUpdate(KistParams),
100}
101
102#[must_use = "If you don't call run() on a reactor, the channel won't work."]
107pub struct Reactor<S: SleepProvider> {
108 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
110 pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
113 pub(super) cells: mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
117 pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
121 pub(super) output: BoxedChannelSink,
125 pub(super) streamops: BoxedChannelStreamOps,
127 pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
129 pub(super) special_outgoing: SpecialOutgoing,
131 pub(super) circs: CircMap,
133 pub(super) unique_id: super::UniqId,
135 pub(super) details: Arc<ChannelDetails>,
137 pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
139 #[allow(dead_code)] pub(super) link_protocol: u16,
142}
143
144#[derive(Default, Debug, Clone)]
146pub(super) struct SpecialOutgoing {
147 pub(super) padding_negotiate: Option<PaddingNegotiate>,
149}
150
151impl SpecialOutgoing {
152 #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
157 pub(super) fn next(&mut self) -> Option<AnyChanCell> {
158 if let Some(p) = self.padding_negotiate.take() {
161 return Some(p.into());
162 }
163 None
164 }
165}
166
167impl<S: SleepProvider> fmt::Display for Reactor<S> {
172 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173 fmt::Debug::fmt(&self.unique_id, f)
174 }
175}
176
177impl<S: SleepProvider> Reactor<S> {
178 pub async fn run(mut self) -> Result<()> {
184 trace!("{}: Running reactor", &self);
185 let result: Result<()> = loop {
186 match self.run_once().await {
187 Ok(()) => (),
188 Err(ReactorError::Shutdown) => break Ok(()),
189 Err(ReactorError::Err(e)) => break Err(e),
190 }
191 };
192 debug!("{}: Reactor stopped: {:?}", &self, result);
193 let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
195 self.reactor_closed_tx.send(close_msg);
196 result
197 }
198
199 async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
201 select! {
202
203 ret = self.output.prepare_send_from(async {
206 if let Some(l) = self.special_outgoing.next() {
209 self.padding_timer.as_mut().note_cell_sent();
212 return Some(l)
213 }
214
215 select_biased! {
216 n = self.cells.next() => {
217 self.padding_timer.as_mut().note_cell_sent();
234 n
235 },
236 p = self.padding_timer.as_mut().next() => {
237 Some(p.into())
239 },
240 }
241 }) => {
242 let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
243 let msg = msg.ok_or(ReactorError::Shutdown)?;
244 sendable.send(msg).map_err(codec_err_to_chan)?;
245 }
246
247 ret = self.control.next() => {
248 let ctrl = match ret {
249 None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
250 Some(x) => x,
251 };
252 self.handle_control(ctrl).await?;
253 }
254
255 ret = self.input.next() => {
256 let item = ret
257 .ok_or(ReactorError::Shutdown)?
258 .map_err(codec_err_to_chan)?;
259 crate::note_incoming_traffic();
260 self.handle_cell(item).await?;
261 }
262
263 }
264 Ok(()) }
266
267 async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
269 trace!("{}: reactor received {:?}", &self, msg);
270 match msg {
271 CtrlMsg::Shutdown => panic!(), CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
273 CtrlMsg::AllocateCircuit {
274 created_sender,
275 sender,
276 tx,
277 } => {
278 let mut rng = rand::thread_rng();
279 let my_unique_id = self.unique_id;
280 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
281 let ret: Result<_> = self
282 .circs
283 .add_ent(&mut rng, created_sender, sender)
284 .map(|id| (id, circ_unique_id));
285 let _ = tx.send(ret); self.update_disused_since();
287 }
288 CtrlMsg::ConfigUpdate(updates) => {
289 if self.link_protocol == 4 {
290 return Ok(());
294 }
295
296 let ChannelPaddingInstructionsUpdates {
297 padding_enable,
300 padding_parameters,
301 padding_negotiate,
302 } = &*updates;
303 if let Some(parameters) = padding_parameters {
304 self.padding_timer.as_mut().reconfigure(parameters);
305 }
306 if let Some(enable) = padding_enable {
307 if *enable {
308 self.padding_timer.as_mut().enable();
309 } else {
310 self.padding_timer.as_mut().disable();
311 }
312 }
313 if let Some(padding_negotiate) = padding_negotiate {
314 self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
318 }
319 }
320 CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
321 }
322 Ok(())
323 }
324
325 async fn handle_cell(&mut self, cell: OpenChanCellS2C) -> Result<()> {
328 let (circid, msg) = cell.into_circid_and_msg();
329 use OpenChanMsgS2C::*;
330
331 match msg {
332 Relay(_) | Padding(_) | Vpadding(_) => {} _ => trace!(
334 "{}: received {} for {}",
335 &self,
336 msg.cmd(),
337 CircId::get_or_zero(circid)
338 ),
339 }
340
341 match msg {
342 Relay(_) => self.deliver_relay(circid, msg.into()).await,
344
345 Destroy(_) => self.deliver_destroy(circid, msg.into()).await,
346
347 CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg.into()).await,
348
349 Padding(_) | Vpadding(_) => Ok(()),
351 }
352 }
353
354 async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
356 let Some(circid) = circid else {
357 return Err(Error::ChanProto("Relay cell without circuit ID".into()));
358 };
359
360 let mut ent = self
361 .circs
362 .get_mut(circid)
363 .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
364
365 match &mut *ent {
366 CircEnt::Open(s) => {
367 if s.send(msg.try_into()?).await.is_err() {
369 drop(ent);
370 self.outbound_destroy_circ(circid).await?;
372 }
373 Ok(())
374 }
375 CircEnt::Opening(_, _) => Err(Error::ChanProto(
376 "Relay cell on pending circuit before CREATED* received".into(),
377 )),
378 CircEnt::DestroySent(hs) => hs.receive_cell(),
379 }
380 }
381
382 async fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
385 let Some(circid) = circid else {
386 return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
387 };
388
389 let target = self.circs.advance_from_opening(circid)?;
390 let created = msg.try_into()?;
391 target.send(created).map_err(|_| {
394 Error::from(internal!(
395 "Circuit queue rejected created message. Is it closing?"
396 ))
397 })
398 }
399
400 async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
403 let Some(circid) = circid else {
404 return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
405 };
406
407 let entry = self.circs.remove(circid);
409 self.update_disused_since();
410 match entry {
411 Some(CircEnt::Opening(oneshot, _)) => {
414 trace!("{}: Passing destroy to pending circuit {}", &self, circid);
415 oneshot
416 .send(msg.try_into()?)
417 .map_err(|_| {
420 internal!("pending circuit wasn't interested in destroy cell?").into()
421 })
422 }
423 Some(CircEnt::Open(mut sink)) => {
425 trace!("{}: Passing destroy to open circuit {}", &self, circid);
426 sink.send(msg.try_into()?)
427 .await
428 .map_err(|_| {
431 internal!("open circuit wasn't interested in destroy cell?").into()
432 })
433 }
434 Some(CircEnt::DestroySent(_)) => Ok(()),
436 None => {
438 trace!("{}: Destroy for nonexistent circuit {}", &self, circid);
439 Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
440 }
441 }
442 }
443
444 async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
446 self.output.send(cell).await.map_err(codec_err_to_chan)?;
447 Ok(())
448 }
449
450 async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
453 trace!("{}: Circuit {} is gone; sending DESTROY", &self, id);
454 self.circs.destroy_sent(id, HalfCirc::new(3000));
459 self.update_disused_since();
460 let destroy = Destroy::new(DestroyReason::NONE).into();
461 let cell = AnyChanCell::new(Some(id), destroy);
462 self.send_cell(cell).await?;
463
464 Ok(())
465 }
466
467 fn update_disused_since(&self) {
469 if self.circs.open_ent_count() == 0 {
470 self.details.unused_since.update_if_none();
472 } else {
473 self.details.unused_since.clear();
475 }
476 }
477
478 #[cfg(target_os = "linux")]
480 fn apply_kist_params(&self, params: &KistParams) {
481 use super::kist::KistMode;
482
483 let set_tcp_notsent_lowat = |v: u32| {
484 if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
485 error_report!(e, "Failed to set KIST socket options");
488 }
489 };
490
491 match params.kist_enabled() {
492 KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
493 KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
494 }
495 }
496
497 #[cfg(not(target_os = "linux"))]
498 fn apply_kist_params(&self, params: &KistParams) {
499 use super::kist::KistMode;
500
501 if params.kist_enabled() != KistMode::Disabled {
502 tracing::warn!("KIST not currently supported on non-linux platforms");
503 }
504 }
505}
506
507#[cfg(test)]
508pub(crate) mod test {
509 #![allow(clippy::unwrap_used)]
510 use super::*;
511 use crate::channel::{ClosedUnexpectedly, UniqId};
512 use crate::fake_mpsc;
513 use crate::tunnel::circuit::CircParameters;
514 use crate::util::fake_mq;
515 use futures::sink::SinkExt;
516 use futures::stream::StreamExt;
517 use futures::task::SpawnExt;
518 use tor_cell::chancell::msg;
519 use tor_linkspec::OwnedChanTarget;
520 use tor_rtcompat::{NoOpStreamOpsHandle, Runtime};
521
522 type CodecResult = std::result::Result<OpenChanCellS2C, CodecError>;
523
524 pub(crate) fn new_reactor<R: Runtime>(
525 runtime: R,
526 ) -> (
527 Arc<crate::channel::Channel>,
528 Reactor<R>,
529 mpsc::Receiver<AnyChanCell>,
530 mpsc::Sender<CodecResult>,
531 ) {
532 let link_protocol = 4;
533 let (send1, recv1) = mpsc::channel(32);
534 let (send2, recv2) = mpsc::channel(32);
535 let unique_id = UniqId::new();
536 let dummy_target = OwnedChanTarget::builder()
537 .ed_identity([6; 32].into())
538 .rsa_identity([10; 20].into())
539 .build()
540 .unwrap();
541 let send1 = send1.sink_map_err(|e| {
542 trace!("got sink error: {:?}", e);
543 CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
544 });
545 let stream_ops = NoOpStreamOpsHandle::default();
546 let (chan, reactor) = crate::channel::Channel::new(
547 link_protocol,
548 Box::new(send1),
549 Box::new(recv2),
550 Box::new(stream_ops),
551 unique_id,
552 dummy_target,
553 crate::ClockSkew::None,
554 runtime,
555 fake_mq(),
556 )
557 .expect("channel create failed");
558 (chan, reactor, recv1, send2)
559 }
560
561 #[test]
563 fn shutdown() {
564 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
565 let (chan, mut reactor, _output, _input) = new_reactor(rt);
566
567 chan.terminate();
568 let r = reactor.run_once().await;
569 assert!(matches!(r, Err(ReactorError::Shutdown)));
570 });
571 }
572
573 #[test]
575 fn shutdown2() {
576 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
577 use futures::future::FutureExt;
580 use futures::join;
581
582 let (chan, reactor, _output, _input) = new_reactor(rt);
583 let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
585
586 let rr = run_reactor.clone();
587
588 let exit_then_check = async {
589 assert!(rr.peek().is_none());
590 chan.terminate();
592 };
593
594 let (rr_s, _) = join!(run_reactor, exit_then_check);
595
596 assert!(rr_s);
598 });
599 }
600
601 #[test]
602 fn new_circ_closed() {
603 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
604 let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
605 assert!(chan.duration_unused().is_some()); let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
608 let (pending, circr) = ret.unwrap();
609 rt.spawn(async {
610 let _ignore = circr.run().await;
611 })
612 .unwrap();
613 assert!(reac.is_ok());
614
615 let id = pending.peek_circid();
616
617 let ent = reactor.circs.get_mut(id);
618 assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
619 assert!(chan.duration_unused().is_none()); drop(pending);
624
625 reactor.run_once().await.unwrap();
626 let ent = reactor.circs.get_mut(id);
627 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
628 let cell = output.next().await.unwrap();
629 assert_eq!(cell.circid(), Some(id));
630 assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
631 assert!(chan.duration_unused().is_some()); });
633 }
634
635 #[test]
637 #[ignore] fn new_circ_create_failure() {
639 use std::time::Duration;
640 use tor_rtcompat::SleepProvider;
641
642 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
643 let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
644
645 let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
646 let (pending, circr) = ret.unwrap();
647 rt.spawn(async {
648 let _ignore = circr.run().await;
649 })
650 .unwrap();
651 assert!(reac.is_ok());
652
653 let circparams = CircParameters::default();
654
655 let id = pending.peek_circid();
656
657 let ent = reactor.circs.get_mut(id);
658 assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
659
660 #[allow(clippy::clone_on_copy)]
661 let rtc = rt.clone();
662 let send_response = async {
663 rtc.sleep(Duration::from_millis(100)).await;
664 trace!("sending createdfast");
665 let created_cell =
667 OpenChanCellS2C::new(Some(id), msg::CreatedFast::new(*b"x").into());
668 input.send(Ok(created_cell)).await.unwrap();
669 reactor.run_once().await.unwrap();
670 };
671
672 let (circ, _) =
673 futures::join!(pending.create_firsthop_fast(&circparams), send_response);
674 assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
676
677 reactor.run_once().await.unwrap();
678
679 let cell_sent = output.next().await.unwrap();
681 assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
682
683 let ent = reactor.circs.get_mut(id);
685 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
686 });
687 }
688
689 #[test]
691 fn bad_cells() {
692 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
693 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
694
695 let created2_cell = msg::Created2::new(*b"hihi").into();
697 input
698 .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
699 .await
700 .unwrap();
701
702 let e = reactor.run_once().await.unwrap_err().unwrap_err();
703 assert_eq!(
704 format!("{}", e),
705 "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
706 );
707
708 let relay_cell = msg::Relay::new(b"abc").into();
710 input
711 .send(Ok(OpenChanCellS2C::new(CircId::new(4), relay_cell)))
712 .await
713 .unwrap();
714 let e = reactor.run_once().await.unwrap_err().unwrap_err();
715 assert_eq!(
716 format!("{}", e),
717 "Channel protocol violation: Relay cell on nonexistent circuit"
718 );
719
720 });
724 }
725
726 #[test]
727 fn deliver_relay() {
728 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
729 use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
730 use oneshot_fused_workaround as oneshot;
731
732 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
733
734 let (_circ_stream_7, mut circ_stream_13) = {
735 let (snd1, _rcv1) = oneshot::channel();
736 let (snd2, rcv2) = fake_mpsc(64);
737 reactor
738 .circs
739 .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
740
741 let (snd3, rcv3) = fake_mpsc(64);
742 reactor
743 .circs
744 .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
745
746 reactor.circs.put_unchecked(
747 CircId::new(23).unwrap(),
748 CircEnt::DestroySent(HalfCirc::new(25)),
749 );
750 (rcv2, rcv3)
751 };
752
753 let relaycell: OpenChanMsgS2C = msg::Relay::new(b"do you suppose").into();
756 input
757 .send(Ok(OpenChanCellS2C::new(CircId::new(13), relaycell.clone())))
758 .await
759 .unwrap();
760 reactor.run_once().await.unwrap();
761 let got = circ_stream_13.next().await.unwrap();
762 assert!(matches!(got, ClientCircChanMsg::Relay(_)));
763
764 input
766 .send(Ok(OpenChanCellS2C::new(CircId::new(7), relaycell.clone())))
767 .await
768 .unwrap();
769 let e = reactor.run_once().await.unwrap_err().unwrap_err();
770 assert_eq!(
771 format!("{}", e),
772 "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
773 );
774
775 input
777 .send(Ok(OpenChanCellS2C::new(
778 CircId::new(101),
779 relaycell.clone(),
780 )))
781 .await
782 .unwrap();
783 let e = reactor.run_once().await.unwrap_err().unwrap_err();
784 assert_eq!(
785 format!("{}", e),
786 "Channel protocol violation: Relay cell on nonexistent circuit"
787 );
788
789 for _ in 0..25 {
794 input
795 .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
796 .await
797 .unwrap();
798 reactor.run_once().await.unwrap(); }
800
801 input
803 .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
804 .await
805 .unwrap();
806 let e = reactor.run_once().await.unwrap_err().unwrap_err();
807 assert_eq!(
808 format!("{}", e),
809 "Channel protocol violation: Too many cells received on destroyed circuit"
810 );
811 });
812 }
813
814 #[test]
815 fn deliver_destroy() {
816 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
817 use crate::tunnel::circuit::celltypes::*;
818 use oneshot_fused_workaround as oneshot;
819
820 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
821
822 let (circ_oneshot_7, mut circ_stream_13) = {
823 let (snd1, rcv1) = oneshot::channel();
824 let (snd2, _rcv2) = fake_mpsc(64);
825 reactor
826 .circs
827 .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
828
829 let (snd3, rcv3) = fake_mpsc(64);
830 reactor
831 .circs
832 .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
833
834 reactor.circs.put_unchecked(
835 CircId::new(23).unwrap(),
836 CircEnt::DestroySent(HalfCirc::new(25)),
837 );
838 (rcv1, rcv3)
839 };
840
841 let destroycell: OpenChanMsgS2C = msg::Destroy::new(0.into()).into();
843 input
844 .send(Ok(OpenChanCellS2C::new(
845 CircId::new(7),
846 destroycell.clone(),
847 )))
848 .await
849 .unwrap();
850 reactor.run_once().await.unwrap();
851 let msg = circ_oneshot_7.await;
852 assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
853
854 input
856 .send(Ok(OpenChanCellS2C::new(
857 CircId::new(13),
858 destroycell.clone(),
859 )))
860 .await
861 .unwrap();
862 reactor.run_once().await.unwrap();
863 let msg = circ_stream_13.next().await.unwrap();
864 assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
865
866 input
868 .send(Ok(OpenChanCellS2C::new(
869 CircId::new(23),
870 destroycell.clone(),
871 )))
872 .await
873 .unwrap();
874 reactor.run_once().await.unwrap();
875
876 input
878 .send(Ok(OpenChanCellS2C::new(
879 CircId::new(101),
880 destroycell.clone(),
881 )))
882 .await
883 .unwrap();
884 let e = reactor.run_once().await.unwrap_err().unwrap_err();
885 assert_eq!(
886 format!("{}", e),
887 "Channel protocol violation: Destroy for nonexistent circuit"
888 );
889 });
890 }
891
892 #[test]
893 fn closing_if_reactor_dropped() {
894 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
895 let (chan, reactor, _output, _input) = new_reactor(rt);
896
897 assert!(!chan.is_closing());
898 drop(reactor);
899 assert!(chan.is_closing());
900
901 assert!(matches!(
902 chan.wait_for_close().await,
903 Err(ClosedUnexpectedly::ReactorDropped),
904 ));
905 });
906 }
907
908 #[test]
909 fn closing_if_reactor_shutdown() {
910 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
911 let (chan, reactor, _output, _input) = new_reactor(rt);
912
913 assert!(!chan.is_closing());
914 chan.terminate();
915 assert!(!chan.is_closing());
916
917 let r = reactor.run().await;
918 assert!(r.is_ok());
919 assert!(chan.is_closing());
920
921 assert!(chan.wait_for_close().await.is_ok());
922 });
923 }
924
925 #[test]
926 fn reactor_error_wait_for_close() {
927 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
928 let (chan, reactor, _output, mut input) = new_reactor(rt);
929
930 let created2_cell = msg::Created2::new(*b"hihi").into();
932 input
933 .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
934 .await
935 .unwrap();
936
937 let run_error = reactor.run().await.unwrap_err();
939
940 let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
942 else {
943 panic!("Expected a 'ReactorError'");
944 };
945
946 assert_eq!(run_error.to_string(), wait_error.to_string());
948 });
949 }
950}