1use super::circmap::{CircEnt, CircMap};
10use super::OpenChanCellS2C;
11use crate::channel::OpenChanMsgS2C;
12use crate::tunnel::circuit::halfcirc::HalfCirc;
13use crate::util::err::ReactorError;
14use crate::util::oneshot_broadcast;
15use crate::{Error, Result};
16use tor_async_utils::SinkPrepareExt as _;
17use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
18use tor_cell::chancell::ChanMsg;
19use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};
20use tor_memquota::mq_queue;
21use tor_rtcompat::SleepProvider;
22
23#[cfg_attr(not(target_os = "linux"), allow(unused))]
24use tor_error::error_report;
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_rtcompat::StreamOps;
27
28use futures::channel::mpsc;
29use oneshot_fused_workaround as oneshot;
30
31use futures::sink::SinkExt;
32use futures::stream::Stream;
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::{select, select_biased};
36use tor_error::internal;
37
38use std::fmt;
39use std::pin::Pin;
40use std::sync::Arc;
41
42use crate::channel::{
43 codec::CodecError, kist::KistParams, padding, params::*, unique_id, ChannelDetails, CloseInfo,
44};
45use crate::tunnel::circuit::{celltypes::CreateResponse, CircuitRxSender};
46use tracing::{debug, trace};
47
48pub(super) type BoxedChannelStream = Box<
50 dyn Stream<Item = std::result::Result<OpenChanCellS2C, CodecError>> + Send + Unpin + 'static,
51>;
52pub(super) type BoxedChannelSink =
54 Box<dyn Sink<AnyChanCell, Error = CodecError> + Send + Unpin + 'static>;
55pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
57pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
59
60fn codec_err_to_chan(err: CodecError) -> Error {
63 match err {
64 CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
65 CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
66 CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
67 }
68}
69
70#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
72#[derive(Debug)]
73#[allow(unreachable_pub)] #[allow(clippy::exhaustive_enums)]
75pub enum CtrlMsg {
76 Shutdown,
78 CloseCircuit(CircId),
80 AllocateCircuit {
83 created_sender: oneshot::Sender<CreateResponse>,
85 sender: CircuitRxSender,
87 tx: ReactorResultChannel<(CircId, crate::tunnel::circuit::UniqId)>,
89 },
90 ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
99 KistConfigUpdate(KistParams),
105}
106
107#[must_use = "If you don't call run() on a reactor, the channel won't work."]
112pub struct Reactor<S: SleepProvider> {
113 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
115 pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
118 pub(super) cells: mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
122 pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
126 pub(super) output: BoxedChannelSink,
130 #[cfg_attr(not(target_os = "linux"), allow(unused))]
132 pub(super) streamops: BoxedChannelStreamOps,
133 pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
135 pub(super) special_outgoing: SpecialOutgoing,
137 pub(super) circs: CircMap,
139 pub(super) unique_id: super::UniqId,
141 pub(super) details: Arc<ChannelDetails>,
143 pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
145 #[allow(dead_code)] pub(super) link_protocol: u16,
148}
149
150#[derive(Default, Debug, Clone)]
152pub(super) struct SpecialOutgoing {
153 pub(super) padding_negotiate: Option<PaddingNegotiate>,
155}
156
157impl SpecialOutgoing {
158 #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
163 pub(super) fn next(&mut self) -> Option<AnyChanCell> {
164 if let Some(p) = self.padding_negotiate.take() {
167 return Some(p.into());
168 }
169 None
170 }
171}
172
173impl<S: SleepProvider> fmt::Display for Reactor<S> {
178 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
179 fmt::Debug::fmt(&self.unique_id, f)
180 }
181}
182
183impl<S: SleepProvider> Reactor<S> {
184 pub async fn run(mut self) -> Result<()> {
190 trace!("{}: Running reactor", &self);
191 let result: Result<()> = loop {
192 match self.run_once().await {
193 Ok(()) => (),
194 Err(ReactorError::Shutdown) => break Ok(()),
195 Err(ReactorError::Err(e)) => break Err(e),
196 }
197 };
198 debug!("{}: Reactor stopped: {:?}", &self, result);
199 let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
201 self.reactor_closed_tx.send(close_msg);
202 result
203 }
204
205 async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
207 select! {
208
209 ret = self.output.prepare_send_from(async {
212 if let Some(l) = self.special_outgoing.next() {
215 self.padding_timer.as_mut().note_cell_sent();
218 return Some(l)
219 }
220
221 select_biased! {
222 n = self.cells.next() => {
223 self.padding_timer.as_mut().note_cell_sent();
240 n
241 },
242 p = self.padding_timer.as_mut().next() => {
243 Some(p.into())
245 },
246 }
247 }) => {
248 let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
249 let msg = msg.ok_or(ReactorError::Shutdown)?;
250 sendable.send(msg).map_err(codec_err_to_chan)?;
251 }
252
253 ret = self.control.next() => {
254 let ctrl = match ret {
255 None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
256 Some(x) => x,
257 };
258 self.handle_control(ctrl).await?;
259 }
260
261 ret = self.input.next() => {
262 let item = ret
263 .ok_or(ReactorError::Shutdown)?
264 .map_err(codec_err_to_chan)?;
265 crate::note_incoming_traffic();
266 self.handle_cell(item).await?;
267 }
268
269 }
270 Ok(()) }
272
273 async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
275 trace!("{}: reactor received {:?}", &self, msg);
276 match msg {
277 CtrlMsg::Shutdown => panic!(), CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
279 CtrlMsg::AllocateCircuit {
280 created_sender,
281 sender,
282 tx,
283 } => {
284 let mut rng = rand::rng();
285 let my_unique_id = self.unique_id;
286 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
287 let ret: Result<_> = self
288 .circs
289 .add_ent(&mut rng, created_sender, sender)
290 .map(|id| (id, circ_unique_id));
291 let _ = tx.send(ret); self.update_disused_since();
293 }
294 CtrlMsg::ConfigUpdate(updates) => {
295 if self.link_protocol == 4 {
296 return Ok(());
300 }
301
302 let ChannelPaddingInstructionsUpdates {
303 padding_enable,
306 padding_parameters,
307 padding_negotiate,
308 } = &*updates;
309 if let Some(parameters) = padding_parameters {
310 self.padding_timer.as_mut().reconfigure(parameters)?;
311 }
312 if let Some(enable) = padding_enable {
313 if *enable {
314 self.padding_timer.as_mut().enable();
315 } else {
316 self.padding_timer.as_mut().disable();
317 }
318 }
319 if let Some(padding_negotiate) = padding_negotiate {
320 self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
324 }
325 }
326 CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
327 }
328 Ok(())
329 }
330
331 async fn handle_cell(&mut self, cell: OpenChanCellS2C) -> Result<()> {
334 let (circid, msg) = cell.into_circid_and_msg();
335 use OpenChanMsgS2C::*;
336
337 match msg {
338 Relay(_) | Padding(_) | Vpadding(_) => {} _ => trace!(
340 "{}: received {} for {}",
341 &self,
342 msg.cmd(),
343 CircId::get_or_zero(circid)
344 ),
345 }
346
347 match msg {
348 Relay(_) => self.deliver_relay(circid, msg.into()).await,
350
351 Destroy(_) => self.deliver_destroy(circid, msg.into()).await,
352
353 CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg.into()).await,
354
355 Padding(_) | Vpadding(_) => Ok(()),
357 }
358 }
359
360 async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
362 let Some(circid) = circid else {
363 return Err(Error::ChanProto("Relay cell without circuit ID".into()));
364 };
365
366 let mut ent = self
367 .circs
368 .get_mut(circid)
369 .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
370
371 match &mut *ent {
372 CircEnt::Open(s) => {
373 if s.send(msg.try_into()?).await.is_err() {
375 drop(ent);
376 self.outbound_destroy_circ(circid).await?;
378 }
379 Ok(())
380 }
381 CircEnt::Opening(_, _) => Err(Error::ChanProto(
382 "Relay cell on pending circuit before CREATED* received".into(),
383 )),
384 CircEnt::DestroySent(hs) => hs.receive_cell(),
385 }
386 }
387
388 async fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
391 let Some(circid) = circid else {
392 return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
393 };
394
395 let target = self.circs.advance_from_opening(circid)?;
396 let created = msg.try_into()?;
397 target.send(created).map_err(|_| {
400 Error::from(internal!(
401 "Circuit queue rejected created message. Is it closing?"
402 ))
403 })
404 }
405
406 async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
409 let Some(circid) = circid else {
410 return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
411 };
412
413 let entry = self.circs.remove(circid);
415 self.update_disused_since();
416 match entry {
417 Some(CircEnt::Opening(oneshot, _)) => {
420 trace!("{}: Passing destroy to pending circuit {}", &self, circid);
421 oneshot
422 .send(msg.try_into()?)
423 .map_err(|_| {
426 internal!("pending circuit wasn't interested in destroy cell?").into()
427 })
428 }
429 Some(CircEnt::Open(mut sink)) => {
431 trace!("{}: Passing destroy to open circuit {}", &self, circid);
432 sink.send(msg.try_into()?)
433 .await
434 .map_err(|_| {
437 internal!("open circuit wasn't interested in destroy cell?").into()
438 })
439 }
440 Some(CircEnt::DestroySent(_)) => Ok(()),
442 None => {
444 trace!("{}: Destroy for nonexistent circuit {}", &self, circid);
445 Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
446 }
447 }
448 }
449
450 async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
452 self.output.send(cell).await.map_err(codec_err_to_chan)?;
453 Ok(())
454 }
455
456 async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
459 trace!("{}: Circuit {} is gone; sending DESTROY", &self, id);
460 self.circs.destroy_sent(id, HalfCirc::new(3000));
465 self.update_disused_since();
466 let destroy = Destroy::new(DestroyReason::NONE).into();
467 let cell = AnyChanCell::new(Some(id), destroy);
468 self.send_cell(cell).await?;
469
470 Ok(())
471 }
472
473 fn update_disused_since(&self) {
475 if self.circs.open_ent_count() == 0 {
476 self.details.unused_since.update_if_none();
478 } else {
479 self.details.unused_since.clear();
481 }
482 }
483
484 #[cfg(target_os = "linux")]
486 fn apply_kist_params(&self, params: &KistParams) {
487 use super::kist::KistMode;
488
489 let set_tcp_notsent_lowat = |v: u32| {
490 if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
491 error_report!(e, "Failed to set KIST socket options");
494 }
495 };
496
497 match params.kist_enabled() {
498 KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
499 KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
500 }
501 }
502
503 #[cfg(not(target_os = "linux"))]
504 fn apply_kist_params(&self, params: &KistParams) {
505 use super::kist::KistMode;
506
507 if params.kist_enabled() != KistMode::Disabled {
508 tracing::warn!("KIST not currently supported on non-linux platforms");
509 }
510 }
511}
512
513#[cfg(test)]
514pub(crate) mod test {
515 #![allow(clippy::unwrap_used)]
516 use super::*;
517 use crate::channel::{ClosedUnexpectedly, UniqId};
518 use crate::fake_mpsc;
519 use crate::tunnel::circuit::CircParameters;
520 use crate::util::fake_mq;
521 use futures::sink::SinkExt;
522 use futures::stream::StreamExt;
523 use futures::task::SpawnExt;
524 use tor_cell::chancell::msg;
525 use tor_linkspec::OwnedChanTarget;
526 use tor_rtcompat::{NoOpStreamOpsHandle, Runtime};
527
528 type CodecResult = std::result::Result<OpenChanCellS2C, CodecError>;
529
530 pub(crate) fn new_reactor<R: Runtime>(
531 runtime: R,
532 ) -> (
533 Arc<crate::channel::Channel>,
534 Reactor<R>,
535 mpsc::Receiver<AnyChanCell>,
536 mpsc::Sender<CodecResult>,
537 ) {
538 let link_protocol = 4;
539 let (send1, recv1) = mpsc::channel(32);
540 let (send2, recv2) = mpsc::channel(32);
541 let unique_id = UniqId::new();
542 let dummy_target = OwnedChanTarget::builder()
543 .ed_identity([6; 32].into())
544 .rsa_identity([10; 20].into())
545 .build()
546 .unwrap();
547 let send1 = send1.sink_map_err(|e| {
548 trace!("got sink error: {:?}", e);
549 CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
550 });
551 let stream_ops = NoOpStreamOpsHandle::default();
552 let (chan, reactor) = crate::channel::Channel::new(
553 link_protocol,
554 Box::new(send1),
555 Box::new(recv2),
556 Box::new(stream_ops),
557 unique_id,
558 dummy_target,
559 crate::ClockSkew::None,
560 runtime,
561 fake_mq(),
562 )
563 .expect("channel create failed");
564 (chan, reactor, recv1, send2)
565 }
566
567 #[test]
569 fn shutdown() {
570 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
571 let (chan, mut reactor, _output, _input) = new_reactor(rt);
572
573 chan.terminate();
574 let r = reactor.run_once().await;
575 assert!(matches!(r, Err(ReactorError::Shutdown)));
576 });
577 }
578
579 #[test]
581 fn shutdown2() {
582 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
583 use futures::future::FutureExt;
586 use futures::join;
587
588 let (chan, reactor, _output, _input) = new_reactor(rt);
589 let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
591
592 let rr = run_reactor.clone();
593
594 let exit_then_check = async {
595 assert!(rr.peek().is_none());
596 chan.terminate();
598 };
599
600 let (rr_s, _) = join!(run_reactor, exit_then_check);
601
602 assert!(rr_s);
604 });
605 }
606
607 #[test]
608 fn new_circ_closed() {
609 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
610 let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
611 assert!(chan.duration_unused().is_some()); let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
614 let (pending, circr) = ret.unwrap();
615 rt.spawn(async {
616 let _ignore = circr.run().await;
617 })
618 .unwrap();
619 assert!(reac.is_ok());
620
621 let id = pending.peek_circid();
622
623 let ent = reactor.circs.get_mut(id);
624 assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
625 assert!(chan.duration_unused().is_none()); drop(pending);
630
631 reactor.run_once().await.unwrap();
632 let ent = reactor.circs.get_mut(id);
633 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
634 let cell = output.next().await.unwrap();
635 assert_eq!(cell.circid(), Some(id));
636 assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
637 assert!(chan.duration_unused().is_some()); });
639 }
640
641 #[test]
643 #[ignore] fn new_circ_create_failure() {
645 use std::time::Duration;
646 use tor_rtcompat::SleepProvider;
647
648 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
649 let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
650
651 let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
652 let (pending, circr) = ret.unwrap();
653 rt.spawn(async {
654 let _ignore = circr.run().await;
655 })
656 .unwrap();
657 assert!(reac.is_ok());
658
659 let circparams = CircParameters::default();
660
661 let id = pending.peek_circid();
662
663 let ent = reactor.circs.get_mut(id);
664 assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
665
666 #[allow(clippy::clone_on_copy)]
667 let rtc = rt.clone();
668 let send_response = async {
669 rtc.sleep(Duration::from_millis(100)).await;
670 trace!("sending createdfast");
671 let created_cell =
673 OpenChanCellS2C::new(Some(id), msg::CreatedFast::new(*b"x").into());
674 input.send(Ok(created_cell)).await.unwrap();
675 reactor.run_once().await.unwrap();
676 };
677
678 let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
679 assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
681
682 reactor.run_once().await.unwrap();
683
684 let cell_sent = output.next().await.unwrap();
686 assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
687
688 let ent = reactor.circs.get_mut(id);
690 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
691 });
692 }
693
694 #[test]
696 fn bad_cells() {
697 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
698 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
699
700 let created2_cell = msg::Created2::new(*b"hihi").into();
702 input
703 .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
704 .await
705 .unwrap();
706
707 let e = reactor.run_once().await.unwrap_err().unwrap_err();
708 assert_eq!(
709 format!("{}", e),
710 "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
711 );
712
713 let relay_cell = msg::Relay::new(b"abc").into();
715 input
716 .send(Ok(OpenChanCellS2C::new(CircId::new(4), relay_cell)))
717 .await
718 .unwrap();
719 let e = reactor.run_once().await.unwrap_err().unwrap_err();
720 assert_eq!(
721 format!("{}", e),
722 "Channel protocol violation: Relay cell on nonexistent circuit"
723 );
724
725 });
729 }
730
731 #[test]
732 fn deliver_relay() {
733 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
734 use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
735 use oneshot_fused_workaround as oneshot;
736
737 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
738
739 let (_circ_stream_7, mut circ_stream_13) = {
740 let (snd1, _rcv1) = oneshot::channel();
741 let (snd2, rcv2) = fake_mpsc(64);
742 reactor
743 .circs
744 .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
745
746 let (snd3, rcv3) = fake_mpsc(64);
747 reactor
748 .circs
749 .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
750
751 reactor.circs.put_unchecked(
752 CircId::new(23).unwrap(),
753 CircEnt::DestroySent(HalfCirc::new(25)),
754 );
755 (rcv2, rcv3)
756 };
757
758 let relaycell: OpenChanMsgS2C = msg::Relay::new(b"do you suppose").into();
761 input
762 .send(Ok(OpenChanCellS2C::new(CircId::new(13), relaycell.clone())))
763 .await
764 .unwrap();
765 reactor.run_once().await.unwrap();
766 let got = circ_stream_13.next().await.unwrap();
767 assert!(matches!(got, ClientCircChanMsg::Relay(_)));
768
769 input
771 .send(Ok(OpenChanCellS2C::new(CircId::new(7), relaycell.clone())))
772 .await
773 .unwrap();
774 let e = reactor.run_once().await.unwrap_err().unwrap_err();
775 assert_eq!(
776 format!("{}", e),
777 "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
778 );
779
780 input
782 .send(Ok(OpenChanCellS2C::new(
783 CircId::new(101),
784 relaycell.clone(),
785 )))
786 .await
787 .unwrap();
788 let e = reactor.run_once().await.unwrap_err().unwrap_err();
789 assert_eq!(
790 format!("{}", e),
791 "Channel protocol violation: Relay cell on nonexistent circuit"
792 );
793
794 for _ in 0..25 {
799 input
800 .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
801 .await
802 .unwrap();
803 reactor.run_once().await.unwrap(); }
805
806 input
808 .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
809 .await
810 .unwrap();
811 let e = reactor.run_once().await.unwrap_err().unwrap_err();
812 assert_eq!(
813 format!("{}", e),
814 "Channel protocol violation: Too many cells received on destroyed circuit"
815 );
816 });
817 }
818
819 #[test]
820 fn deliver_destroy() {
821 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
822 use crate::tunnel::circuit::celltypes::*;
823 use oneshot_fused_workaround as oneshot;
824
825 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
826
827 let (circ_oneshot_7, mut circ_stream_13) = {
828 let (snd1, rcv1) = oneshot::channel();
829 let (snd2, _rcv2) = fake_mpsc(64);
830 reactor
831 .circs
832 .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
833
834 let (snd3, rcv3) = fake_mpsc(64);
835 reactor
836 .circs
837 .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
838
839 reactor.circs.put_unchecked(
840 CircId::new(23).unwrap(),
841 CircEnt::DestroySent(HalfCirc::new(25)),
842 );
843 (rcv1, rcv3)
844 };
845
846 let destroycell: OpenChanMsgS2C = msg::Destroy::new(0.into()).into();
848 input
849 .send(Ok(OpenChanCellS2C::new(
850 CircId::new(7),
851 destroycell.clone(),
852 )))
853 .await
854 .unwrap();
855 reactor.run_once().await.unwrap();
856 let msg = circ_oneshot_7.await;
857 assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
858
859 input
861 .send(Ok(OpenChanCellS2C::new(
862 CircId::new(13),
863 destroycell.clone(),
864 )))
865 .await
866 .unwrap();
867 reactor.run_once().await.unwrap();
868 let msg = circ_stream_13.next().await.unwrap();
869 assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
870
871 input
873 .send(Ok(OpenChanCellS2C::new(
874 CircId::new(23),
875 destroycell.clone(),
876 )))
877 .await
878 .unwrap();
879 reactor.run_once().await.unwrap();
880
881 input
883 .send(Ok(OpenChanCellS2C::new(
884 CircId::new(101),
885 destroycell.clone(),
886 )))
887 .await
888 .unwrap();
889 let e = reactor.run_once().await.unwrap_err().unwrap_err();
890 assert_eq!(
891 format!("{}", e),
892 "Channel protocol violation: Destroy for nonexistent circuit"
893 );
894 });
895 }
896
897 #[test]
898 fn closing_if_reactor_dropped() {
899 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
900 let (chan, reactor, _output, _input) = new_reactor(rt);
901
902 assert!(!chan.is_closing());
903 drop(reactor);
904 assert!(chan.is_closing());
905
906 assert!(matches!(
907 chan.wait_for_close().await,
908 Err(ClosedUnexpectedly::ReactorDropped),
909 ));
910 });
911 }
912
913 #[test]
914 fn closing_if_reactor_shutdown() {
915 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
916 let (chan, reactor, _output, _input) = new_reactor(rt);
917
918 assert!(!chan.is_closing());
919 chan.terminate();
920 assert!(!chan.is_closing());
921
922 let r = reactor.run().await;
923 assert!(r.is_ok());
924 assert!(chan.is_closing());
925
926 assert!(chan.wait_for_close().await.is_ok());
927 });
928 }
929
930 #[test]
931 fn reactor_error_wait_for_close() {
932 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
933 let (chan, reactor, _output, mut input) = new_reactor(rt);
934
935 let created2_cell = msg::Created2::new(*b"hihi").into();
937 input
938 .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
939 .await
940 .unwrap();
941
942 let run_error = reactor.run().await.unwrap_err();
944
945 let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
947 else {
948 panic!("Expected a 'ReactorError'");
949 };
950
951 assert_eq!(run_error.to_string(), wait_error.to_string());
953 });
954 }
955}