1pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55 AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56 StreamReader,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61 CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use tor_cell::{
69 chancell::CircId,
70 relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71};
72
73use tor_error::{bad_api_usage, internal, into_internal};
74use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75use tor_protover::named;
76
77pub use crate::crypto::binding::CircuitBinding;
78pub use crate::memquota::StreamAccount;
79pub use crate::tunnel::circuit::unique_id::UniqId;
80
81#[cfg(feature = "hs-service")]
82use {
83 crate::stream::{IncomingCmdChecker, IncomingStream},
84 crate::tunnel::reactor::StreamReqInfo,
85};
86
87use futures::channel::mpsc;
88use oneshot_fused_workaround as oneshot;
89
90use crate::congestion::sendme::StreamRecvWindow;
91use crate::DynTimeProvider;
92use futures::FutureExt as _;
93use std::collections::HashMap;
94use std::net::IpAddr;
95use std::sync::{Arc, Mutex};
96use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97
98use crate::crypto::handshake::ntor::NtorPublicKey;
99
100pub use path::{Path, PathEntry};
101
102pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104
105#[cfg(feature = "send-control-msg")]
106use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107
108pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109#[cfg(feature = "send-control-msg")]
110#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112
113pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117
118pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122
123#[derive(Debug)]
124pub struct ClientCirc {
167 mutable: Arc<TunnelMutableState>,
169 unique_id: UniqId,
171 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179 #[cfg(test)]
181 circid: CircId,
182 memquota: CircuitAccount,
184 time_provider: DynTimeProvider,
186}
187
188#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
209
210impl TunnelMutableState {
211 pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
213 #[allow(unused)] let state = self
215 .0
216 .lock()
217 .expect("lock poisoned")
218 .insert(unique_id, mutable);
219
220 debug_assert!(state.is_none());
221 }
222
223 pub(super) fn remove(&self, unique_id: UniqId) {
225 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228 debug_assert!(state.is_some());
229 }
230
231 fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235 let lock = self.0.lock().expect("lock poisoned");
236 let mutable = lock
237 .get(&unique_id)
238 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239
240 Ok(mutable.path())
241 }
242
243 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248 let lock = self.0.lock().expect("lock poisoned");
249 let mutable = lock
250 .get(&unique_id)
251 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252
253 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254 path::HopDetail::Relay(r) => r,
255 #[cfg(feature = "hs-common")]
256 path::HopDetail::Virtual => {
257 panic!("somehow made a circuit with a virtual first hop.")
258 }
259 });
260
261 Ok(first_hop)
262 }
263
264 fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270 let lock = self.0.lock().expect("lock poisoned");
271 let mutable = lock
272 .get(&unique_id)
273 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274
275 Ok(mutable.last_hop_num())
276 }
277
278 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282 let lock = self.0.lock().expect("lock poisoned");
283 let mutable = lock
284 .get(&unique_id)
285 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286
287 Ok(mutable.n_hops())
288 }
289
290 fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293 let lock = self.0.lock().expect("lock poisoned");
294 let mutable = lock
295 .get(&unique_id)
296 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298 Ok(mutable.binding_key(hop))
299 }
300
301 fn n_legs(&self) -> usize {
306 let lock = self.0.lock().expect("lock poisoned");
307 lock.len()
308 }
309}
310
311#[derive(Educe, Default)]
313#[educe(Debug)]
314pub(super) struct MutableState(Mutex<CircuitState>);
315
316impl MutableState {
317 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
319 let mut mutable = self.0.lock().expect("poisoned lock");
320 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
321 mutable.binding.push(binding);
322 }
323
324 pub(super) fn path(&self) -> Arc<path::Path> {
326 let mutable = self.0.lock().expect("poisoned lock");
327 Arc::clone(&mutable.path)
328 }
329
330 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
333 let mutable = self.0.lock().expect("poisoned lock");
334
335 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
336 }
339
340 fn first_hop(&self) -> Option<HopDetail> {
342 let mutable = self.0.lock().expect("poisoned lock");
343 mutable.path.first_hop()
344 }
345
346 fn last_hop_num(&self) -> Option<HopNum> {
353 let mutable = self.0.lock().expect("poisoned lock");
354 mutable.path.last_hop_num()
355 }
356
357 fn n_hops(&self) -> usize {
364 let mutable = self.0.lock().expect("poisoned lock");
365 mutable.path.n_hops()
366 }
367}
368
369#[derive(Educe, Default)]
371#[educe(Debug)]
372pub(super) struct CircuitState {
373 path: Arc<path::Path>,
379
380 #[educe(Debug(ignore))]
388 binding: Vec<Option<CircuitBinding>>,
389}
390
391pub struct PendingClientCirc {
396 recvcreated: oneshot::Receiver<CreateResponse>,
399 circ: Arc<ClientCirc>,
401}
402
403#[non_exhaustive]
415#[derive(Clone, Debug)]
416pub struct CircParameters {
417 pub extend_by_ed25519_id: bool,
420 pub ccontrol: CongestionControlParams,
422}
423
424#[derive(Clone, Debug)]
436pub(super) struct HopSettings {
437 pub(super) ccontrol: CongestionControlParams,
439}
440
441impl HopSettings {
442 #[allow(clippy::unnecessary_wraps)] pub(super) fn from_params_and_caps(
453 params: &CircParameters,
454 caps: &tor_protover::Protocols,
455 ) -> Result<Self> {
456 let mut settings = Self {
457 ccontrol: params.ccontrol.clone(),
458 };
459
460 match settings.ccontrol.alg() {
461 crate::ccparams::Algorithm::FixedWindow(_) => {}
462 crate::ccparams::Algorithm::Vegas(_) => {
463 if !caps.supports_named_subver(named::FLOWCTRL_CC) {
465 settings.ccontrol.use_fallback_alg();
466 }
467 }
468 }
469
470 Ok(settings)
471 }
472
473 pub(super) fn without_negotiation(mut self) -> Self {
480 self.ccontrol.use_fallback_alg();
481 self
482 }
483}
484
485#[cfg(test)]
486impl std::default::Default for CircParameters {
487 fn default() -> Self {
488 Self {
489 extend_by_ed25519_id: true,
490 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
491 }
492 }
493}
494
495impl CircParameters {
496 pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
498 Self {
499 extend_by_ed25519_id,
500 ccontrol,
501 }
502 }
503}
504
505impl ClientCirc {
506 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
514 Ok(self
515 .mutable
516 .first_hop(self.unique_id)
517 .map_err(|_| Error::CircuitClosed)?
518 .expect("called first_hop on an un-constructed circuit"))
519 }
520
521 pub fn last_hop_num(&self) -> Result<HopNum> {
531 Ok(self
532 .mutable
533 .last_hop_num(self.unique_id)?
534 .ok_or_else(|| internal!("no last hop index"))?)
535 }
536
537 pub fn path_ref(&self) -> Result<Arc<Path>> {
542 self.mutable
543 .path_ref(self.unique_id)
544 .map_err(|_| Error::CircuitClosed)
545 }
546
547 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
551 let (tx, rx) = oneshot::channel();
552
553 self.control
554 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
555 .map_err(|_| Error::CircuitClosed)?;
556
557 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
558 }
559
560 pub fn mq_account(&self) -> &CircuitAccount {
562 &self.memquota
563 }
564
565 pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
573 self.mutable
574 .binding_key(self.unique_id, hop)
575 .map_err(|_| Error::CircuitClosed)
576 }
577
578 #[cfg(feature = "send-control-msg")]
656 pub async fn start_conversation(
657 &self,
658 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
659 reply_handler: impl MsgHandler + Send + 'static,
660 hop_num: HopNum,
661 ) -> Result<Conversation<'_>> {
662 let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
663 let conversation = Conversation(self);
664 conversation.send_internal(msg, Some(handler)).await?;
665 Ok(conversation)
666 }
667
668 #[cfg(feature = "send-control-msg")]
674 pub async fn send_raw_msg(
675 &self,
676 msg: tor_cell::relaycell::msg::AnyRelayMsg,
677 hop_num: HopNum,
678 ) -> Result<()> {
679 let (sender, receiver) = oneshot::channel();
680 let ctrl_msg = CtrlMsg::SendMsg {
681 hop_num,
682 msg,
683 sender,
684 };
685 self.control
686 .unbounded_send(ctrl_msg)
687 .map_err(|_| Error::CircuitClosed)?;
688
689 receiver.await.map_err(|_| Error::CircuitClosed)?
690 }
691
692 #[cfg(feature = "hs-service")]
712 pub async fn allow_stream_requests(
713 self: &Arc<ClientCirc>,
714 allow_commands: &[tor_cell::relaycell::RelayCmd],
715 hop_num: HopNum,
716 filter: impl crate::stream::IncomingStreamRequestFilter,
717 ) -> Result<impl futures::Stream<Item = IncomingStream>> {
718 use futures::stream::StreamExt;
719
720 use crate::tunnel::HopLocation;
721
722 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
724
725 let circ_count = self.mutable.n_legs();
727 if circ_count != 1 {
728 return Err(
729 internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
730 );
731 }
732
733 let time_prov = self.time_provider.clone();
734 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
735 let (incoming_sender, incoming_receiver) =
736 MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
737 let (tx, rx) = oneshot::channel();
738
739 self.command
740 .unbounded_send(CtrlCmd::AwaitStreamRequest {
741 cmd_checker,
742 incoming_sender,
743 hop_num,
744 done: tx,
745 filter: Box::new(filter),
746 })
747 .map_err(|_| Error::CircuitClosed)?;
748
749 rx.await.map_err(|_| Error::CircuitClosed)??;
751
752 let allowed_hop_num = hop_num;
753
754 let circ = Arc::clone(self);
755 Ok(incoming_receiver.map(move |req_ctx| {
756 let StreamReqInfo {
757 req,
758 stream_id,
759 hop_num,
760 leg,
761 receiver,
762 msg_tx,
763 memquota,
764 relay_cell_format,
765 } = req_ctx;
766
767 assert_eq!(allowed_hop_num, hop_num);
772
773 let target = StreamTarget {
780 circ: Arc::clone(&circ),
781 tx: msg_tx,
782 hop: HopLocation::Hop((leg, hop_num)),
783 stream_id,
784 relay_cell_format,
785 };
786
787 let reader = StreamReader {
788 target: target.clone(),
789 receiver,
790 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
791 ended: false,
792 };
793
794 IncomingStream::new(circ.time_provider.clone(), req, target, reader, memquota)
795 }))
796 }
797
798 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
801 where
802 Tg: CircTarget,
803 {
804 if target
816 .protovers()
817 .supports_named_subver(named::RELAY_NTORV3)
818 {
819 self.extend_ntor_v3(target, params).await
820 } else {
821 self.extend_ntor(target, params).await
822 }
823 }
824
825 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
828 where
829 Tg: CircTarget,
830 {
831 let key = NtorPublicKey {
832 id: *target
833 .rsa_identity()
834 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
835 pk: *target.ntor_onion_key(),
836 };
837 let mut linkspecs = target
838 .linkspecs()
839 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
840 if !params.extend_by_ed25519_id {
841 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
842 }
843
844 let (tx, rx) = oneshot::channel();
845
846 let peer_id = OwnedChanTarget::from_chan_target(target);
847 let settings =
848 HopSettings::from_params_and_caps(¶ms, target.protovers())?.without_negotiation();
849 self.control
850 .unbounded_send(CtrlMsg::ExtendNtor {
851 peer_id,
852 public_key: key,
853 linkspecs,
854 settings,
855 done: tx,
856 })
857 .map_err(|_| Error::CircuitClosed)?;
858
859 rx.await.map_err(|_| Error::CircuitClosed)??;
860
861 Ok(())
862 }
863
864 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
867 where
868 Tg: CircTarget,
869 {
870 let key = NtorV3PublicKey {
871 id: *target
872 .ed_identity()
873 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
874 pk: *target.ntor_onion_key(),
875 };
876 let mut linkspecs = target
877 .linkspecs()
878 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
879 if !params.extend_by_ed25519_id {
880 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
881 }
882
883 let (tx, rx) = oneshot::channel();
884
885 let peer_id = OwnedChanTarget::from_chan_target(target);
886 let settings = HopSettings::from_params_and_caps(¶ms, target.protovers())?;
887 self.control
888 .unbounded_send(CtrlMsg::ExtendNtorV3 {
889 peer_id,
890 public_key: key,
891 linkspecs,
892 settings,
893 done: tx,
894 })
895 .map_err(|_| Error::CircuitClosed)?;
896
897 rx.await.map_err(|_| Error::CircuitClosed)??;
898
899 Ok(())
900 }
901
902 #[cfg(feature = "hs-common")]
926 pub async fn extend_virtual(
927 &self,
928 protocol: handshake::RelayProtocol,
929 role: handshake::HandshakeRole,
930 seed: impl handshake::KeyGenerator,
931 params: &CircParameters,
932 capabilities: &tor_protover::Protocols,
933 ) -> Result<()> {
934 use self::handshake::BoxedClientLayer;
935
936 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
937 let relay_cell_format = protocol.relay_cell_format();
938
939 let BoxedClientLayer { fwd, back, binding } =
940 protocol.construct_client_layers(role, seed)?;
941
942 let settings = HopSettings::from_params_and_caps(params, capabilities)?
943 .without_negotiation();
945 let (tx, rx) = oneshot::channel();
946 let message = CtrlCmd::ExtendVirtual {
947 relay_cell_format,
948 cell_crypto: (fwd, back, binding),
949 settings,
950 done: tx,
951 };
952
953 self.command
954 .unbounded_send(message)
955 .map_err(|_| Error::CircuitClosed)?;
956
957 rx.await.map_err(|_| Error::CircuitClosed)?
958 }
959
960 async fn begin_stream_impl(
968 self: &Arc<ClientCirc>,
969 begin_msg: AnyRelayMsg,
970 cmd_checker: AnyCmdChecker,
971 ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
972 let hop = TargetHop::LastHop;
975
976 let time_prov = self.time_provider.clone();
977
978 let memquota = StreamAccount::new(self.mq_account())?;
979 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
980 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
981 let (tx, rx) = oneshot::channel();
982 let (msg_tx, msg_rx) =
983 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
984
985 self.control
986 .unbounded_send(CtrlMsg::BeginStream {
987 hop,
988 message: begin_msg,
989 sender,
990 rx: msg_rx,
991 done: tx,
992 cmd_checker,
993 })
994 .map_err(|_| Error::CircuitClosed)?;
995
996 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
997
998 let target = StreamTarget {
999 circ: self.clone(),
1000 tx: msg_tx,
1001 hop,
1002 stream_id,
1003 relay_cell_format,
1004 };
1005
1006 let reader = StreamReader {
1007 target: target.clone(),
1008 receiver,
1009 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
1010 ended: false,
1011 };
1012
1013 Ok((reader, target, memquota))
1014 }
1015
1016 async fn begin_data_stream(
1019 self: &Arc<ClientCirc>,
1020 msg: AnyRelayMsg,
1021 optimistic: bool,
1022 ) -> Result<DataStream> {
1023 let (reader, target, memquota) = self
1024 .begin_stream_impl(msg, DataCmdChecker::new_any())
1025 .await?;
1026 let mut stream = DataStream::new(self.time_provider.clone(), reader, target, memquota);
1027 if !optimistic {
1028 stream.wait_for_connection().await?;
1029 }
1030 Ok(stream)
1031 }
1032
1033 pub async fn begin_stream(
1039 self: &Arc<ClientCirc>,
1040 target: &str,
1041 port: u16,
1042 parameters: Option<StreamParameters>,
1043 ) -> Result<DataStream> {
1044 let parameters = parameters.unwrap_or_default();
1045 let begin_flags = parameters.begin_flags();
1046 let optimistic = parameters.is_optimistic();
1047 let target = if parameters.suppressing_hostname() {
1048 ""
1049 } else {
1050 target
1051 };
1052 let beginmsg = Begin::new(target, port, begin_flags)
1053 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
1054 self.begin_data_stream(beginmsg.into(), optimistic).await
1055 }
1056
1057 pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
1060 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
1065 .await
1066 }
1067
1068 pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
1074 let resolve_msg = Resolve::new(hostname);
1075
1076 let resolved_msg = self.try_resolve(resolve_msg).await?;
1077
1078 resolved_msg
1079 .into_answers()
1080 .into_iter()
1081 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1082 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1083 Ok(_) => None,
1084 Err(e) => Some(Err(e)),
1085 })
1086 .collect()
1087 }
1088
1089 pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1095 let resolve_ptr_msg = Resolve::new_reverse(&addr);
1096
1097 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1098
1099 resolved_msg
1100 .into_answers()
1101 .into_iter()
1102 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1103 Ok(ResolvedVal::Hostname(v)) => Some(
1104 String::from_utf8(v)
1105 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1106 ),
1107 Ok(_) => None,
1108 Err(e) => Some(Err(e)),
1109 })
1110 .collect()
1111 }
1112
1113 async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1116 let (reader, _target, memquota) = self
1117 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1118 .await?;
1119 let mut resolve_stream = ResolveStream::new(reader, memquota);
1120 resolve_stream.read_msg().await
1121 }
1122
1123 pub fn terminate(&self) {
1134 let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1135 }
1136
1137 pub(crate) fn protocol_error(&self) {
1145 self.terminate();
1146 }
1147
1148 pub fn is_closing(&self) -> bool {
1150 self.control.is_closed()
1151 }
1152
1153 pub fn unique_id(&self) -> UniqId {
1155 self.unique_id
1156 }
1157
1158 pub fn n_hops(&self) -> Result<usize> {
1165 self.mutable
1166 .n_hops(self.unique_id)
1167 .map_err(|_| Error::CircuitClosed)
1168 }
1169
1170 #[cfg(feature = "experimental-api")]
1177 pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1178 self.reactor_closed_rx.clone().map(|_| ())
1179 }
1180}
1181
1182#[cfg(feature = "send-control-msg")]
1190#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1191pub struct Conversation<'r>(&'r ClientCirc);
1192
1193#[cfg(feature = "send-control-msg")]
1194#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1195impl Conversation<'_> {
1196 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1201 self.send_internal(Some(msg), None).await
1202 }
1203
1204 pub(crate) async fn send_internal(
1208 &self,
1209 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1210 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1211 ) -> Result<()> {
1212 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1213 let (sender, receiver) = oneshot::channel();
1214
1215 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1216 msg,
1217 handler,
1218 sender,
1219 };
1220 self.0
1221 .control
1222 .unbounded_send(ctrl_msg)
1223 .map_err(|_| Error::CircuitClosed)?;
1224
1225 receiver.await.map_err(|_| Error::CircuitClosed)?
1226 }
1227}
1228
1229impl PendingClientCirc {
1230 pub(crate) fn new(
1236 id: CircId,
1237 channel: Arc<Channel>,
1238 createdreceiver: oneshot::Receiver<CreateResponse>,
1239 input: CircuitRxReceiver,
1240 unique_id: UniqId,
1241 runtime: DynTimeProvider,
1242 memquota: CircuitAccount,
1243 ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1244 let time_provider = channel.time_provider().clone();
1245 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1246 Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1247
1248 let circuit = ClientCirc {
1249 mutable,
1250 unique_id,
1251 control: control_tx,
1252 command: command_tx,
1253 reactor_closed_rx: reactor_closed_rx.shared(),
1254 #[cfg(test)]
1255 circid: id,
1256 memquota,
1257 time_provider,
1258 };
1259
1260 let pending = PendingClientCirc {
1261 recvcreated: createdreceiver,
1262 circ: Arc::new(circuit),
1263 };
1264 (pending, reactor)
1265 }
1266
1267 pub fn peek_unique_id(&self) -> UniqId {
1269 self.circ.unique_id
1270 }
1271
1272 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1279 let protocols = tor_protover::Protocols::new();
1283 let settings =
1284 HopSettings::from_params_and_caps(¶ms, &protocols)?.without_negotiation();
1285
1286 let (tx, rx) = oneshot::channel();
1287 self.circ
1288 .control
1289 .unbounded_send(CtrlMsg::Create {
1290 recv_created: self.recvcreated,
1291 handshake: CircuitHandshake::CreateFast,
1292 settings,
1293 done: tx,
1294 })
1295 .map_err(|_| Error::CircuitClosed)?;
1296
1297 rx.await.map_err(|_| Error::CircuitClosed)??;
1298
1299 Ok(self.circ)
1300 }
1301
1302 pub async fn create_firsthop<Tg>(
1307 self,
1308 target: &Tg,
1309 params: CircParameters,
1310 ) -> Result<Arc<ClientCirc>>
1311 where
1312 Tg: tor_linkspec::CircTarget,
1313 {
1314 if target
1316 .protovers()
1317 .supports_named_subver(named::RELAY_NTORV3)
1318 {
1319 self.create_firsthop_ntor_v3(target, params).await
1320 } else {
1321 self.create_firsthop_ntor(target, params).await
1322 }
1323 }
1324
1325 pub async fn create_firsthop_ntor<Tg>(
1330 self,
1331 target: &Tg,
1332 params: CircParameters,
1333 ) -> Result<Arc<ClientCirc>>
1334 where
1335 Tg: tor_linkspec::CircTarget,
1336 {
1337 let (tx, rx) = oneshot::channel();
1338 let settings =
1339 HopSettings::from_params_and_caps(¶ms, target.protovers())?.without_negotiation();
1340
1341 self.circ
1342 .control
1343 .unbounded_send(CtrlMsg::Create {
1344 recv_created: self.recvcreated,
1345 handshake: CircuitHandshake::Ntor {
1346 public_key: NtorPublicKey {
1347 id: *target
1348 .rsa_identity()
1349 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1350 pk: *target.ntor_onion_key(),
1351 },
1352 ed_identity: *target
1353 .ed_identity()
1354 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1355 },
1356 settings,
1357 done: tx,
1358 })
1359 .map_err(|_| Error::CircuitClosed)?;
1360
1361 rx.await.map_err(|_| Error::CircuitClosed)??;
1362
1363 Ok(self.circ)
1364 }
1365
1366 pub async fn create_firsthop_ntor_v3<Tg>(
1375 self,
1376 target: &Tg,
1377 params: CircParameters,
1378 ) -> Result<Arc<ClientCirc>>
1379 where
1380 Tg: tor_linkspec::CircTarget,
1381 {
1382 let settings = HopSettings::from_params_and_caps(¶ms, target.protovers())?;
1383 let (tx, rx) = oneshot::channel();
1384
1385 self.circ
1386 .control
1387 .unbounded_send(CtrlMsg::Create {
1388 recv_created: self.recvcreated,
1389 handshake: CircuitHandshake::NtorV3 {
1390 public_key: NtorV3PublicKey {
1391 id: *target
1392 .ed_identity()
1393 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1394 pk: *target.ntor_onion_key(),
1395 },
1396 },
1397 settings,
1398 done: tx,
1399 })
1400 .map_err(|_| Error::CircuitClosed)?;
1401
1402 rx.await.map_err(|_| Error::CircuitClosed)??;
1403
1404 Ok(self.circ)
1405 }
1406}
1407
1408fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1411 match val {
1412 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1413 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1414 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1415 _ => Ok(val),
1416 }
1417}
1418
1419#[cfg(test)]
1420pub(crate) mod test {
1421 #![allow(clippy::bool_assert_comparison)]
1423 #![allow(clippy::clone_on_copy)]
1424 #![allow(clippy::dbg_macro)]
1425 #![allow(clippy::mixed_attributes_style)]
1426 #![allow(clippy::print_stderr)]
1427 #![allow(clippy::print_stdout)]
1428 #![allow(clippy::single_char_pattern)]
1429 #![allow(clippy::unwrap_used)]
1430 #![allow(clippy::unchecked_duration_subtraction)]
1431 #![allow(clippy::useless_vec)]
1432 #![allow(clippy::needless_pass_by_value)]
1433 use super::*;
1436 use crate::channel::OpenChanCellS2C;
1437 use crate::channel::{test::new_reactor, CodecError};
1438 use crate::congestion::test_utils::params::build_cc_vegas_params;
1439 use crate::crypto::cell::RelayCellBody;
1440 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1441 #[cfg(feature = "hs-service")]
1442 use crate::stream::IncomingStreamRequestFilter;
1443 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1444 use futures::channel::mpsc::{Receiver, Sender};
1445 use futures::io::{AsyncReadExt, AsyncWriteExt};
1446 use futures::sink::SinkExt;
1447 use futures::stream::StreamExt;
1448 use futures::task::SpawnExt;
1449 use hex_literal::hex;
1450 use std::collections::{HashMap, VecDeque};
1451 use std::fmt::Debug;
1452 use std::time::Duration;
1453 use tor_basic_utils::test_rng::testing_rng;
1454 use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
1455 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1456 use tor_cell::relaycell::msg::SendmeTag;
1457 use tor_cell::relaycell::{
1458 msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1459 };
1460 use tor_linkspec::OwnedCircTarget;
1461 use tor_memquota::HasMemoryCost;
1462 use tor_rtcompat::Runtime;
1463 use tracing::trace;
1464 use tracing_test::traced_test;
1465
1466 #[cfg(feature = "conflux")]
1467 use {
1468 crate::tunnel::reactor::ConfluxHandshakeResult,
1469 crate::util::err::ConfluxHandshakeError,
1470 std::result::Result as StdResult,
1471 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1472 tor_cell::relaycell::msg::ConfluxLink,
1473 tor_rtmock::MockRuntime,
1474 };
1475
1476 impl PendingClientCirc {
1477 pub(crate) fn peek_circid(&self) -> CircId {
1479 self.circ.circid
1480 }
1481 }
1482
1483 impl ClientCirc {
1484 pub(crate) fn peek_circid(&self) -> CircId {
1486 self.circid
1487 }
1488 }
1489
1490 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1491 let rfmt = RelayCellFormat::V0;
1493 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1494 .encode(rfmt, &mut testing_rng())
1495 .unwrap();
1496 let chanmsg = chanmsg::Relay::from(body);
1497 ClientCircChanMsg::Relay(chanmsg)
1498 }
1499
1500 const EXAMPLE_SK: [u8; 32] =
1502 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1503 const EXAMPLE_PK: [u8; 32] =
1504 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1505 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1506 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1507
1508 #[cfg(test)]
1510 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1511 buffer: usize,
1512 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1513 crate::fake_mpsc(buffer)
1514 }
1515
1516 fn example_target() -> OwnedCircTarget {
1518 let mut builder = OwnedCircTarget::builder();
1519 builder
1520 .chan_target()
1521 .ed_identity(EXAMPLE_ED_ID.into())
1522 .rsa_identity(EXAMPLE_RSA_ID.into());
1523 builder
1524 .ntor_onion_key(EXAMPLE_PK.into())
1525 .protocols("FlowCtrl=1-2".parse().unwrap())
1526 .build()
1527 .unwrap()
1528 }
1529 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1530 crate::crypto::handshake::ntor::NtorSecretKey::new(
1531 EXAMPLE_SK.into(),
1532 EXAMPLE_PK.into(),
1533 EXAMPLE_RSA_ID.into(),
1534 )
1535 }
1536 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1537 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1538 EXAMPLE_SK.into(),
1539 EXAMPLE_PK.into(),
1540 EXAMPLE_ED_ID.into(),
1541 )
1542 }
1543
1544 fn working_fake_channel<R: Runtime>(
1545 rt: &R,
1546 ) -> (
1547 Arc<Channel>,
1548 Receiver<AnyChanCell>,
1549 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1550 ) {
1551 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1552 rt.spawn(async {
1553 let _ignore = chan_reactor.run().await;
1554 })
1555 .unwrap();
1556 (channel, rx, tx)
1557 }
1558
1559 #[derive(Copy, Clone)]
1561 enum HandshakeType {
1562 Fast,
1563 Ntor,
1564 NtorV3,
1565 }
1566
1567 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1568 use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1572
1573 let (chan, mut rx, _sink) = working_fake_channel(rt);
1574 let circid = CircId::new(128).unwrap();
1575 let (created_send, created_recv) = oneshot::channel();
1576 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1577 let unique_id = UniqId::new(23, 17);
1578
1579 let (pending, reactor) = PendingClientCirc::new(
1580 circid,
1581 chan,
1582 created_recv,
1583 circmsg_recv,
1584 unique_id,
1585 DynTimeProvider::new(rt.clone()),
1586 CircuitAccount::new_noop(),
1587 );
1588
1589 rt.spawn(async {
1590 let _ignore = reactor.run().await;
1591 })
1592 .unwrap();
1593
1594 let simulate_relay_fut = async move {
1596 let mut rng = testing_rng();
1597 let create_cell = rx.next().await.unwrap();
1598 assert_eq!(create_cell.circid(), Some(circid));
1599 let reply = match handshake_type {
1600 HandshakeType::Fast => {
1601 let cf = match create_cell.msg() {
1602 AnyChanMsg::CreateFast(cf) => cf,
1603 other => panic!("{:?}", other),
1604 };
1605 let (_, rep) = CreateFastServer::server(
1606 &mut rng,
1607 &mut |_: &()| Some(()),
1608 &[()],
1609 cf.handshake(),
1610 )
1611 .unwrap();
1612 CreateResponse::CreatedFast(CreatedFast::new(rep))
1613 }
1614 HandshakeType::Ntor => {
1615 let c2 = match create_cell.msg() {
1616 AnyChanMsg::Create2(c2) => c2,
1617 other => panic!("{:?}", other),
1618 };
1619 let (_, rep) = NtorServer::server(
1620 &mut rng,
1621 &mut |_: &()| Some(()),
1622 &[example_ntor_key()],
1623 c2.body(),
1624 )
1625 .unwrap();
1626 CreateResponse::Created2(Created2::new(rep))
1627 }
1628 HandshakeType::NtorV3 => {
1629 let c2 = match create_cell.msg() {
1630 AnyChanMsg::Create2(c2) => c2,
1631 other => panic!("{:?}", other),
1632 };
1633 let mut reply_fn = if with_cc {
1634 |client_exts: &[CircRequestExt]| {
1635 let _ = client_exts
1636 .iter()
1637 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1638 .expect("Client failed to request CC");
1639 Some(vec![CircResponseExt::CcResponse(
1642 extend_ext::CcResponse::new(31),
1643 )])
1644 }
1645 } else {
1646 |_: &_| Some(vec![])
1647 };
1648 let (_, rep) = NtorV3Server::server(
1649 &mut rng,
1650 &mut reply_fn,
1651 &[example_ntor_v3_key()],
1652 c2.body(),
1653 )
1654 .unwrap();
1655 CreateResponse::Created2(Created2::new(rep))
1656 }
1657 };
1658 created_send.send(reply).unwrap();
1659 };
1660 let client_fut = async move {
1662 let target = example_target();
1663 let params = CircParameters::default();
1664 let ret = match handshake_type {
1665 HandshakeType::Fast => {
1666 trace!("doing fast create");
1667 pending.create_firsthop_fast(params).await
1668 }
1669 HandshakeType::Ntor => {
1670 trace!("doing ntor create");
1671 pending.create_firsthop_ntor(&target, params).await
1672 }
1673 HandshakeType::NtorV3 => {
1674 let params = if with_cc {
1675 CircParameters::new(true, build_cc_vegas_params())
1677 } else {
1678 params
1679 };
1680 trace!("doing ntor_v3 create");
1681 pending.create_firsthop_ntor_v3(&target, params).await
1682 }
1683 };
1684 trace!("create done: result {:?}", ret);
1685 ret
1686 };
1687
1688 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1689
1690 let _circ = circ.unwrap();
1691
1692 assert_eq!(_circ.n_hops().unwrap(), 1);
1694 }
1695
1696 #[traced_test]
1697 #[test]
1698 fn test_create_fast() {
1699 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1700 test_create(&rt, HandshakeType::Fast, false).await;
1701 });
1702 }
1703 #[traced_test]
1704 #[test]
1705 fn test_create_ntor() {
1706 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1707 test_create(&rt, HandshakeType::Ntor, false).await;
1708 });
1709 }
1710 #[traced_test]
1711 #[test]
1712 fn test_create_ntor_v3() {
1713 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1714 test_create(&rt, HandshakeType::NtorV3, false).await;
1715 });
1716 }
1717 #[traced_test]
1718 #[test]
1719 #[cfg(feature = "flowctl-cc")]
1720 fn test_create_ntor_v3_with_cc() {
1721 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1722 test_create(&rt, HandshakeType::NtorV3, true).await;
1723 });
1724 }
1725
1726 pub(crate) struct DummyCrypto {
1729 counter_tag: [u8; 20],
1730 counter: u32,
1731 lasthop: bool,
1732 }
1733 impl DummyCrypto {
1734 fn next_tag(&mut self) -> SendmeTag {
1735 #![allow(clippy::identity_op)]
1736 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1737 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1738 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1739 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1740 self.counter += 1;
1741 self.counter_tag.into()
1742 }
1743 }
1744
1745 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1746 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1747 self.next_tag()
1748 }
1749 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1750 }
1751 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1752 fn decrypt_inbound(
1753 &mut self,
1754 _cmd: ChanCmd,
1755 _cell: &mut RelayCellBody,
1756 ) -> Option<SendmeTag> {
1757 if self.lasthop {
1758 Some(self.next_tag())
1759 } else {
1760 None
1761 }
1762 }
1763 }
1764 impl DummyCrypto {
1765 pub(crate) fn new(lasthop: bool) -> Self {
1766 DummyCrypto {
1767 counter_tag: [0; 20],
1768 counter: 0,
1769 lasthop,
1770 }
1771 }
1772 }
1773
1774 async fn newcirc_ext<R: Runtime>(
1777 rt: &R,
1778 unique_id: UniqId,
1779 chan: Arc<Channel>,
1780 hops: Vec<path::HopDetail>,
1781 next_msg_from: HopNum,
1782 params: CircParameters,
1783 ) -> (Arc<ClientCirc>, CircuitRxSender) {
1784 let circid = CircId::new(128).unwrap();
1785 let (_created_send, created_recv) = oneshot::channel();
1786 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1787
1788 let (pending, reactor) = PendingClientCirc::new(
1789 circid,
1790 chan,
1791 created_recv,
1792 circmsg_recv,
1793 unique_id,
1794 DynTimeProvider::new(rt.clone()),
1795 CircuitAccount::new_noop(),
1796 );
1797
1798 rt.spawn(async {
1799 let _ignore = reactor.run().await;
1800 })
1801 .unwrap();
1802
1803 let PendingClientCirc {
1804 circ,
1805 recvcreated: _,
1806 } = pending;
1807
1808 let relay_cell_format = RelayCellFormat::V0;
1810
1811 let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1812 for (idx, peer_id) in hops.into_iter().enumerate() {
1813 let (tx, rx) = oneshot::channel();
1814 let idx = idx as u8;
1815
1816 circ.command
1817 .unbounded_send(CtrlCmd::AddFakeHop {
1818 relay_cell_format,
1819 fwd_lasthop: idx == last_hop_num,
1820 rev_lasthop: idx == u8::from(next_msg_from),
1821 peer_id,
1822 params: params.clone(),
1823 done: tx,
1824 })
1825 .unwrap();
1826 rx.await.unwrap().unwrap();
1827 }
1828
1829 (circ, circmsg_send)
1830 }
1831
1832 async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1835 let hops = std::iter::repeat_with(|| {
1836 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1837 .ed_identity([4; 32].into())
1838 .rsa_identity([5; 20].into())
1839 .build()
1840 .expect("Could not construct fake hop");
1841
1842 path::HopDetail::Relay(peer_id)
1843 })
1844 .take(3)
1845 .collect();
1846
1847 let unique_id = UniqId::new(23, 17);
1848 newcirc_ext(
1849 rt,
1850 unique_id,
1851 chan,
1852 hops,
1853 2.into(),
1854 CircParameters::default(),
1855 )
1856 .await
1857 }
1858
1859 fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1862 (0..n)
1863 .map(|idx| {
1864 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1865 .ed_identity([idx + start_idx; 32].into())
1866 .rsa_identity([idx + start_idx + 1; 20].into())
1867 .build()
1868 .expect("Could not construct fake hop");
1869
1870 path::HopDetail::Relay(peer_id)
1871 })
1872 .collect()
1873 }
1874
1875 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1876 use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1877
1878 let (chan, mut rx, _sink) = working_fake_channel(rt);
1879 let (circ, mut sink) = newcirc(rt, chan).await;
1880 let circid = circ.peek_circid();
1881 let params = CircParameters::default();
1882
1883 let extend_fut = async move {
1884 let target = example_target();
1885 match handshake_type {
1886 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1887 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1888 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1889 };
1890 circ };
1892 let reply_fut = async move {
1893 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1896 assert_eq!(id, Some(circid));
1897 let rmsg = match chmsg {
1898 AnyChanMsg::RelayEarly(r) => {
1899 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1900 .unwrap()
1901 }
1902 other => panic!("{:?}", other),
1903 };
1904 let e2 = match rmsg.msg() {
1905 AnyRelayMsg::Extend2(e2) => e2,
1906 other => panic!("{:?}", other),
1907 };
1908 let mut rng = testing_rng();
1909 let reply = match handshake_type {
1910 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1911 HandshakeType::Ntor => {
1912 let (_keygen, reply) = NtorServer::server(
1913 &mut rng,
1914 &mut |_: &()| Some(()),
1915 &[example_ntor_key()],
1916 e2.handshake(),
1917 )
1918 .unwrap();
1919 reply
1920 }
1921 HandshakeType::NtorV3 => {
1922 let (_keygen, reply) = NtorV3Server::server(
1923 &mut rng,
1924 &mut |_: &[CircRequestExt]| Some(vec![]),
1925 &[example_ntor_v3_key()],
1926 e2.handshake(),
1927 )
1928 .unwrap();
1929 reply
1930 }
1931 };
1932
1933 let extended2 = relaymsg::Extended2::new(reply).into();
1934 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1935 (sink, rx) };
1937
1938 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1939
1940 assert_eq!(circ.n_hops().unwrap(), 4);
1942
1943 {
1945 let path = circ.path_ref().unwrap();
1946 let path = path
1947 .all_hops()
1948 .filter_map(|hop| match hop {
1949 path::HopDetail::Relay(r) => Some(r),
1950 #[cfg(feature = "hs-common")]
1951 path::HopDetail::Virtual => None,
1952 })
1953 .collect::<Vec<_>>();
1954
1955 assert_eq!(path.len(), 4);
1956 use tor_linkspec::HasRelayIds;
1957 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1958 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1959 }
1960 {
1961 let path = circ.path_ref().unwrap();
1962 assert_eq!(path.n_hops(), 4);
1963 use tor_linkspec::HasRelayIds;
1964 assert_eq!(
1965 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1966 example_target().ed_identity()
1967 );
1968 assert_ne!(
1969 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1970 example_target().ed_identity()
1971 );
1972 }
1973 }
1974
1975 #[traced_test]
1976 #[test]
1977 fn test_extend_ntor() {
1978 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1979 test_extend(&rt, HandshakeType::Ntor).await;
1980 });
1981 }
1982
1983 #[traced_test]
1984 #[test]
1985 fn test_extend_ntor_v3() {
1986 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1987 test_extend(&rt, HandshakeType::NtorV3).await;
1988 });
1989 }
1990
1991 async fn bad_extend_test_impl<R: Runtime>(
1992 rt: &R,
1993 reply_hop: HopNum,
1994 bad_reply: ClientCircChanMsg,
1995 ) -> Error {
1996 let (chan, _rx, _sink) = working_fake_channel(rt);
1997 let hops = std::iter::repeat_with(|| {
1998 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1999 .ed_identity([4; 32].into())
2000 .rsa_identity([5; 20].into())
2001 .build()
2002 .expect("Could not construct fake hop");
2003
2004 path::HopDetail::Relay(peer_id)
2005 })
2006 .take(3)
2007 .collect();
2008
2009 let unique_id = UniqId::new(23, 17);
2010 let (circ, mut sink) = newcirc_ext(
2011 rt,
2012 unique_id,
2013 chan,
2014 hops,
2015 reply_hop,
2016 CircParameters::default(),
2017 )
2018 .await;
2019 let params = CircParameters::default();
2020
2021 let target = example_target();
2022 #[allow(clippy::clone_on_copy)]
2023 let rtc = rt.clone();
2024 let sink_handle = rt
2025 .spawn_with_handle(async move {
2026 rtc.sleep(Duration::from_millis(100)).await;
2027 sink.send(bad_reply).await.unwrap();
2028 sink
2029 })
2030 .unwrap();
2031 let outcome = circ.extend_ntor(&target, params).await;
2032 let _sink = sink_handle.await;
2033
2034 assert_eq!(circ.n_hops().unwrap(), 3);
2035 assert!(outcome.is_err());
2036 outcome.unwrap_err()
2037 }
2038
2039 #[traced_test]
2040 #[test]
2041 fn bad_extend_wronghop() {
2042 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2043 let extended2 = relaymsg::Extended2::new(vec![]).into();
2044 let cc = rmsg_to_ccmsg(None, extended2);
2045
2046 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
2047 match error {
2052 Error::CircuitClosed => {}
2053 x => panic!("got other error: {}", x),
2054 }
2055 });
2056 }
2057
2058 #[traced_test]
2059 #[test]
2060 fn bad_extend_wrongtype() {
2061 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2062 let extended = relaymsg::Extended::new(vec![7; 200]).into();
2063 let cc = rmsg_to_ccmsg(None, extended);
2064
2065 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2066 match error {
2067 Error::BytesErr {
2068 err: tor_bytes::Error::InvalidMessage(_),
2069 object: "extended2 message",
2070 } => {}
2071 other => panic!("{:?}", other),
2072 }
2073 });
2074 }
2075
2076 #[traced_test]
2077 #[test]
2078 fn bad_extend_destroy() {
2079 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2080 let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
2081 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2082 match error {
2083 Error::CircuitClosed => {}
2084 other => panic!("{:?}", other),
2085 }
2086 });
2087 }
2088
2089 #[traced_test]
2090 #[test]
2091 fn bad_extend_crypto() {
2092 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2093 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
2094 let cc = rmsg_to_ccmsg(None, extended2);
2095 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2096 assert!(matches!(error, Error::BadCircHandshakeAuth));
2097 });
2098 }
2099
2100 #[traced_test]
2101 #[test]
2102 fn begindir() {
2103 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2104 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2105 let (circ, mut sink) = newcirc(&rt, chan).await;
2106 let circid = circ.peek_circid();
2107
2108 let begin_and_send_fut = async move {
2109 let mut stream = circ.begin_dir_stream().await.unwrap();
2112 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
2113 stream.flush().await.unwrap();
2114 let mut buf = [0_u8; 1024];
2115 let n = stream.read(&mut buf).await.unwrap();
2116 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
2117 let n = stream.read(&mut buf).await.unwrap();
2118 assert_eq!(n, 0);
2119 stream
2120 };
2121 let reply_fut = async move {
2122 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2125 assert_eq!(id, Some(circid));
2126 let rmsg = match chmsg {
2127 AnyChanMsg::Relay(r) => {
2128 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2129 .unwrap()
2130 }
2131 other => panic!("{:?}", other),
2132 };
2133 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2134 assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
2135
2136 let connected = relaymsg::Connected::new_empty().into();
2138 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2139
2140 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2142 assert_eq!(id, Some(circid));
2143 let rmsg = match chmsg {
2144 AnyChanMsg::Relay(r) => {
2145 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2146 .unwrap()
2147 }
2148 other => panic!("{:?}", other),
2149 };
2150 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
2151 assert_eq!(streamid_2, streamid);
2152 if let AnyRelayMsg::Data(d) = rmsg {
2153 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
2154 } else {
2155 panic!();
2156 }
2157
2158 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
2160 .unwrap()
2161 .into();
2162 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2163
2164 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2166 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2167
2168 (rx, sink) };
2170
2171 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2172 });
2173 }
2174
2175 fn close_stream_helper(by_drop: bool) {
2177 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2178 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2179 let (circ, mut sink) = newcirc(&rt, chan).await;
2180
2181 let stream_fut = async move {
2182 let stream = circ
2183 .begin_stream("www.example.com", 80, None)
2184 .await
2185 .unwrap();
2186
2187 let (r, mut w) = stream.split();
2188 if by_drop {
2189 drop(r);
2191 drop(w);
2192 (None, circ) } else {
2194 w.close().await.unwrap();
2196 (Some(r), circ)
2197 }
2198 };
2199 let handler_fut = async {
2200 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2202 let rmsg = match msg {
2203 AnyChanMsg::Relay(r) => {
2204 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2205 .unwrap()
2206 }
2207 other => panic!("{:?}", other),
2208 };
2209 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2210 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2211
2212 let connected =
2214 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2215 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2216
2217 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2219 let rmsg = match msg {
2220 AnyChanMsg::Relay(r) => {
2221 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2222 .unwrap()
2223 }
2224 other => panic!("{:?}", other),
2225 };
2226 let (_, rmsg) = rmsg.into_streamid_and_msg();
2227 assert_eq!(rmsg.cmd(), RelayCmd::END);
2228
2229 (rx, sink) };
2231
2232 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2233 });
2234 }
2235
2236 #[traced_test]
2237 #[test]
2238 fn drop_stream() {
2239 close_stream_helper(true);
2240 }
2241
2242 #[traced_test]
2243 #[test]
2244 fn close_stream() {
2245 close_stream_helper(false);
2246 }
2247
2248 async fn setup_incoming_sendme_case<R: Runtime>(
2250 rt: &R,
2251 n_to_send: usize,
2252 ) -> (
2253 Arc<ClientCirc>,
2254 DataStream,
2255 CircuitRxSender,
2256 Option<StreamId>,
2257 usize,
2258 Receiver<AnyChanCell>,
2259 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2260 ) {
2261 let (chan, mut rx, sink2) = working_fake_channel(rt);
2262 let (circ, mut sink) = newcirc(rt, chan).await;
2263 let circid = circ.peek_circid();
2264
2265 let begin_and_send_fut = {
2266 let circ = circ.clone();
2267 async move {
2268 let mut stream = circ
2270 .begin_stream("www.example.com", 443, None)
2271 .await
2272 .unwrap();
2273 let junk = [0_u8; 1024];
2274 let mut remaining = n_to_send;
2275 while remaining > 0 {
2276 let n = std::cmp::min(remaining, junk.len());
2277 stream.write_all(&junk[..n]).await.unwrap();
2278 remaining -= n;
2279 }
2280 stream.flush().await.unwrap();
2281 stream
2282 }
2283 };
2284
2285 let receive_fut = async move {
2286 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2288 let rmsg = match chmsg {
2289 AnyChanMsg::Relay(r) => {
2290 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2291 .unwrap()
2292 }
2293 other => panic!("{:?}", other),
2294 };
2295 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2296 assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2297 let connected = relaymsg::Connected::new_empty().into();
2299 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2300 let mut bytes_received = 0_usize;
2302 let mut cells_received = 0_usize;
2303 while bytes_received < n_to_send {
2304 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2306 assert_eq!(id, Some(circid));
2307
2308 let rmsg = match chmsg {
2309 AnyChanMsg::Relay(r) => {
2310 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2311 .unwrap()
2312 }
2313 other => panic!("{:?}", other),
2314 };
2315 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2316 assert_eq!(streamid2, streamid);
2317 if let AnyRelayMsg::Data(dat) = rmsg {
2318 cells_received += 1;
2319 bytes_received += dat.as_ref().len();
2320 } else {
2321 panic!();
2322 }
2323 }
2324
2325 (sink, streamid, cells_received, rx)
2326 };
2327
2328 let (stream, (sink, streamid, cells_received, rx)) =
2329 futures::join!(begin_and_send_fut, receive_fut);
2330
2331 (circ, stream, sink, streamid, cells_received, rx, sink2)
2332 }
2333
2334 #[traced_test]
2335 #[test]
2336 fn accept_valid_sendme() {
2337 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2338 let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2339 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2340
2341 assert_eq!(cells_received, 301);
2342
2343 {
2345 let (tx, rx) = oneshot::channel();
2346 circ.command
2347 .unbounded_send(CtrlCmd::QuerySendWindow {
2348 hop: 2.into(),
2349 done: tx,
2350 })
2351 .unwrap();
2352 let (window, tags) = rx.await.unwrap().unwrap();
2353 assert_eq!(window, 1000 - 301);
2354 assert_eq!(tags.len(), 3);
2355 assert_eq!(
2357 tags[0],
2358 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2359 );
2360 assert_eq!(
2362 tags[1],
2363 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2364 );
2365 assert_eq!(
2367 tags[2],
2368 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2369 );
2370 }
2371
2372 let reply_with_sendme_fut = async move {
2373 let c_sendme =
2375 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2376 .into();
2377 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2378
2379 let s_sendme = relaymsg::Sendme::new_empty().into();
2381 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2382
2383 sink
2384 };
2385
2386 let _sink = reply_with_sendme_fut.await;
2387
2388 rt.advance_until_stalled().await;
2389
2390 {
2393 let (tx, rx) = oneshot::channel();
2394 circ.command
2395 .unbounded_send(CtrlCmd::QuerySendWindow {
2396 hop: 2.into(),
2397 done: tx,
2398 })
2399 .unwrap();
2400 let (window, _tags) = rx.await.unwrap().unwrap();
2401 assert_eq!(window, 1000 - 201);
2402 }
2403 });
2404 }
2405
2406 #[traced_test]
2407 #[test]
2408 fn invalid_circ_sendme() {
2409 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2410 let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2414 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2415
2416 let reply_with_sendme_fut = async move {
2417 let c_sendme =
2419 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2420 .into();
2421 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2422 sink
2423 };
2424
2425 let _sink = reply_with_sendme_fut.await;
2426
2427 rt.advance_until_stalled().await;
2429 assert!(circ.is_closing());
2430 });
2431 }
2432
2433 #[traced_test]
2434 #[test]
2435 fn test_busy_stream_fairness() {
2436 const N_STREAMS: usize = 3;
2438 const N_CELLS: usize = 20;
2440 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2443 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2450 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2451
2452 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2453 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2454 let (circ, mut sink) = newcirc(&rt, chan).await;
2455
2456 rt.spawn({
2462 let circ = circ.clone();
2465 async move {
2466 let mut clients = VecDeque::new();
2467 struct Client {
2468 stream: DataStream,
2469 to_write: &'static [u8],
2470 }
2471 for _ in 0..N_STREAMS {
2472 clients.push_back(Client {
2473 stream: circ
2474 .begin_stream("www.example.com", 80, None)
2475 .await
2476 .unwrap(),
2477 to_write: &[0_u8; N_BYTES][..],
2478 });
2479 }
2480 while let Some(mut client) = clients.pop_front() {
2481 if client.to_write.is_empty() {
2482 continue;
2484 }
2485 let written = client.stream.write(client.to_write).await.unwrap();
2486 client.to_write = &client.to_write[written..];
2487 clients.push_back(client);
2488 }
2489 }
2490 })
2491 .unwrap();
2492
2493 let channel_handler_fut = async {
2494 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2495 let mut total_bytes_received = 0;
2496
2497 loop {
2498 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2499 let rmsg = match msg {
2500 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2501 RelayCellFormat::V0,
2502 r.into_relay_body(),
2503 )
2504 .unwrap(),
2505 other => panic!("Unexpected chanmsg: {other:?}"),
2506 };
2507 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2508 match rmsg.cmd() {
2509 RelayCmd::BEGIN => {
2510 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2512 assert_eq!(prev, None);
2513 let connected = relaymsg::Connected::new_with_addr(
2515 "10.0.0.1".parse().unwrap(),
2516 1234,
2517 )
2518 .into();
2519 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2520 }
2521 RelayCmd::DATA => {
2522 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2523 let nbytes = data_msg.as_ref().len();
2524 total_bytes_received += nbytes;
2525 let streamid = streamid.unwrap();
2526 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2527 *stream_bytes += nbytes;
2528 if total_bytes_received >= N_BYTES {
2529 break;
2530 }
2531 }
2532 RelayCmd::END => {
2533 continue;
2538 }
2539 other => {
2540 panic!("Unexpected command {other:?}");
2541 }
2542 }
2543 }
2544
2545 (total_bytes_received, stream_bytes_received, rx, sink)
2548 };
2549
2550 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2551 channel_handler_fut.await;
2552 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2553 for (sid, stream_bytes) in stream_bytes_received {
2554 assert!(
2555 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2556 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2557 );
2558 }
2559 });
2560 }
2561
2562 #[test]
2563 fn basic_params() {
2564 use super::CircParameters;
2565 let mut p = CircParameters::default();
2566 assert!(p.extend_by_ed25519_id);
2567
2568 p.extend_by_ed25519_id = false;
2569 assert!(!p.extend_by_ed25519_id);
2570 }
2571
2572 #[cfg(feature = "hs-service")]
2573 struct AllowAllStreamsFilter;
2574 #[cfg(feature = "hs-service")]
2575 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2576 fn disposition(
2577 &mut self,
2578 _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2579 _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2580 ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2581 Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2582 }
2583 }
2584
2585 #[traced_test]
2586 #[test]
2587 #[cfg(feature = "hs-service")]
2588 fn allow_stream_requests_twice() {
2589 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2590 let (chan, _rx, _sink) = working_fake_channel(&rt);
2591 let (circ, _send) = newcirc(&rt, chan).await;
2592
2593 let _incoming = circ
2594 .allow_stream_requests(
2595 &[tor_cell::relaycell::RelayCmd::BEGIN],
2596 circ.last_hop_num().unwrap(),
2597 AllowAllStreamsFilter,
2598 )
2599 .await
2600 .unwrap();
2601
2602 let incoming = circ
2603 .allow_stream_requests(
2604 &[tor_cell::relaycell::RelayCmd::BEGIN],
2605 circ.last_hop_num().unwrap(),
2606 AllowAllStreamsFilter,
2607 )
2608 .await;
2609
2610 assert!(incoming.is_err());
2612 });
2613 }
2614
2615 #[traced_test]
2616 #[test]
2617 #[cfg(feature = "hs-service")]
2618 fn allow_stream_requests() {
2619 use tor_cell::relaycell::msg::BeginFlags;
2620
2621 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2622 const TEST_DATA: &[u8] = b"ping";
2623
2624 let (chan, _rx, _sink) = working_fake_channel(&rt);
2625 let (circ, mut send) = newcirc(&rt, chan).await;
2626
2627 let rfmt = RelayCellFormat::V0;
2628
2629 let (tx, rx) = oneshot::channel();
2631 let mut incoming = circ
2632 .allow_stream_requests(
2633 &[tor_cell::relaycell::RelayCmd::BEGIN],
2634 circ.last_hop_num().unwrap(),
2635 AllowAllStreamsFilter,
2636 )
2637 .await
2638 .unwrap();
2639
2640 let simulate_service = async move {
2641 let stream = incoming.next().await.unwrap();
2642 let mut data_stream = stream
2643 .accept_data(relaymsg::Connected::new_empty())
2644 .await
2645 .unwrap();
2646 tx.send(()).unwrap();
2648
2649 let mut buf = [0_u8; TEST_DATA.len()];
2651 data_stream.read_exact(&mut buf).await.unwrap();
2652 assert_eq!(&buf, TEST_DATA);
2653
2654 circ
2655 };
2656
2657 let simulate_client = async move {
2658 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2659 let body: BoxedCellBody =
2660 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2661 .encode(rfmt, &mut testing_rng())
2662 .unwrap();
2663 let begin_msg = chanmsg::Relay::from(body);
2664
2665 send.send(ClientCircChanMsg::Relay(begin_msg))
2667 .await
2668 .unwrap();
2669
2670 rx.await.unwrap();
2676 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2678 let body: BoxedCellBody =
2679 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2680 .encode(rfmt, &mut testing_rng())
2681 .unwrap();
2682 let data_msg = chanmsg::Relay::from(body);
2683
2684 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2685 send
2686 };
2687
2688 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2689 });
2690 }
2691
2692 #[traced_test]
2693 #[test]
2694 #[cfg(feature = "hs-service")]
2695 fn accept_stream_after_reject() {
2696 use tor_cell::relaycell::msg::BeginFlags;
2697 use tor_cell::relaycell::msg::EndReason;
2698
2699 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2700 const TEST_DATA: &[u8] = b"ping";
2701 const STREAM_COUNT: usize = 2;
2702 let rfmt = RelayCellFormat::V0;
2703
2704 let (chan, _rx, _sink) = working_fake_channel(&rt);
2705 let (circ, mut send) = newcirc(&rt, chan).await;
2706
2707 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2709
2710 let mut incoming = circ
2711 .allow_stream_requests(
2712 &[tor_cell::relaycell::RelayCmd::BEGIN],
2713 circ.last_hop_num().unwrap(),
2714 AllowAllStreamsFilter,
2715 )
2716 .await
2717 .unwrap();
2718
2719 let simulate_service = async move {
2720 for i in 0..STREAM_COUNT {
2722 let stream = incoming.next().await.unwrap();
2723
2724 if i == 0 {
2726 stream
2727 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2728 .await
2729 .unwrap();
2730 tx.send(()).await.unwrap();
2732 continue;
2733 }
2734
2735 let mut data_stream = stream
2736 .accept_data(relaymsg::Connected::new_empty())
2737 .await
2738 .unwrap();
2739 tx.send(()).await.unwrap();
2741
2742 let mut buf = [0_u8; TEST_DATA.len()];
2744 data_stream.read_exact(&mut buf).await.unwrap();
2745 assert_eq!(&buf, TEST_DATA);
2746 }
2747
2748 circ
2749 };
2750
2751 let simulate_client = async move {
2752 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2753 let body: BoxedCellBody =
2754 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2755 .encode(rfmt, &mut testing_rng())
2756 .unwrap();
2757 let begin_msg = chanmsg::Relay::from(body);
2758
2759 for _ in 0..STREAM_COUNT {
2762 send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2763 .await
2764 .unwrap();
2765
2766 rx.next().await.unwrap();
2768 }
2769
2770 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2772 let body: BoxedCellBody =
2773 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2774 .encode(rfmt, &mut testing_rng())
2775 .unwrap();
2776 let data_msg = chanmsg::Relay::from(body);
2777
2778 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2779 send
2780 };
2781
2782 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2783 });
2784 }
2785
2786 #[traced_test]
2787 #[test]
2788 #[cfg(feature = "hs-service")]
2789 fn incoming_stream_bad_hop() {
2790 use tor_cell::relaycell::msg::BeginFlags;
2791
2792 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2793 const EXPECTED_HOP: u8 = 1;
2795 let rfmt = RelayCellFormat::V0;
2796
2797 let (chan, _rx, _sink) = working_fake_channel(&rt);
2798 let (circ, mut send) = newcirc(&rt, chan).await;
2799
2800 let mut incoming = circ
2802 .allow_stream_requests(
2803 &[tor_cell::relaycell::RelayCmd::BEGIN],
2804 EXPECTED_HOP.into(),
2805 AllowAllStreamsFilter,
2806 )
2807 .await
2808 .unwrap();
2809
2810 let simulate_service = async move {
2811 assert!(incoming.next().await.is_none());
2814 circ
2815 };
2816
2817 let simulate_client = async move {
2818 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2819 let body: BoxedCellBody =
2820 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2821 .encode(rfmt, &mut testing_rng())
2822 .unwrap();
2823 let begin_msg = chanmsg::Relay::from(body);
2824
2825 send.send(ClientCircChanMsg::Relay(begin_msg))
2827 .await
2828 .unwrap();
2829
2830 send
2831 };
2832
2833 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2834 });
2835 }
2836
2837 #[traced_test]
2838 #[test]
2839 #[cfg(feature = "conflux")]
2840 fn multipath_circ_validation() {
2841 use std::error::Error as _;
2842
2843 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2844 let params = CircParameters::default();
2845 let invalid_tunnels = [
2846 setup_bad_conflux_tunnel(&rt).await,
2847 setup_conflux_tunnel(&rt, true, params).await,
2848 ];
2849
2850 for tunnel in invalid_tunnels {
2851 let TestTunnelCtx {
2852 tunnel: _tunnel,
2853 circs: _circs,
2854 conflux_link_rx,
2855 } = tunnel;
2856
2857 let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2858 let err_src = conflux_hs_err.source().unwrap();
2859
2860 assert!(err_src
2863 .to_string()
2864 .contains("one more more conflux circuits are invalid"));
2865 }
2866 });
2867 }
2868
2869 #[derive(Debug)]
2873 #[allow(unused)]
2874 #[cfg(feature = "conflux")]
2875 struct TestCircuitCtx {
2876 chan_rx: Receiver<AnyChanCell>,
2877 chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2878 circ_sink: CircuitRxSender,
2879 }
2880
2881 #[derive(Debug)]
2882 #[cfg(feature = "conflux")]
2883 struct TestTunnelCtx {
2884 tunnel: Arc<ClientCirc>,
2885 circs: Vec<TestCircuitCtx>,
2886 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2887 }
2888
2889 #[cfg(feature = "conflux")]
2891 async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2892 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2894 let rmsg = match chmsg {
2895 AnyChanMsg::Relay(r) => {
2896 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2897 .unwrap()
2898 }
2899 other => panic!("{:?}", other),
2900 };
2901 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2902
2903 let link = match rmsg {
2904 AnyRelayMsg::ConfluxLink(link) => link,
2905 _ => panic!("unexpected relay message {rmsg:?}"),
2906 };
2907
2908 assert!(streamid.is_none());
2909
2910 link
2911 }
2912
2913 #[cfg(feature = "conflux")]
2914 async fn setup_conflux_tunnel(
2915 rt: &MockRuntime,
2916 same_hops: bool,
2917 params: CircParameters,
2918 ) -> TestTunnelCtx {
2919 let hops1 = hop_details(3, 0);
2920 let hops2 = if same_hops {
2921 hops1.clone()
2922 } else {
2923 hop_details(3, 10)
2924 };
2925
2926 let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
2927 let (circ1, sink1) = newcirc_ext(
2928 rt,
2929 UniqId::new(1, 3),
2930 chan1,
2931 hops1,
2932 2.into(),
2933 params.clone(),
2934 )
2935 .await;
2936
2937 let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
2938
2939 let (circ2, sink2) =
2940 newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
2941
2942 let (answer_tx, answer_rx) = oneshot::channel();
2943 circ2
2944 .command
2945 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
2946 .unwrap();
2947
2948 let circuit = answer_rx.await.unwrap().unwrap();
2949 rt.advance_until_stalled().await;
2951 assert!(circ2.is_closing());
2952
2953 let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
2954 circ1
2956 .control
2957 .unbounded_send(CtrlMsg::LinkCircuits {
2958 circuits: vec![circuit],
2959 answer: conflux_link_tx,
2960 })
2961 .unwrap();
2962
2963 let circ_ctx1 = TestCircuitCtx {
2964 chan_rx: rx1,
2965 chan_tx: chan_sink1,
2966 circ_sink: sink1,
2967 };
2968
2969 let circ_ctx2 = TestCircuitCtx {
2970 chan_rx: rx2,
2971 chan_tx: chan_sink2,
2972 circ_sink: sink2,
2973 };
2974
2975 TestTunnelCtx {
2976 tunnel: circ1,
2977 circs: vec![circ_ctx1, circ_ctx2],
2978 conflux_link_rx,
2979 }
2980 }
2981
2982 #[cfg(feature = "conflux")]
2983 async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2984 let same_hops = true;
2990 let params = CircParameters::new(true, build_cc_vegas_params());
2991 setup_conflux_tunnel(rt, same_hops, params).await
2992 }
2993
2994 #[cfg(feature = "conflux")]
2995 async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
2996 let same_hops = false;
3000 let params = CircParameters::new(true, build_cc_vegas_params());
3001 setup_conflux_tunnel(rt, same_hops, params).await
3002 }
3003
3004 #[traced_test]
3005 #[test]
3006 #[cfg(feature = "conflux")]
3007 fn reject_conflux_linked_before_hs() {
3008 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3009 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3010 let (circ, mut sink) = newcirc(&rt, chan).await;
3011
3012 let nonce = V1Nonce::new(&mut testing_rng());
3013 let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3014 let linked = relaymsg::ConfluxLinked::new(payload).into();
3016 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3017
3018 rt.advance_until_stalled().await;
3019 assert!(circ.is_closing());
3020 });
3021 }
3022
3023 #[traced_test]
3024 #[test]
3025 #[cfg(feature = "conflux")]
3026 fn conflux_hs_timeout() {
3027 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3028 let TestTunnelCtx {
3029 tunnel: _tunnel,
3030 circs,
3031 conflux_link_rx,
3032 } = setup_good_conflux_tunnel(&rt).await;
3033
3034 let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3035
3036 let link = await_link_payload(&mut circ1.chan_rx).await;
3038
3039 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3041 circ1
3042 .circ_sink
3043 .send(rmsg_to_ccmsg(None, linked))
3044 .await
3045 .unwrap();
3046
3047 rt.advance_by(Duration::from_secs(60)).await;
3049
3050 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3051
3052 let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
3054 conflux_hs_res.try_into().unwrap();
3055
3056 assert!(res1.is_ok());
3057
3058 let err = res2.unwrap_err();
3059 assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
3060 });
3061 }
3062
3063 #[traced_test]
3064 #[test]
3065 #[cfg(feature = "conflux")]
3066 fn conflux_bad_hs() {
3067 use crate::util::err::ConfluxHandshakeError;
3068
3069 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3070 let nonce = V1Nonce::new(&mut testing_rng());
3071 let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3072 let bad_hs_responses = [
3074 (
3075 rmsg_to_ccmsg(
3076 None,
3077 relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
3078 ),
3079 "Received CONFLUX_LINKED cell with mismatched nonce",
3080 ),
3081 (
3082 rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
3083 "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
3084 ),
3085 (
3086 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3087 "Received CONFLUX_SWITCH on unlinked circuit?!",
3088 ),
3089 ];
3098
3099 for (bad_cell, expected_err) in bad_hs_responses {
3100 let TestTunnelCtx {
3101 tunnel,
3102 circs,
3103 conflux_link_rx,
3104 } = setup_good_conflux_tunnel(&rt).await;
3105
3106 let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3107
3108 circ2.circ_sink.send(bad_cell).await.unwrap();
3110
3111 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3112 let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
3116 conflux_hs_res.try_into().unwrap();
3117
3118 match res2.unwrap_err() {
3119 ConfluxHandshakeError::Link(Error::CircProto(e)) => {
3120 assert_eq!(e, expected_err);
3121 }
3122 e => panic!("unexpected error: {e:?}"),
3123 }
3124
3125 assert!(tunnel.is_closing());
3126 }
3127 });
3128 }
3129
3130 #[traced_test]
3131 #[test]
3132 #[cfg(feature = "conflux")]
3133 fn unexpected_conflux_cell() {
3134 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3135 let nonce = V1Nonce::new(&mut testing_rng());
3136 let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3137 let bad_cells = [
3138 rmsg_to_ccmsg(
3139 None,
3140 relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
3141 ),
3142 rmsg_to_ccmsg(
3143 None,
3144 relaymsg::ConfluxLink::new(link_payload.clone()).into(),
3145 ),
3146 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3147 ];
3148
3149 for bad_cell in bad_cells {
3150 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3151 let (circ, mut sink) = newcirc(&rt, chan).await;
3152
3153 sink.send(bad_cell).await.unwrap();
3154 rt.advance_until_stalled().await;
3155
3156 assert!(circ.is_closing());
3160 }
3161 });
3162 }
3163
3164 #[traced_test]
3165 #[test]
3166 #[cfg(feature = "conflux")]
3167 fn conflux_bad_linked() {
3168 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3169 let TestTunnelCtx {
3170 tunnel,
3171 circs,
3172 conflux_link_rx: _,
3173 } = setup_good_conflux_tunnel(&rt).await;
3174
3175 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3176
3177 let link = await_link_payload(&mut circ1.chan_rx).await;
3178
3179 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3181 circ1
3182 .circ_sink
3183 .send(rmsg_to_ccmsg(None, linked))
3184 .await
3185 .unwrap();
3186
3187 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3189 circ2
3190 .circ_sink
3191 .send(rmsg_to_ccmsg(None, linked))
3192 .await
3193 .unwrap();
3194 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3195 circ2
3196 .circ_sink
3197 .send(rmsg_to_ccmsg(None, linked))
3198 .await
3199 .unwrap();
3200
3201 rt.advance_until_stalled().await;
3202
3203 assert!(tunnel.is_closing());
3206 });
3207 }
3208
3209 #[traced_test]
3210 #[test]
3211 #[cfg(feature = "conflux")]
3212 fn conflux_bad_switch() {
3213 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3214 let bad_switch = [
3215 relaymsg::ConfluxSwitch::new(0),
3217 ];
3226
3227 for bad_cell in bad_switch {
3228 let TestTunnelCtx {
3229 tunnel,
3230 circs,
3231 conflux_link_rx,
3232 } = setup_good_conflux_tunnel(&rt).await;
3233
3234 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3235
3236 let link = await_link_payload(&mut circ1.chan_rx).await;
3237
3238 for circ in [&mut circ1, &mut circ2] {
3240 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3241 circ.circ_sink
3242 .send(rmsg_to_ccmsg(None, linked))
3243 .await
3244 .unwrap();
3245 }
3246
3247 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3248 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3249
3250 for circ in [&mut circ1, &mut circ2] {
3254 let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
3255 circ.circ_sink.send(msg).await.unwrap();
3256 }
3257
3258 rt.advance_until_stalled().await;
3260 assert!(tunnel.is_closing());
3261 }
3262 });
3263 }
3264
3265 #[cfg(feature = "conflux")]
3269 #[derive(Debug)]
3270 enum ConfluxTestEndpoint {
3271 Relay(ConfluxExitState),
3273 Client {
3275 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3277 tunnel: Arc<ClientCirc>,
3279 stream_data: Vec<u8>,
3281 },
3282 }
3283
3284 #[allow(unused)]
3287 #[derive(Debug)]
3288 #[cfg(feature = "conflux")]
3289 enum ConfluxEndpointResult {
3290 Circuit(Arc<ClientCirc>),
3291 Relay {
3292 rx: Receiver<ChanCell<AnyChanMsg>>,
3293 sink: CircuitRxSender,
3294 },
3295 }
3296
3297 #[derive(Debug)]
3299 #[cfg(feature = "conflux")]
3300 struct ConfluxStreamState {
3301 data_recvd: Vec<u8>,
3302 expected_data_len: usize,
3303 begin_recvd: bool,
3304 end_recvd: bool,
3305 }
3306
3307 #[derive(Debug)]
3308 #[cfg(feature = "conflux")]
3309 struct ConfluxExitState {
3310 runtime: MockRuntime,
3311 init_rtt_delay: Option<Duration>,
3312 rtt_delay: Option<Duration>,
3313 leg: usize,
3314 rx: Receiver<ChanCell<AnyChanMsg>>,
3315 sink: CircuitRxSender,
3316 stream_state: Arc<Mutex<ConfluxStreamState>>,
3317 expect_switch: Vec<usize>,
3318 }
3319
3320 #[cfg(feature = "conflux")]
3321 async fn good_exit_handshake(
3322 runtime: &MockRuntime,
3323 init_rtt_delay: Option<Duration>,
3324 rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3325 sink: &mut CircuitRxSender,
3326 ) {
3327 let link = await_link_payload(rx).await;
3329
3330 if let Some(init_rtt_delay) = init_rtt_delay {
3333 runtime.advance_by(init_rtt_delay).await;
3334 }
3335
3336 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3338 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3339
3340 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3342 let rmsg = match chmsg {
3343 AnyChanMsg::Relay(r) => {
3344 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3345 .unwrap()
3346 }
3347 other => panic!("{other:?}"),
3348 };
3349 let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3350
3351 assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
3352 }
3353
3354 #[cfg(feature = "conflux")]
3355 async fn run_mock_conflux_exit(state: ConfluxExitState) -> ConfluxEndpointResult {
3356 let ConfluxExitState {
3357 runtime,
3358 init_rtt_delay,
3359 rtt_delay,
3360 leg,
3361 mut rx,
3362 mut sink,
3363 stream_state,
3364 mut expect_switch,
3365 } = state;
3366
3367 good_exit_handshake(&runtime, init_rtt_delay, &mut rx, &mut sink).await;
3369
3370 let stream_len = stream_state.lock().unwrap().expected_data_len;
3372 let mut data_cells_received = 0_usize;
3373 let mut cell_count = 0_usize;
3374 while stream_state.lock().unwrap().data_recvd.len() < stream_len {
3375 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3377 cell_count += 1;
3378 let rmsg = match chmsg {
3379 AnyChanMsg::Relay(r) => {
3380 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3381 .unwrap()
3382 }
3383 other => panic!("{:?}", other),
3384 };
3385 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
3386
3387 let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3388 let end_recvd = stream_state.lock().unwrap().end_recvd;
3389 match rmsg {
3390 AnyRelayMsg::Begin(_) if begin_recvd => {
3391 panic!("client tried to open two streams?!");
3392 }
3393 AnyRelayMsg::Begin(_) if !begin_recvd => {
3394 stream_state.lock().unwrap().begin_recvd = true;
3395 let connected = relaymsg::Connected::new_empty().into();
3397 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
3398 }
3399 AnyRelayMsg::End(_) if !end_recvd => {
3400 stream_state.lock().unwrap().end_recvd = true;
3401 break;
3402 }
3403 AnyRelayMsg::End(_) if end_recvd => {
3404 panic!("received two END cells for the same stream?!");
3405 }
3406 AnyRelayMsg::ConfluxSwitch(_) => {
3407 let cells_until_switch = expect_switch.remove(0);
3409
3410 assert_eq!(cells_until_switch, cell_count);
3411
3412 continue;
3418 }
3419 AnyRelayMsg::Data(dat) => {
3420 data_cells_received += 1;
3421
3422 stream_state
3423 .lock()
3424 .unwrap()
3425 .data_recvd
3426 .extend_from_slice(dat.as_ref());
3427 }
3428 _ => panic!("unexpected message {rmsg:?} on leg {leg}"),
3429 }
3430
3431 if data_cells_received == 100 {
3432 if let Some(rtt_delay) = rtt_delay {
3435 runtime.advance_by(rtt_delay).await;
3436 }
3437
3438 let sendme =
3440 relaymsg::Sendme::new_tag(hex!("2100000000000000000000000000000000000000"))
3441 .into();
3442 sink.send(rmsg_to_ccmsg(None, sendme)).await.unwrap();
3443 }
3444 }
3445
3446 ConfluxEndpointResult::Relay { rx, sink }
3447 }
3448
3449 #[cfg(feature = "conflux")]
3450 async fn run_conflux_client(
3451 tunnel: Arc<ClientCirc>,
3452 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3453 stream_data: Vec<u8>,
3454 ) -> ConfluxEndpointResult {
3455 let res = conflux_link_rx.await;
3456
3457 let res = res.unwrap().unwrap();
3458 assert_eq!(res.len(), 2);
3459
3460 let mut stream = tunnel
3465 .begin_stream("www.example.com", 443, None)
3466 .await
3467 .unwrap();
3468
3469 stream.write_all(&stream_data).await.unwrap();
3470 stream.flush().await.unwrap();
3471
3472 ConfluxEndpointResult::Circuit(tunnel)
3473 }
3474
3475 #[cfg(feature = "conflux")]
3476 async fn run_conflux_endpoint(endpoint: ConfluxTestEndpoint) -> ConfluxEndpointResult {
3477 match endpoint {
3478 ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3479 ConfluxTestEndpoint::Client {
3480 tunnel,
3481 conflux_link_rx,
3482 stream_data,
3483 } => run_conflux_client(tunnel, conflux_link_rx, stream_data).await,
3484 }
3485 }
3486
3487 #[traced_test]
3488 #[test]
3489 #[cfg(feature = "conflux")]
3490 fn multipath_stream() {
3491 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3492 let TestTunnelCtx {
3493 tunnel,
3494 circs,
3495 conflux_link_rx,
3496 } = setup_good_conflux_tunnel(&rt).await;
3497 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3498
3499 let stream_data = (0..255_u8).cycle().take(300 * 498).collect::<Vec<_>>();
3501 let stream_state = Arc::new(Mutex::new(ConfluxStreamState {
3502 data_recvd: vec![],
3503 expected_data_len: stream_data.len(),
3504 begin_recvd: false,
3505 end_recvd: false,
3506 }));
3507
3508 let mut tasks = vec![];
3509 let relays = [
3514 (
3515 circ1.chan_rx,
3516 circ1.circ_sink,
3517 vec![2],
3520 None,
3521 Some(Duration::from_millis(300)),
3522 ),
3523 (
3524 circ2.chan_rx,
3525 circ2.circ_sink,
3526 vec![1],
3527 Some(Duration::from_millis(200)),
3528 None,
3529 ),
3530 ];
3531
3532 for (leg, (rx, sink, expect_switch, init_rtt_delay, rtt_delay)) in
3533 relays.into_iter().enumerate()
3534 {
3535 let relay = ConfluxTestEndpoint::Relay(ConfluxExitState {
3536 runtime: rt.clone(),
3537 leg,
3538 init_rtt_delay,
3539 rtt_delay,
3540 rx,
3541 sink,
3542 stream_state: Arc::clone(&stream_state),
3543 expect_switch,
3544 });
3545
3546 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3547 }
3548
3549 tasks.push(rt.spawn_join(
3550 "client task".to_string(),
3551 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3552 tunnel,
3553 conflux_link_rx,
3554 stream_data: stream_data.clone(),
3555 }),
3556 ));
3557 let _sinks = futures::future::join_all(tasks).await;
3558
3559 let stream_state = stream_state.lock().unwrap();
3560 assert!(stream_state.begin_recvd);
3561 assert!(stream_state.end_recvd);
3562
3563 assert_eq!(stream_state.data_recvd, stream_data);
3566 });
3567 }
3568
3569 #[traced_test]
3570 #[test]
3571 #[cfg(all(feature = "conflux", feature = "hs-service"))]
3572 fn conflux_incoming_stream() {
3573 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3574 use std::error::Error as _;
3575
3576 const EXPECTED_HOP: u8 = 1;
3577
3578 let TestTunnelCtx {
3579 tunnel,
3580 circs,
3581 conflux_link_rx,
3582 } = setup_good_conflux_tunnel(&rt).await;
3583
3584 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3585
3586 let link = await_link_payload(&mut circ1.chan_rx).await;
3587 for circ in [&mut circ1, &mut circ2] {
3588 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3589 circ.circ_sink
3590 .send(rmsg_to_ccmsg(None, linked))
3591 .await
3592 .unwrap();
3593 }
3594
3595 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3596 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3597
3598 let err = tunnel
3600 .allow_stream_requests(
3601 &[tor_cell::relaycell::RelayCmd::BEGIN],
3602 EXPECTED_HOP.into(),
3603 AllowAllStreamsFilter,
3604 )
3605 .await
3606 .map(|_| ())
3608 .unwrap_err();
3609
3610 let err_src = err.source().unwrap();
3611 assert!(err_src
3612 .to_string()
3613 .contains("Cannot allow stream requests on tunnel with 2 legs"));
3614 });
3615 }
3616}