1pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::circuit::handshake::RelayCryptLayerProtocol;
51use crate::congestion::params::CongestionControlParams;
52use crate::crypto::cell::HopNum;
53use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
54use crate::memquota::{CircuitAccount, SpecificAccount as _};
55use crate::stream::queue::stream_queue;
56use crate::stream::xon_xoff::XonXoffReaderCtrl;
57use crate::stream::{
58 AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
59 StreamRateLimit, StreamReceiver,
60};
61use crate::tunnel::circuit::celltypes::*;
62use crate::tunnel::reactor::CtrlCmd;
63use crate::tunnel::reactor::{
64 CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
65};
66use crate::tunnel::{StreamTarget, TargetHop};
67use crate::util::notify::NotifySender;
68use crate::util::skew::ClockSkew;
69use crate::{Error, ResolveError, Result};
70use cfg_if::cfg_if;
71use educe::Educe;
72use path::HopDetail;
73use postage::watch;
74use tor_cell::relaycell::{self, RelayCellFormat};
75use tor_cell::{
76 chancell::CircId,
77 relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
78};
79
80use tor_error::{bad_api_usage, internal, into_internal};
81use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
82use tor_protover::named;
83
84pub use crate::crypto::binding::CircuitBinding;
85pub use crate::memquota::StreamAccount;
86pub use crate::tunnel::circuit::unique_id::UniqId;
87
88#[cfg(feature = "hs-service")]
89use {
90 crate::stream::{IncomingCmdChecker, IncomingStream},
91 crate::tunnel::reactor::StreamReqInfo,
92};
93
94use futures::channel::mpsc;
95use oneshot_fused_workaround as oneshot;
96
97use crate::congestion::sendme::StreamRecvWindow;
98use crate::DynTimeProvider;
99use futures::FutureExt as _;
100use std::collections::HashMap;
101use std::net::IpAddr;
102use std::sync::{Arc, Mutex};
103use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
104
105use crate::crypto::handshake::ntor::NtorPublicKey;
106
107pub use path::{Path, PathEntry};
108
109pub const CIRCUIT_BUFFER_SIZE: usize = 128;
111
112#[cfg(feature = "send-control-msg")]
113use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
114
115pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
116#[cfg(feature = "send-control-msg")]
117#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
118pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
119
120pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
122pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
124
125pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
127pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
129
130#[derive(Debug)]
131pub struct ClientCirc {
174 mutable: Arc<TunnelMutableState>,
176 unique_id: UniqId,
178 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
180 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
182 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
185 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
186 #[cfg(test)]
188 circid: CircId,
189 memquota: CircuitAccount,
191 time_provider: DynTimeProvider,
193}
194
195#[derive(Debug, Default)]
215pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
216
217impl TunnelMutableState {
218 pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
220 #[allow(unused)] let state = self
222 .0
223 .lock()
224 .expect("lock poisoned")
225 .insert(unique_id, mutable);
226
227 debug_assert!(state.is_none());
228 }
229
230 pub(super) fn remove(&self, unique_id: UniqId) {
232 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
234
235 debug_assert!(state.is_some());
236 }
237
238 fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
242 let lock = self.0.lock().expect("lock poisoned");
243 let mutable = lock
244 .get(&unique_id)
245 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
246
247 Ok(mutable.path())
248 }
249
250 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
255 let lock = self.0.lock().expect("lock poisoned");
256 let mutable = lock
257 .get(&unique_id)
258 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
259
260 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
261 path::HopDetail::Relay(r) => r,
262 #[cfg(feature = "hs-common")]
263 path::HopDetail::Virtual => {
264 panic!("somehow made a circuit with a virtual first hop.")
265 }
266 });
267
268 Ok(first_hop)
269 }
270
271 fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
277 let lock = self.0.lock().expect("lock poisoned");
278 let mutable = lock
279 .get(&unique_id)
280 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
281
282 Ok(mutable.last_hop_num())
283 }
284
285 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
289 let lock = self.0.lock().expect("lock poisoned");
290 let mutable = lock
291 .get(&unique_id)
292 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
293
294 Ok(mutable.n_hops())
295 }
296
297 fn n_legs(&self) -> usize {
302 let lock = self.0.lock().expect("lock poisoned");
303 lock.len()
304 }
305}
306
307#[derive(Educe, Default)]
309#[educe(Debug)]
310pub(super) struct MutableState(Mutex<CircuitState>);
311
312impl MutableState {
313 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
315 let mut mutable = self.0.lock().expect("poisoned lock");
316 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
317 mutable.binding.push(binding);
318 }
319
320 pub(super) fn path(&self) -> Arc<path::Path> {
322 let mutable = self.0.lock().expect("poisoned lock");
323 Arc::clone(&mutable.path)
324 }
325
326 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
329 let mutable = self.0.lock().expect("poisoned lock");
330
331 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
332 }
335
336 fn first_hop(&self) -> Option<HopDetail> {
338 let mutable = self.0.lock().expect("poisoned lock");
339 mutable.path.first_hop()
340 }
341
342 fn last_hop_num(&self) -> Option<HopNum> {
349 let mutable = self.0.lock().expect("poisoned lock");
350 mutable.path.last_hop_num()
351 }
352
353 fn n_hops(&self) -> usize {
360 let mutable = self.0.lock().expect("poisoned lock");
361 mutable.path.n_hops()
362 }
363}
364
365#[derive(Educe, Default)]
367#[educe(Debug)]
368pub(super) struct CircuitState {
369 path: Arc<path::Path>,
375
376 #[educe(Debug(ignore))]
384 binding: Vec<Option<CircuitBinding>>,
385}
386
387pub struct PendingClientCirc {
392 recvcreated: oneshot::Receiver<CreateResponse>,
395 circ: Arc<ClientCirc>,
397}
398
399#[non_exhaustive]
411#[derive(Clone, Debug)]
412pub struct CircParameters {
413 pub extend_by_ed25519_id: bool,
416 pub ccontrol: CongestionControlParams,
418
419 pub n_incoming_cells_permitted: Option<u32>,
429
430 pub n_outgoing_cells_permitted: Option<u32>,
445}
446
447#[derive(Debug, Clone, Copy, Eq, PartialEq)]
455pub(super) enum HopNegotiationType {
456 None,
458 HsV3,
467 Full,
469}
470
471#[derive(Clone, Debug)]
483pub(super) struct HopSettings {
484 pub(super) ccontrol: CongestionControlParams,
486
487 pub(super) n_incoming_cells_permitted: Option<u32>,
489
490 pub(super) n_outgoing_cells_permitted: Option<u32>,
492
493 relay_crypt_protocol: RelayCryptLayerProtocol,
495}
496
497impl HopSettings {
498 #[allow(clippy::unnecessary_wraps)] pub(super) fn from_params_and_caps(
509 hoptype: HopNegotiationType,
510 params: &CircParameters,
511 caps: &tor_protover::Protocols,
512 ) -> Result<Self> {
513 let mut ccontrol = params.ccontrol.clone();
514 match ccontrol.alg() {
515 crate::ccparams::Algorithm::FixedWindow(_) => {}
516 crate::ccparams::Algorithm::Vegas(_) => {
517 if !caps.supports_named_subver(named::FLOWCTRL_CC) {
519 ccontrol.use_fallback_alg();
520 }
521 }
522 };
523 if hoptype == HopNegotiationType::None {
524 ccontrol.use_fallback_alg();
525 } else if hoptype == HopNegotiationType::HsV3 {
526 ccontrol.use_fallback_alg();
530 }
531 let ccontrol = ccontrol; let relay_crypt_protocol = match hoptype {
536 HopNegotiationType::None => RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0),
537 HopNegotiationType::HsV3 => {
538 cfg_if! {
540 if #[cfg(feature = "hs-common")] {
541 RelayCryptLayerProtocol::HsV3(RelayCellFormat::V0)
542 } else {
543 return Err(
544 internal!("Unexpectedly tried to negotiate HsV3 without support!").into(),
545 );
546 }
547 }
548 }
549 HopNegotiationType::Full => {
550 cfg_if! {
551 if #[cfg(all(feature = "flowctl-cc", feature = "counter-galois-onion"))] {
552 #[allow(clippy::overly_complex_bool_expr)]
553 if ccontrol.alg().compatible_with_cgo()
554 && caps.supports_named_subver(named::RELAY_NEGOTIATE_SUBPROTO)
555 && caps.supports_named_subver(named::RELAY_CRYPT_CGO)
556 && false {
559 RelayCryptLayerProtocol::Cgo
560 } else {
561 RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
562 }
563 } else {
564 RelayCryptLayerProtocol::Tor1(RelayCellFormat::V0)
565 }
566 }
567 }
568 };
569
570 Ok(Self {
571 ccontrol,
572 relay_crypt_protocol,
573 n_incoming_cells_permitted: params.n_incoming_cells_permitted,
574 n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
575 })
576 }
577
578 pub(super) fn relay_crypt_protocol(&self) -> RelayCryptLayerProtocol {
580 #[cfg(feature = "counter-galois-onion")]
583 assert!(
584 !matches!(self.relay_crypt_protocol, RelayCryptLayerProtocol::Cgo),
585 "Somehow negotiated CGO, but CGO is not yet supported!!"
586 );
587 self.relay_crypt_protocol
588 }
589}
590
591#[cfg(test)]
592impl std::default::Default for CircParameters {
593 fn default() -> Self {
594 Self {
595 extend_by_ed25519_id: true,
596 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
597 n_incoming_cells_permitted: None,
598 n_outgoing_cells_permitted: None,
599 }
600 }
601}
602
603impl CircParameters {
604 pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
606 Self {
607 extend_by_ed25519_id,
608 ccontrol,
609 n_incoming_cells_permitted: None,
610 n_outgoing_cells_permitted: None,
611 }
612 }
613}
614
615impl ClientCirc {
616 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
624 Ok(self
625 .mutable
626 .first_hop(self.unique_id)
627 .map_err(|_| Error::CircuitClosed)?
628 .expect("called first_hop on an un-constructed circuit"))
629 }
630
631 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
643 let path = self.path_ref()?;
644 Ok(path
645 .hops()
646 .last()
647 .expect("Called last_hop an an un-constructed circuit")
648 .as_chan_target()
649 .map(OwnedChanTarget::from_chan_target))
650 }
651
652 pub fn last_hop_num(&self) -> Result<HopNum> {
662 Ok(self
663 .mutable
664 .last_hop_num(self.unique_id)?
665 .ok_or_else(|| internal!("no last hop index"))?)
666 }
667
668 pub fn last_hop(&self) -> Result<TargetHop> {
673 let hop_num = self
674 .mutable
675 .last_hop_num(self.unique_id)?
676 .ok_or_else(|| bad_api_usage!("no last hop"))?;
677 Ok((self.unique_id, hop_num).into())
678 }
679
680 pub fn path_ref(&self) -> Result<Arc<Path>> {
685 self.mutable
686 .path_ref(self.unique_id)
687 .map_err(|_| Error::CircuitClosed)
688 }
689
690 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
694 let (tx, rx) = oneshot::channel();
695
696 self.control
697 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
698 .map_err(|_| Error::CircuitClosed)?;
699
700 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
701 }
702
703 pub fn mq_account(&self) -> &CircuitAccount {
705 &self.memquota
706 }
707
708 #[cfg(feature = "hs-service")]
716 pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
717 let (sender, receiver) = oneshot::channel();
718 let msg = CtrlCmd::GetBindingKey { hop, done: sender };
719 self.command
720 .unbounded_send(msg)
721 .map_err(|_| Error::CircuitClosed)?;
722
723 receiver.await.map_err(|_| Error::CircuitClosed)?
724 }
725
726 #[cfg(feature = "send-control-msg")]
804 pub async fn start_conversation(
805 &self,
806 msg: Option<relaycell::msg::AnyRelayMsg>,
807 reply_handler: impl MsgHandler + Send + 'static,
808 hop: TargetHop,
809 ) -> Result<Conversation<'_>> {
810 let (sender, receiver) = oneshot::channel();
813 self.command
814 .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
815 .map_err(|_| Error::CircuitClosed)?;
816 let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
817 let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
818 let conversation = Conversation(self);
819 conversation.send_internal(msg, Some(handler)).await?;
820 Ok(conversation)
821 }
822
823 #[cfg(feature = "send-control-msg")]
829 pub async fn send_raw_msg(
830 &self,
831 msg: relaycell::msg::AnyRelayMsg,
832 hop: TargetHop,
833 ) -> Result<()> {
834 let (sender, receiver) = oneshot::channel();
835 let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
836 self.control
837 .unbounded_send(ctrl_msg)
838 .map_err(|_| Error::CircuitClosed)?;
839
840 receiver.await.map_err(|_| Error::CircuitClosed)?
841 }
842
843 #[cfg(feature = "hs-service")]
863 pub async fn allow_stream_requests(
864 self: &Arc<ClientCirc>,
865 allow_commands: &[relaycell::RelayCmd],
866 hop: TargetHop,
867 filter: impl crate::stream::IncomingStreamRequestFilter,
868 ) -> Result<impl futures::Stream<Item = IncomingStream>> {
869 use futures::stream::StreamExt;
870
871 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
873
874 let circ_count = self.mutable.n_legs();
876 if circ_count != 1 {
877 return Err(
878 internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
879 );
880 }
881
882 let time_prov = self.time_provider.clone();
883 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
884 let (incoming_sender, incoming_receiver) =
885 MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
886 let (tx, rx) = oneshot::channel();
887
888 self.command
889 .unbounded_send(CtrlCmd::AwaitStreamRequest {
890 cmd_checker,
891 incoming_sender,
892 hop,
893 done: tx,
894 filter: Box::new(filter),
895 })
896 .map_err(|_| Error::CircuitClosed)?;
897
898 rx.await.map_err(|_| Error::CircuitClosed)??;
900
901 let allowed_hop_loc = match hop {
902 TargetHop::Hop(loc) => Some(loc),
903 _ => None,
904 }
905 .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
906
907 let circ = Arc::clone(self);
908 Ok(incoming_receiver.map(move |req_ctx| {
909 let StreamReqInfo {
910 req,
911 stream_id,
912 hop,
913 receiver,
914 msg_tx,
915 rate_limit_stream,
916 drain_rate_request_stream,
917 memquota,
918 relay_cell_format,
919 } = req_ctx;
920
921 assert_eq!(allowed_hop_loc, hop);
926
927 let target = StreamTarget {
934 circ: Arc::clone(&circ),
935 tx: msg_tx,
936 hop: allowed_hop_loc,
937 stream_id,
938 relay_cell_format,
939 rate_limit_stream,
940 };
941
942 let xon_xoff_reader_ctrl =
944 XonXoffReaderCtrl::new(drain_rate_request_stream, target.clone());
945
946 let reader = StreamReceiver {
947 target: target.clone(),
948 receiver,
949 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
950 ended: false,
951 };
952
953 let components = StreamComponents {
954 stream_receiver: reader,
955 target,
956 memquota,
957 xon_xoff_reader_ctrl,
958 };
959
960 IncomingStream::new(circ.time_provider.clone(), req, components)
961 }))
962 }
963
964 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
967 where
968 Tg: CircTarget,
969 {
970 if target
982 .protovers()
983 .supports_named_subver(named::RELAY_NTORV3)
984 {
985 self.extend_ntor_v3(target, params).await
986 } else {
987 self.extend_ntor(target, params).await
988 }
989 }
990
991 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
994 where
995 Tg: CircTarget,
996 {
997 let key = NtorPublicKey {
998 id: *target
999 .rsa_identity()
1000 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1001 pk: *target.ntor_onion_key(),
1002 };
1003 let mut linkspecs = target
1004 .linkspecs()
1005 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
1006 if !params.extend_by_ed25519_id {
1007 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
1008 }
1009
1010 let (tx, rx) = oneshot::channel();
1011
1012 let peer_id = OwnedChanTarget::from_chan_target(target);
1013 let settings = HopSettings::from_params_and_caps(
1014 HopNegotiationType::None,
1015 ¶ms,
1016 target.protovers(),
1017 )?;
1018 self.control
1019 .unbounded_send(CtrlMsg::ExtendNtor {
1020 peer_id,
1021 public_key: key,
1022 linkspecs,
1023 settings,
1024 done: tx,
1025 })
1026 .map_err(|_| Error::CircuitClosed)?;
1027
1028 rx.await.map_err(|_| Error::CircuitClosed)??;
1029
1030 Ok(())
1031 }
1032
1033 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
1036 where
1037 Tg: CircTarget,
1038 {
1039 let key = NtorV3PublicKey {
1040 id: *target
1041 .ed_identity()
1042 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1043 pk: *target.ntor_onion_key(),
1044 };
1045 let mut linkspecs = target
1046 .linkspecs()
1047 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
1048 if !params.extend_by_ed25519_id {
1049 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
1050 }
1051
1052 let (tx, rx) = oneshot::channel();
1053
1054 let peer_id = OwnedChanTarget::from_chan_target(target);
1055 let settings = HopSettings::from_params_and_caps(
1056 HopNegotiationType::Full,
1057 ¶ms,
1058 target.protovers(),
1059 )?;
1060 self.control
1061 .unbounded_send(CtrlMsg::ExtendNtorV3 {
1062 peer_id,
1063 public_key: key,
1064 linkspecs,
1065 settings,
1066 done: tx,
1067 })
1068 .map_err(|_| Error::CircuitClosed)?;
1069
1070 rx.await.map_err(|_| Error::CircuitClosed)??;
1071
1072 Ok(())
1073 }
1074
1075 #[cfg(feature = "hs-common")]
1099 pub async fn extend_virtual(
1100 &self,
1101 protocol: handshake::RelayProtocol,
1102 role: handshake::HandshakeRole,
1103 seed: impl handshake::KeyGenerator,
1104 params: &CircParameters,
1105 capabilities: &tor_protover::Protocols,
1106 ) -> Result<()> {
1107 use self::handshake::BoxedClientLayer;
1108
1109 let negotiation_type = match protocol {
1111 handshake::RelayProtocol::HsV3 => HopNegotiationType::HsV3,
1112 };
1113 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
1114
1115 let BoxedClientLayer { fwd, back, binding } =
1116 protocol.construct_client_layers(role, seed)?;
1117
1118 let settings = HopSettings::from_params_and_caps(negotiation_type, params, capabilities)?;
1119 let (tx, rx) = oneshot::channel();
1120 let message = CtrlCmd::ExtendVirtual {
1121 cell_crypto: (fwd, back, binding),
1122 settings,
1123 done: tx,
1124 };
1125
1126 self.command
1127 .unbounded_send(message)
1128 .map_err(|_| Error::CircuitClosed)?;
1129
1130 rx.await.map_err(|_| Error::CircuitClosed)?
1131 }
1132
1133 async fn begin_stream_impl(
1141 self: &Arc<ClientCirc>,
1142 begin_msg: AnyRelayMsg,
1143 cmd_checker: AnyCmdChecker,
1144 ) -> Result<StreamComponents> {
1145 let hop = TargetHop::LastHop;
1148
1149 let time_prov = self.time_provider.clone();
1150
1151 let memquota = StreamAccount::new(self.mq_account())?;
1152 let (sender, receiver) = stream_queue(STREAM_READER_BUFFER, &memquota, &time_prov)?;
1153
1154 let (tx, rx) = oneshot::channel();
1155 let (msg_tx, msg_rx) =
1156 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
1157
1158 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
1159
1160 let mut drain_rate_request_tx = NotifySender::new_typed();
1164 let drain_rate_request_rx = drain_rate_request_tx.subscribe();
1165
1166 self.control
1167 .unbounded_send(CtrlMsg::BeginStream {
1168 hop,
1169 message: begin_msg,
1170 sender,
1171 rx: msg_rx,
1172 rate_limit_notifier: rate_limit_tx,
1173 drain_rate_requester: drain_rate_request_tx,
1174 done: tx,
1175 cmd_checker,
1176 })
1177 .map_err(|_| Error::CircuitClosed)?;
1178
1179 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
1180
1181 let target = StreamTarget {
1182 circ: self.clone(),
1183 tx: msg_tx,
1184 hop,
1185 stream_id,
1186 relay_cell_format,
1187 rate_limit_stream: rate_limit_rx,
1188 };
1189
1190 let xon_xoff_reader_ctrl = XonXoffReaderCtrl::new(drain_rate_request_rx, target.clone());
1192
1193 let stream_receiver = StreamReceiver {
1194 target: target.clone(),
1195 receiver,
1196 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
1197 ended: false,
1198 };
1199
1200 let components = StreamComponents {
1201 stream_receiver,
1202 target,
1203 memquota,
1204 xon_xoff_reader_ctrl,
1205 };
1206
1207 Ok(components)
1208 }
1209
1210 async fn begin_data_stream(
1213 self: &Arc<ClientCirc>,
1214 msg: AnyRelayMsg,
1215 optimistic: bool,
1216 ) -> Result<DataStream> {
1217 let components = self
1218 .begin_stream_impl(msg, DataCmdChecker::new_any())
1219 .await?;
1220
1221 let StreamComponents {
1222 stream_receiver,
1223 target,
1224 memquota,
1225 xon_xoff_reader_ctrl,
1226 } = components;
1227
1228 let mut stream = DataStream::new(
1229 self.time_provider.clone(),
1230 stream_receiver,
1231 xon_xoff_reader_ctrl,
1232 target,
1233 memquota,
1234 );
1235 if !optimistic {
1236 stream.wait_for_connection().await?;
1237 }
1238 Ok(stream)
1239 }
1240
1241 pub async fn begin_stream(
1247 self: &Arc<ClientCirc>,
1248 target: &str,
1249 port: u16,
1250 parameters: Option<StreamParameters>,
1251 ) -> Result<DataStream> {
1252 let parameters = parameters.unwrap_or_default();
1253 let begin_flags = parameters.begin_flags();
1254 let optimistic = parameters.is_optimistic();
1255 let target = if parameters.suppressing_hostname() {
1256 ""
1257 } else {
1258 target
1259 };
1260 let beginmsg = Begin::new(target, port, begin_flags)
1261 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
1262 self.begin_data_stream(beginmsg.into(), optimistic).await
1263 }
1264
1265 pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
1268 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
1273 .await
1274 }
1275
1276 pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
1282 let resolve_msg = Resolve::new(hostname);
1283
1284 let resolved_msg = self.try_resolve(resolve_msg).await?;
1285
1286 resolved_msg
1287 .into_answers()
1288 .into_iter()
1289 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1290 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1291 Ok(_) => None,
1292 Err(e) => Some(Err(e)),
1293 })
1294 .collect()
1295 }
1296
1297 pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1303 let resolve_ptr_msg = Resolve::new_reverse(&addr);
1304
1305 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1306
1307 resolved_msg
1308 .into_answers()
1309 .into_iter()
1310 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1311 Ok(ResolvedVal::Hostname(v)) => Some(
1312 String::from_utf8(v)
1313 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1314 ),
1315 Ok(_) => None,
1316 Err(e) => Some(Err(e)),
1317 })
1318 .collect()
1319 }
1320
1321 async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1324 let components = self
1325 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1326 .await?;
1327
1328 let StreamComponents {
1329 stream_receiver,
1330 target: _,
1331 memquota,
1332 xon_xoff_reader_ctrl: _,
1333 } = components;
1334
1335 let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
1336 resolve_stream.read_msg().await
1337 }
1338
1339 pub fn terminate(&self) {
1350 let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1351 }
1352
1353 pub(crate) fn protocol_error(&self) {
1361 self.terminate();
1362 }
1363
1364 pub fn is_closing(&self) -> bool {
1366 self.control.is_closed()
1367 }
1368
1369 pub fn unique_id(&self) -> UniqId {
1371 self.unique_id
1372 }
1373
1374 pub fn n_hops(&self) -> Result<usize> {
1381 self.mutable
1382 .n_hops(self.unique_id)
1383 .map_err(|_| Error::CircuitClosed)
1384 }
1385
1386 #[cfg(feature = "experimental-api")]
1393 pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1394 self.reactor_closed_rx.clone().map(|_| ())
1395 }
1396}
1397
1398#[cfg(feature = "send-control-msg")]
1406#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1407pub struct Conversation<'r>(&'r ClientCirc);
1408
1409#[cfg(feature = "send-control-msg")]
1410#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1411impl Conversation<'_> {
1412 pub async fn send_message(&self, msg: relaycell::msg::AnyRelayMsg) -> Result<()> {
1417 self.send_internal(Some(msg), None).await
1418 }
1419
1420 pub(crate) async fn send_internal(
1424 &self,
1425 msg: Option<relaycell::msg::AnyRelayMsg>,
1426 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1427 ) -> Result<()> {
1428 let msg = msg.map(|msg| relaycell::AnyRelayMsgOuter::new(None, msg));
1429 let (sender, receiver) = oneshot::channel();
1430
1431 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1432 msg,
1433 handler,
1434 sender,
1435 };
1436 self.0
1437 .control
1438 .unbounded_send(ctrl_msg)
1439 .map_err(|_| Error::CircuitClosed)?;
1440
1441 receiver.await.map_err(|_| Error::CircuitClosed)?
1442 }
1443}
1444
1445impl PendingClientCirc {
1446 pub(crate) fn new(
1452 id: CircId,
1453 channel: Arc<Channel>,
1454 createdreceiver: oneshot::Receiver<CreateResponse>,
1455 input: CircuitRxReceiver,
1456 unique_id: UniqId,
1457 runtime: DynTimeProvider,
1458 memquota: CircuitAccount,
1459 ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1460 let time_provider = channel.time_provider().clone();
1461 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1462 Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1463
1464 let circuit = ClientCirc {
1465 mutable,
1466 unique_id,
1467 control: control_tx,
1468 command: command_tx,
1469 reactor_closed_rx: reactor_closed_rx.shared(),
1470 #[cfg(test)]
1471 circid: id,
1472 memquota,
1473 time_provider,
1474 };
1475
1476 let pending = PendingClientCirc {
1477 recvcreated: createdreceiver,
1478 circ: Arc::new(circuit),
1479 };
1480 (pending, reactor)
1481 }
1482
1483 pub fn peek_unique_id(&self) -> UniqId {
1485 self.circ.unique_id
1486 }
1487
1488 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1495 let protocols = tor_protover::Protocols::new();
1499 let settings =
1500 HopSettings::from_params_and_caps(HopNegotiationType::None, ¶ms, &protocols)?;
1501 let (tx, rx) = oneshot::channel();
1502 self.circ
1503 .control
1504 .unbounded_send(CtrlMsg::Create {
1505 recv_created: self.recvcreated,
1506 handshake: CircuitHandshake::CreateFast,
1507 settings,
1508 done: tx,
1509 })
1510 .map_err(|_| Error::CircuitClosed)?;
1511
1512 rx.await.map_err(|_| Error::CircuitClosed)??;
1513
1514 Ok(self.circ)
1515 }
1516
1517 pub async fn create_firsthop<Tg>(
1522 self,
1523 target: &Tg,
1524 params: CircParameters,
1525 ) -> Result<Arc<ClientCirc>>
1526 where
1527 Tg: tor_linkspec::CircTarget,
1528 {
1529 if target
1531 .protovers()
1532 .supports_named_subver(named::RELAY_NTORV3)
1533 {
1534 self.create_firsthop_ntor_v3(target, params).await
1535 } else {
1536 self.create_firsthop_ntor(target, params).await
1537 }
1538 }
1539
1540 pub async fn create_firsthop_ntor<Tg>(
1545 self,
1546 target: &Tg,
1547 params: CircParameters,
1548 ) -> Result<Arc<ClientCirc>>
1549 where
1550 Tg: tor_linkspec::CircTarget,
1551 {
1552 let (tx, rx) = oneshot::channel();
1553 let settings = HopSettings::from_params_and_caps(
1554 HopNegotiationType::None,
1555 ¶ms,
1556 target.protovers(),
1557 )?;
1558
1559 self.circ
1560 .control
1561 .unbounded_send(CtrlMsg::Create {
1562 recv_created: self.recvcreated,
1563 handshake: CircuitHandshake::Ntor {
1564 public_key: NtorPublicKey {
1565 id: *target
1566 .rsa_identity()
1567 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1568 pk: *target.ntor_onion_key(),
1569 },
1570 ed_identity: *target
1571 .ed_identity()
1572 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1573 },
1574 settings,
1575 done: tx,
1576 })
1577 .map_err(|_| Error::CircuitClosed)?;
1578
1579 rx.await.map_err(|_| Error::CircuitClosed)??;
1580
1581 Ok(self.circ)
1582 }
1583
1584 pub async fn create_firsthop_ntor_v3<Tg>(
1593 self,
1594 target: &Tg,
1595 params: CircParameters,
1596 ) -> Result<Arc<ClientCirc>>
1597 where
1598 Tg: tor_linkspec::CircTarget,
1599 {
1600 let settings = HopSettings::from_params_and_caps(
1601 HopNegotiationType::Full,
1602 ¶ms,
1603 target.protovers(),
1604 )?;
1605 let (tx, rx) = oneshot::channel();
1606
1607 self.circ
1608 .control
1609 .unbounded_send(CtrlMsg::Create {
1610 recv_created: self.recvcreated,
1611 handshake: CircuitHandshake::NtorV3 {
1612 public_key: NtorV3PublicKey {
1613 id: *target
1614 .ed_identity()
1615 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1616 pk: *target.ntor_onion_key(),
1617 },
1618 },
1619 settings,
1620 done: tx,
1621 })
1622 .map_err(|_| Error::CircuitClosed)?;
1623
1624 rx.await.map_err(|_| Error::CircuitClosed)??;
1625
1626 Ok(self.circ)
1627 }
1628}
1629
1630#[derive(Debug)]
1638pub(crate) struct StreamComponents {
1639 pub(crate) stream_receiver: StreamReceiver,
1641 pub(crate) target: StreamTarget,
1643 pub(crate) memquota: StreamAccount,
1645 pub(crate) xon_xoff_reader_ctrl: XonXoffReaderCtrl,
1647}
1648
1649fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1652 match val {
1653 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1654 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1655 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1656 _ => Ok(val),
1657 }
1658}
1659
1660#[cfg(test)]
1661pub(crate) mod test {
1662 #![allow(clippy::bool_assert_comparison)]
1664 #![allow(clippy::clone_on_copy)]
1665 #![allow(clippy::dbg_macro)]
1666 #![allow(clippy::mixed_attributes_style)]
1667 #![allow(clippy::print_stderr)]
1668 #![allow(clippy::print_stdout)]
1669 #![allow(clippy::single_char_pattern)]
1670 #![allow(clippy::unwrap_used)]
1671 #![allow(clippy::unchecked_duration_subtraction)]
1672 #![allow(clippy::useless_vec)]
1673 #![allow(clippy::needless_pass_by_value)]
1674 use super::*;
1677 use crate::channel::OpenChanCellS2C;
1678 use crate::channel::{test::new_reactor, CodecError};
1679 use crate::congestion::test_utils::params::build_cc_vegas_params;
1680 use crate::crypto::cell::RelayCellBody;
1681 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1682 #[cfg(feature = "hs-service")]
1683 use crate::stream::IncomingStreamRequestFilter;
1684 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1685 use futures::channel::mpsc::{Receiver, Sender};
1686 use futures::io::{AsyncReadExt, AsyncWriteExt};
1687 use futures::sink::SinkExt;
1688 use futures::stream::StreamExt;
1689 use futures::task::SpawnExt;
1690 use hex_literal::hex;
1691 use std::collections::{HashMap, VecDeque};
1692 use std::fmt::Debug;
1693 use std::time::Duration;
1694 use tor_basic_utils::test_rng::testing_rng;
1695 use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
1696 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1697 use tor_cell::relaycell::msg::SendmeTag;
1698 use tor_cell::relaycell::{
1699 msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1700 };
1701 use tor_linkspec::OwnedCircTarget;
1702 use tor_memquota::HasMemoryCost;
1703 use tor_rtcompat::Runtime;
1704 use tracing::trace;
1705 use tracing_test::traced_test;
1706
1707 #[cfg(feature = "conflux")]
1708 use {
1709 crate::tunnel::reactor::ConfluxHandshakeResult,
1710 crate::util::err::ConfluxHandshakeError,
1711 futures::future::FusedFuture,
1712 futures::lock::Mutex as AsyncMutex,
1713 std::pin::Pin,
1714 std::result::Result as StdResult,
1715 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1716 tor_cell::relaycell::msg::ConfluxLink,
1717 tor_rtmock::MockRuntime,
1718 };
1719
1720 impl PendingClientCirc {
1721 pub(crate) fn peek_circid(&self) -> CircId {
1723 self.circ.circid
1724 }
1725 }
1726
1727 impl ClientCirc {
1728 pub(crate) fn peek_circid(&self) -> CircId {
1730 self.circid
1731 }
1732 }
1733
1734 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1735 let rfmt = RelayCellFormat::V0;
1737 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1738 .encode(rfmt, &mut testing_rng())
1739 .unwrap();
1740 let chanmsg = chanmsg::Relay::from(body);
1741 ClientCircChanMsg::Relay(chanmsg)
1742 }
1743
1744 const EXAMPLE_SK: [u8; 32] =
1746 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1747 const EXAMPLE_PK: [u8; 32] =
1748 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1749 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1750 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1751
1752 #[cfg(test)]
1754 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1755 buffer: usize,
1756 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1757 crate::fake_mpsc(buffer)
1758 }
1759
1760 fn example_target() -> OwnedCircTarget {
1762 let mut builder = OwnedCircTarget::builder();
1763 builder
1764 .chan_target()
1765 .ed_identity(EXAMPLE_ED_ID.into())
1766 .rsa_identity(EXAMPLE_RSA_ID.into());
1767 builder
1768 .ntor_onion_key(EXAMPLE_PK.into())
1769 .protocols("FlowCtrl=1-2".parse().unwrap())
1770 .build()
1771 .unwrap()
1772 }
1773 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1774 crate::crypto::handshake::ntor::NtorSecretKey::new(
1775 EXAMPLE_SK.into(),
1776 EXAMPLE_PK.into(),
1777 EXAMPLE_RSA_ID.into(),
1778 )
1779 }
1780 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1781 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1782 EXAMPLE_SK.into(),
1783 EXAMPLE_PK.into(),
1784 EXAMPLE_ED_ID.into(),
1785 )
1786 }
1787
1788 fn working_fake_channel<R: Runtime>(
1789 rt: &R,
1790 ) -> (
1791 Arc<Channel>,
1792 Receiver<AnyChanCell>,
1793 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1794 ) {
1795 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1796 rt.spawn(async {
1797 let _ignore = chan_reactor.run().await;
1798 })
1799 .unwrap();
1800 (channel, rx, tx)
1801 }
1802
1803 #[derive(Copy, Clone)]
1805 enum HandshakeType {
1806 Fast,
1807 Ntor,
1808 NtorV3,
1809 }
1810
1811 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1812 use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1816
1817 let (chan, mut rx, _sink) = working_fake_channel(rt);
1818 let circid = CircId::new(128).unwrap();
1819 let (created_send, created_recv) = oneshot::channel();
1820 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1821 let unique_id = UniqId::new(23, 17);
1822
1823 let (pending, reactor) = PendingClientCirc::new(
1824 circid,
1825 chan,
1826 created_recv,
1827 circmsg_recv,
1828 unique_id,
1829 DynTimeProvider::new(rt.clone()),
1830 CircuitAccount::new_noop(),
1831 );
1832
1833 rt.spawn(async {
1834 let _ignore = reactor.run().await;
1835 })
1836 .unwrap();
1837
1838 let simulate_relay_fut = async move {
1840 let mut rng = testing_rng();
1841 let create_cell = rx.next().await.unwrap();
1842 assert_eq!(create_cell.circid(), Some(circid));
1843 let reply = match handshake_type {
1844 HandshakeType::Fast => {
1845 let cf = match create_cell.msg() {
1846 AnyChanMsg::CreateFast(cf) => cf,
1847 other => panic!("{:?}", other),
1848 };
1849 let (_, rep) = CreateFastServer::server(
1850 &mut rng,
1851 &mut |_: &()| Some(()),
1852 &[()],
1853 cf.handshake(),
1854 )
1855 .unwrap();
1856 CreateResponse::CreatedFast(CreatedFast::new(rep))
1857 }
1858 HandshakeType::Ntor => {
1859 let c2 = match create_cell.msg() {
1860 AnyChanMsg::Create2(c2) => c2,
1861 other => panic!("{:?}", other),
1862 };
1863 let (_, rep) = NtorServer::server(
1864 &mut rng,
1865 &mut |_: &()| Some(()),
1866 &[example_ntor_key()],
1867 c2.body(),
1868 )
1869 .unwrap();
1870 CreateResponse::Created2(Created2::new(rep))
1871 }
1872 HandshakeType::NtorV3 => {
1873 let c2 = match create_cell.msg() {
1874 AnyChanMsg::Create2(c2) => c2,
1875 other => panic!("{:?}", other),
1876 };
1877 let mut reply_fn = if with_cc {
1878 |client_exts: &[CircRequestExt]| {
1879 let _ = client_exts
1880 .iter()
1881 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1882 .expect("Client failed to request CC");
1883 Some(vec![CircResponseExt::CcResponse(
1886 extend_ext::CcResponse::new(31),
1887 )])
1888 }
1889 } else {
1890 |_: &_| Some(vec![])
1891 };
1892 let (_, rep) = NtorV3Server::server(
1893 &mut rng,
1894 &mut reply_fn,
1895 &[example_ntor_v3_key()],
1896 c2.body(),
1897 )
1898 .unwrap();
1899 CreateResponse::Created2(Created2::new(rep))
1900 }
1901 };
1902 created_send.send(reply).unwrap();
1903 };
1904 let client_fut = async move {
1906 let target = example_target();
1907 let params = CircParameters::default();
1908 let ret = match handshake_type {
1909 HandshakeType::Fast => {
1910 trace!("doing fast create");
1911 pending.create_firsthop_fast(params).await
1912 }
1913 HandshakeType::Ntor => {
1914 trace!("doing ntor create");
1915 pending.create_firsthop_ntor(&target, params).await
1916 }
1917 HandshakeType::NtorV3 => {
1918 let params = if with_cc {
1919 CircParameters::new(true, build_cc_vegas_params())
1921 } else {
1922 params
1923 };
1924 trace!("doing ntor_v3 create");
1925 pending.create_firsthop_ntor_v3(&target, params).await
1926 }
1927 };
1928 trace!("create done: result {:?}", ret);
1929 ret
1930 };
1931
1932 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1933
1934 let _circ = circ.unwrap();
1935
1936 assert_eq!(_circ.n_hops().unwrap(), 1);
1938 }
1939
1940 #[traced_test]
1941 #[test]
1942 fn test_create_fast() {
1943 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1944 test_create(&rt, HandshakeType::Fast, false).await;
1945 });
1946 }
1947 #[traced_test]
1948 #[test]
1949 fn test_create_ntor() {
1950 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1951 test_create(&rt, HandshakeType::Ntor, false).await;
1952 });
1953 }
1954 #[traced_test]
1955 #[test]
1956 fn test_create_ntor_v3() {
1957 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1958 test_create(&rt, HandshakeType::NtorV3, false).await;
1959 });
1960 }
1961 #[traced_test]
1962 #[test]
1963 #[cfg(feature = "flowctl-cc")]
1964 fn test_create_ntor_v3_with_cc() {
1965 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1966 test_create(&rt, HandshakeType::NtorV3, true).await;
1967 });
1968 }
1969
1970 pub(crate) struct DummyCrypto {
1973 counter_tag: [u8; 20],
1974 counter: u32,
1975 lasthop: bool,
1976 }
1977 impl DummyCrypto {
1978 fn next_tag(&mut self) -> SendmeTag {
1979 #![allow(clippy::identity_op)]
1980 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1981 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1982 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1983 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1984 self.counter += 1;
1985 self.counter_tag.into()
1986 }
1987 }
1988
1989 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1990 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1991 self.next_tag()
1992 }
1993 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1994 }
1995 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1996 fn decrypt_inbound(
1997 &mut self,
1998 _cmd: ChanCmd,
1999 _cell: &mut RelayCellBody,
2000 ) -> Option<SendmeTag> {
2001 if self.lasthop {
2002 Some(self.next_tag())
2003 } else {
2004 None
2005 }
2006 }
2007 }
2008 impl DummyCrypto {
2009 pub(crate) fn new(lasthop: bool) -> Self {
2010 DummyCrypto {
2011 counter_tag: [0; 20],
2012 counter: 0,
2013 lasthop,
2014 }
2015 }
2016 }
2017
2018 async fn newcirc_ext<R: Runtime>(
2021 rt: &R,
2022 unique_id: UniqId,
2023 chan: Arc<Channel>,
2024 hops: Vec<path::HopDetail>,
2025 next_msg_from: HopNum,
2026 params: CircParameters,
2027 ) -> (Arc<ClientCirc>, CircuitRxSender) {
2028 let circid = CircId::new(128).unwrap();
2029 let (_created_send, created_recv) = oneshot::channel();
2030 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
2031
2032 let (pending, reactor) = PendingClientCirc::new(
2033 circid,
2034 chan,
2035 created_recv,
2036 circmsg_recv,
2037 unique_id,
2038 DynTimeProvider::new(rt.clone()),
2039 CircuitAccount::new_noop(),
2040 );
2041
2042 rt.spawn(async {
2043 let _ignore = reactor.run().await;
2044 })
2045 .unwrap();
2046
2047 let PendingClientCirc {
2048 circ,
2049 recvcreated: _,
2050 } = pending;
2051
2052 let relay_cell_format = RelayCellFormat::V0;
2054
2055 let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
2056 for (idx, peer_id) in hops.into_iter().enumerate() {
2057 let (tx, rx) = oneshot::channel();
2058 let idx = idx as u8;
2059
2060 circ.command
2061 .unbounded_send(CtrlCmd::AddFakeHop {
2062 relay_cell_format,
2063 fwd_lasthop: idx == last_hop_num,
2064 rev_lasthop: idx == u8::from(next_msg_from),
2065 peer_id,
2066 params: params.clone(),
2067 done: tx,
2068 })
2069 .unwrap();
2070 rx.await.unwrap().unwrap();
2071 }
2072
2073 (circ, circmsg_send)
2074 }
2075
2076 async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
2079 let hops = std::iter::repeat_with(|| {
2080 let peer_id = tor_linkspec::OwnedChanTarget::builder()
2081 .ed_identity([4; 32].into())
2082 .rsa_identity([5; 20].into())
2083 .build()
2084 .expect("Could not construct fake hop");
2085
2086 path::HopDetail::Relay(peer_id)
2087 })
2088 .take(3)
2089 .collect();
2090
2091 let unique_id = UniqId::new(23, 17);
2092 newcirc_ext(
2093 rt,
2094 unique_id,
2095 chan,
2096 hops,
2097 2.into(),
2098 CircParameters::default(),
2099 )
2100 .await
2101 }
2102
2103 fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
2106 (0..n)
2107 .map(|idx| {
2108 let peer_id = tor_linkspec::OwnedChanTarget::builder()
2109 .ed_identity([idx + start_idx; 32].into())
2110 .rsa_identity([idx + start_idx + 1; 20].into())
2111 .build()
2112 .expect("Could not construct fake hop");
2113
2114 path::HopDetail::Relay(peer_id)
2115 })
2116 .collect()
2117 }
2118
2119 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
2120 use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
2121
2122 let (chan, mut rx, _sink) = working_fake_channel(rt);
2123 let (circ, mut sink) = newcirc(rt, chan).await;
2124 let circid = circ.peek_circid();
2125 let params = CircParameters::default();
2126
2127 let extend_fut = async move {
2128 let target = example_target();
2129 match handshake_type {
2130 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
2131 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
2132 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
2133 };
2134 circ };
2136 let reply_fut = async move {
2137 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2140 assert_eq!(id, Some(circid));
2141 let rmsg = match chmsg {
2142 AnyChanMsg::RelayEarly(r) => {
2143 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2144 .unwrap()
2145 }
2146 other => panic!("{:?}", other),
2147 };
2148 let e2 = match rmsg.msg() {
2149 AnyRelayMsg::Extend2(e2) => e2,
2150 other => panic!("{:?}", other),
2151 };
2152 let mut rng = testing_rng();
2153 let reply = match handshake_type {
2154 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
2155 HandshakeType::Ntor => {
2156 let (_keygen, reply) = NtorServer::server(
2157 &mut rng,
2158 &mut |_: &()| Some(()),
2159 &[example_ntor_key()],
2160 e2.handshake(),
2161 )
2162 .unwrap();
2163 reply
2164 }
2165 HandshakeType::NtorV3 => {
2166 let (_keygen, reply) = NtorV3Server::server(
2167 &mut rng,
2168 &mut |_: &[CircRequestExt]| Some(vec![]),
2169 &[example_ntor_v3_key()],
2170 e2.handshake(),
2171 )
2172 .unwrap();
2173 reply
2174 }
2175 };
2176
2177 let extended2 = relaymsg::Extended2::new(reply).into();
2178 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
2179 (sink, rx) };
2181
2182 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
2183
2184 assert_eq!(circ.n_hops().unwrap(), 4);
2186
2187 {
2189 let path = circ.path_ref().unwrap();
2190 let path = path
2191 .all_hops()
2192 .filter_map(|hop| match hop {
2193 path::HopDetail::Relay(r) => Some(r),
2194 #[cfg(feature = "hs-common")]
2195 path::HopDetail::Virtual => None,
2196 })
2197 .collect::<Vec<_>>();
2198
2199 assert_eq!(path.len(), 4);
2200 use tor_linkspec::HasRelayIds;
2201 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
2202 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
2203 }
2204 {
2205 let path = circ.path_ref().unwrap();
2206 assert_eq!(path.n_hops(), 4);
2207 use tor_linkspec::HasRelayIds;
2208 assert_eq!(
2209 path.hops()[3].as_chan_target().unwrap().ed_identity(),
2210 example_target().ed_identity()
2211 );
2212 assert_ne!(
2213 path.hops()[0].as_chan_target().unwrap().ed_identity(),
2214 example_target().ed_identity()
2215 );
2216 }
2217 }
2218
2219 #[traced_test]
2220 #[test]
2221 fn test_extend_ntor() {
2222 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2223 test_extend(&rt, HandshakeType::Ntor).await;
2224 });
2225 }
2226
2227 #[traced_test]
2228 #[test]
2229 fn test_extend_ntor_v3() {
2230 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2231 test_extend(&rt, HandshakeType::NtorV3).await;
2232 });
2233 }
2234
2235 async fn bad_extend_test_impl<R: Runtime>(
2236 rt: &R,
2237 reply_hop: HopNum,
2238 bad_reply: ClientCircChanMsg,
2239 ) -> Error {
2240 let (chan, _rx, _sink) = working_fake_channel(rt);
2241 let hops = std::iter::repeat_with(|| {
2242 let peer_id = tor_linkspec::OwnedChanTarget::builder()
2243 .ed_identity([4; 32].into())
2244 .rsa_identity([5; 20].into())
2245 .build()
2246 .expect("Could not construct fake hop");
2247
2248 path::HopDetail::Relay(peer_id)
2249 })
2250 .take(3)
2251 .collect();
2252
2253 let unique_id = UniqId::new(23, 17);
2254 let (circ, mut sink) = newcirc_ext(
2255 rt,
2256 unique_id,
2257 chan,
2258 hops,
2259 reply_hop,
2260 CircParameters::default(),
2261 )
2262 .await;
2263 let params = CircParameters::default();
2264
2265 let target = example_target();
2266 #[allow(clippy::clone_on_copy)]
2267 let rtc = rt.clone();
2268 let sink_handle = rt
2269 .spawn_with_handle(async move {
2270 rtc.sleep(Duration::from_millis(100)).await;
2271 sink.send(bad_reply).await.unwrap();
2272 sink
2273 })
2274 .unwrap();
2275 let outcome = circ.extend_ntor(&target, params).await;
2276 let _sink = sink_handle.await;
2277
2278 assert_eq!(circ.n_hops().unwrap(), 3);
2279 assert!(outcome.is_err());
2280 outcome.unwrap_err()
2281 }
2282
2283 #[traced_test]
2284 #[test]
2285 fn bad_extend_wronghop() {
2286 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2287 let extended2 = relaymsg::Extended2::new(vec![]).into();
2288 let cc = rmsg_to_ccmsg(None, extended2);
2289
2290 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
2291 match error {
2296 Error::CircuitClosed => {}
2297 x => panic!("got other error: {}", x),
2298 }
2299 });
2300 }
2301
2302 #[traced_test]
2303 #[test]
2304 fn bad_extend_wrongtype() {
2305 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2306 let extended = relaymsg::Extended::new(vec![7; 200]).into();
2307 let cc = rmsg_to_ccmsg(None, extended);
2308
2309 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2310 match error {
2311 Error::BytesErr {
2312 err: tor_bytes::Error::InvalidMessage(_),
2313 object: "extended2 message",
2314 } => {}
2315 other => panic!("{:?}", other),
2316 }
2317 });
2318 }
2319
2320 #[traced_test]
2321 #[test]
2322 fn bad_extend_destroy() {
2323 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2324 let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
2325 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2326 match error {
2327 Error::CircuitClosed => {}
2328 other => panic!("{:?}", other),
2329 }
2330 });
2331 }
2332
2333 #[traced_test]
2334 #[test]
2335 fn bad_extend_crypto() {
2336 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2337 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
2338 let cc = rmsg_to_ccmsg(None, extended2);
2339 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2340 assert!(matches!(error, Error::BadCircHandshakeAuth));
2341 });
2342 }
2343
2344 #[traced_test]
2345 #[test]
2346 fn begindir() {
2347 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2348 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2349 let (circ, mut sink) = newcirc(&rt, chan).await;
2350 let circid = circ.peek_circid();
2351
2352 let begin_and_send_fut = async move {
2353 let mut stream = circ.begin_dir_stream().await.unwrap();
2356 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
2357 stream.flush().await.unwrap();
2358 let mut buf = [0_u8; 1024];
2359 let n = stream.read(&mut buf).await.unwrap();
2360 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
2361 let n = stream.read(&mut buf).await.unwrap();
2362 assert_eq!(n, 0);
2363 stream
2364 };
2365 let reply_fut = async move {
2366 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2369 assert_eq!(id, Some(circid));
2370 let rmsg = match chmsg {
2371 AnyChanMsg::Relay(r) => {
2372 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2373 .unwrap()
2374 }
2375 other => panic!("{:?}", other),
2376 };
2377 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2378 assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
2379
2380 let connected = relaymsg::Connected::new_empty().into();
2382 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2383
2384 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2386 assert_eq!(id, Some(circid));
2387 let rmsg = match chmsg {
2388 AnyChanMsg::Relay(r) => {
2389 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2390 .unwrap()
2391 }
2392 other => panic!("{:?}", other),
2393 };
2394 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
2395 assert_eq!(streamid_2, streamid);
2396 if let AnyRelayMsg::Data(d) = rmsg {
2397 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
2398 } else {
2399 panic!();
2400 }
2401
2402 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
2404 .unwrap()
2405 .into();
2406 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2407
2408 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2410 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2411
2412 (rx, sink) };
2414
2415 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2416 });
2417 }
2418
2419 fn close_stream_helper(by_drop: bool) {
2421 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2422 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2423 let (circ, mut sink) = newcirc(&rt, chan).await;
2424
2425 let stream_fut = async move {
2426 let stream = circ
2427 .begin_stream("www.example.com", 80, None)
2428 .await
2429 .unwrap();
2430
2431 let (r, mut w) = stream.split();
2432 if by_drop {
2433 drop(r);
2435 drop(w);
2436 (None, circ) } else {
2438 w.close().await.unwrap();
2440 (Some(r), circ)
2441 }
2442 };
2443 let handler_fut = async {
2444 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2446 let rmsg = match msg {
2447 AnyChanMsg::Relay(r) => {
2448 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2449 .unwrap()
2450 }
2451 other => panic!("{:?}", other),
2452 };
2453 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2454 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2455
2456 let connected =
2458 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2459 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2460
2461 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2463 let rmsg = match msg {
2464 AnyChanMsg::Relay(r) => {
2465 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2466 .unwrap()
2467 }
2468 other => panic!("{:?}", other),
2469 };
2470 let (_, rmsg) = rmsg.into_streamid_and_msg();
2471 assert_eq!(rmsg.cmd(), RelayCmd::END);
2472
2473 (rx, sink) };
2475
2476 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2477 });
2478 }
2479
2480 #[traced_test]
2481 #[test]
2482 fn drop_stream() {
2483 close_stream_helper(true);
2484 }
2485
2486 #[traced_test]
2487 #[test]
2488 fn close_stream() {
2489 close_stream_helper(false);
2490 }
2491
2492 async fn setup_incoming_sendme_case<R: Runtime>(
2494 rt: &R,
2495 n_to_send: usize,
2496 ) -> (
2497 Arc<ClientCirc>,
2498 DataStream,
2499 CircuitRxSender,
2500 Option<StreamId>,
2501 usize,
2502 Receiver<AnyChanCell>,
2503 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2504 ) {
2505 let (chan, mut rx, sink2) = working_fake_channel(rt);
2506 let (circ, mut sink) = newcirc(rt, chan).await;
2507 let circid = circ.peek_circid();
2508
2509 let begin_and_send_fut = {
2510 let circ = circ.clone();
2511 async move {
2512 let mut stream = circ
2514 .begin_stream("www.example.com", 443, None)
2515 .await
2516 .unwrap();
2517 let junk = [0_u8; 1024];
2518 let mut remaining = n_to_send;
2519 while remaining > 0 {
2520 let n = std::cmp::min(remaining, junk.len());
2521 stream.write_all(&junk[..n]).await.unwrap();
2522 remaining -= n;
2523 }
2524 stream.flush().await.unwrap();
2525 stream
2526 }
2527 };
2528
2529 let receive_fut = async move {
2530 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2532 let rmsg = match chmsg {
2533 AnyChanMsg::Relay(r) => {
2534 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2535 .unwrap()
2536 }
2537 other => panic!("{:?}", other),
2538 };
2539 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2540 assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2541 let connected = relaymsg::Connected::new_empty().into();
2543 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2544 let mut bytes_received = 0_usize;
2546 let mut cells_received = 0_usize;
2547 while bytes_received < n_to_send {
2548 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2550 assert_eq!(id, Some(circid));
2551
2552 let rmsg = match chmsg {
2553 AnyChanMsg::Relay(r) => {
2554 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2555 .unwrap()
2556 }
2557 other => panic!("{:?}", other),
2558 };
2559 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2560 assert_eq!(streamid2, streamid);
2561 if let AnyRelayMsg::Data(dat) = rmsg {
2562 cells_received += 1;
2563 bytes_received += dat.as_ref().len();
2564 } else {
2565 panic!();
2566 }
2567 }
2568
2569 (sink, streamid, cells_received, rx)
2570 };
2571
2572 let (stream, (sink, streamid, cells_received, rx)) =
2573 futures::join!(begin_and_send_fut, receive_fut);
2574
2575 (circ, stream, sink, streamid, cells_received, rx, sink2)
2576 }
2577
2578 #[traced_test]
2579 #[test]
2580 fn accept_valid_sendme() {
2581 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2582 let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2583 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2584
2585 assert_eq!(cells_received, 301);
2586
2587 {
2589 let (tx, rx) = oneshot::channel();
2590 circ.command
2591 .unbounded_send(CtrlCmd::QuerySendWindow {
2592 hop: 2.into(),
2593 leg: circ.unique_id(),
2594 done: tx,
2595 })
2596 .unwrap();
2597 let (window, tags) = rx.await.unwrap().unwrap();
2598 assert_eq!(window, 1000 - 301);
2599 assert_eq!(tags.len(), 3);
2600 assert_eq!(
2602 tags[0],
2603 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2604 );
2605 assert_eq!(
2607 tags[1],
2608 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2609 );
2610 assert_eq!(
2612 tags[2],
2613 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2614 );
2615 }
2616
2617 let reply_with_sendme_fut = async move {
2618 let c_sendme =
2620 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2621 .into();
2622 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2623
2624 let s_sendme = relaymsg::Sendme::new_empty().into();
2626 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2627
2628 sink
2629 };
2630
2631 let _sink = reply_with_sendme_fut.await;
2632
2633 rt.advance_until_stalled().await;
2634
2635 {
2638 let (tx, rx) = oneshot::channel();
2639 circ.command
2640 .unbounded_send(CtrlCmd::QuerySendWindow {
2641 hop: 2.into(),
2642 leg: circ.unique_id(),
2643 done: tx,
2644 })
2645 .unwrap();
2646 let (window, _tags) = rx.await.unwrap().unwrap();
2647 assert_eq!(window, 1000 - 201);
2648 }
2649 });
2650 }
2651
2652 #[traced_test]
2653 #[test]
2654 fn invalid_circ_sendme() {
2655 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2656 let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2660 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2661
2662 let reply_with_sendme_fut = async move {
2663 let c_sendme =
2665 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2666 .into();
2667 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2668 sink
2669 };
2670
2671 let _sink = reply_with_sendme_fut.await;
2672
2673 rt.advance_until_stalled().await;
2675 assert!(circ.is_closing());
2676 });
2677 }
2678
2679 #[traced_test]
2680 #[test]
2681 fn test_busy_stream_fairness() {
2682 const N_STREAMS: usize = 3;
2684 const N_CELLS: usize = 20;
2686 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2689 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2696 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2697
2698 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2699 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2700 let (circ, mut sink) = newcirc(&rt, chan).await;
2701
2702 rt.spawn({
2708 let circ = circ.clone();
2711 async move {
2712 let mut clients = VecDeque::new();
2713 struct Client {
2714 stream: DataStream,
2715 to_write: &'static [u8],
2716 }
2717 for _ in 0..N_STREAMS {
2718 clients.push_back(Client {
2719 stream: circ
2720 .begin_stream("www.example.com", 80, None)
2721 .await
2722 .unwrap(),
2723 to_write: &[0_u8; N_BYTES][..],
2724 });
2725 }
2726 while let Some(mut client) = clients.pop_front() {
2727 if client.to_write.is_empty() {
2728 continue;
2730 }
2731 let written = client.stream.write(client.to_write).await.unwrap();
2732 client.to_write = &client.to_write[written..];
2733 clients.push_back(client);
2734 }
2735 }
2736 })
2737 .unwrap();
2738
2739 let channel_handler_fut = async {
2740 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2741 let mut total_bytes_received = 0;
2742
2743 loop {
2744 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2745 let rmsg = match msg {
2746 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2747 RelayCellFormat::V0,
2748 r.into_relay_body(),
2749 )
2750 .unwrap(),
2751 other => panic!("Unexpected chanmsg: {other:?}"),
2752 };
2753 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2754 match rmsg.cmd() {
2755 RelayCmd::BEGIN => {
2756 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2758 assert_eq!(prev, None);
2759 let connected = relaymsg::Connected::new_with_addr(
2761 "10.0.0.1".parse().unwrap(),
2762 1234,
2763 )
2764 .into();
2765 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2766 }
2767 RelayCmd::DATA => {
2768 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2769 let nbytes = data_msg.as_ref().len();
2770 total_bytes_received += nbytes;
2771 let streamid = streamid.unwrap();
2772 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2773 *stream_bytes += nbytes;
2774 if total_bytes_received >= N_BYTES {
2775 break;
2776 }
2777 }
2778 RelayCmd::END => {
2779 continue;
2784 }
2785 other => {
2786 panic!("Unexpected command {other:?}");
2787 }
2788 }
2789 }
2790
2791 (total_bytes_received, stream_bytes_received, rx, sink)
2794 };
2795
2796 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2797 channel_handler_fut.await;
2798 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2799 for (sid, stream_bytes) in stream_bytes_received {
2800 assert!(
2801 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2802 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2803 );
2804 }
2805 });
2806 }
2807
2808 #[test]
2809 fn basic_params() {
2810 use super::CircParameters;
2811 let mut p = CircParameters::default();
2812 assert!(p.extend_by_ed25519_id);
2813
2814 p.extend_by_ed25519_id = false;
2815 assert!(!p.extend_by_ed25519_id);
2816 }
2817
2818 #[cfg(feature = "hs-service")]
2819 struct AllowAllStreamsFilter;
2820 #[cfg(feature = "hs-service")]
2821 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2822 fn disposition(
2823 &mut self,
2824 _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2825 _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2826 ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2827 Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2828 }
2829 }
2830
2831 #[traced_test]
2832 #[test]
2833 #[cfg(feature = "hs-service")]
2834 fn allow_stream_requests_twice() {
2835 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2836 let (chan, _rx, _sink) = working_fake_channel(&rt);
2837 let (circ, _send) = newcirc(&rt, chan).await;
2838
2839 let _incoming = circ
2840 .allow_stream_requests(
2841 &[tor_cell::relaycell::RelayCmd::BEGIN],
2842 circ.last_hop().unwrap(),
2843 AllowAllStreamsFilter,
2844 )
2845 .await
2846 .unwrap();
2847
2848 let incoming = circ
2849 .allow_stream_requests(
2850 &[tor_cell::relaycell::RelayCmd::BEGIN],
2851 circ.last_hop().unwrap(),
2852 AllowAllStreamsFilter,
2853 )
2854 .await;
2855
2856 assert!(incoming.is_err());
2858 });
2859 }
2860
2861 #[traced_test]
2862 #[test]
2863 #[cfg(feature = "hs-service")]
2864 fn allow_stream_requests() {
2865 use tor_cell::relaycell::msg::BeginFlags;
2866
2867 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2868 const TEST_DATA: &[u8] = b"ping";
2869
2870 let (chan, _rx, _sink) = working_fake_channel(&rt);
2871 let (circ, mut send) = newcirc(&rt, chan).await;
2872
2873 let rfmt = RelayCellFormat::V0;
2874
2875 let (tx, rx) = oneshot::channel();
2877 let mut incoming = circ
2878 .allow_stream_requests(
2879 &[tor_cell::relaycell::RelayCmd::BEGIN],
2880 circ.last_hop().unwrap(),
2881 AllowAllStreamsFilter,
2882 )
2883 .await
2884 .unwrap();
2885
2886 let simulate_service = async move {
2887 let stream = incoming.next().await.unwrap();
2888 let mut data_stream = stream
2889 .accept_data(relaymsg::Connected::new_empty())
2890 .await
2891 .unwrap();
2892 tx.send(()).unwrap();
2894
2895 let mut buf = [0_u8; TEST_DATA.len()];
2897 data_stream.read_exact(&mut buf).await.unwrap();
2898 assert_eq!(&buf, TEST_DATA);
2899
2900 circ
2901 };
2902
2903 let simulate_client = async move {
2904 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2905 let body: BoxedCellBody =
2906 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2907 .encode(rfmt, &mut testing_rng())
2908 .unwrap();
2909 let begin_msg = chanmsg::Relay::from(body);
2910
2911 send.send(ClientCircChanMsg::Relay(begin_msg))
2913 .await
2914 .unwrap();
2915
2916 rx.await.unwrap();
2922 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2924 let body: BoxedCellBody =
2925 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2926 .encode(rfmt, &mut testing_rng())
2927 .unwrap();
2928 let data_msg = chanmsg::Relay::from(body);
2929
2930 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2931 send
2932 };
2933
2934 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2935 });
2936 }
2937
2938 #[traced_test]
2939 #[test]
2940 #[cfg(feature = "hs-service")]
2941 fn accept_stream_after_reject() {
2942 use tor_cell::relaycell::msg::BeginFlags;
2943 use tor_cell::relaycell::msg::EndReason;
2944
2945 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2946 const TEST_DATA: &[u8] = b"ping";
2947 const STREAM_COUNT: usize = 2;
2948 let rfmt = RelayCellFormat::V0;
2949
2950 let (chan, _rx, _sink) = working_fake_channel(&rt);
2951 let (circ, mut send) = newcirc(&rt, chan).await;
2952
2953 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2955
2956 let mut incoming = circ
2957 .allow_stream_requests(
2958 &[tor_cell::relaycell::RelayCmd::BEGIN],
2959 circ.last_hop().unwrap(),
2960 AllowAllStreamsFilter,
2961 )
2962 .await
2963 .unwrap();
2964
2965 let simulate_service = async move {
2966 for i in 0..STREAM_COUNT {
2968 let stream = incoming.next().await.unwrap();
2969
2970 if i == 0 {
2972 stream
2973 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2974 .await
2975 .unwrap();
2976 tx.send(()).await.unwrap();
2978 continue;
2979 }
2980
2981 let mut data_stream = stream
2982 .accept_data(relaymsg::Connected::new_empty())
2983 .await
2984 .unwrap();
2985 tx.send(()).await.unwrap();
2987
2988 let mut buf = [0_u8; TEST_DATA.len()];
2990 data_stream.read_exact(&mut buf).await.unwrap();
2991 assert_eq!(&buf, TEST_DATA);
2992 }
2993
2994 circ
2995 };
2996
2997 let simulate_client = async move {
2998 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2999 let body: BoxedCellBody =
3000 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
3001 .encode(rfmt, &mut testing_rng())
3002 .unwrap();
3003 let begin_msg = chanmsg::Relay::from(body);
3004
3005 for _ in 0..STREAM_COUNT {
3008 send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
3009 .await
3010 .unwrap();
3011
3012 rx.next().await.unwrap();
3014 }
3015
3016 let data = relaymsg::Data::new(TEST_DATA).unwrap();
3018 let body: BoxedCellBody =
3019 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
3020 .encode(rfmt, &mut testing_rng())
3021 .unwrap();
3022 let data_msg = chanmsg::Relay::from(body);
3023
3024 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
3025 send
3026 };
3027
3028 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
3029 });
3030 }
3031
3032 #[traced_test]
3033 #[test]
3034 #[cfg(feature = "hs-service")]
3035 fn incoming_stream_bad_hop() {
3036 use tor_cell::relaycell::msg::BeginFlags;
3037
3038 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
3039 const EXPECTED_HOP: u8 = 1;
3041 let rfmt = RelayCellFormat::V0;
3042
3043 let (chan, _rx, _sink) = working_fake_channel(&rt);
3044 let (circ, mut send) = newcirc(&rt, chan).await;
3045
3046 let mut incoming = circ
3048 .allow_stream_requests(
3049 &[tor_cell::relaycell::RelayCmd::BEGIN],
3050 (circ.unique_id(), EXPECTED_HOP.into()).into(),
3051 AllowAllStreamsFilter,
3052 )
3053 .await
3054 .unwrap();
3055
3056 let simulate_service = async move {
3057 assert!(incoming.next().await.is_none());
3060 circ
3061 };
3062
3063 let simulate_client = async move {
3064 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
3065 let body: BoxedCellBody =
3066 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
3067 .encode(rfmt, &mut testing_rng())
3068 .unwrap();
3069 let begin_msg = chanmsg::Relay::from(body);
3070
3071 send.send(ClientCircChanMsg::Relay(begin_msg))
3073 .await
3074 .unwrap();
3075
3076 send
3077 };
3078
3079 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
3080 });
3081 }
3082
3083 #[traced_test]
3084 #[test]
3085 #[cfg(feature = "conflux")]
3086 fn multipath_circ_validation() {
3087 use std::error::Error as _;
3088
3089 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3090 let params = CircParameters::default();
3091 let invalid_tunnels = [
3092 setup_bad_conflux_tunnel(&rt).await,
3093 setup_conflux_tunnel(&rt, true, params).await,
3094 ];
3095
3096 for tunnel in invalid_tunnels {
3097 let TestTunnelCtx {
3098 tunnel: _tunnel,
3099 circs: _circs,
3100 conflux_link_rx,
3101 } = tunnel;
3102
3103 let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
3104 let err_src = conflux_hs_err.source().unwrap();
3105
3106 assert!(err_src
3109 .to_string()
3110 .contains("one more more conflux circuits are invalid"));
3111 }
3112 });
3113 }
3114
3115 #[derive(Debug)]
3119 #[allow(unused)]
3120 #[cfg(feature = "conflux")]
3121 struct TestCircuitCtx {
3122 chan_rx: Receiver<AnyChanCell>,
3123 chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
3124 circ_tx: CircuitRxSender,
3125 unique_id: UniqId,
3126 }
3127
3128 #[derive(Debug)]
3129 #[cfg(feature = "conflux")]
3130 struct TestTunnelCtx {
3131 tunnel: Arc<ClientCirc>,
3132 circs: Vec<TestCircuitCtx>,
3133 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3134 }
3135
3136 #[cfg(feature = "conflux")]
3138 async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
3139 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3141 let rmsg = match chmsg {
3142 AnyChanMsg::Relay(r) => {
3143 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3144 .unwrap()
3145 }
3146 other => panic!("{:?}", other),
3147 };
3148 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
3149
3150 let link = match rmsg {
3151 AnyRelayMsg::ConfluxLink(link) => link,
3152 _ => panic!("unexpected relay message {rmsg:?}"),
3153 };
3154
3155 assert!(streamid.is_none());
3156
3157 link
3158 }
3159
3160 #[cfg(feature = "conflux")]
3161 async fn setup_conflux_tunnel(
3162 rt: &MockRuntime,
3163 same_hops: bool,
3164 params: CircParameters,
3165 ) -> TestTunnelCtx {
3166 let hops1 = hop_details(3, 0);
3167 let hops2 = if same_hops {
3168 hops1.clone()
3169 } else {
3170 hop_details(3, 10)
3171 };
3172
3173 let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
3174 let (circ1, sink1) = newcirc_ext(
3175 rt,
3176 UniqId::new(1, 3),
3177 chan1,
3178 hops1,
3179 2.into(),
3180 params.clone(),
3181 )
3182 .await;
3183
3184 let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
3185
3186 let (circ2, sink2) =
3187 newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
3188
3189 let (answer_tx, answer_rx) = oneshot::channel();
3190 circ2
3191 .command
3192 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3193 .unwrap();
3194
3195 let circuit = answer_rx.await.unwrap().unwrap();
3196 rt.advance_until_stalled().await;
3198 assert!(circ2.is_closing());
3199
3200 let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
3201 circ1
3203 .control
3204 .unbounded_send(CtrlMsg::LinkCircuits {
3205 circuits: vec![circuit],
3206 answer: conflux_link_tx,
3207 })
3208 .unwrap();
3209
3210 let circ_ctx1 = TestCircuitCtx {
3211 chan_rx: rx1,
3212 chan_tx: chan_sink1,
3213 circ_tx: sink1,
3214 unique_id: circ1.unique_id(),
3215 };
3216
3217 let circ_ctx2 = TestCircuitCtx {
3218 chan_rx: rx2,
3219 chan_tx: chan_sink2,
3220 circ_tx: sink2,
3221 unique_id: circ2.unique_id(),
3222 };
3223
3224 TestTunnelCtx {
3225 tunnel: circ1,
3226 circs: vec![circ_ctx1, circ_ctx2],
3227 conflux_link_rx,
3228 }
3229 }
3230
3231 #[cfg(feature = "conflux")]
3232 async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3233 let same_hops = true;
3239 let params = CircParameters::new(true, build_cc_vegas_params());
3240 setup_conflux_tunnel(rt, same_hops, params).await
3241 }
3242
3243 #[cfg(feature = "conflux")]
3244 async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3245 let same_hops = false;
3249 let params = CircParameters::new(true, build_cc_vegas_params());
3250 setup_conflux_tunnel(rt, same_hops, params).await
3251 }
3252
3253 #[traced_test]
3254 #[test]
3255 #[cfg(feature = "conflux")]
3256 fn reject_conflux_linked_before_hs() {
3257 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3258 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3259 let (circ, mut sink) = newcirc(&rt, chan).await;
3260
3261 let nonce = V1Nonce::new(&mut testing_rng());
3262 let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3263 let linked = relaymsg::ConfluxLinked::new(payload).into();
3265 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3266
3267 rt.advance_until_stalled().await;
3268 assert!(circ.is_closing());
3269 });
3270 }
3271
3272 #[traced_test]
3273 #[test]
3274 #[cfg(feature = "conflux")]
3275 fn conflux_hs_timeout() {
3276 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3277 let TestTunnelCtx {
3278 tunnel: _tunnel,
3279 circs,
3280 conflux_link_rx,
3281 } = setup_good_conflux_tunnel(&rt).await;
3282
3283 let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3284
3285 let link = await_link_payload(&mut circ1.chan_rx).await;
3287
3288 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3290 circ1
3291 .circ_tx
3292 .send(rmsg_to_ccmsg(None, linked))
3293 .await
3294 .unwrap();
3295
3296 rt.advance_by(Duration::from_secs(60)).await;
3298
3299 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3300
3301 let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
3303 conflux_hs_res.try_into().unwrap();
3304
3305 assert!(res1.is_ok());
3306
3307 let err = res2.unwrap_err();
3308 assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
3309 });
3310 }
3311
3312 #[traced_test]
3313 #[test]
3314 #[cfg(feature = "conflux")]
3315 fn conflux_bad_hs() {
3316 use crate::util::err::ConfluxHandshakeError;
3317
3318 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3319 let nonce = V1Nonce::new(&mut testing_rng());
3320 let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3321 let bad_hs_responses = [
3323 (
3324 rmsg_to_ccmsg(
3325 None,
3326 relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
3327 ),
3328 "Received CONFLUX_LINKED cell with mismatched nonce",
3329 ),
3330 (
3331 rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
3332 "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
3333 ),
3334 (
3335 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3336 "Received CONFLUX_SWITCH on unlinked circuit?!",
3337 ),
3338 ];
3347
3348 for (bad_cell, expected_err) in bad_hs_responses {
3349 let TestTunnelCtx {
3350 tunnel,
3351 circs,
3352 conflux_link_rx,
3353 } = setup_good_conflux_tunnel(&rt).await;
3354
3355 let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3356
3357 circ2.circ_tx.send(bad_cell).await.unwrap();
3359
3360 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3361 let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
3365 conflux_hs_res.try_into().unwrap();
3366
3367 match res2.unwrap_err() {
3368 ConfluxHandshakeError::Link(Error::CircProto(e)) => {
3369 assert_eq!(e, expected_err);
3370 }
3371 e => panic!("unexpected error: {e:?}"),
3372 }
3373
3374 assert!(tunnel.is_closing());
3375 }
3376 });
3377 }
3378
3379 #[traced_test]
3380 #[test]
3381 #[cfg(feature = "conflux")]
3382 fn unexpected_conflux_cell() {
3383 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3384 let nonce = V1Nonce::new(&mut testing_rng());
3385 let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3386 let bad_cells = [
3387 rmsg_to_ccmsg(
3388 None,
3389 relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
3390 ),
3391 rmsg_to_ccmsg(
3392 None,
3393 relaymsg::ConfluxLink::new(link_payload.clone()).into(),
3394 ),
3395 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3396 ];
3397
3398 for bad_cell in bad_cells {
3399 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3400 let (circ, mut sink) = newcirc(&rt, chan).await;
3401
3402 sink.send(bad_cell).await.unwrap();
3403 rt.advance_until_stalled().await;
3404
3405 assert!(circ.is_closing());
3409 }
3410 });
3411 }
3412
3413 #[traced_test]
3414 #[test]
3415 #[cfg(feature = "conflux")]
3416 fn conflux_bad_linked() {
3417 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3418 let TestTunnelCtx {
3419 tunnel,
3420 circs,
3421 conflux_link_rx: _,
3422 } = setup_good_conflux_tunnel(&rt).await;
3423
3424 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3425
3426 let link = await_link_payload(&mut circ1.chan_rx).await;
3427
3428 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3430 circ1
3431 .circ_tx
3432 .send(rmsg_to_ccmsg(None, linked))
3433 .await
3434 .unwrap();
3435
3436 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3438 circ2
3439 .circ_tx
3440 .send(rmsg_to_ccmsg(None, linked))
3441 .await
3442 .unwrap();
3443 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3444 circ2
3445 .circ_tx
3446 .send(rmsg_to_ccmsg(None, linked))
3447 .await
3448 .unwrap();
3449
3450 rt.advance_until_stalled().await;
3451
3452 assert!(tunnel.is_closing());
3455 });
3456 }
3457
3458 #[traced_test]
3459 #[test]
3460 #[cfg(feature = "conflux")]
3461 fn conflux_bad_switch() {
3462 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3463 let bad_switch = [
3464 relaymsg::ConfluxSwitch::new(0),
3466 ];
3475
3476 for bad_cell in bad_switch {
3477 let TestTunnelCtx {
3478 tunnel,
3479 circs,
3480 conflux_link_rx,
3481 } = setup_good_conflux_tunnel(&rt).await;
3482
3483 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3484
3485 let link = await_link_payload(&mut circ1.chan_rx).await;
3486
3487 for circ in [&mut circ1, &mut circ2] {
3489 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3490 circ.circ_tx
3491 .send(rmsg_to_ccmsg(None, linked))
3492 .await
3493 .unwrap();
3494 }
3495
3496 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3497 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3498
3499 for circ in [&mut circ1, &mut circ2] {
3503 let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
3504 circ.circ_tx.send(msg).await.unwrap();
3505 }
3506
3507 rt.advance_until_stalled().await;
3509 assert!(tunnel.is_closing());
3510 }
3511 });
3512 }
3513
3514 #[cfg(feature = "conflux")]
3518 #[derive(Debug)]
3519 enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3520 Relay(ConfluxExitState<I>),
3522 Client {
3524 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3526 tunnel: Arc<ClientCirc>,
3528 send_data: Vec<u8>,
3530 recv_data: Vec<u8>,
3532 },
3533 }
3534
3535 #[allow(unused, clippy::large_enum_variant)]
3538 #[derive(Debug)]
3539 #[cfg(feature = "conflux")]
3540 enum ConfluxEndpointResult {
3541 Circuit {
3542 tunnel: Arc<ClientCirc>,
3543 stream: DataStream,
3544 },
3545 Relay {
3546 circ: TestCircuitCtx,
3547 },
3548 }
3549
3550 #[derive(Debug)]
3552 #[cfg(feature = "conflux")]
3553 struct ConfluxStreamState {
3554 data_recvd: Vec<u8>,
3556 expected_data_len: usize,
3558 begin_recvd: bool,
3560 end_recvd: bool,
3562 end_sent: bool,
3564 }
3565
3566 #[cfg(feature = "conflux")]
3567 impl ConfluxStreamState {
3568 fn new(expected_data_len: usize) -> Self {
3569 Self {
3570 data_recvd: vec![],
3571 expected_data_len,
3572 begin_recvd: false,
3573 end_recvd: false,
3574 end_sent: false,
3575 }
3576 }
3577 }
3578
3579 #[derive(Debug)]
3582 #[cfg(feature = "conflux")]
3583 struct ExpectedSwitch {
3584 cells_so_far: usize,
3587 seqno: u32,
3589 }
3590
3591 #[cfg(feature = "conflux")]
3597 struct CellDispatcher {
3598 leg_tx: HashMap<UniqId, mpsc::Sender<CellToSend>>,
3600 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
3602 }
3603
3604 #[cfg(feature = "conflux")]
3605 impl CellDispatcher {
3606 async fn run(mut self) {
3607 while !self.cells_to_send.is_empty() {
3608 let (circ_id, cell) = self.cells_to_send.remove(0);
3609 let cell_tx = self.leg_tx.get_mut(&circ_id).unwrap();
3610 let (done_tx, done_rx) = oneshot::channel();
3611 cell_tx.send(CellToSend { done_tx, cell }).await.unwrap();
3612 let () = done_rx.await.unwrap();
3614 }
3615 }
3616 }
3617
3618 #[cfg(feature = "conflux")]
3620 #[derive(Debug)]
3621 struct CellToSend {
3622 done_tx: oneshot::Sender<()>,
3624 cell: AnyRelayMsg,
3626 }
3627
3628 #[derive(Debug)]
3630 #[cfg(feature = "conflux")]
3631 struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3632 runtime: Arc<AsyncMutex<MockRuntime>>,
3639 tunnel: Arc<ClientCirc>,
3641 circ: TestCircuitCtx,
3643 rtt_delays: I,
3647 stream_state: Arc<Mutex<ConfluxStreamState>>,
3650 expect_switch: Vec<ExpectedSwitch>,
3653 event_rx: mpsc::Receiver<MockExitEvent>,
3655 event_tx: mpsc::Sender<MockExitEvent>,
3657 is_sending_leg: bool,
3659 cells_rx: mpsc::Receiver<CellToSend>,
3661 }
3662
3663 #[cfg(feature = "conflux")]
3664 async fn good_exit_handshake(
3665 runtime: &Arc<AsyncMutex<MockRuntime>>,
3666 init_rtt_delay: Option<Duration>,
3667 rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3668 sink: &mut CircuitRxSender,
3669 ) {
3670 let link = await_link_payload(rx).await;
3672
3673 if let Some(init_rtt_delay) = init_rtt_delay {
3676 runtime.lock().await.advance_by(init_rtt_delay).await;
3677 }
3678
3679 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3681 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3682
3683 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3685 let rmsg = match chmsg {
3686 AnyChanMsg::Relay(r) => {
3687 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3688 .unwrap()
3689 }
3690 other => panic!("{other:?}"),
3691 };
3692 let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3693
3694 assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
3695 }
3696
3697 #[derive(Copy, Clone, Debug)]
3699 enum MockExitEvent {
3700 Done,
3702 BeginRecvd(StreamId),
3704 }
3705
3706 #[cfg(feature = "conflux")]
3707 async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3708 state: ConfluxExitState<I>,
3709 ) -> ConfluxEndpointResult {
3710 let ConfluxExitState {
3711 runtime,
3712 tunnel,
3713 mut circ,
3714 rtt_delays,
3715 stream_state,
3716 mut expect_switch,
3717 mut event_tx,
3718 mut event_rx,
3719 is_sending_leg,
3720 mut cells_rx,
3721 } = state;
3722
3723 let mut rtt_delays = rtt_delays.into_iter();
3724
3725 let stream_len = stream_state.lock().unwrap().expected_data_len;
3727 let mut data_cells_received = 0_usize;
3728 let mut cell_count = 0_usize;
3729 let mut tags = vec![];
3730 let mut streamid = None;
3731 let mut done_writing = false;
3732
3733 loop {
3734 let should_exit = {
3735 let stream_state = stream_state.lock().unwrap();
3736 let done_reading = stream_state.data_recvd.len() >= stream_len;
3737
3738 (stream_state.begin_recvd || stream_state.end_recvd) && done_reading && done_writing
3739 };
3740
3741 if should_exit {
3742 break;
3743 }
3744
3745 use futures::select;
3746
3747 let mut next_cell = if streamid.is_some() && !done_writing {
3750 Box::pin(cells_rx.next().fuse())
3751 as Pin<Box<dyn FusedFuture<Output = Option<CellToSend>> + Send>>
3752 } else {
3753 Box::pin(std::future::pending().fuse())
3754 };
3755
3756 let res = select! {
3759 res = circ.chan_rx.next() => {
3760 res.unwrap()
3761 },
3762 res = event_rx.next() => {
3763 let Some(event) = res else {
3764 break;
3765 };
3766
3767 match event {
3768 MockExitEvent::Done => {
3769 break;
3770 },
3771 MockExitEvent::BeginRecvd(id) => {
3772 streamid = Some(id);
3775 continue;
3776 },
3777 }
3778 }
3779 res = next_cell => {
3780 if let Some(cell_to_send) = res {
3781 let CellToSend { cell, done_tx } = cell_to_send;
3782
3783 let streamid = if matches!(cell, AnyRelayMsg::ConfluxSwitch(_)) {
3785 None
3786 } else {
3787 streamid
3788 };
3789
3790 circ.circ_tx
3791 .send(rmsg_to_ccmsg(streamid, cell))
3792 .await
3793 .unwrap();
3794
3795 runtime.lock().await.advance_until_stalled().await;
3796 done_tx.send(()).unwrap();
3797 } else {
3798 done_writing = true;
3799 }
3800
3801 continue;
3802 }
3803 };
3804
3805 let (_id, chmsg) = res.into_circid_and_msg();
3806 cell_count += 1;
3807 let rmsg = match chmsg {
3808 AnyChanMsg::Relay(r) => {
3809 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3810 .unwrap()
3811 }
3812 other => panic!("{:?}", other),
3813 };
3814 let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3815 if streamid.is_none() {
3816 streamid = new_streamid;
3817 }
3818
3819 let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3820 let end_recvd = stream_state.lock().unwrap().end_recvd;
3821 match rmsg {
3822 AnyRelayMsg::Begin(_) if begin_recvd => {
3823 panic!("client tried to open two streams?!");
3824 }
3825 AnyRelayMsg::Begin(_) if !begin_recvd => {
3826 stream_state.lock().unwrap().begin_recvd = true;
3827 let connected = relaymsg::Connected::new_empty().into();
3829 circ.circ_tx
3830 .send(rmsg_to_ccmsg(streamid, connected))
3831 .await
3832 .unwrap();
3833 event_tx
3835 .send(MockExitEvent::BeginRecvd(streamid.unwrap()))
3836 .await
3837 .unwrap();
3838 }
3839 AnyRelayMsg::End(_) if !end_recvd => {
3840 stream_state.lock().unwrap().end_recvd = true;
3841 break;
3842 }
3843 AnyRelayMsg::End(_) if end_recvd => {
3844 panic!("received two END cells for the same stream?!");
3845 }
3846 AnyRelayMsg::ConfluxSwitch(cell) => {
3847 let expected = expect_switch.remove(0);
3849
3850 assert_eq!(expected.cells_so_far, cell_count);
3851 assert_eq!(expected.seqno, cell.seqno());
3852
3853 continue;
3859 }
3860 AnyRelayMsg::Data(dat) => {
3861 data_cells_received += 1;
3862 stream_state
3863 .lock()
3864 .unwrap()
3865 .data_recvd
3866 .extend_from_slice(dat.as_ref());
3867
3868 let is_next_cell_sendme = data_cells_received % 31 == 0;
3869 if is_next_cell_sendme {
3870 if tags.is_empty() {
3871 runtime.lock().await.advance_until_stalled().await;
3876 let (tx, rx) = oneshot::channel();
3877 tunnel
3878 .command
3879 .unbounded_send(CtrlCmd::QuerySendWindow {
3880 hop: 2.into(),
3881 leg: circ.unique_id,
3882 done: tx,
3883 })
3884 .unwrap();
3885
3886 let (_window, new_tags) = rx.await.unwrap().unwrap();
3888 tags = new_tags;
3889 }
3890
3891 let tag = tags.remove(0);
3892
3893 if let Some(rtt_delay) = rtt_delays.next().flatten() {
3896 runtime.lock().await.advance_by(rtt_delay).await;
3897 }
3898 let sendme = relaymsg::Sendme::from(tag).into();
3900
3901 circ.circ_tx
3902 .send(rmsg_to_ccmsg(None, sendme))
3903 .await
3904 .unwrap();
3905 }
3906 }
3907 _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3908 }
3909 }
3910
3911 let end_recvd = stream_state.lock().unwrap().end_recvd;
3912
3913 if is_sending_leg && !end_recvd {
3915 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3916 circ.circ_tx
3917 .send(rmsg_to_ccmsg(streamid, end))
3918 .await
3919 .unwrap();
3920 stream_state.lock().unwrap().end_sent = true;
3921 }
3922
3923 let _ = event_tx.send(MockExitEvent::Done).await;
3925
3926 assert!(
3928 expect_switch.is_empty(),
3929 "expect_switch = {expect_switch:?}"
3930 );
3931
3932 ConfluxEndpointResult::Relay { circ }
3933 }
3934
3935 #[cfg(feature = "conflux")]
3936 async fn run_conflux_client(
3937 tunnel: Arc<ClientCirc>,
3938 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3939 send_data: Vec<u8>,
3940 recv_data: Vec<u8>,
3941 ) -> ConfluxEndpointResult {
3942 let res = conflux_link_rx.await;
3943
3944 let res = res.unwrap().unwrap();
3945 assert_eq!(res.len(), 2);
3946
3947 let mut stream = tunnel
3952 .begin_stream("www.example.com", 443, None)
3953 .await
3954 .unwrap();
3955
3956 stream.write_all(&send_data).await.unwrap();
3957 stream.flush().await.unwrap();
3958
3959 let mut recv: Vec<u8> = Vec::new();
3960 let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3961 assert_eq!(recv_len, recv_data.len());
3962 assert_eq!(recv_data, recv);
3963
3964 ConfluxEndpointResult::Circuit { tunnel, stream }
3965 }
3966
3967 #[cfg(feature = "conflux")]
3968 async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3969 endpoint: ConfluxTestEndpoint<I>,
3970 ) -> ConfluxEndpointResult {
3971 match endpoint {
3972 ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3973 ConfluxTestEndpoint::Client {
3974 tunnel,
3975 conflux_link_rx,
3976 send_data,
3977 recv_data,
3978 } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3979 }
3980 }
3981
3982 #[traced_test]
4000 #[test]
4001 #[cfg(feature = "conflux")]
4002 fn multipath_client_to_exit() {
4003 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4004 const NUM_CELLS: usize = 300;
4006 const CELL_SIZE: usize = 498;
4008
4009 let TestTunnelCtx {
4010 tunnel,
4011 circs,
4012 conflux_link_rx,
4013 } = setup_good_conflux_tunnel(&rt).await;
4014 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4015
4016 let mut send_data = (0..255_u8)
4018 .cycle()
4019 .take(NUM_CELLS * CELL_SIZE)
4020 .collect::<Vec<_>>();
4021 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
4022
4023 let mut tasks = vec![];
4024
4025 let (tx1, rx1) = mpsc::channel(1);
4028 let (tx2, rx2) = mpsc::channel(1);
4029
4030 let circ1_rtt_delays = [
4035 Some(Duration::from_millis(100)),
4037 Some(Duration::from_millis(500)),
4041 Some(Duration::from_millis(700)),
4042 Some(Duration::from_millis(900)),
4043 Some(Duration::from_millis(1100)),
4044 Some(Duration::from_millis(1300)),
4045 Some(Duration::from_millis(1500)),
4046 Some(Duration::from_millis(1700)),
4047 Some(Duration::from_millis(1900)),
4048 Some(Duration::from_millis(2100)),
4049 ]
4050 .into_iter();
4051
4052 let circ2_rtt_delays = [
4053 Some(Duration::from_millis(200)),
4054 Some(Duration::from_millis(400)),
4055 Some(Duration::from_millis(600)),
4056 Some(Duration::from_millis(800)),
4057 Some(Duration::from_millis(1000)),
4058 Some(Duration::from_millis(1200)),
4059 Some(Duration::from_millis(1400)),
4060 Some(Duration::from_millis(1600)),
4061 Some(Duration::from_millis(1800)),
4062 Some(Duration::from_millis(2000)),
4063 ]
4064 .into_iter();
4065
4066 let expected_switches1 = vec![ExpectedSwitch {
4067 cells_so_far: 126,
4075 seqno: 124,
4084 }];
4085
4086 let expected_switches2 = vec![ExpectedSwitch {
4087 cells_so_far: 1,
4090 seqno: 125,
4092 }];
4093
4094 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
4095
4096 let (_, cells_rx1) = mpsc::channel(1);
4099 let (_, cells_rx2) = mpsc::channel(1);
4100
4101 let relay1 = ConfluxExitState {
4102 runtime: Arc::clone(&relay_runtime),
4103 tunnel: Arc::clone(&tunnel),
4104 circ: circ1,
4105 rtt_delays: circ1_rtt_delays,
4106 stream_state: Arc::clone(&stream_state),
4107 expect_switch: expected_switches1,
4108 event_tx: tx1,
4109 event_rx: rx2,
4110 is_sending_leg: true,
4111 cells_rx: cells_rx1,
4112 };
4113
4114 let relay2 = ConfluxExitState {
4115 runtime: Arc::clone(&relay_runtime),
4116 tunnel: Arc::clone(&tunnel),
4117 circ: circ2,
4118 rtt_delays: circ2_rtt_delays,
4119 stream_state: Arc::clone(&stream_state),
4120 expect_switch: expected_switches2,
4121 event_tx: tx2,
4122 event_rx: rx1,
4123 is_sending_leg: false,
4124 cells_rx: cells_rx2,
4125 };
4126
4127 for mut mock_relay in [relay1, relay2] {
4128 let leg = mock_relay.circ.unique_id;
4129
4130 good_exit_handshake(
4138 &relay_runtime,
4139 mock_relay.rtt_delays.next().flatten(),
4140 &mut mock_relay.circ.chan_rx,
4141 &mut mock_relay.circ.circ_tx,
4142 )
4143 .await;
4144
4145 let relay = ConfluxTestEndpoint::Relay(mock_relay);
4146
4147 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
4148 }
4149
4150 tasks.push(rt.spawn_join(
4151 "client task".to_string(),
4152 run_conflux_endpoint(ConfluxTestEndpoint::Client {
4153 tunnel,
4154 conflux_link_rx,
4155 send_data: send_data.clone(),
4156 recv_data: vec![],
4157 }),
4158 ));
4159 let _sinks = futures::future::join_all(tasks).await;
4160 let mut stream_state = stream_state.lock().unwrap();
4161 assert!(stream_state.begin_recvd);
4162
4163 stream_state.data_recvd.sort();
4164 send_data.sort();
4165 assert_eq!(stream_state.data_recvd, send_data);
4166 });
4167 }
4168
4169 #[cfg(feature = "conflux")]
4180 async fn run_multipath_exit_to_client_test(
4181 rt: MockRuntime,
4182 tunnel: TestTunnelCtx,
4183 cells_to_send: Vec<(UniqId, AnyRelayMsg)>,
4184 send_data: Vec<u8>,
4185 recv_data: Vec<u8>,
4186 ) -> Arc<Mutex<ConfluxStreamState>> {
4187 let TestTunnelCtx {
4188 tunnel,
4189 circs,
4190 conflux_link_rx,
4191 } = tunnel;
4192 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4193
4194 let stream_state = Arc::new(Mutex::new(ConfluxStreamState::new(send_data.len())));
4195
4196 let mut tasks = vec![];
4197 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
4198 let (cells_tx1, cells_rx1) = mpsc::channel(1);
4199 let (cells_tx2, cells_rx2) = mpsc::channel(1);
4200
4201 let dispatcher = CellDispatcher {
4202 leg_tx: [(circ1.unique_id, cells_tx1), (circ2.unique_id, cells_tx2)]
4203 .into_iter()
4204 .collect(),
4205 cells_to_send,
4206 };
4207
4208 let (tx1, rx1) = mpsc::channel(1);
4211 let (tx2, rx2) = mpsc::channel(1);
4212
4213 let relay1 = ConfluxExitState {
4214 runtime: Arc::clone(&relay_runtime),
4215 tunnel: Arc::clone(&tunnel),
4216 circ: circ1,
4217 rtt_delays: [].into_iter(),
4218 stream_state: Arc::clone(&stream_state),
4219 expect_switch: vec![],
4221 event_tx: tx1,
4222 event_rx: rx2,
4223 is_sending_leg: false,
4224 cells_rx: cells_rx1,
4225 };
4226
4227 let relay2 = ConfluxExitState {
4228 runtime: Arc::clone(&relay_runtime),
4229 tunnel: Arc::clone(&tunnel),
4230 circ: circ2,
4231 rtt_delays: [].into_iter(),
4232 stream_state: Arc::clone(&stream_state),
4233 expect_switch: vec![],
4235 event_tx: tx2,
4236 event_rx: rx1,
4237 is_sending_leg: true,
4238 cells_rx: cells_rx2,
4239 };
4240
4241 rt.spawn(dispatcher.run()).unwrap();
4246
4247 for mut mock_relay in [relay1, relay2] {
4248 let leg = mock_relay.circ.unique_id;
4249
4250 good_exit_handshake(
4251 &relay_runtime,
4252 mock_relay.rtt_delays.next().flatten(),
4253 &mut mock_relay.circ.chan_rx,
4254 &mut mock_relay.circ.circ_tx,
4255 )
4256 .await;
4257
4258 let relay = ConfluxTestEndpoint::Relay(mock_relay);
4259
4260 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
4261 }
4262
4263 tasks.push(rt.spawn_join(
4264 "client task".to_string(),
4265 run_conflux_endpoint(ConfluxTestEndpoint::Client {
4266 tunnel,
4267 conflux_link_rx,
4268 send_data: send_data.clone(),
4269 recv_data,
4270 }),
4271 ));
4272
4273 let _sinks = futures::future::join_all(tasks).await;
4275
4276 stream_state
4277 }
4278
4279 #[traced_test]
4280 #[test]
4281 #[cfg(feature = "conflux")]
4282 fn multipath_exit_to_client() {
4283 const TO_SEND: &[u8] =
4285 b"But something about Buster Friendly irritated John Isidore, one specific thing";
4286
4287 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4288 const CIRC1: usize = 0;
4290 const CIRC2: usize = 1;
4291
4292 let simple_switch = vec![
4316 (CIRC1, relaymsg::Data::new(&TO_SEND[0..5]).unwrap().into()),
4317 (CIRC1, relaymsg::Data::new(&TO_SEND[5..10]).unwrap().into()),
4318 (CIRC2, relaymsg::ConfluxSwitch::new(4).into()),
4320 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
4322 (CIRC1, relaymsg::Data::new(&TO_SEND[10..20]).unwrap().into()),
4325 (CIRC2, relaymsg::Data::new(&TO_SEND[30..40]).unwrap().into()),
4326 (CIRC2, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
4327 ];
4328
4329 let multiple_switches = vec![
4376 (CIRC2, relaymsg::ConfluxSwitch::new(3).into()),
4379 (CIRC2, relaymsg::Data::new(&TO_SEND[15..20]).unwrap().into()),
4381 (CIRC2, relaymsg::Data::new(&TO_SEND[20..30]).unwrap().into()),
4382 (CIRC1, relaymsg::Data::new(&TO_SEND[0..10]).unwrap().into()),
4384 (CIRC1, relaymsg::Data::new(&TO_SEND[10..15]).unwrap().into()),
4385 (CIRC1, relaymsg::ConfluxSwitch::new(3).into()),
4387 (CIRC1, relaymsg::Data::new(&TO_SEND[31..40]).unwrap().into()),
4389 (CIRC2, relaymsg::Data::new(&TO_SEND[30..31]).unwrap().into()),
4391 (CIRC1, relaymsg::Data::new(&TO_SEND[40..]).unwrap().into()),
4393 (CIRC2, relaymsg::ConfluxSwitch::new(2).into()),
4395 ];
4396
4397 let tests = [simple_switch, multiple_switches];
4403
4404 for cells_to_send in tests {
4405 let tunnel = setup_good_conflux_tunnel(&rt).await;
4406 assert_eq!(tunnel.circs.len(), 2);
4407 let circ_ids = [tunnel.circs[0].unique_id, tunnel.circs[1].unique_id];
4408 let cells_to_send = cells_to_send
4409 .into_iter()
4410 .map(|(i, cell)| (circ_ids[i], cell))
4411 .collect();
4412
4413 let send_data = vec![];
4415 let stream_state = run_multipath_exit_to_client_test(
4416 rt.clone(),
4417 tunnel,
4418 cells_to_send,
4419 send_data.clone(),
4420 TO_SEND.into(),
4421 )
4422 .await;
4423 let stream_state = stream_state.lock().unwrap();
4424 assert!(stream_state.begin_recvd);
4425 assert!(stream_state.data_recvd.is_empty());
4427 }
4428 });
4429 }
4430
4431 #[traced_test]
4432 #[test]
4433 #[cfg(all(feature = "conflux", feature = "hs-service"))]
4434 fn conflux_incoming_stream() {
4435 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
4436 use std::error::Error as _;
4437
4438 const EXPECTED_HOP: u8 = 1;
4439
4440 let TestTunnelCtx {
4441 tunnel,
4442 circs,
4443 conflux_link_rx,
4444 } = setup_good_conflux_tunnel(&rt).await;
4445
4446 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
4447
4448 let link = await_link_payload(&mut circ1.chan_rx).await;
4449 for circ in [&mut circ1, &mut circ2] {
4450 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
4451 circ.circ_tx
4452 .send(rmsg_to_ccmsg(None, linked))
4453 .await
4454 .unwrap();
4455 }
4456
4457 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
4458 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
4459
4460 let err = tunnel
4462 .allow_stream_requests(
4463 &[tor_cell::relaycell::RelayCmd::BEGIN],
4464 (tunnel.unique_id(), EXPECTED_HOP.into()).into(),
4465 AllowAllStreamsFilter,
4466 )
4467 .await
4468 .map(|_| ())
4470 .unwrap_err();
4471
4472 let err_src = err.source().unwrap();
4473 assert!(err_src
4474 .to_string()
4475 .contains("Cannot allow stream requests on tunnel with 2 legs"));
4476 });
4477 }
4478}