1pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55 AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56 StreamRateLimit, StreamReceiver,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61 CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use postage::watch;
69use tor_cell::{
70 chancell::CircId,
71 relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
72};
73
74use tor_error::{bad_api_usage, internal, into_internal};
75use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
76use tor_protover::named;
77
78pub use crate::crypto::binding::CircuitBinding;
79pub use crate::memquota::StreamAccount;
80pub use crate::tunnel::circuit::unique_id::UniqId;
81
82#[cfg(feature = "hs-service")]
83use {
84 crate::stream::{IncomingCmdChecker, IncomingStream},
85 crate::tunnel::reactor::StreamReqInfo,
86};
87
88use futures::channel::mpsc;
89use oneshot_fused_workaround as oneshot;
90
91use crate::congestion::sendme::StreamRecvWindow;
92use crate::DynTimeProvider;
93use futures::FutureExt as _;
94use std::collections::HashMap;
95use std::net::IpAddr;
96use std::sync::{Arc, Mutex};
97use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
98
99use crate::crypto::handshake::ntor::NtorPublicKey;
100
101pub use path::{Path, PathEntry};
102
103pub const CIRCUIT_BUFFER_SIZE: usize = 128;
105
106#[cfg(feature = "send-control-msg")]
107use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
108
109pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
110#[cfg(feature = "send-control-msg")]
111#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
112pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
113
114pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
116pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
118
119pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
121pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
123
124#[derive(Debug)]
125pub struct ClientCirc {
168 mutable: Arc<TunnelMutableState>,
170 unique_id: UniqId,
172 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
174 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
176 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
179 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
180 #[cfg(test)]
182 circid: CircId,
183 memquota: CircuitAccount,
185 time_provider: DynTimeProvider,
187}
188
189#[derive(Debug, Default)]
209pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, Arc<MutableState>>>);
210
211impl TunnelMutableState {
212 pub(super) fn insert(&self, unique_id: UniqId, mutable: Arc<MutableState>) {
214 #[allow(unused)] let state = self
216 .0
217 .lock()
218 .expect("lock poisoned")
219 .insert(unique_id, mutable);
220
221 debug_assert!(state.is_none());
222 }
223
224 pub(super) fn remove(&self, unique_id: UniqId) {
226 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
228
229 debug_assert!(state.is_some());
230 }
231
232 fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
236 let lock = self.0.lock().expect("lock poisoned");
237 let mutable = lock
238 .get(&unique_id)
239 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
240
241 Ok(mutable.path())
242 }
243
244 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
249 let lock = self.0.lock().expect("lock poisoned");
250 let mutable = lock
251 .get(&unique_id)
252 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
253
254 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
255 path::HopDetail::Relay(r) => r,
256 #[cfg(feature = "hs-common")]
257 path::HopDetail::Virtual => {
258 panic!("somehow made a circuit with a virtual first hop.")
259 }
260 });
261
262 Ok(first_hop)
263 }
264
265 fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
271 let lock = self.0.lock().expect("lock poisoned");
272 let mutable = lock
273 .get(&unique_id)
274 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
275
276 Ok(mutable.last_hop_num())
277 }
278
279 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
283 let lock = self.0.lock().expect("lock poisoned");
284 let mutable = lock
285 .get(&unique_id)
286 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
287
288 Ok(mutable.n_hops())
289 }
290
291 fn n_legs(&self) -> usize {
296 let lock = self.0.lock().expect("lock poisoned");
297 lock.len()
298 }
299}
300
301#[derive(Educe, Default)]
303#[educe(Debug)]
304pub(super) struct MutableState(Mutex<CircuitState>);
305
306impl MutableState {
307 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
309 let mut mutable = self.0.lock().expect("poisoned lock");
310 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
311 mutable.binding.push(binding);
312 }
313
314 pub(super) fn path(&self) -> Arc<path::Path> {
316 let mutable = self.0.lock().expect("poisoned lock");
317 Arc::clone(&mutable.path)
318 }
319
320 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
323 let mutable = self.0.lock().expect("poisoned lock");
324
325 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
326 }
329
330 fn first_hop(&self) -> Option<HopDetail> {
332 let mutable = self.0.lock().expect("poisoned lock");
333 mutable.path.first_hop()
334 }
335
336 fn last_hop_num(&self) -> Option<HopNum> {
343 let mutable = self.0.lock().expect("poisoned lock");
344 mutable.path.last_hop_num()
345 }
346
347 fn n_hops(&self) -> usize {
354 let mutable = self.0.lock().expect("poisoned lock");
355 mutable.path.n_hops()
356 }
357}
358
359#[derive(Educe, Default)]
361#[educe(Debug)]
362pub(super) struct CircuitState {
363 path: Arc<path::Path>,
369
370 #[educe(Debug(ignore))]
378 binding: Vec<Option<CircuitBinding>>,
379}
380
381pub struct PendingClientCirc {
386 recvcreated: oneshot::Receiver<CreateResponse>,
389 circ: Arc<ClientCirc>,
391}
392
393#[non_exhaustive]
405#[derive(Clone, Debug)]
406pub struct CircParameters {
407 pub extend_by_ed25519_id: bool,
410 pub ccontrol: CongestionControlParams,
412
413 pub n_incoming_cells_permitted: Option<u32>,
423
424 pub n_outgoing_cells_permitted: Option<u32>,
439}
440
441#[derive(Clone, Debug)]
453pub(super) struct HopSettings {
454 pub(super) ccontrol: CongestionControlParams,
456
457 pub(super) n_incoming_cells_permitted: Option<u32>,
459
460 pub(super) n_outgoing_cells_permitted: Option<u32>,
462}
463
464impl HopSettings {
465 #[allow(clippy::unnecessary_wraps)] pub(super) fn from_params_and_caps(
476 params: &CircParameters,
477 caps: &tor_protover::Protocols,
478 ) -> Result<Self> {
479 let mut settings = Self {
480 ccontrol: params.ccontrol.clone(),
481 n_incoming_cells_permitted: params.n_incoming_cells_permitted,
482 n_outgoing_cells_permitted: params.n_outgoing_cells_permitted,
483 };
484
485 match settings.ccontrol.alg() {
486 crate::ccparams::Algorithm::FixedWindow(_) => {}
487 crate::ccparams::Algorithm::Vegas(_) => {
488 if !caps.supports_named_subver(named::FLOWCTRL_CC) {
490 settings.ccontrol.use_fallback_alg();
491 }
492 }
493 }
494
495 Ok(settings)
496 }
497
498 pub(super) fn without_negotiation(mut self) -> Self {
505 self.ccontrol.use_fallback_alg();
506 self
507 }
508}
509
510#[cfg(test)]
511impl std::default::Default for CircParameters {
512 fn default() -> Self {
513 Self {
514 extend_by_ed25519_id: true,
515 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
516 n_incoming_cells_permitted: None,
517 n_outgoing_cells_permitted: None,
518 }
519 }
520}
521
522impl CircParameters {
523 pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
525 Self {
526 extend_by_ed25519_id,
527 ccontrol,
528 n_incoming_cells_permitted: None,
529 n_outgoing_cells_permitted: None,
530 }
531 }
532}
533
534impl ClientCirc {
535 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
543 Ok(self
544 .mutable
545 .first_hop(self.unique_id)
546 .map_err(|_| Error::CircuitClosed)?
547 .expect("called first_hop on an un-constructed circuit"))
548 }
549
550 pub fn last_hop_info(&self) -> Result<Option<OwnedChanTarget>> {
562 let path = self.path_ref()?;
563 Ok(path
564 .hops()
565 .last()
566 .expect("Called last_hop an an un-constructed circuit")
567 .as_chan_target()
568 .map(OwnedChanTarget::from_chan_target))
569 }
570
571 pub fn last_hop_num(&self) -> Result<HopNum> {
581 Ok(self
582 .mutable
583 .last_hop_num(self.unique_id)?
584 .ok_or_else(|| internal!("no last hop index"))?)
585 }
586
587 pub fn last_hop(&self) -> Result<TargetHop> {
592 let hop_num = self
593 .mutable
594 .last_hop_num(self.unique_id)?
595 .ok_or_else(|| bad_api_usage!("no last hop"))?;
596 Ok((self.unique_id, hop_num).into())
597 }
598
599 pub fn path_ref(&self) -> Result<Arc<Path>> {
604 self.mutable
605 .path_ref(self.unique_id)
606 .map_err(|_| Error::CircuitClosed)
607 }
608
609 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
613 let (tx, rx) = oneshot::channel();
614
615 self.control
616 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
617 .map_err(|_| Error::CircuitClosed)?;
618
619 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
620 }
621
622 pub fn mq_account(&self) -> &CircuitAccount {
624 &self.memquota
625 }
626
627 #[cfg(feature = "hs-service")]
635 pub async fn binding_key(&self, hop: TargetHop) -> Result<Option<CircuitBinding>> {
636 let (sender, receiver) = oneshot::channel();
637 let msg = CtrlCmd::GetBindingKey { hop, done: sender };
638 self.command
639 .unbounded_send(msg)
640 .map_err(|_| Error::CircuitClosed)?;
641
642 receiver.await.map_err(|_| Error::CircuitClosed)?
643 }
644
645 #[cfg(feature = "send-control-msg")]
723 pub async fn start_conversation(
724 &self,
725 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
726 reply_handler: impl MsgHandler + Send + 'static,
727 hop: TargetHop,
728 ) -> Result<Conversation<'_>> {
729 let (sender, receiver) = oneshot::channel();
732 self.command
733 .unbounded_send(CtrlCmd::ResolveTargetHop { hop, done: sender })
734 .map_err(|_| Error::CircuitClosed)?;
735 let hop_location = receiver.await.map_err(|_| Error::CircuitClosed)??;
736 let handler = Box::new(UserMsgHandler::new(hop_location, reply_handler));
737 let conversation = Conversation(self);
738 conversation.send_internal(msg, Some(handler)).await?;
739 Ok(conversation)
740 }
741
742 #[cfg(feature = "send-control-msg")]
748 pub async fn send_raw_msg(
749 &self,
750 msg: tor_cell::relaycell::msg::AnyRelayMsg,
751 hop: TargetHop,
752 ) -> Result<()> {
753 let (sender, receiver) = oneshot::channel();
754 let ctrl_msg = CtrlMsg::SendMsg { hop, msg, sender };
755 self.control
756 .unbounded_send(ctrl_msg)
757 .map_err(|_| Error::CircuitClosed)?;
758
759 receiver.await.map_err(|_| Error::CircuitClosed)?
760 }
761
762 #[cfg(feature = "hs-service")]
782 pub async fn allow_stream_requests(
783 self: &Arc<ClientCirc>,
784 allow_commands: &[tor_cell::relaycell::RelayCmd],
785 hop: TargetHop,
786 filter: impl crate::stream::IncomingStreamRequestFilter,
787 ) -> Result<impl futures::Stream<Item = IncomingStream>> {
788 use futures::stream::StreamExt;
789
790 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
792
793 let circ_count = self.mutable.n_legs();
795 if circ_count != 1 {
796 return Err(
797 internal!("Cannot allow stream requests on tunnel with {circ_count} legs",).into(),
798 );
799 }
800
801 let time_prov = self.time_provider.clone();
802 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
803 let (incoming_sender, incoming_receiver) =
804 MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
805 let (tx, rx) = oneshot::channel();
806
807 self.command
808 .unbounded_send(CtrlCmd::AwaitStreamRequest {
809 cmd_checker,
810 incoming_sender,
811 hop,
812 done: tx,
813 filter: Box::new(filter),
814 })
815 .map_err(|_| Error::CircuitClosed)?;
816
817 rx.await.map_err(|_| Error::CircuitClosed)??;
819
820 let allowed_hop_loc = match hop {
821 TargetHop::Hop(loc) => Some(loc),
822 _ => None,
823 }
824 .ok_or_else(|| bad_api_usage!("Expected TargetHop with HopLocation"))?;
825
826 let circ = Arc::clone(self);
827 Ok(incoming_receiver.map(move |req_ctx| {
828 let StreamReqInfo {
829 req,
830 stream_id,
831 hop,
832 receiver,
833 msg_tx,
834 rate_limit_stream,
835 memquota,
836 relay_cell_format,
837 } = req_ctx;
838
839 assert_eq!(allowed_hop_loc, hop);
844
845 let target = StreamTarget {
852 circ: Arc::clone(&circ),
853 tx: msg_tx,
854 hop: allowed_hop_loc,
855 stream_id,
856 relay_cell_format,
857 rate_limit_stream,
858 };
859
860 let reader = StreamReceiver {
861 target: target.clone(),
862 receiver,
863 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
864 ended: false,
865 };
866
867 IncomingStream::new(circ.time_provider.clone(), req, target, reader, memquota)
868 }))
869 }
870
871 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
874 where
875 Tg: CircTarget,
876 {
877 if target
889 .protovers()
890 .supports_named_subver(named::RELAY_NTORV3)
891 {
892 self.extend_ntor_v3(target, params).await
893 } else {
894 self.extend_ntor(target, params).await
895 }
896 }
897
898 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
901 where
902 Tg: CircTarget,
903 {
904 let key = NtorPublicKey {
905 id: *target
906 .rsa_identity()
907 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
908 pk: *target.ntor_onion_key(),
909 };
910 let mut linkspecs = target
911 .linkspecs()
912 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
913 if !params.extend_by_ed25519_id {
914 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
915 }
916
917 let (tx, rx) = oneshot::channel();
918
919 let peer_id = OwnedChanTarget::from_chan_target(target);
920 let settings =
921 HopSettings::from_params_and_caps(¶ms, target.protovers())?.without_negotiation();
922 self.control
923 .unbounded_send(CtrlMsg::ExtendNtor {
924 peer_id,
925 public_key: key,
926 linkspecs,
927 settings,
928 done: tx,
929 })
930 .map_err(|_| Error::CircuitClosed)?;
931
932 rx.await.map_err(|_| Error::CircuitClosed)??;
933
934 Ok(())
935 }
936
937 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
940 where
941 Tg: CircTarget,
942 {
943 let key = NtorV3PublicKey {
944 id: *target
945 .ed_identity()
946 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
947 pk: *target.ntor_onion_key(),
948 };
949 let mut linkspecs = target
950 .linkspecs()
951 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
952 if !params.extend_by_ed25519_id {
953 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
954 }
955
956 let (tx, rx) = oneshot::channel();
957
958 let peer_id = OwnedChanTarget::from_chan_target(target);
959 let settings = HopSettings::from_params_and_caps(¶ms, target.protovers())?;
960 self.control
961 .unbounded_send(CtrlMsg::ExtendNtorV3 {
962 peer_id,
963 public_key: key,
964 linkspecs,
965 settings,
966 done: tx,
967 })
968 .map_err(|_| Error::CircuitClosed)?;
969
970 rx.await.map_err(|_| Error::CircuitClosed)??;
971
972 Ok(())
973 }
974
975 #[cfg(feature = "hs-common")]
999 pub async fn extend_virtual(
1000 &self,
1001 protocol: handshake::RelayProtocol,
1002 role: handshake::HandshakeRole,
1003 seed: impl handshake::KeyGenerator,
1004 params: &CircParameters,
1005 capabilities: &tor_protover::Protocols,
1006 ) -> Result<()> {
1007 use self::handshake::BoxedClientLayer;
1008
1009 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
1010 let relay_cell_format = protocol.relay_cell_format();
1011
1012 let BoxedClientLayer { fwd, back, binding } =
1013 protocol.construct_client_layers(role, seed)?;
1014
1015 let settings = HopSettings::from_params_and_caps(params, capabilities)?
1016 .without_negotiation();
1018 let (tx, rx) = oneshot::channel();
1019 let message = CtrlCmd::ExtendVirtual {
1020 relay_cell_format,
1021 cell_crypto: (fwd, back, binding),
1022 settings,
1023 done: tx,
1024 };
1025
1026 self.command
1027 .unbounded_send(message)
1028 .map_err(|_| Error::CircuitClosed)?;
1029
1030 rx.await.map_err(|_| Error::CircuitClosed)?
1031 }
1032
1033 async fn begin_stream_impl(
1041 self: &Arc<ClientCirc>,
1042 begin_msg: AnyRelayMsg,
1043 cmd_checker: AnyCmdChecker,
1044 ) -> Result<(StreamReceiver, StreamTarget, StreamAccount)> {
1045 let hop = TargetHop::LastHop;
1048
1049 let time_prov = self.time_provider.clone();
1050
1051 let memquota = StreamAccount::new(self.mq_account())?;
1052 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
1053 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
1054 let (tx, rx) = oneshot::channel();
1055 let (msg_tx, msg_rx) =
1056 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
1057
1058 let (rate_limit_tx, rate_limit_rx) = watch::channel_with(StreamRateLimit::MAX);
1059
1060 self.control
1061 .unbounded_send(CtrlMsg::BeginStream {
1062 hop,
1063 message: begin_msg,
1064 sender,
1065 rx: msg_rx,
1066 rate_limit_notifier: rate_limit_tx,
1067 done: tx,
1068 cmd_checker,
1069 })
1070 .map_err(|_| Error::CircuitClosed)?;
1071
1072 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
1073
1074 let target = StreamTarget {
1075 circ: self.clone(),
1076 tx: msg_tx,
1077 hop,
1078 stream_id,
1079 relay_cell_format,
1080 rate_limit_stream: rate_limit_rx,
1081 };
1082
1083 let stream_receiver = StreamReceiver {
1084 target: target.clone(),
1085 receiver,
1086 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
1087 ended: false,
1088 };
1089
1090 Ok((stream_receiver, target, memquota))
1091 }
1092
1093 async fn begin_data_stream(
1096 self: &Arc<ClientCirc>,
1097 msg: AnyRelayMsg,
1098 optimistic: bool,
1099 ) -> Result<DataStream> {
1100 let (stream_receiver, target, memquota) = self
1101 .begin_stream_impl(msg, DataCmdChecker::new_any())
1102 .await?;
1103 let mut stream = DataStream::new(
1104 self.time_provider.clone(),
1105 stream_receiver,
1106 target,
1107 memquota,
1108 );
1109 if !optimistic {
1110 stream.wait_for_connection().await?;
1111 }
1112 Ok(stream)
1113 }
1114
1115 pub async fn begin_stream(
1121 self: &Arc<ClientCirc>,
1122 target: &str,
1123 port: u16,
1124 parameters: Option<StreamParameters>,
1125 ) -> Result<DataStream> {
1126 let parameters = parameters.unwrap_or_default();
1127 let begin_flags = parameters.begin_flags();
1128 let optimistic = parameters.is_optimistic();
1129 let target = if parameters.suppressing_hostname() {
1130 ""
1131 } else {
1132 target
1133 };
1134 let beginmsg = Begin::new(target, port, begin_flags)
1135 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
1136 self.begin_data_stream(beginmsg.into(), optimistic).await
1137 }
1138
1139 pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
1142 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
1147 .await
1148 }
1149
1150 pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
1156 let resolve_msg = Resolve::new(hostname);
1157
1158 let resolved_msg = self.try_resolve(resolve_msg).await?;
1159
1160 resolved_msg
1161 .into_answers()
1162 .into_iter()
1163 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1164 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1165 Ok(_) => None,
1166 Err(e) => Some(Err(e)),
1167 })
1168 .collect()
1169 }
1170
1171 pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1177 let resolve_ptr_msg = Resolve::new_reverse(&addr);
1178
1179 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1180
1181 resolved_msg
1182 .into_answers()
1183 .into_iter()
1184 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1185 Ok(ResolvedVal::Hostname(v)) => Some(
1186 String::from_utf8(v)
1187 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1188 ),
1189 Ok(_) => None,
1190 Err(e) => Some(Err(e)),
1191 })
1192 .collect()
1193 }
1194
1195 async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1198 let (stream_receiver, _target, memquota) = self
1199 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1200 .await?;
1201 let mut resolve_stream = ResolveStream::new(stream_receiver, memquota);
1202 resolve_stream.read_msg().await
1203 }
1204
1205 pub fn terminate(&self) {
1216 let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1217 }
1218
1219 pub(crate) fn protocol_error(&self) {
1227 self.terminate();
1228 }
1229
1230 pub fn is_closing(&self) -> bool {
1232 self.control.is_closed()
1233 }
1234
1235 pub fn unique_id(&self) -> UniqId {
1237 self.unique_id
1238 }
1239
1240 pub fn n_hops(&self) -> Result<usize> {
1247 self.mutable
1248 .n_hops(self.unique_id)
1249 .map_err(|_| Error::CircuitClosed)
1250 }
1251
1252 #[cfg(feature = "experimental-api")]
1259 pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1260 self.reactor_closed_rx.clone().map(|_| ())
1261 }
1262}
1263
1264#[cfg(feature = "send-control-msg")]
1272#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1273pub struct Conversation<'r>(&'r ClientCirc);
1274
1275#[cfg(feature = "send-control-msg")]
1276#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1277impl Conversation<'_> {
1278 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1283 self.send_internal(Some(msg), None).await
1284 }
1285
1286 pub(crate) async fn send_internal(
1290 &self,
1291 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1292 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1293 ) -> Result<()> {
1294 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1295 let (sender, receiver) = oneshot::channel();
1296
1297 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1298 msg,
1299 handler,
1300 sender,
1301 };
1302 self.0
1303 .control
1304 .unbounded_send(ctrl_msg)
1305 .map_err(|_| Error::CircuitClosed)?;
1306
1307 receiver.await.map_err(|_| Error::CircuitClosed)?
1308 }
1309}
1310
1311impl PendingClientCirc {
1312 pub(crate) fn new(
1318 id: CircId,
1319 channel: Arc<Channel>,
1320 createdreceiver: oneshot::Receiver<CreateResponse>,
1321 input: CircuitRxReceiver,
1322 unique_id: UniqId,
1323 runtime: DynTimeProvider,
1324 memquota: CircuitAccount,
1325 ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1326 let time_provider = channel.time_provider().clone();
1327 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1328 Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1329
1330 let circuit = ClientCirc {
1331 mutable,
1332 unique_id,
1333 control: control_tx,
1334 command: command_tx,
1335 reactor_closed_rx: reactor_closed_rx.shared(),
1336 #[cfg(test)]
1337 circid: id,
1338 memquota,
1339 time_provider,
1340 };
1341
1342 let pending = PendingClientCirc {
1343 recvcreated: createdreceiver,
1344 circ: Arc::new(circuit),
1345 };
1346 (pending, reactor)
1347 }
1348
1349 pub fn peek_unique_id(&self) -> UniqId {
1351 self.circ.unique_id
1352 }
1353
1354 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1361 let protocols = tor_protover::Protocols::new();
1365 let settings =
1366 HopSettings::from_params_and_caps(¶ms, &protocols)?.without_negotiation();
1367
1368 let (tx, rx) = oneshot::channel();
1369 self.circ
1370 .control
1371 .unbounded_send(CtrlMsg::Create {
1372 recv_created: self.recvcreated,
1373 handshake: CircuitHandshake::CreateFast,
1374 settings,
1375 done: tx,
1376 })
1377 .map_err(|_| Error::CircuitClosed)?;
1378
1379 rx.await.map_err(|_| Error::CircuitClosed)??;
1380
1381 Ok(self.circ)
1382 }
1383
1384 pub async fn create_firsthop<Tg>(
1389 self,
1390 target: &Tg,
1391 params: CircParameters,
1392 ) -> Result<Arc<ClientCirc>>
1393 where
1394 Tg: tor_linkspec::CircTarget,
1395 {
1396 if target
1398 .protovers()
1399 .supports_named_subver(named::RELAY_NTORV3)
1400 {
1401 self.create_firsthop_ntor_v3(target, params).await
1402 } else {
1403 self.create_firsthop_ntor(target, params).await
1404 }
1405 }
1406
1407 pub async fn create_firsthop_ntor<Tg>(
1412 self,
1413 target: &Tg,
1414 params: CircParameters,
1415 ) -> Result<Arc<ClientCirc>>
1416 where
1417 Tg: tor_linkspec::CircTarget,
1418 {
1419 let (tx, rx) = oneshot::channel();
1420 let settings =
1421 HopSettings::from_params_and_caps(¶ms, target.protovers())?.without_negotiation();
1422
1423 self.circ
1424 .control
1425 .unbounded_send(CtrlMsg::Create {
1426 recv_created: self.recvcreated,
1427 handshake: CircuitHandshake::Ntor {
1428 public_key: NtorPublicKey {
1429 id: *target
1430 .rsa_identity()
1431 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1432 pk: *target.ntor_onion_key(),
1433 },
1434 ed_identity: *target
1435 .ed_identity()
1436 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1437 },
1438 settings,
1439 done: tx,
1440 })
1441 .map_err(|_| Error::CircuitClosed)?;
1442
1443 rx.await.map_err(|_| Error::CircuitClosed)??;
1444
1445 Ok(self.circ)
1446 }
1447
1448 pub async fn create_firsthop_ntor_v3<Tg>(
1457 self,
1458 target: &Tg,
1459 params: CircParameters,
1460 ) -> Result<Arc<ClientCirc>>
1461 where
1462 Tg: tor_linkspec::CircTarget,
1463 {
1464 let settings = HopSettings::from_params_and_caps(¶ms, target.protovers())?;
1465 let (tx, rx) = oneshot::channel();
1466
1467 self.circ
1468 .control
1469 .unbounded_send(CtrlMsg::Create {
1470 recv_created: self.recvcreated,
1471 handshake: CircuitHandshake::NtorV3 {
1472 public_key: NtorV3PublicKey {
1473 id: *target
1474 .ed_identity()
1475 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1476 pk: *target.ntor_onion_key(),
1477 },
1478 },
1479 settings,
1480 done: tx,
1481 })
1482 .map_err(|_| Error::CircuitClosed)?;
1483
1484 rx.await.map_err(|_| Error::CircuitClosed)??;
1485
1486 Ok(self.circ)
1487 }
1488}
1489
1490fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1493 match val {
1494 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1495 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1496 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1497 _ => Ok(val),
1498 }
1499}
1500
1501#[cfg(test)]
1502pub(crate) mod test {
1503 #![allow(clippy::bool_assert_comparison)]
1505 #![allow(clippy::clone_on_copy)]
1506 #![allow(clippy::dbg_macro)]
1507 #![allow(clippy::mixed_attributes_style)]
1508 #![allow(clippy::print_stderr)]
1509 #![allow(clippy::print_stdout)]
1510 #![allow(clippy::single_char_pattern)]
1511 #![allow(clippy::unwrap_used)]
1512 #![allow(clippy::unchecked_duration_subtraction)]
1513 #![allow(clippy::useless_vec)]
1514 #![allow(clippy::needless_pass_by_value)]
1515 use super::*;
1518 use crate::channel::OpenChanCellS2C;
1519 use crate::channel::{test::new_reactor, CodecError};
1520 use crate::congestion::test_utils::params::build_cc_vegas_params;
1521 use crate::crypto::cell::RelayCellBody;
1522 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1523 #[cfg(feature = "hs-service")]
1524 use crate::stream::IncomingStreamRequestFilter;
1525 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1526 use futures::channel::mpsc::{Receiver, Sender};
1527 use futures::io::{AsyncReadExt, AsyncWriteExt};
1528 use futures::sink::SinkExt;
1529 use futures::stream::StreamExt;
1530 use futures::task::SpawnExt;
1531 use hex_literal::hex;
1532 use std::collections::{HashMap, VecDeque};
1533 use std::fmt::Debug;
1534 use std::time::Duration;
1535 use tor_basic_utils::test_rng::testing_rng;
1536 use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCell, ChanCmd};
1537 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1538 use tor_cell::relaycell::msg::SendmeTag;
1539 use tor_cell::relaycell::{
1540 msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1541 };
1542 use tor_linkspec::OwnedCircTarget;
1543 use tor_memquota::HasMemoryCost;
1544 use tor_rtcompat::Runtime;
1545 use tracing::trace;
1546 use tracing_test::traced_test;
1547
1548 #[cfg(feature = "conflux")]
1549 use {
1550 crate::tunnel::reactor::ConfluxHandshakeResult,
1551 crate::util::err::ConfluxHandshakeError,
1552 futures::lock::Mutex as AsyncMutex,
1553 std::result::Result as StdResult,
1554 tor_cell::relaycell::conflux::{V1DesiredUx, V1LinkPayload, V1Nonce},
1555 tor_cell::relaycell::msg::ConfluxLink,
1556 tor_rtmock::MockRuntime,
1557 };
1558
1559 impl PendingClientCirc {
1560 pub(crate) fn peek_circid(&self) -> CircId {
1562 self.circ.circid
1563 }
1564 }
1565
1566 impl ClientCirc {
1567 pub(crate) fn peek_circid(&self) -> CircId {
1569 self.circid
1570 }
1571 }
1572
1573 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1574 let rfmt = RelayCellFormat::V0;
1576 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1577 .encode(rfmt, &mut testing_rng())
1578 .unwrap();
1579 let chanmsg = chanmsg::Relay::from(body);
1580 ClientCircChanMsg::Relay(chanmsg)
1581 }
1582
1583 const EXAMPLE_SK: [u8; 32] =
1585 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1586 const EXAMPLE_PK: [u8; 32] =
1587 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1588 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1589 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1590
1591 #[cfg(test)]
1593 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1594 buffer: usize,
1595 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1596 crate::fake_mpsc(buffer)
1597 }
1598
1599 fn example_target() -> OwnedCircTarget {
1601 let mut builder = OwnedCircTarget::builder();
1602 builder
1603 .chan_target()
1604 .ed_identity(EXAMPLE_ED_ID.into())
1605 .rsa_identity(EXAMPLE_RSA_ID.into());
1606 builder
1607 .ntor_onion_key(EXAMPLE_PK.into())
1608 .protocols("FlowCtrl=1-2".parse().unwrap())
1609 .build()
1610 .unwrap()
1611 }
1612 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1613 crate::crypto::handshake::ntor::NtorSecretKey::new(
1614 EXAMPLE_SK.into(),
1615 EXAMPLE_PK.into(),
1616 EXAMPLE_RSA_ID.into(),
1617 )
1618 }
1619 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1620 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1621 EXAMPLE_SK.into(),
1622 EXAMPLE_PK.into(),
1623 EXAMPLE_ED_ID.into(),
1624 )
1625 }
1626
1627 fn working_fake_channel<R: Runtime>(
1628 rt: &R,
1629 ) -> (
1630 Arc<Channel>,
1631 Receiver<AnyChanCell>,
1632 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1633 ) {
1634 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1635 rt.spawn(async {
1636 let _ignore = chan_reactor.run().await;
1637 })
1638 .unwrap();
1639 (channel, rx, tx)
1640 }
1641
1642 #[derive(Copy, Clone)]
1644 enum HandshakeType {
1645 Fast,
1646 Ntor,
1647 NtorV3,
1648 }
1649
1650 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1651 use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1655
1656 let (chan, mut rx, _sink) = working_fake_channel(rt);
1657 let circid = CircId::new(128).unwrap();
1658 let (created_send, created_recv) = oneshot::channel();
1659 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1660 let unique_id = UniqId::new(23, 17);
1661
1662 let (pending, reactor) = PendingClientCirc::new(
1663 circid,
1664 chan,
1665 created_recv,
1666 circmsg_recv,
1667 unique_id,
1668 DynTimeProvider::new(rt.clone()),
1669 CircuitAccount::new_noop(),
1670 );
1671
1672 rt.spawn(async {
1673 let _ignore = reactor.run().await;
1674 })
1675 .unwrap();
1676
1677 let simulate_relay_fut = async move {
1679 let mut rng = testing_rng();
1680 let create_cell = rx.next().await.unwrap();
1681 assert_eq!(create_cell.circid(), Some(circid));
1682 let reply = match handshake_type {
1683 HandshakeType::Fast => {
1684 let cf = match create_cell.msg() {
1685 AnyChanMsg::CreateFast(cf) => cf,
1686 other => panic!("{:?}", other),
1687 };
1688 let (_, rep) = CreateFastServer::server(
1689 &mut rng,
1690 &mut |_: &()| Some(()),
1691 &[()],
1692 cf.handshake(),
1693 )
1694 .unwrap();
1695 CreateResponse::CreatedFast(CreatedFast::new(rep))
1696 }
1697 HandshakeType::Ntor => {
1698 let c2 = match create_cell.msg() {
1699 AnyChanMsg::Create2(c2) => c2,
1700 other => panic!("{:?}", other),
1701 };
1702 let (_, rep) = NtorServer::server(
1703 &mut rng,
1704 &mut |_: &()| Some(()),
1705 &[example_ntor_key()],
1706 c2.body(),
1707 )
1708 .unwrap();
1709 CreateResponse::Created2(Created2::new(rep))
1710 }
1711 HandshakeType::NtorV3 => {
1712 let c2 = match create_cell.msg() {
1713 AnyChanMsg::Create2(c2) => c2,
1714 other => panic!("{:?}", other),
1715 };
1716 let mut reply_fn = if with_cc {
1717 |client_exts: &[CircRequestExt]| {
1718 let _ = client_exts
1719 .iter()
1720 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1721 .expect("Client failed to request CC");
1722 Some(vec![CircResponseExt::CcResponse(
1725 extend_ext::CcResponse::new(31),
1726 )])
1727 }
1728 } else {
1729 |_: &_| Some(vec![])
1730 };
1731 let (_, rep) = NtorV3Server::server(
1732 &mut rng,
1733 &mut reply_fn,
1734 &[example_ntor_v3_key()],
1735 c2.body(),
1736 )
1737 .unwrap();
1738 CreateResponse::Created2(Created2::new(rep))
1739 }
1740 };
1741 created_send.send(reply).unwrap();
1742 };
1743 let client_fut = async move {
1745 let target = example_target();
1746 let params = CircParameters::default();
1747 let ret = match handshake_type {
1748 HandshakeType::Fast => {
1749 trace!("doing fast create");
1750 pending.create_firsthop_fast(params).await
1751 }
1752 HandshakeType::Ntor => {
1753 trace!("doing ntor create");
1754 pending.create_firsthop_ntor(&target, params).await
1755 }
1756 HandshakeType::NtorV3 => {
1757 let params = if with_cc {
1758 CircParameters::new(true, build_cc_vegas_params())
1760 } else {
1761 params
1762 };
1763 trace!("doing ntor_v3 create");
1764 pending.create_firsthop_ntor_v3(&target, params).await
1765 }
1766 };
1767 trace!("create done: result {:?}", ret);
1768 ret
1769 };
1770
1771 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1772
1773 let _circ = circ.unwrap();
1774
1775 assert_eq!(_circ.n_hops().unwrap(), 1);
1777 }
1778
1779 #[traced_test]
1780 #[test]
1781 fn test_create_fast() {
1782 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1783 test_create(&rt, HandshakeType::Fast, false).await;
1784 });
1785 }
1786 #[traced_test]
1787 #[test]
1788 fn test_create_ntor() {
1789 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1790 test_create(&rt, HandshakeType::Ntor, false).await;
1791 });
1792 }
1793 #[traced_test]
1794 #[test]
1795 fn test_create_ntor_v3() {
1796 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1797 test_create(&rt, HandshakeType::NtorV3, false).await;
1798 });
1799 }
1800 #[traced_test]
1801 #[test]
1802 #[cfg(feature = "flowctl-cc")]
1803 fn test_create_ntor_v3_with_cc() {
1804 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1805 test_create(&rt, HandshakeType::NtorV3, true).await;
1806 });
1807 }
1808
1809 pub(crate) struct DummyCrypto {
1812 counter_tag: [u8; 20],
1813 counter: u32,
1814 lasthop: bool,
1815 }
1816 impl DummyCrypto {
1817 fn next_tag(&mut self) -> SendmeTag {
1818 #![allow(clippy::identity_op)]
1819 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1820 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1821 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1822 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1823 self.counter += 1;
1824 self.counter_tag.into()
1825 }
1826 }
1827
1828 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1829 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1830 self.next_tag()
1831 }
1832 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1833 }
1834 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1835 fn decrypt_inbound(
1836 &mut self,
1837 _cmd: ChanCmd,
1838 _cell: &mut RelayCellBody,
1839 ) -> Option<SendmeTag> {
1840 if self.lasthop {
1841 Some(self.next_tag())
1842 } else {
1843 None
1844 }
1845 }
1846 }
1847 impl DummyCrypto {
1848 pub(crate) fn new(lasthop: bool) -> Self {
1849 DummyCrypto {
1850 counter_tag: [0; 20],
1851 counter: 0,
1852 lasthop,
1853 }
1854 }
1855 }
1856
1857 async fn newcirc_ext<R: Runtime>(
1860 rt: &R,
1861 unique_id: UniqId,
1862 chan: Arc<Channel>,
1863 hops: Vec<path::HopDetail>,
1864 next_msg_from: HopNum,
1865 params: CircParameters,
1866 ) -> (Arc<ClientCirc>, CircuitRxSender) {
1867 let circid = CircId::new(128).unwrap();
1868 let (_created_send, created_recv) = oneshot::channel();
1869 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1870
1871 let (pending, reactor) = PendingClientCirc::new(
1872 circid,
1873 chan,
1874 created_recv,
1875 circmsg_recv,
1876 unique_id,
1877 DynTimeProvider::new(rt.clone()),
1878 CircuitAccount::new_noop(),
1879 );
1880
1881 rt.spawn(async {
1882 let _ignore = reactor.run().await;
1883 })
1884 .unwrap();
1885
1886 let PendingClientCirc {
1887 circ,
1888 recvcreated: _,
1889 } = pending;
1890
1891 let relay_cell_format = RelayCellFormat::V0;
1893
1894 let last_hop_num = u8::try_from(hops.len() - 1).unwrap();
1895 for (idx, peer_id) in hops.into_iter().enumerate() {
1896 let (tx, rx) = oneshot::channel();
1897 let idx = idx as u8;
1898
1899 circ.command
1900 .unbounded_send(CtrlCmd::AddFakeHop {
1901 relay_cell_format,
1902 fwd_lasthop: idx == last_hop_num,
1903 rev_lasthop: idx == u8::from(next_msg_from),
1904 peer_id,
1905 params: params.clone(),
1906 done: tx,
1907 })
1908 .unwrap();
1909 rx.await.unwrap().unwrap();
1910 }
1911
1912 (circ, circmsg_send)
1913 }
1914
1915 async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1918 let hops = std::iter::repeat_with(|| {
1919 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1920 .ed_identity([4; 32].into())
1921 .rsa_identity([5; 20].into())
1922 .build()
1923 .expect("Could not construct fake hop");
1924
1925 path::HopDetail::Relay(peer_id)
1926 })
1927 .take(3)
1928 .collect();
1929
1930 let unique_id = UniqId::new(23, 17);
1931 newcirc_ext(
1932 rt,
1933 unique_id,
1934 chan,
1935 hops,
1936 2.into(),
1937 CircParameters::default(),
1938 )
1939 .await
1940 }
1941
1942 fn hop_details(n: u8, start_idx: u8) -> Vec<path::HopDetail> {
1945 (0..n)
1946 .map(|idx| {
1947 let peer_id = tor_linkspec::OwnedChanTarget::builder()
1948 .ed_identity([idx + start_idx; 32].into())
1949 .rsa_identity([idx + start_idx + 1; 20].into())
1950 .build()
1951 .expect("Could not construct fake hop");
1952
1953 path::HopDetail::Relay(peer_id)
1954 })
1955 .collect()
1956 }
1957
1958 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1959 use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1960
1961 let (chan, mut rx, _sink) = working_fake_channel(rt);
1962 let (circ, mut sink) = newcirc(rt, chan).await;
1963 let circid = circ.peek_circid();
1964 let params = CircParameters::default();
1965
1966 let extend_fut = async move {
1967 let target = example_target();
1968 match handshake_type {
1969 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1970 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1971 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1972 };
1973 circ };
1975 let reply_fut = async move {
1976 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1979 assert_eq!(id, Some(circid));
1980 let rmsg = match chmsg {
1981 AnyChanMsg::RelayEarly(r) => {
1982 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1983 .unwrap()
1984 }
1985 other => panic!("{:?}", other),
1986 };
1987 let e2 = match rmsg.msg() {
1988 AnyRelayMsg::Extend2(e2) => e2,
1989 other => panic!("{:?}", other),
1990 };
1991 let mut rng = testing_rng();
1992 let reply = match handshake_type {
1993 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1994 HandshakeType::Ntor => {
1995 let (_keygen, reply) = NtorServer::server(
1996 &mut rng,
1997 &mut |_: &()| Some(()),
1998 &[example_ntor_key()],
1999 e2.handshake(),
2000 )
2001 .unwrap();
2002 reply
2003 }
2004 HandshakeType::NtorV3 => {
2005 let (_keygen, reply) = NtorV3Server::server(
2006 &mut rng,
2007 &mut |_: &[CircRequestExt]| Some(vec![]),
2008 &[example_ntor_v3_key()],
2009 e2.handshake(),
2010 )
2011 .unwrap();
2012 reply
2013 }
2014 };
2015
2016 let extended2 = relaymsg::Extended2::new(reply).into();
2017 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
2018 (sink, rx) };
2020
2021 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
2022
2023 assert_eq!(circ.n_hops().unwrap(), 4);
2025
2026 {
2028 let path = circ.path_ref().unwrap();
2029 let path = path
2030 .all_hops()
2031 .filter_map(|hop| match hop {
2032 path::HopDetail::Relay(r) => Some(r),
2033 #[cfg(feature = "hs-common")]
2034 path::HopDetail::Virtual => None,
2035 })
2036 .collect::<Vec<_>>();
2037
2038 assert_eq!(path.len(), 4);
2039 use tor_linkspec::HasRelayIds;
2040 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
2041 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
2042 }
2043 {
2044 let path = circ.path_ref().unwrap();
2045 assert_eq!(path.n_hops(), 4);
2046 use tor_linkspec::HasRelayIds;
2047 assert_eq!(
2048 path.hops()[3].as_chan_target().unwrap().ed_identity(),
2049 example_target().ed_identity()
2050 );
2051 assert_ne!(
2052 path.hops()[0].as_chan_target().unwrap().ed_identity(),
2053 example_target().ed_identity()
2054 );
2055 }
2056 }
2057
2058 #[traced_test]
2059 #[test]
2060 fn test_extend_ntor() {
2061 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2062 test_extend(&rt, HandshakeType::Ntor).await;
2063 });
2064 }
2065
2066 #[traced_test]
2067 #[test]
2068 fn test_extend_ntor_v3() {
2069 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2070 test_extend(&rt, HandshakeType::NtorV3).await;
2071 });
2072 }
2073
2074 async fn bad_extend_test_impl<R: Runtime>(
2075 rt: &R,
2076 reply_hop: HopNum,
2077 bad_reply: ClientCircChanMsg,
2078 ) -> Error {
2079 let (chan, _rx, _sink) = working_fake_channel(rt);
2080 let hops = std::iter::repeat_with(|| {
2081 let peer_id = tor_linkspec::OwnedChanTarget::builder()
2082 .ed_identity([4; 32].into())
2083 .rsa_identity([5; 20].into())
2084 .build()
2085 .expect("Could not construct fake hop");
2086
2087 path::HopDetail::Relay(peer_id)
2088 })
2089 .take(3)
2090 .collect();
2091
2092 let unique_id = UniqId::new(23, 17);
2093 let (circ, mut sink) = newcirc_ext(
2094 rt,
2095 unique_id,
2096 chan,
2097 hops,
2098 reply_hop,
2099 CircParameters::default(),
2100 )
2101 .await;
2102 let params = CircParameters::default();
2103
2104 let target = example_target();
2105 #[allow(clippy::clone_on_copy)]
2106 let rtc = rt.clone();
2107 let sink_handle = rt
2108 .spawn_with_handle(async move {
2109 rtc.sleep(Duration::from_millis(100)).await;
2110 sink.send(bad_reply).await.unwrap();
2111 sink
2112 })
2113 .unwrap();
2114 let outcome = circ.extend_ntor(&target, params).await;
2115 let _sink = sink_handle.await;
2116
2117 assert_eq!(circ.n_hops().unwrap(), 3);
2118 assert!(outcome.is_err());
2119 outcome.unwrap_err()
2120 }
2121
2122 #[traced_test]
2123 #[test]
2124 fn bad_extend_wronghop() {
2125 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2126 let extended2 = relaymsg::Extended2::new(vec![]).into();
2127 let cc = rmsg_to_ccmsg(None, extended2);
2128
2129 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
2130 match error {
2135 Error::CircuitClosed => {}
2136 x => panic!("got other error: {}", x),
2137 }
2138 });
2139 }
2140
2141 #[traced_test]
2142 #[test]
2143 fn bad_extend_wrongtype() {
2144 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2145 let extended = relaymsg::Extended::new(vec![7; 200]).into();
2146 let cc = rmsg_to_ccmsg(None, extended);
2147
2148 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2149 match error {
2150 Error::BytesErr {
2151 err: tor_bytes::Error::InvalidMessage(_),
2152 object: "extended2 message",
2153 } => {}
2154 other => panic!("{:?}", other),
2155 }
2156 });
2157 }
2158
2159 #[traced_test]
2160 #[test]
2161 fn bad_extend_destroy() {
2162 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2163 let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
2164 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2165 match error {
2166 Error::CircuitClosed => {}
2167 other => panic!("{:?}", other),
2168 }
2169 });
2170 }
2171
2172 #[traced_test]
2173 #[test]
2174 fn bad_extend_crypto() {
2175 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2176 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
2177 let cc = rmsg_to_ccmsg(None, extended2);
2178 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
2179 assert!(matches!(error, Error::BadCircHandshakeAuth));
2180 });
2181 }
2182
2183 #[traced_test]
2184 #[test]
2185 fn begindir() {
2186 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2187 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2188 let (circ, mut sink) = newcirc(&rt, chan).await;
2189 let circid = circ.peek_circid();
2190
2191 let begin_and_send_fut = async move {
2192 let mut stream = circ.begin_dir_stream().await.unwrap();
2195 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
2196 stream.flush().await.unwrap();
2197 let mut buf = [0_u8; 1024];
2198 let n = stream.read(&mut buf).await.unwrap();
2199 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
2200 let n = stream.read(&mut buf).await.unwrap();
2201 assert_eq!(n, 0);
2202 stream
2203 };
2204 let reply_fut = async move {
2205 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2208 assert_eq!(id, Some(circid));
2209 let rmsg = match chmsg {
2210 AnyChanMsg::Relay(r) => {
2211 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2212 .unwrap()
2213 }
2214 other => panic!("{:?}", other),
2215 };
2216 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2217 assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
2218
2219 let connected = relaymsg::Connected::new_empty().into();
2221 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2222
2223 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2225 assert_eq!(id, Some(circid));
2226 let rmsg = match chmsg {
2227 AnyChanMsg::Relay(r) => {
2228 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2229 .unwrap()
2230 }
2231 other => panic!("{:?}", other),
2232 };
2233 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
2234 assert_eq!(streamid_2, streamid);
2235 if let AnyRelayMsg::Data(d) = rmsg {
2236 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
2237 } else {
2238 panic!();
2239 }
2240
2241 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
2243 .unwrap()
2244 .into();
2245 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2246
2247 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2249 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2250
2251 (rx, sink) };
2253
2254 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2255 });
2256 }
2257
2258 fn close_stream_helper(by_drop: bool) {
2260 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2261 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2262 let (circ, mut sink) = newcirc(&rt, chan).await;
2263
2264 let stream_fut = async move {
2265 let stream = circ
2266 .begin_stream("www.example.com", 80, None)
2267 .await
2268 .unwrap();
2269
2270 let (r, mut w) = stream.split();
2271 if by_drop {
2272 drop(r);
2274 drop(w);
2275 (None, circ) } else {
2277 w.close().await.unwrap();
2279 (Some(r), circ)
2280 }
2281 };
2282 let handler_fut = async {
2283 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2285 let rmsg = match msg {
2286 AnyChanMsg::Relay(r) => {
2287 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2288 .unwrap()
2289 }
2290 other => panic!("{:?}", other),
2291 };
2292 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2293 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2294
2295 let connected =
2297 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2298 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2299
2300 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2302 let rmsg = match msg {
2303 AnyChanMsg::Relay(r) => {
2304 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2305 .unwrap()
2306 }
2307 other => panic!("{:?}", other),
2308 };
2309 let (_, rmsg) = rmsg.into_streamid_and_msg();
2310 assert_eq!(rmsg.cmd(), RelayCmd::END);
2311
2312 (rx, sink) };
2314
2315 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2316 });
2317 }
2318
2319 #[traced_test]
2320 #[test]
2321 fn drop_stream() {
2322 close_stream_helper(true);
2323 }
2324
2325 #[traced_test]
2326 #[test]
2327 fn close_stream() {
2328 close_stream_helper(false);
2329 }
2330
2331 async fn setup_incoming_sendme_case<R: Runtime>(
2333 rt: &R,
2334 n_to_send: usize,
2335 ) -> (
2336 Arc<ClientCirc>,
2337 DataStream,
2338 CircuitRxSender,
2339 Option<StreamId>,
2340 usize,
2341 Receiver<AnyChanCell>,
2342 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2343 ) {
2344 let (chan, mut rx, sink2) = working_fake_channel(rt);
2345 let (circ, mut sink) = newcirc(rt, chan).await;
2346 let circid = circ.peek_circid();
2347
2348 let begin_and_send_fut = {
2349 let circ = circ.clone();
2350 async move {
2351 let mut stream = circ
2353 .begin_stream("www.example.com", 443, None)
2354 .await
2355 .unwrap();
2356 let junk = [0_u8; 1024];
2357 let mut remaining = n_to_send;
2358 while remaining > 0 {
2359 let n = std::cmp::min(remaining, junk.len());
2360 stream.write_all(&junk[..n]).await.unwrap();
2361 remaining -= n;
2362 }
2363 stream.flush().await.unwrap();
2364 stream
2365 }
2366 };
2367
2368 let receive_fut = async move {
2369 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2371 let rmsg = match chmsg {
2372 AnyChanMsg::Relay(r) => {
2373 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2374 .unwrap()
2375 }
2376 other => panic!("{:?}", other),
2377 };
2378 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2379 assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2380 let connected = relaymsg::Connected::new_empty().into();
2382 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2383 let mut bytes_received = 0_usize;
2385 let mut cells_received = 0_usize;
2386 while bytes_received < n_to_send {
2387 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2389 assert_eq!(id, Some(circid));
2390
2391 let rmsg = match chmsg {
2392 AnyChanMsg::Relay(r) => {
2393 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2394 .unwrap()
2395 }
2396 other => panic!("{:?}", other),
2397 };
2398 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2399 assert_eq!(streamid2, streamid);
2400 if let AnyRelayMsg::Data(dat) = rmsg {
2401 cells_received += 1;
2402 bytes_received += dat.as_ref().len();
2403 } else {
2404 panic!();
2405 }
2406 }
2407
2408 (sink, streamid, cells_received, rx)
2409 };
2410
2411 let (stream, (sink, streamid, cells_received, rx)) =
2412 futures::join!(begin_and_send_fut, receive_fut);
2413
2414 (circ, stream, sink, streamid, cells_received, rx, sink2)
2415 }
2416
2417 #[traced_test]
2418 #[test]
2419 fn accept_valid_sendme() {
2420 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2421 let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2422 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2423
2424 assert_eq!(cells_received, 301);
2425
2426 {
2428 let (tx, rx) = oneshot::channel();
2429 circ.command
2430 .unbounded_send(CtrlCmd::QuerySendWindow {
2431 hop: 2.into(),
2432 leg: circ.unique_id(),
2433 done: tx,
2434 })
2435 .unwrap();
2436 let (window, tags) = rx.await.unwrap().unwrap();
2437 assert_eq!(window, 1000 - 301);
2438 assert_eq!(tags.len(), 3);
2439 assert_eq!(
2441 tags[0],
2442 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2443 );
2444 assert_eq!(
2446 tags[1],
2447 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2448 );
2449 assert_eq!(
2451 tags[2],
2452 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2453 );
2454 }
2455
2456 let reply_with_sendme_fut = async move {
2457 let c_sendme =
2459 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2460 .into();
2461 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2462
2463 let s_sendme = relaymsg::Sendme::new_empty().into();
2465 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2466
2467 sink
2468 };
2469
2470 let _sink = reply_with_sendme_fut.await;
2471
2472 rt.advance_until_stalled().await;
2473
2474 {
2477 let (tx, rx) = oneshot::channel();
2478 circ.command
2479 .unbounded_send(CtrlCmd::QuerySendWindow {
2480 hop: 2.into(),
2481 leg: circ.unique_id(),
2482 done: tx,
2483 })
2484 .unwrap();
2485 let (window, _tags) = rx.await.unwrap().unwrap();
2486 assert_eq!(window, 1000 - 201);
2487 }
2488 });
2489 }
2490
2491 #[traced_test]
2492 #[test]
2493 fn invalid_circ_sendme() {
2494 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2495 let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2499 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2500
2501 let reply_with_sendme_fut = async move {
2502 let c_sendme =
2504 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2505 .into();
2506 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2507 sink
2508 };
2509
2510 let _sink = reply_with_sendme_fut.await;
2511
2512 rt.advance_until_stalled().await;
2514 assert!(circ.is_closing());
2515 });
2516 }
2517
2518 #[traced_test]
2519 #[test]
2520 fn test_busy_stream_fairness() {
2521 const N_STREAMS: usize = 3;
2523 const N_CELLS: usize = 20;
2525 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2528 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2535 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2536
2537 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2538 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2539 let (circ, mut sink) = newcirc(&rt, chan).await;
2540
2541 rt.spawn({
2547 let circ = circ.clone();
2550 async move {
2551 let mut clients = VecDeque::new();
2552 struct Client {
2553 stream: DataStream,
2554 to_write: &'static [u8],
2555 }
2556 for _ in 0..N_STREAMS {
2557 clients.push_back(Client {
2558 stream: circ
2559 .begin_stream("www.example.com", 80, None)
2560 .await
2561 .unwrap(),
2562 to_write: &[0_u8; N_BYTES][..],
2563 });
2564 }
2565 while let Some(mut client) = clients.pop_front() {
2566 if client.to_write.is_empty() {
2567 continue;
2569 }
2570 let written = client.stream.write(client.to_write).await.unwrap();
2571 client.to_write = &client.to_write[written..];
2572 clients.push_back(client);
2573 }
2574 }
2575 })
2576 .unwrap();
2577
2578 let channel_handler_fut = async {
2579 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2580 let mut total_bytes_received = 0;
2581
2582 loop {
2583 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2584 let rmsg = match msg {
2585 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2586 RelayCellFormat::V0,
2587 r.into_relay_body(),
2588 )
2589 .unwrap(),
2590 other => panic!("Unexpected chanmsg: {other:?}"),
2591 };
2592 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2593 match rmsg.cmd() {
2594 RelayCmd::BEGIN => {
2595 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2597 assert_eq!(prev, None);
2598 let connected = relaymsg::Connected::new_with_addr(
2600 "10.0.0.1".parse().unwrap(),
2601 1234,
2602 )
2603 .into();
2604 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2605 }
2606 RelayCmd::DATA => {
2607 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2608 let nbytes = data_msg.as_ref().len();
2609 total_bytes_received += nbytes;
2610 let streamid = streamid.unwrap();
2611 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2612 *stream_bytes += nbytes;
2613 if total_bytes_received >= N_BYTES {
2614 break;
2615 }
2616 }
2617 RelayCmd::END => {
2618 continue;
2623 }
2624 other => {
2625 panic!("Unexpected command {other:?}");
2626 }
2627 }
2628 }
2629
2630 (total_bytes_received, stream_bytes_received, rx, sink)
2633 };
2634
2635 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2636 channel_handler_fut.await;
2637 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2638 for (sid, stream_bytes) in stream_bytes_received {
2639 assert!(
2640 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2641 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2642 );
2643 }
2644 });
2645 }
2646
2647 #[test]
2648 fn basic_params() {
2649 use super::CircParameters;
2650 let mut p = CircParameters::default();
2651 assert!(p.extend_by_ed25519_id);
2652
2653 p.extend_by_ed25519_id = false;
2654 assert!(!p.extend_by_ed25519_id);
2655 }
2656
2657 #[cfg(feature = "hs-service")]
2658 struct AllowAllStreamsFilter;
2659 #[cfg(feature = "hs-service")]
2660 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2661 fn disposition(
2662 &mut self,
2663 _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2664 _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2665 ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2666 Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2667 }
2668 }
2669
2670 #[traced_test]
2671 #[test]
2672 #[cfg(feature = "hs-service")]
2673 fn allow_stream_requests_twice() {
2674 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2675 let (chan, _rx, _sink) = working_fake_channel(&rt);
2676 let (circ, _send) = newcirc(&rt, chan).await;
2677
2678 let _incoming = circ
2679 .allow_stream_requests(
2680 &[tor_cell::relaycell::RelayCmd::BEGIN],
2681 circ.last_hop().unwrap(),
2682 AllowAllStreamsFilter,
2683 )
2684 .await
2685 .unwrap();
2686
2687 let incoming = circ
2688 .allow_stream_requests(
2689 &[tor_cell::relaycell::RelayCmd::BEGIN],
2690 circ.last_hop().unwrap(),
2691 AllowAllStreamsFilter,
2692 )
2693 .await;
2694
2695 assert!(incoming.is_err());
2697 });
2698 }
2699
2700 #[traced_test]
2701 #[test]
2702 #[cfg(feature = "hs-service")]
2703 fn allow_stream_requests() {
2704 use tor_cell::relaycell::msg::BeginFlags;
2705
2706 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2707 const TEST_DATA: &[u8] = b"ping";
2708
2709 let (chan, _rx, _sink) = working_fake_channel(&rt);
2710 let (circ, mut send) = newcirc(&rt, chan).await;
2711
2712 let rfmt = RelayCellFormat::V0;
2713
2714 let (tx, rx) = oneshot::channel();
2716 let mut incoming = circ
2717 .allow_stream_requests(
2718 &[tor_cell::relaycell::RelayCmd::BEGIN],
2719 circ.last_hop().unwrap(),
2720 AllowAllStreamsFilter,
2721 )
2722 .await
2723 .unwrap();
2724
2725 let simulate_service = async move {
2726 let stream = incoming.next().await.unwrap();
2727 let mut data_stream = stream
2728 .accept_data(relaymsg::Connected::new_empty())
2729 .await
2730 .unwrap();
2731 tx.send(()).unwrap();
2733
2734 let mut buf = [0_u8; TEST_DATA.len()];
2736 data_stream.read_exact(&mut buf).await.unwrap();
2737 assert_eq!(&buf, TEST_DATA);
2738
2739 circ
2740 };
2741
2742 let simulate_client = async move {
2743 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2744 let body: BoxedCellBody =
2745 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2746 .encode(rfmt, &mut testing_rng())
2747 .unwrap();
2748 let begin_msg = chanmsg::Relay::from(body);
2749
2750 send.send(ClientCircChanMsg::Relay(begin_msg))
2752 .await
2753 .unwrap();
2754
2755 rx.await.unwrap();
2761 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2763 let body: BoxedCellBody =
2764 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2765 .encode(rfmt, &mut testing_rng())
2766 .unwrap();
2767 let data_msg = chanmsg::Relay::from(body);
2768
2769 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2770 send
2771 };
2772
2773 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2774 });
2775 }
2776
2777 #[traced_test]
2778 #[test]
2779 #[cfg(feature = "hs-service")]
2780 fn accept_stream_after_reject() {
2781 use tor_cell::relaycell::msg::BeginFlags;
2782 use tor_cell::relaycell::msg::EndReason;
2783
2784 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2785 const TEST_DATA: &[u8] = b"ping";
2786 const STREAM_COUNT: usize = 2;
2787 let rfmt = RelayCellFormat::V0;
2788
2789 let (chan, _rx, _sink) = working_fake_channel(&rt);
2790 let (circ, mut send) = newcirc(&rt, chan).await;
2791
2792 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2794
2795 let mut incoming = circ
2796 .allow_stream_requests(
2797 &[tor_cell::relaycell::RelayCmd::BEGIN],
2798 circ.last_hop().unwrap(),
2799 AllowAllStreamsFilter,
2800 )
2801 .await
2802 .unwrap();
2803
2804 let simulate_service = async move {
2805 for i in 0..STREAM_COUNT {
2807 let stream = incoming.next().await.unwrap();
2808
2809 if i == 0 {
2811 stream
2812 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2813 .await
2814 .unwrap();
2815 tx.send(()).await.unwrap();
2817 continue;
2818 }
2819
2820 let mut data_stream = stream
2821 .accept_data(relaymsg::Connected::new_empty())
2822 .await
2823 .unwrap();
2824 tx.send(()).await.unwrap();
2826
2827 let mut buf = [0_u8; TEST_DATA.len()];
2829 data_stream.read_exact(&mut buf).await.unwrap();
2830 assert_eq!(&buf, TEST_DATA);
2831 }
2832
2833 circ
2834 };
2835
2836 let simulate_client = async move {
2837 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2838 let body: BoxedCellBody =
2839 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2840 .encode(rfmt, &mut testing_rng())
2841 .unwrap();
2842 let begin_msg = chanmsg::Relay::from(body);
2843
2844 for _ in 0..STREAM_COUNT {
2847 send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2848 .await
2849 .unwrap();
2850
2851 rx.next().await.unwrap();
2853 }
2854
2855 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2857 let body: BoxedCellBody =
2858 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2859 .encode(rfmt, &mut testing_rng())
2860 .unwrap();
2861 let data_msg = chanmsg::Relay::from(body);
2862
2863 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2864 send
2865 };
2866
2867 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2868 });
2869 }
2870
2871 #[traced_test]
2872 #[test]
2873 #[cfg(feature = "hs-service")]
2874 fn incoming_stream_bad_hop() {
2875 use tor_cell::relaycell::msg::BeginFlags;
2876
2877 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2878 const EXPECTED_HOP: u8 = 1;
2880 let rfmt = RelayCellFormat::V0;
2881
2882 let (chan, _rx, _sink) = working_fake_channel(&rt);
2883 let (circ, mut send) = newcirc(&rt, chan).await;
2884
2885 let mut incoming = circ
2887 .allow_stream_requests(
2888 &[tor_cell::relaycell::RelayCmd::BEGIN],
2889 (circ.unique_id(), EXPECTED_HOP.into()).into(),
2890 AllowAllStreamsFilter,
2891 )
2892 .await
2893 .unwrap();
2894
2895 let simulate_service = async move {
2896 assert!(incoming.next().await.is_none());
2899 circ
2900 };
2901
2902 let simulate_client = async move {
2903 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2904 let body: BoxedCellBody =
2905 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2906 .encode(rfmt, &mut testing_rng())
2907 .unwrap();
2908 let begin_msg = chanmsg::Relay::from(body);
2909
2910 send.send(ClientCircChanMsg::Relay(begin_msg))
2912 .await
2913 .unwrap();
2914
2915 send
2916 };
2917
2918 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2919 });
2920 }
2921
2922 #[traced_test]
2923 #[test]
2924 #[cfg(feature = "conflux")]
2925 fn multipath_circ_validation() {
2926 use std::error::Error as _;
2927
2928 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2929 let params = CircParameters::default();
2930 let invalid_tunnels = [
2931 setup_bad_conflux_tunnel(&rt).await,
2932 setup_conflux_tunnel(&rt, true, params).await,
2933 ];
2934
2935 for tunnel in invalid_tunnels {
2936 let TestTunnelCtx {
2937 tunnel: _tunnel,
2938 circs: _circs,
2939 conflux_link_rx,
2940 } = tunnel;
2941
2942 let conflux_hs_err = conflux_link_rx.await.unwrap().unwrap_err();
2943 let err_src = conflux_hs_err.source().unwrap();
2944
2945 assert!(err_src
2948 .to_string()
2949 .contains("one more more conflux circuits are invalid"));
2950 }
2951 });
2952 }
2953
2954 #[derive(Debug)]
2958 #[allow(unused)]
2959 #[cfg(feature = "conflux")]
2960 struct TestCircuitCtx {
2961 chan_rx: Receiver<AnyChanCell>,
2962 chan_tx: Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2963 circ_tx: CircuitRxSender,
2964 unique_id: UniqId,
2965 }
2966
2967 #[derive(Debug)]
2968 #[cfg(feature = "conflux")]
2969 struct TestTunnelCtx {
2970 tunnel: Arc<ClientCirc>,
2971 circs: Vec<TestCircuitCtx>,
2972 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
2973 }
2974
2975 #[cfg(feature = "conflux")]
2977 async fn await_link_payload(rx: &mut Receiver<AnyChanCell>) -> ConfluxLink {
2978 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2980 let rmsg = match chmsg {
2981 AnyChanMsg::Relay(r) => {
2982 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2983 .unwrap()
2984 }
2985 other => panic!("{:?}", other),
2986 };
2987 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2988
2989 let link = match rmsg {
2990 AnyRelayMsg::ConfluxLink(link) => link,
2991 _ => panic!("unexpected relay message {rmsg:?}"),
2992 };
2993
2994 assert!(streamid.is_none());
2995
2996 link
2997 }
2998
2999 #[cfg(feature = "conflux")]
3000 async fn setup_conflux_tunnel(
3001 rt: &MockRuntime,
3002 same_hops: bool,
3003 params: CircParameters,
3004 ) -> TestTunnelCtx {
3005 let hops1 = hop_details(3, 0);
3006 let hops2 = if same_hops {
3007 hops1.clone()
3008 } else {
3009 hop_details(3, 10)
3010 };
3011
3012 let (chan1, rx1, chan_sink1) = working_fake_channel(rt);
3013 let (circ1, sink1) = newcirc_ext(
3014 rt,
3015 UniqId::new(1, 3),
3016 chan1,
3017 hops1,
3018 2.into(),
3019 params.clone(),
3020 )
3021 .await;
3022
3023 let (chan2, rx2, chan_sink2) = working_fake_channel(rt);
3024
3025 let (circ2, sink2) =
3026 newcirc_ext(rt, UniqId::new(2, 4), chan2, hops2, 2.into(), params).await;
3027
3028 let (answer_tx, answer_rx) = oneshot::channel();
3029 circ2
3030 .command
3031 .unbounded_send(CtrlCmd::ShutdownAndReturnCircuit { answer: answer_tx })
3032 .unwrap();
3033
3034 let circuit = answer_rx.await.unwrap().unwrap();
3035 rt.advance_until_stalled().await;
3037 assert!(circ2.is_closing());
3038
3039 let (conflux_link_tx, conflux_link_rx) = oneshot::channel();
3040 circ1
3042 .control
3043 .unbounded_send(CtrlMsg::LinkCircuits {
3044 circuits: vec![circuit],
3045 answer: conflux_link_tx,
3046 })
3047 .unwrap();
3048
3049 let circ_ctx1 = TestCircuitCtx {
3050 chan_rx: rx1,
3051 chan_tx: chan_sink1,
3052 circ_tx: sink1,
3053 unique_id: circ1.unique_id(),
3054 };
3055
3056 let circ_ctx2 = TestCircuitCtx {
3057 chan_rx: rx2,
3058 chan_tx: chan_sink2,
3059 circ_tx: sink2,
3060 unique_id: circ2.unique_id(),
3061 };
3062
3063 TestTunnelCtx {
3064 tunnel: circ1,
3065 circs: vec![circ_ctx1, circ_ctx2],
3066 conflux_link_rx,
3067 }
3068 }
3069
3070 #[cfg(feature = "conflux")]
3071 async fn setup_good_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3072 let same_hops = true;
3078 let params = CircParameters::new(true, build_cc_vegas_params());
3079 setup_conflux_tunnel(rt, same_hops, params).await
3080 }
3081
3082 #[cfg(feature = "conflux")]
3083 async fn setup_bad_conflux_tunnel(rt: &MockRuntime) -> TestTunnelCtx {
3084 let same_hops = false;
3088 let params = CircParameters::new(true, build_cc_vegas_params());
3089 setup_conflux_tunnel(rt, same_hops, params).await
3090 }
3091
3092 #[traced_test]
3093 #[test]
3094 #[cfg(feature = "conflux")]
3095 fn reject_conflux_linked_before_hs() {
3096 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3097 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3098 let (circ, mut sink) = newcirc(&rt, chan).await;
3099
3100 let nonce = V1Nonce::new(&mut testing_rng());
3101 let payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3102 let linked = relaymsg::ConfluxLinked::new(payload).into();
3104 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3105
3106 rt.advance_until_stalled().await;
3107 assert!(circ.is_closing());
3108 });
3109 }
3110
3111 #[traced_test]
3112 #[test]
3113 #[cfg(feature = "conflux")]
3114 fn conflux_hs_timeout() {
3115 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3116 let TestTunnelCtx {
3117 tunnel: _tunnel,
3118 circs,
3119 conflux_link_rx,
3120 } = setup_good_conflux_tunnel(&rt).await;
3121
3122 let [mut circ1, _circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3123
3124 let link = await_link_payload(&mut circ1.chan_rx).await;
3126
3127 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3129 circ1
3130 .circ_tx
3131 .send(rmsg_to_ccmsg(None, linked))
3132 .await
3133 .unwrap();
3134
3135 rt.advance_by(Duration::from_secs(60)).await;
3137
3138 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3139
3140 let [res1, res2]: [StdResult<(), ConfluxHandshakeError>; 2] =
3142 conflux_hs_res.try_into().unwrap();
3143
3144 assert!(res1.is_ok());
3145
3146 let err = res2.unwrap_err();
3147 assert!(matches!(err, ConfluxHandshakeError::Timeout), "{err:?}");
3148 });
3149 }
3150
3151 #[traced_test]
3152 #[test]
3153 #[cfg(feature = "conflux")]
3154 fn conflux_bad_hs() {
3155 use crate::util::err::ConfluxHandshakeError;
3156
3157 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3158 let nonce = V1Nonce::new(&mut testing_rng());
3159 let bad_link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3160 let bad_hs_responses = [
3162 (
3163 rmsg_to_ccmsg(
3164 None,
3165 relaymsg::ConfluxLinked::new(bad_link_payload.clone()).into(),
3166 ),
3167 "Received CONFLUX_LINKED cell with mismatched nonce",
3168 ),
3169 (
3170 rmsg_to_ccmsg(None, relaymsg::ConfluxLink::new(bad_link_payload).into()),
3171 "Unexpected CONFLUX_LINK cell from hop #3 on client circuit",
3172 ),
3173 (
3174 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3175 "Received CONFLUX_SWITCH on unlinked circuit?!",
3176 ),
3177 ];
3186
3187 for (bad_cell, expected_err) in bad_hs_responses {
3188 let TestTunnelCtx {
3189 tunnel,
3190 circs,
3191 conflux_link_rx,
3192 } = setup_good_conflux_tunnel(&rt).await;
3193
3194 let [mut _circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3195
3196 circ2.circ_tx.send(bad_cell).await.unwrap();
3198
3199 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3200 let [res2]: [StdResult<(), ConfluxHandshakeError>; 1] =
3204 conflux_hs_res.try_into().unwrap();
3205
3206 match res2.unwrap_err() {
3207 ConfluxHandshakeError::Link(Error::CircProto(e)) => {
3208 assert_eq!(e, expected_err);
3209 }
3210 e => panic!("unexpected error: {e:?}"),
3211 }
3212
3213 assert!(tunnel.is_closing());
3214 }
3215 });
3216 }
3217
3218 #[traced_test]
3219 #[test]
3220 #[cfg(feature = "conflux")]
3221 fn unexpected_conflux_cell() {
3222 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3223 let nonce = V1Nonce::new(&mut testing_rng());
3224 let link_payload = V1LinkPayload::new(nonce, V1DesiredUx::NO_OPINION);
3225 let bad_cells = [
3226 rmsg_to_ccmsg(
3227 None,
3228 relaymsg::ConfluxLinked::new(link_payload.clone()).into(),
3229 ),
3230 rmsg_to_ccmsg(
3231 None,
3232 relaymsg::ConfluxLink::new(link_payload.clone()).into(),
3233 ),
3234 rmsg_to_ccmsg(None, relaymsg::ConfluxSwitch::new(0).into()),
3235 ];
3236
3237 for bad_cell in bad_cells {
3238 let (chan, mut _rx, _sink) = working_fake_channel(&rt);
3239 let (circ, mut sink) = newcirc(&rt, chan).await;
3240
3241 sink.send(bad_cell).await.unwrap();
3242 rt.advance_until_stalled().await;
3243
3244 assert!(circ.is_closing());
3248 }
3249 });
3250 }
3251
3252 #[traced_test]
3253 #[test]
3254 #[cfg(feature = "conflux")]
3255 fn conflux_bad_linked() {
3256 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3257 let TestTunnelCtx {
3258 tunnel,
3259 circs,
3260 conflux_link_rx: _,
3261 } = setup_good_conflux_tunnel(&rt).await;
3262
3263 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3264
3265 let link = await_link_payload(&mut circ1.chan_rx).await;
3266
3267 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3269 circ1
3270 .circ_tx
3271 .send(rmsg_to_ccmsg(None, linked))
3272 .await
3273 .unwrap();
3274
3275 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3277 circ2
3278 .circ_tx
3279 .send(rmsg_to_ccmsg(None, linked))
3280 .await
3281 .unwrap();
3282 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3283 circ2
3284 .circ_tx
3285 .send(rmsg_to_ccmsg(None, linked))
3286 .await
3287 .unwrap();
3288
3289 rt.advance_until_stalled().await;
3290
3291 assert!(tunnel.is_closing());
3294 });
3295 }
3296
3297 #[traced_test]
3298 #[test]
3299 #[cfg(feature = "conflux")]
3300 fn conflux_bad_switch() {
3301 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3302 let bad_switch = [
3303 relaymsg::ConfluxSwitch::new(0),
3305 ];
3314
3315 for bad_cell in bad_switch {
3316 let TestTunnelCtx {
3317 tunnel,
3318 circs,
3319 conflux_link_rx,
3320 } = setup_good_conflux_tunnel(&rt).await;
3321
3322 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3323
3324 let link = await_link_payload(&mut circ1.chan_rx).await;
3325
3326 for circ in [&mut circ1, &mut circ2] {
3328 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3329 circ.circ_tx
3330 .send(rmsg_to_ccmsg(None, linked))
3331 .await
3332 .unwrap();
3333 }
3334
3335 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3336 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3337
3338 for circ in [&mut circ1, &mut circ2] {
3342 let msg = rmsg_to_ccmsg(None, bad_cell.clone().into());
3343 circ.circ_tx.send(msg).await.unwrap();
3344 }
3345
3346 rt.advance_until_stalled().await;
3348 assert!(tunnel.is_closing());
3349 }
3350 });
3351 }
3352
3353 #[cfg(feature = "conflux")]
3357 #[derive(Debug)]
3358 enum ConfluxTestEndpoint<I: Iterator<Item = Option<Duration>>> {
3359 Relay(ConfluxExitState<I>),
3361 Client {
3363 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3365 tunnel: Arc<ClientCirc>,
3367 send_data: Vec<u8>,
3369 recv_data: Vec<u8>,
3371 },
3372 }
3373
3374 #[allow(unused, clippy::large_enum_variant)]
3377 #[derive(Debug)]
3378 #[cfg(feature = "conflux")]
3379 enum ConfluxEndpointResult {
3380 Circuit {
3381 tunnel: Arc<ClientCirc>,
3382 stream: DataStream,
3383 },
3384 Relay {
3385 circ: TestCircuitCtx,
3386 },
3387 }
3388
3389 #[derive(Debug)]
3391 #[cfg(feature = "conflux")]
3392 struct ConfluxStreamState {
3393 data_recvd: Vec<u8>,
3395 expected_data_len: usize,
3397 begin_recvd: bool,
3399 end_recvd: bool,
3401 end_sent: bool,
3403 }
3404
3405 #[derive(Debug)]
3408 #[cfg(feature = "conflux")]
3409 struct ExpectedSwitch {
3410 cells_so_far: usize,
3413 seqno: u32,
3415 }
3416
3417 #[derive(Debug)]
3419 #[cfg(feature = "conflux")]
3420 struct ConfluxExitState<I: Iterator<Item = Option<Duration>>> {
3421 runtime: Arc<AsyncMutex<MockRuntime>>,
3423 tunnel: Arc<ClientCirc>,
3425 circ: TestCircuitCtx,
3427 rtt_delays: I,
3431 stream_state: Arc<Mutex<ConfluxStreamState>>,
3434 expect_switch: Vec<ExpectedSwitch>,
3437 done_rx: oneshot::Receiver<()>,
3439 done_tx: oneshot::Sender<()>,
3441 is_sending_leg: bool,
3443 }
3444
3445 #[cfg(feature = "conflux")]
3446 async fn good_exit_handshake(
3447 runtime: &Arc<AsyncMutex<MockRuntime>>,
3448 init_rtt_delay: Option<Duration>,
3449 rx: &mut Receiver<ChanCell<AnyChanMsg>>,
3450 sink: &mut CircuitRxSender,
3451 ) {
3452 let link = await_link_payload(rx).await;
3454
3455 if let Some(init_rtt_delay) = init_rtt_delay {
3458 runtime.lock().await.advance_by(init_rtt_delay).await;
3459 }
3460
3461 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3463 sink.send(rmsg_to_ccmsg(None, linked)).await.unwrap();
3464
3465 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
3467 let rmsg = match chmsg {
3468 AnyChanMsg::Relay(r) => {
3469 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3470 .unwrap()
3471 }
3472 other => panic!("{other:?}"),
3473 };
3474 let (_streamid, rmsg) = rmsg.into_streamid_and_msg();
3475
3476 assert!(matches!(rmsg, AnyRelayMsg::ConfluxLinkedAck(_)));
3477 }
3478
3479 #[cfg(feature = "conflux")]
3480 async fn run_mock_conflux_exit<I: Iterator<Item = Option<Duration>>>(
3481 state: ConfluxExitState<I>,
3482 ) -> ConfluxEndpointResult {
3483 let ConfluxExitState {
3484 runtime,
3485 tunnel,
3486 mut circ,
3487 rtt_delays,
3488 stream_state,
3489 mut expect_switch,
3490 done_tx,
3491 mut done_rx,
3492 is_sending_leg,
3493 } = state;
3494
3495 let mut rtt_delays = rtt_delays.into_iter();
3496
3497 let stream_len = stream_state.lock().unwrap().expected_data_len;
3499 let mut data_cells_received = 0_usize;
3500 let mut cell_count = 0_usize;
3501 let mut tags = vec![];
3502 let mut streamid = None;
3503
3504 while stream_state.lock().unwrap().data_recvd.len() < stream_len {
3505 use futures::select;
3506
3507 let res = select! {
3510 res = circ.chan_rx.next() => {
3511 res.unwrap()
3512 },
3513 res = done_rx => {
3514 res.unwrap();
3515 break;
3516 }
3517 };
3518
3519 let (_id, chmsg) = res.into_circid_and_msg();
3520 cell_count += 1;
3521 let rmsg = match chmsg {
3522 AnyChanMsg::Relay(r) => {
3523 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
3524 .unwrap()
3525 }
3526 other => panic!("{:?}", other),
3527 };
3528 let (new_streamid, rmsg) = rmsg.into_streamid_and_msg();
3529 if streamid.is_none() {
3530 streamid = new_streamid;
3531 }
3532
3533 let begin_recvd = stream_state.lock().unwrap().begin_recvd;
3534 let end_recvd = stream_state.lock().unwrap().end_recvd;
3535 match rmsg {
3536 AnyRelayMsg::Begin(_) if begin_recvd => {
3537 panic!("client tried to open two streams?!");
3538 }
3539 AnyRelayMsg::Begin(_) if !begin_recvd => {
3540 stream_state.lock().unwrap().begin_recvd = true;
3541 let connected = relaymsg::Connected::new_empty().into();
3543 circ.circ_tx
3544 .send(rmsg_to_ccmsg(streamid, connected))
3545 .await
3546 .unwrap();
3547 }
3548 AnyRelayMsg::End(_) if !end_recvd => {
3549 stream_state.lock().unwrap().end_recvd = true;
3550 break;
3551 }
3552 AnyRelayMsg::End(_) if end_recvd => {
3553 panic!("received two END cells for the same stream?!");
3554 }
3555 AnyRelayMsg::ConfluxSwitch(cell) => {
3556 let expected = expect_switch.remove(0);
3558
3559 assert_eq!(expected.cells_so_far, cell_count);
3560 assert_eq!(expected.seqno, cell.seqno());
3561
3562 continue;
3568 }
3569 AnyRelayMsg::Data(dat) => {
3570 data_cells_received += 1;
3571 stream_state
3572 .lock()
3573 .unwrap()
3574 .data_recvd
3575 .extend_from_slice(dat.as_ref());
3576
3577 let is_next_cell_sendme = data_cells_received % 31 == 0;
3578 if is_next_cell_sendme {
3579 if tags.is_empty() {
3580 runtime.lock().await.advance_until_stalled().await;
3585 let (tx, rx) = oneshot::channel();
3586 tunnel
3587 .command
3588 .unbounded_send(CtrlCmd::QuerySendWindow {
3589 hop: 2.into(),
3590 leg: circ.unique_id,
3591 done: tx,
3592 })
3593 .unwrap();
3594
3595 let (_window, new_tags) = rx.await.unwrap().unwrap();
3597 tags = new_tags;
3598 }
3599
3600 let tag = tags.remove(0);
3601
3602 if let Some(rtt_delay) = rtt_delays.next().flatten() {
3605 runtime.lock().await.advance_by(rtt_delay).await;
3606 }
3607 let sendme = relaymsg::Sendme::from(tag).into();
3609
3610 circ.circ_tx
3611 .send(rmsg_to_ccmsg(None, sendme))
3612 .await
3613 .unwrap();
3614 }
3615 }
3616 _ => panic!("unexpected message {rmsg:?} on leg {}", circ.unique_id),
3617 }
3618 }
3619
3620 let end_recvd = stream_state.lock().unwrap().end_recvd;
3621
3622 if is_sending_leg && !end_recvd {
3624 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
3625 circ.circ_tx
3626 .send(rmsg_to_ccmsg(streamid, end))
3627 .await
3628 .unwrap();
3629 stream_state.lock().unwrap().end_sent = true;
3630 }
3631
3632 let _ = done_tx.send(());
3634
3635 assert!(
3637 expect_switch.is_empty(),
3638 "expect_switch = {expect_switch:?}"
3639 );
3640
3641 ConfluxEndpointResult::Relay { circ }
3642 }
3643
3644 #[cfg(feature = "conflux")]
3645 async fn run_conflux_client(
3646 tunnel: Arc<ClientCirc>,
3647 conflux_link_rx: oneshot::Receiver<Result<ConfluxHandshakeResult>>,
3648 send_data: Vec<u8>,
3649 recv_data: Vec<u8>,
3650 ) -> ConfluxEndpointResult {
3651 let res = conflux_link_rx.await;
3652
3653 let res = res.unwrap().unwrap();
3654 assert_eq!(res.len(), 2);
3655
3656 let mut stream = tunnel
3661 .begin_stream("www.example.com", 443, None)
3662 .await
3663 .unwrap();
3664
3665 stream.write_all(&send_data).await.unwrap();
3666 stream.flush().await.unwrap();
3667
3668 let mut recv: Vec<u8> = Vec::new();
3669 let recv_len = stream.read_to_end(&mut recv).await.unwrap();
3670 assert_eq!(recv_len, recv_data.len());
3671 assert_eq!(recv_data, recv);
3672
3673 ConfluxEndpointResult::Circuit { tunnel, stream }
3674 }
3675
3676 #[cfg(feature = "conflux")]
3677 async fn run_conflux_endpoint<I: Iterator<Item = Option<Duration>>>(
3678 endpoint: ConfluxTestEndpoint<I>,
3679 ) -> ConfluxEndpointResult {
3680 match endpoint {
3681 ConfluxTestEndpoint::Relay(state) => run_mock_conflux_exit(state).await,
3682 ConfluxTestEndpoint::Client {
3683 tunnel,
3684 conflux_link_rx,
3685 send_data,
3686 recv_data,
3687 } => run_conflux_client(tunnel, conflux_link_rx, send_data, recv_data).await,
3688 }
3689 }
3690
3691 #[traced_test]
3692 #[test]
3693 #[cfg(feature = "conflux")]
3694 fn multipath_stream() {
3695 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3696 const NUM_CELLS: usize = 300;
3698 const CELL_SIZE: usize = 498;
3700
3701 let TestTunnelCtx {
3702 tunnel,
3703 circs,
3704 conflux_link_rx,
3705 } = setup_good_conflux_tunnel(&rt).await;
3706 let [circ1, circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3707
3708 let mut send_data = (0..255_u8)
3710 .cycle()
3711 .take(NUM_CELLS * CELL_SIZE)
3712 .collect::<Vec<_>>();
3713 let stream_state = Arc::new(Mutex::new(ConfluxStreamState {
3714 data_recvd: vec![],
3715 expected_data_len: send_data.len(),
3716 begin_recvd: false,
3717 end_recvd: false,
3718 end_sent: false,
3719 }));
3720
3721 let mut tasks = vec![];
3722
3723 let (tx1, rx1) = oneshot::channel();
3726 let (tx2, rx2) = oneshot::channel();
3727
3728 let circ1_rtt_delays = [
3733 Some(Duration::from_millis(100)),
3735 Some(Duration::from_millis(500)),
3739 Some(Duration::from_millis(700)),
3740 Some(Duration::from_millis(900)),
3741 Some(Duration::from_millis(1100)),
3742 Some(Duration::from_millis(1300)),
3743 Some(Duration::from_millis(1500)),
3744 Some(Duration::from_millis(1700)),
3745 Some(Duration::from_millis(1900)),
3746 Some(Duration::from_millis(2100)),
3747 ]
3748 .into_iter();
3749
3750 let circ2_rtt_delays = [
3751 Some(Duration::from_millis(200)),
3752 Some(Duration::from_millis(400)),
3753 Some(Duration::from_millis(600)),
3754 Some(Duration::from_millis(800)),
3755 Some(Duration::from_millis(1000)),
3756 Some(Duration::from_millis(1200)),
3757 Some(Duration::from_millis(1400)),
3758 Some(Duration::from_millis(1600)),
3759 Some(Duration::from_millis(1800)),
3760 Some(Duration::from_millis(2000)),
3761 ]
3762 .into_iter();
3763
3764 let relays = [
3769 (
3770 circ1,
3771 tx1,
3772 rx2,
3773 vec![ExpectedSwitch {
3774 cells_so_far: 126,
3782 seqno: 124,
3791 }],
3792 circ1_rtt_delays,
3793 true,
3794 ),
3795 (
3796 circ2,
3797 tx2,
3798 rx1,
3799 vec![ExpectedSwitch {
3800 cells_so_far: 1,
3803 seqno: 125,
3805 }],
3806 circ2_rtt_delays,
3807 false,
3808 ),
3809 ];
3810
3811 let relay_runtime = Arc::new(AsyncMutex::new(rt.clone()));
3812 for (mut circ, done_tx, done_rx, expect_switch, mut rtt_delays, is_sending_leg) in
3813 relays.into_iter()
3814 {
3815 let leg = circ.unique_id;
3816
3817 good_exit_handshake(
3825 &relay_runtime,
3826 rtt_delays.next().flatten(),
3827 &mut circ.chan_rx,
3828 &mut circ.circ_tx,
3829 )
3830 .await;
3831
3832 let relay = ConfluxTestEndpoint::Relay(ConfluxExitState {
3833 runtime: Arc::clone(&relay_runtime),
3834 tunnel: Arc::clone(&tunnel),
3835 circ,
3836 rtt_delays,
3837 stream_state: Arc::clone(&stream_state),
3838 expect_switch,
3839 done_tx,
3840 done_rx,
3841 is_sending_leg,
3842 });
3843
3844 tasks.push(rt.spawn_join(format!("relay task {leg}"), run_conflux_endpoint(relay)));
3845 }
3846
3847 tasks.push(rt.spawn_join(
3848 "client task".to_string(),
3849 run_conflux_endpoint(ConfluxTestEndpoint::Client {
3850 tunnel,
3851 conflux_link_rx,
3852 send_data: send_data.clone(),
3853 recv_data: vec![],
3854 }),
3855 ));
3856 let _sinks = futures::future::join_all(tasks).await;
3857 let mut stream_state = stream_state.lock().unwrap();
3858 assert!(stream_state.begin_recvd);
3859
3860 stream_state.data_recvd.sort();
3861 send_data.sort();
3862 assert_eq!(stream_state.data_recvd, send_data);
3863 });
3864 }
3865
3866 #[traced_test]
3867 #[test]
3868 #[cfg(all(feature = "conflux", feature = "hs-service"))]
3869 fn conflux_incoming_stream() {
3870 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
3871 use std::error::Error as _;
3872
3873 const EXPECTED_HOP: u8 = 1;
3874
3875 let TestTunnelCtx {
3876 tunnel,
3877 circs,
3878 conflux_link_rx,
3879 } = setup_good_conflux_tunnel(&rt).await;
3880
3881 let [mut circ1, mut circ2]: [TestCircuitCtx; 2] = circs.try_into().unwrap();
3882
3883 let link = await_link_payload(&mut circ1.chan_rx).await;
3884 for circ in [&mut circ1, &mut circ2] {
3885 let linked = relaymsg::ConfluxLinked::new(link.payload().clone()).into();
3886 circ.circ_tx
3887 .send(rmsg_to_ccmsg(None, linked))
3888 .await
3889 .unwrap();
3890 }
3891
3892 let conflux_hs_res = conflux_link_rx.await.unwrap().unwrap();
3893 assert!(conflux_hs_res.iter().all(|res| res.is_ok()));
3894
3895 let err = tunnel
3897 .allow_stream_requests(
3898 &[tor_cell::relaycell::RelayCmd::BEGIN],
3899 (tunnel.unique_id(), EXPECTED_HOP.into()).into(),
3900 AllowAllStreamsFilter,
3901 )
3902 .await
3903 .map(|_| ())
3905 .unwrap_err();
3906
3907 let err_src = err.source().unwrap();
3908 assert!(err_src
3909 .to_string()
3910 .contains("Cannot allow stream requests on tunnel with 2 legs"));
3911 });
3912 }
3913
3914 }