1use super::circmap::{CircEnt, CircMap};
10use super::OpenChanCellS2C;
11use crate::channel::OpenChanMsgS2C;
12use crate::tunnel::circuit::halfcirc::HalfCirc;
13use crate::util::err::ReactorError;
14use crate::util::oneshot_broadcast;
15use crate::{Error, Result};
16use tor_async_utils::SinkPrepareExt as _;
17use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
18use tor_cell::chancell::ChanMsg;
19use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};
20use tor_memquota::mq_queue;
21use tor_rtcompat::SleepProvider;
22
23#[cfg_attr(not(target_os = "linux"), allow(unused))]
24use tor_error::error_report;
25#[cfg_attr(not(target_os = "linux"), allow(unused))]
26use tor_rtcompat::StreamOps;
27
28use futures::channel::mpsc;
29use oneshot_fused_workaround as oneshot;
30
31use futures::sink::SinkExt;
32use futures::stream::Stream;
33use futures::Sink;
34use futures::StreamExt as _;
35use futures::{select, select_biased};
36use tor_error::internal;
37
38use std::fmt;
39use std::pin::Pin;
40use std::sync::Arc;
41
42use crate::channel::{
43 codec::CodecError, kist::KistParams, padding, params::*, unique_id, ChannelDetails, CloseInfo,
44};
45use crate::tunnel::circuit::{celltypes::CreateResponse, CircuitRxSender};
46use tracing::{debug, trace};
47
48pub(super) type BoxedChannelStream = Box<
50 dyn Stream<Item = std::result::Result<OpenChanCellS2C, CodecError>> + Send + Unpin + 'static,
51>;
52pub(super) type BoxedChannelSink =
54 Box<dyn Sink<AnyChanCell, Error = CodecError> + Send + Unpin + 'static>;
55pub(super) type BoxedChannelStreamOps = Box<dyn StreamOps + Send + Unpin + 'static>;
57pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
59
60fn codec_err_to_chan(err: CodecError) -> Error {
63 match err {
64 CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
65 CodecError::EncCell(err) => Error::from_cell_enc(err, "channel cell"),
66 CodecError::DecCell(err) => Error::from_cell_dec(err, "channel cell"),
67 }
68}
69
70#[cfg_attr(docsrs, doc(cfg(feature = "testing")))]
72#[derive(Debug)]
73#[allow(unreachable_pub)] #[allow(clippy::exhaustive_enums)]
75pub enum CtrlMsg {
76 Shutdown,
78 CloseCircuit(CircId),
80 AllocateCircuit {
83 created_sender: oneshot::Sender<CreateResponse>,
85 sender: CircuitRxSender,
87 tx: ReactorResultChannel<(CircId, crate::tunnel::circuit::UniqId)>,
89 },
90 ConfigUpdate(Arc<ChannelPaddingInstructionsUpdates>),
99 KistConfigUpdate(KistParams),
105}
106
107#[must_use = "If you don't call run() on a reactor, the channel won't work."]
112pub struct Reactor<S: SleepProvider> {
113 pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
115 pub(super) reactor_closed_tx: oneshot_broadcast::Sender<Result<CloseInfo>>,
118 pub(super) cells: mq_queue::Receiver<AnyChanCell, mq_queue::MpscSpec>,
122 pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
126 pub(super) output: BoxedChannelSink,
130 #[cfg_attr(not(target_os = "linux"), allow(unused))]
132 pub(super) streamops: BoxedChannelStreamOps,
133 pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
135 pub(super) special_outgoing: SpecialOutgoing,
137 pub(super) circs: CircMap,
139 pub(super) unique_id: super::UniqId,
141 pub(super) details: Arc<ChannelDetails>,
143 pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
145 #[allow(dead_code)] pub(super) link_protocol: u16,
148}
149
150#[derive(Default, Debug, Clone)]
152pub(super) struct SpecialOutgoing {
153 pub(super) padding_negotiate: Option<PaddingNegotiate>,
155}
156
157impl SpecialOutgoing {
158 #[must_use = "SpecialOutgoing::next()'s return value must be actually sent"]
163 pub(super) fn next(&mut self) -> Option<AnyChanCell> {
164 if let Some(p) = self.padding_negotiate.take() {
167 return Some(p.into());
168 }
169 None
170 }
171}
172
173impl<S: SleepProvider> fmt::Display for Reactor<S> {
178 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
179 fmt::Debug::fmt(&self.unique_id, f)
180 }
181}
182
183impl<S: SleepProvider> Reactor<S> {
184 pub async fn run(mut self) -> Result<()> {
190 trace!(channel_id = %self, "Running reactor");
191 let result: Result<()> = loop {
192 match self.run_once().await {
193 Ok(()) => (),
194 Err(ReactorError::Shutdown) => break Ok(()),
195 Err(ReactorError::Err(e)) => break Err(e),
196 }
197 };
198 debug!(channel_id = %self, "Reactor stopped");
199 let close_msg = result.as_ref().map_err(Clone::clone).map(|()| CloseInfo);
201 self.reactor_closed_tx.send(close_msg);
202 result
203 }
204
205 async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
207 select! {
208
209 ret = self.output.prepare_send_from(async {
212 if let Some(l) = self.special_outgoing.next() {
215 self.padding_timer.as_mut().note_cell_sent();
218 return Some(l)
219 }
220
221 select_biased! {
222 n = self.cells.next() => {
223 self.padding_timer.as_mut().note_cell_sent();
240 n
241 },
242 p = self.padding_timer.as_mut().next() => {
243 Some(p.into())
245 },
246 }
247 }) => {
248 let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
249 let msg = msg.ok_or(ReactorError::Shutdown)?;
250 sendable.send(msg).map_err(codec_err_to_chan)?;
251 }
252
253 ret = self.control.next() => {
254 let ctrl = match ret {
255 None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
256 Some(x) => x,
257 };
258 self.handle_control(ctrl).await?;
259 }
260
261 ret = self.input.next() => {
262 let item = ret
263 .ok_or(ReactorError::Shutdown)?
264 .map_err(codec_err_to_chan)?;
265 crate::note_incoming_traffic();
266 self.handle_cell(item).await?;
267 }
268
269 }
270 Ok(()) }
272
273 async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
275 trace!(
276 channel_id = %self,
277 msg = ?msg,
278 "reactor received control message"
279 );
280
281 match msg {
282 CtrlMsg::Shutdown => panic!(), CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
284 CtrlMsg::AllocateCircuit {
285 created_sender,
286 sender,
287 tx,
288 } => {
289 let mut rng = rand::rng();
290 let my_unique_id = self.unique_id;
291 let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
292 let ret: Result<_> = self
293 .circs
294 .add_ent(&mut rng, created_sender, sender)
295 .map(|id| (id, circ_unique_id));
296 let _ = tx.send(ret); self.update_disused_since();
298 }
299 CtrlMsg::ConfigUpdate(updates) => {
300 if self.link_protocol == 4 {
301 return Ok(());
305 }
306
307 let ChannelPaddingInstructionsUpdates {
308 padding_enable,
311 padding_parameters,
312 padding_negotiate,
313 } = &*updates;
314 if let Some(parameters) = padding_parameters {
315 self.padding_timer.as_mut().reconfigure(parameters)?;
316 }
317 if let Some(enable) = padding_enable {
318 if *enable {
319 self.padding_timer.as_mut().enable();
320 } else {
321 self.padding_timer.as_mut().disable();
322 }
323 }
324 if let Some(padding_negotiate) = padding_negotiate {
325 self.special_outgoing.padding_negotiate = Some(padding_negotiate.clone());
329 }
330 }
331 CtrlMsg::KistConfigUpdate(kist) => self.apply_kist_params(&kist),
332 }
333 Ok(())
334 }
335
336 async fn handle_cell(&mut self, cell: OpenChanCellS2C) -> Result<()> {
339 let (circid, msg) = cell.into_circid_and_msg();
340 use OpenChanMsgS2C::*;
341
342 match msg {
343 Relay(_) | Padding(_) | Vpadding(_) => {} _ => trace!(
345 channel_id = %self,
346 "received {} for {}",
347 msg.cmd(),
348 CircId::get_or_zero(circid)
349 ),
350 }
351
352 match msg {
353 Relay(_) => self.deliver_relay(circid, msg.into()).await,
355
356 Destroy(_) => self.deliver_destroy(circid, msg.into()).await,
357
358 CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg.into()).await,
359
360 Padding(_) | Vpadding(_) => Ok(()),
362 }
363 }
364
365 async fn deliver_relay(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
367 let Some(circid) = circid else {
368 return Err(Error::ChanProto("Relay cell without circuit ID".into()));
369 };
370
371 let mut ent = self
372 .circs
373 .get_mut(circid)
374 .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
375
376 match &mut *ent {
377 CircEnt::Open(s) => {
378 if s.send(msg.try_into()?).await.is_err() {
380 drop(ent);
381 self.outbound_destroy_circ(circid).await?;
383 }
384 Ok(())
385 }
386 CircEnt::Opening(_, _) => Err(Error::ChanProto(
387 "Relay cell on pending circuit before CREATED* received".into(),
388 )),
389 CircEnt::DestroySent(hs) => hs.receive_cell(),
390 }
391 }
392
393 async fn deliver_created(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
396 let Some(circid) = circid else {
397 return Err(Error::ChanProto("'Created' cell without circuit ID".into()));
398 };
399
400 let target = self.circs.advance_from_opening(circid)?;
401 let created = msg.try_into()?;
402 target.send(created).map_err(|_| {
405 Error::from(internal!(
406 "Circuit queue rejected created message. Is it closing?"
407 ))
408 })
409 }
410
411 async fn deliver_destroy(&mut self, circid: Option<CircId>, msg: AnyChanMsg) -> Result<()> {
414 let Some(circid) = circid else {
415 return Err(Error::ChanProto("'Destroy' cell without circuit ID".into()));
416 };
417
418 let entry = self.circs.remove(circid);
420 self.update_disused_since();
421 match entry {
422 Some(CircEnt::Opening(oneshot, _)) => {
425 trace!(channel_id = %self, "Passing destroy to pending circuit {}", circid);
426 oneshot
427 .send(msg.try_into()?)
428 .map_err(|_| {
431 internal!("pending circuit wasn't interested in destroy cell?").into()
432 })
433 }
434 Some(CircEnt::Open(mut sink)) => {
436 trace!(channel_id = %self, "Passing destroy to open circuit {}", circid);
437 sink.send(msg.try_into()?)
438 .await
439 .map_err(|_| {
442 internal!("open circuit wasn't interested in destroy cell?").into()
443 })
444 }
445 Some(CircEnt::DestroySent(_)) => Ok(()),
447 None => {
449 trace!(channel_id = %self, "Destroy for nonexistent circuit {}", circid);
450 Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
451 }
452 }
453 }
454
455 async fn send_cell(&mut self, cell: AnyChanCell) -> Result<()> {
457 self.output.send(cell).await.map_err(codec_err_to_chan)?;
458 Ok(())
459 }
460
461 async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
464 trace!(channel_id = %self, "Circuit {} is gone; sending DESTROY", id);
465 self.circs.destroy_sent(id, HalfCirc::new(3000));
470 self.update_disused_since();
471 let destroy = Destroy::new(DestroyReason::NONE).into();
472 let cell = AnyChanCell::new(Some(id), destroy);
473 self.send_cell(cell).await?;
474
475 Ok(())
476 }
477
478 fn update_disused_since(&self) {
480 if self.circs.open_ent_count() == 0 {
481 self.details.unused_since.update_if_none();
483 } else {
484 self.details.unused_since.clear();
486 }
487 }
488
489 #[cfg(target_os = "linux")]
491 fn apply_kist_params(&self, params: &KistParams) {
492 use super::kist::KistMode;
493
494 let set_tcp_notsent_lowat = |v: u32| {
495 if let Err(e) = self.streamops.set_tcp_notsent_lowat(v) {
496 error_report!(e, "Failed to set KIST socket options");
499 }
500 };
501
502 match params.kist_enabled() {
503 KistMode::TcpNotSentLowat => set_tcp_notsent_lowat(params.tcp_notsent_lowat()),
504 KistMode::Disabled => set_tcp_notsent_lowat(u32::MAX),
505 }
506 }
507
508 #[cfg(not(target_os = "linux"))]
509 fn apply_kist_params(&self, params: &KistParams) {
510 use super::kist::KistMode;
511
512 if params.kist_enabled() != KistMode::Disabled {
513 tracing::warn!("KIST not currently supported on non-linux platforms");
514 }
515 }
516}
517
518#[cfg(test)]
519pub(crate) mod test {
520 #![allow(clippy::unwrap_used)]
521 use super::*;
522 use crate::channel::{ClosedUnexpectedly, UniqId};
523 use crate::fake_mpsc;
524 use crate::tunnel::circuit::CircParameters;
525 use crate::util::fake_mq;
526 use futures::sink::SinkExt;
527 use futures::stream::StreamExt;
528 use futures::task::SpawnExt;
529 use tor_cell::chancell::msg;
530 use tor_linkspec::OwnedChanTarget;
531 use tor_rtcompat::{NoOpStreamOpsHandle, Runtime};
532
533 type CodecResult = std::result::Result<OpenChanCellS2C, CodecError>;
534
535 pub(crate) fn new_reactor<R: Runtime>(
536 runtime: R,
537 ) -> (
538 Arc<crate::channel::Channel>,
539 Reactor<R>,
540 mpsc::Receiver<AnyChanCell>,
541 mpsc::Sender<CodecResult>,
542 ) {
543 let link_protocol = 4;
544 let (send1, recv1) = mpsc::channel(32);
545 let (send2, recv2) = mpsc::channel(32);
546 let unique_id = UniqId::new();
547 let dummy_target = OwnedChanTarget::builder()
548 .ed_identity([6; 32].into())
549 .rsa_identity([10; 20].into())
550 .build()
551 .unwrap();
552 let send1 = send1.sink_map_err(|e| {
553 trace!("got sink error: {:?}", e);
554 CodecError::DecCell(tor_cell::Error::ChanProto("dummy message".into()))
555 });
556 let stream_ops = NoOpStreamOpsHandle::default();
557 let (chan, reactor) = crate::channel::Channel::new(
558 link_protocol,
559 Box::new(send1),
560 Box::new(recv2),
561 Box::new(stream_ops),
562 unique_id,
563 dummy_target,
564 crate::ClockSkew::None,
565 runtime,
566 fake_mq(),
567 )
568 .expect("channel create failed");
569 (chan, reactor, recv1, send2)
570 }
571
572 #[test]
574 fn shutdown() {
575 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
576 let (chan, mut reactor, _output, _input) = new_reactor(rt);
577
578 chan.terminate();
579 let r = reactor.run_once().await;
580 assert!(matches!(r, Err(ReactorError::Shutdown)));
581 });
582 }
583
584 #[test]
586 fn shutdown2() {
587 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
588 use futures::future::FutureExt;
591 use futures::join;
592
593 let (chan, reactor, _output, _input) = new_reactor(rt);
594 let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
596
597 let rr = run_reactor.clone();
598
599 let exit_then_check = async {
600 assert!(rr.peek().is_none());
601 chan.terminate();
603 };
604
605 let (rr_s, _) = join!(run_reactor, exit_then_check);
606
607 assert!(rr_s);
609 });
610 }
611
612 #[test]
613 fn new_circ_closed() {
614 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
615 let (chan, mut reactor, mut output, _input) = new_reactor(rt.clone());
616 assert!(chan.duration_unused().is_some()); let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
619 let (pending, circr) = ret.unwrap();
620 rt.spawn(async {
621 let _ignore = circr.run().await;
622 })
623 .unwrap();
624 assert!(reac.is_ok());
625
626 let id = pending.peek_circid();
627
628 let ent = reactor.circs.get_mut(id);
629 assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
630 assert!(chan.duration_unused().is_none()); drop(pending);
635
636 reactor.run_once().await.unwrap();
637 let ent = reactor.circs.get_mut(id);
638 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
639 let cell = output.next().await.unwrap();
640 assert_eq!(cell.circid(), Some(id));
641 assert!(matches!(cell.msg(), AnyChanMsg::Destroy(_)));
642 assert!(chan.duration_unused().is_some()); });
644 }
645
646 #[test]
648 #[ignore] fn new_circ_create_failure() {
650 use std::time::Duration;
651 use tor_rtcompat::SleepProvider;
652
653 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
654 let (chan, mut reactor, mut output, mut input) = new_reactor(rt.clone());
655
656 let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
657 let (pending, circr) = ret.unwrap();
658 rt.spawn(async {
659 let _ignore = circr.run().await;
660 })
661 .unwrap();
662 assert!(reac.is_ok());
663
664 let circparams = CircParameters::default();
665
666 let id = pending.peek_circid();
667
668 let ent = reactor.circs.get_mut(id);
669 assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
670
671 #[allow(clippy::clone_on_copy)]
672 let rtc = rt.clone();
673 let send_response = async {
674 rtc.sleep(Duration::from_millis(100)).await;
675 trace!("sending createdfast");
676 let created_cell =
678 OpenChanCellS2C::new(Some(id), msg::CreatedFast::new(*b"x").into());
679 input.send(Ok(created_cell)).await.unwrap();
680 reactor.run_once().await.unwrap();
681 };
682
683 let (circ, _) = futures::join!(pending.create_firsthop_fast(circparams), send_response);
684 assert!(matches!(circ.err().unwrap(), Error::BadCircHandshakeAuth));
686
687 reactor.run_once().await.unwrap();
688
689 let cell_sent = output.next().await.unwrap();
691 assert!(matches!(cell_sent.msg(), msg::AnyChanMsg::CreateFast(_)));
692
693 let ent = reactor.circs.get_mut(id);
695 assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
696 });
697 }
698
699 #[test]
701 fn bad_cells() {
702 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
703 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
704
705 let created2_cell = msg::Created2::new(*b"hihi").into();
707 input
708 .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
709 .await
710 .unwrap();
711
712 let e = reactor.run_once().await.unwrap_err().unwrap_err();
713 assert_eq!(
714 format!("{}", e),
715 "Channel protocol violation: Unexpected CREATED* cell not on opening circuit"
716 );
717
718 let relay_cell = msg::Relay::new(b"abc").into();
720 input
721 .send(Ok(OpenChanCellS2C::new(CircId::new(4), relay_cell)))
722 .await
723 .unwrap();
724 let e = reactor.run_once().await.unwrap_err().unwrap_err();
725 assert_eq!(
726 format!("{}", e),
727 "Channel protocol violation: Relay cell on nonexistent circuit"
728 );
729
730 });
734 }
735
736 #[test]
737 fn deliver_relay() {
738 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
739 use crate::tunnel::circuit::celltypes::ClientCircChanMsg;
740 use oneshot_fused_workaround as oneshot;
741
742 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
743
744 let (_circ_stream_7, mut circ_stream_13) = {
745 let (snd1, _rcv1) = oneshot::channel();
746 let (snd2, rcv2) = fake_mpsc(64);
747 reactor
748 .circs
749 .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
750
751 let (snd3, rcv3) = fake_mpsc(64);
752 reactor
753 .circs
754 .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
755
756 reactor.circs.put_unchecked(
757 CircId::new(23).unwrap(),
758 CircEnt::DestroySent(HalfCirc::new(25)),
759 );
760 (rcv2, rcv3)
761 };
762
763 let relaycell: OpenChanMsgS2C = msg::Relay::new(b"do you suppose").into();
766 input
767 .send(Ok(OpenChanCellS2C::new(CircId::new(13), relaycell.clone())))
768 .await
769 .unwrap();
770 reactor.run_once().await.unwrap();
771 let got = circ_stream_13.next().await.unwrap();
772 assert!(matches!(got, ClientCircChanMsg::Relay(_)));
773
774 input
776 .send(Ok(OpenChanCellS2C::new(CircId::new(7), relaycell.clone())))
777 .await
778 .unwrap();
779 let e = reactor.run_once().await.unwrap_err().unwrap_err();
780 assert_eq!(
781 format!("{}", e),
782 "Channel protocol violation: Relay cell on pending circuit before CREATED* received"
783 );
784
785 input
787 .send(Ok(OpenChanCellS2C::new(
788 CircId::new(101),
789 relaycell.clone(),
790 )))
791 .await
792 .unwrap();
793 let e = reactor.run_once().await.unwrap_err().unwrap_err();
794 assert_eq!(
795 format!("{}", e),
796 "Channel protocol violation: Relay cell on nonexistent circuit"
797 );
798
799 for _ in 0..25 {
804 input
805 .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
806 .await
807 .unwrap();
808 reactor.run_once().await.unwrap(); }
810
811 input
813 .send(Ok(OpenChanCellS2C::new(CircId::new(23), relaycell.clone())))
814 .await
815 .unwrap();
816 let e = reactor.run_once().await.unwrap_err().unwrap_err();
817 assert_eq!(
818 format!("{}", e),
819 "Channel protocol violation: Too many cells received on destroyed circuit"
820 );
821 });
822 }
823
824 #[test]
825 fn deliver_destroy() {
826 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
827 use crate::tunnel::circuit::celltypes::*;
828 use oneshot_fused_workaround as oneshot;
829
830 let (_chan, mut reactor, _output, mut input) = new_reactor(rt);
831
832 let (circ_oneshot_7, mut circ_stream_13) = {
833 let (snd1, rcv1) = oneshot::channel();
834 let (snd2, _rcv2) = fake_mpsc(64);
835 reactor
836 .circs
837 .put_unchecked(CircId::new(7).unwrap(), CircEnt::Opening(snd1, snd2));
838
839 let (snd3, rcv3) = fake_mpsc(64);
840 reactor
841 .circs
842 .put_unchecked(CircId::new(13).unwrap(), CircEnt::Open(snd3));
843
844 reactor.circs.put_unchecked(
845 CircId::new(23).unwrap(),
846 CircEnt::DestroySent(HalfCirc::new(25)),
847 );
848 (rcv1, rcv3)
849 };
850
851 let destroycell: OpenChanMsgS2C = msg::Destroy::new(0.into()).into();
853 input
854 .send(Ok(OpenChanCellS2C::new(
855 CircId::new(7),
856 destroycell.clone(),
857 )))
858 .await
859 .unwrap();
860 reactor.run_once().await.unwrap();
861 let msg = circ_oneshot_7.await;
862 assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
863
864 input
866 .send(Ok(OpenChanCellS2C::new(
867 CircId::new(13),
868 destroycell.clone(),
869 )))
870 .await
871 .unwrap();
872 reactor.run_once().await.unwrap();
873 let msg = circ_stream_13.next().await.unwrap();
874 assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
875
876 input
878 .send(Ok(OpenChanCellS2C::new(
879 CircId::new(23),
880 destroycell.clone(),
881 )))
882 .await
883 .unwrap();
884 reactor.run_once().await.unwrap();
885
886 input
888 .send(Ok(OpenChanCellS2C::new(
889 CircId::new(101),
890 destroycell.clone(),
891 )))
892 .await
893 .unwrap();
894 let e = reactor.run_once().await.unwrap_err().unwrap_err();
895 assert_eq!(
896 format!("{}", e),
897 "Channel protocol violation: Destroy for nonexistent circuit"
898 );
899 });
900 }
901
902 #[test]
903 fn closing_if_reactor_dropped() {
904 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
905 let (chan, reactor, _output, _input) = new_reactor(rt);
906
907 assert!(!chan.is_closing());
908 drop(reactor);
909 assert!(chan.is_closing());
910
911 assert!(matches!(
912 chan.wait_for_close().await,
913 Err(ClosedUnexpectedly::ReactorDropped),
914 ));
915 });
916 }
917
918 #[test]
919 fn closing_if_reactor_shutdown() {
920 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
921 let (chan, reactor, _output, _input) = new_reactor(rt);
922
923 assert!(!chan.is_closing());
924 chan.terminate();
925 assert!(!chan.is_closing());
926
927 let r = reactor.run().await;
928 assert!(r.is_ok());
929 assert!(chan.is_closing());
930
931 assert!(chan.wait_for_close().await.is_ok());
932 });
933 }
934
935 #[test]
936 fn reactor_error_wait_for_close() {
937 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
938 let (chan, reactor, _output, mut input) = new_reactor(rt);
939
940 let created2_cell = msg::Created2::new(*b"hihi").into();
942 input
943 .send(Ok(OpenChanCellS2C::new(CircId::new(7), created2_cell)))
944 .await
945 .unwrap();
946
947 let run_error = reactor.run().await.unwrap_err();
949
950 let Err(ClosedUnexpectedly::ReactorError(wait_error)) = chan.wait_for_close().await
952 else {
953 panic!("Expected a 'ReactorError'");
954 };
955
956 assert_eq!(run_error.to_string(), wait_error.to_string());
958 });
959 }
960}