1pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55 AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56 StreamReader,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61 CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{LegId, StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use tor_cell::{
69 chancell::CircId,
70 relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71};
72
73use tor_error::{bad_api_usage, internal, into_internal};
74use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75use tor_protover::named;
76
77pub use crate::crypto::binding::CircuitBinding;
78pub use crate::memquota::StreamAccount;
79pub use crate::tunnel::circuit::unique_id::UniqId;
80
81#[cfg(feature = "hs-service")]
82use {
83 crate::stream::{IncomingCmdChecker, IncomingStream},
84 crate::tunnel::reactor::StreamReqInfo,
85};
86
87use futures::channel::mpsc;
88use oneshot_fused_workaround as oneshot;
89
90use crate::congestion::sendme::StreamRecvWindow;
91use crate::DynTimeProvider;
92use futures::FutureExt as _;
93use std::collections::HashMap;
94use std::net::IpAddr;
95use std::sync::{Arc, Mutex};
96use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97
98use crate::crypto::handshake::ntor::NtorPublicKey;
99
100pub use path::{Path, PathEntry};
101
102pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104
105#[cfg(feature = "send-control-msg")]
106use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107
108pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109#[cfg(feature = "send-control-msg")]
110#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112
113pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117
118pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122
123#[derive(Debug)]
124pub struct ClientCirc {
167 mutable: Arc<TunnelMutableState>,
169 unique_id: UniqId,
171 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179 #[cfg(test)]
181 circid: CircId,
182 memquota: CircuitAccount,
184 time_provider: DynTimeProvider,
186}
187
188#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, (LegId, Arc<MutableState>)>>);
209
210impl TunnelMutableState {
211 pub(super) fn insert(&self, unique_id: UniqId, leg: LegId, mutable: Arc<MutableState>) {
213 #[allow(unused)] let state = self
215 .0
216 .lock()
217 .expect("lock poisoned")
218 .insert(unique_id, (leg, mutable));
219
220 debug_assert!(state.is_none());
221 }
222
223 pub(super) fn remove(&self, unique_id: UniqId) {
225 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228 debug_assert!(state.is_some());
229 }
230
231 fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235 let lock = self.0.lock().expect("lock poisoned");
236 let (_leg, mutable) = lock
237 .get(&unique_id)
238 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239
240 Ok(mutable.path())
241 }
242
243 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248 let lock = self.0.lock().expect("lock poisoned");
249 let (_leg, mutable) = lock
250 .get(&unique_id)
251 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252
253 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254 path::HopDetail::Relay(r) => r,
255 #[cfg(feature = "hs-common")]
256 path::HopDetail::Virtual => {
257 panic!("somehow made a circuit with a virtual first hop.")
258 }
259 });
260
261 Ok(first_hop)
262 }
263
264 fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270 let lock = self.0.lock().expect("lock poisoned");
271 let (_leg, mutable) = lock
272 .get(&unique_id)
273 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274
275 Ok(mutable.last_hop_num())
276 }
277
278 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282 let lock = self.0.lock().expect("lock poisoned");
283 let (_leg, mutable) = lock
284 .get(&unique_id)
285 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286
287 Ok(mutable.n_hops())
288 }
289
290 fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293 let lock = self.0.lock().expect("lock poisoned");
294 let (_leg, mutable) = lock
295 .get(&unique_id)
296 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298 Ok(mutable.binding_key(hop))
299 }
300}
301
302#[derive(Educe, Default)]
304#[educe(Debug)]
305pub(super) struct MutableState(Mutex<CircuitState>);
306
307impl MutableState {
308 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
310 let mut mutable = self.0.lock().expect("poisoned lock");
311 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
312 mutable.binding.push(binding);
313 }
314
315 pub(super) fn path(&self) -> Arc<path::Path> {
317 let mutable = self.0.lock().expect("poisoned lock");
318 Arc::clone(&mutable.path)
319 }
320
321 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
324 let mutable = self.0.lock().expect("poisoned lock");
325
326 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
327 }
330
331 fn first_hop(&self) -> Option<HopDetail> {
333 let mutable = self.0.lock().expect("poisoned lock");
334 mutable.path.first_hop()
335 }
336
337 fn last_hop_num(&self) -> Option<HopNum> {
344 let mutable = self.0.lock().expect("poisoned lock");
345 mutable.path.last_hop_num()
346 }
347
348 fn n_hops(&self) -> usize {
355 let mutable = self.0.lock().expect("poisoned lock");
356 mutable.path.n_hops()
357 }
358}
359
360#[derive(Educe, Default)]
362#[educe(Debug)]
363pub(super) struct CircuitState {
364 path: Arc<path::Path>,
370
371 #[educe(Debug(ignore))]
379 binding: Vec<Option<CircuitBinding>>,
380}
381
382pub struct PendingClientCirc {
387 recvcreated: oneshot::Receiver<CreateResponse>,
390 circ: Arc<ClientCirc>,
392}
393
394#[non_exhaustive]
396#[derive(Clone, Debug)]
397pub struct CircParameters {
398 pub extend_by_ed25519_id: bool,
401 pub ccontrol: CongestionControlParams,
403}
404
405#[cfg(test)]
406impl std::default::Default for CircParameters {
407 fn default() -> Self {
408 Self {
409 extend_by_ed25519_id: true,
410 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
411 }
412 }
413}
414
415impl CircParameters {
416 pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
418 Self {
419 extend_by_ed25519_id,
420 ccontrol,
421 }
422 }
423}
424
425impl ClientCirc {
426 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
434 Ok(self
435 .mutable
436 .first_hop(self.unique_id)
437 .map_err(|_| Error::CircuitClosed)?
438 .expect("called first_hop on an un-constructed circuit"))
439 }
440
441 pub fn last_hop_num(&self) -> Result<HopNum> {
451 Ok(self
452 .mutable
453 .last_hop_num(self.unique_id)?
454 .ok_or_else(|| internal!("no last hop index"))?)
455 }
456
457 pub fn path_ref(&self) -> Result<Arc<Path>> {
462 self.mutable
463 .path_ref(self.unique_id)
464 .map_err(|_| Error::CircuitClosed)
465 }
466
467 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
471 let (tx, rx) = oneshot::channel();
472
473 self.control
474 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
475 .map_err(|_| Error::CircuitClosed)?;
476
477 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
478 }
479
480 pub fn mq_account(&self) -> &CircuitAccount {
482 &self.memquota
483 }
484
485 pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
493 self.mutable
494 .binding_key(self.unique_id, hop)
495 .map_err(|_| Error::CircuitClosed)
496 }
497
498 #[cfg(feature = "send-control-msg")]
576 pub async fn start_conversation(
577 &self,
578 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
579 reply_handler: impl MsgHandler + Send + 'static,
580 hop_num: HopNum,
581 ) -> Result<Conversation<'_>> {
582 let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
583 let conversation = Conversation(self);
584 conversation.send_internal(msg, Some(handler)).await?;
585 Ok(conversation)
586 }
587
588 #[cfg(feature = "send-control-msg")]
594 pub async fn send_raw_msg(
595 &self,
596 msg: tor_cell::relaycell::msg::AnyRelayMsg,
597 hop_num: HopNum,
598 ) -> Result<()> {
599 let (sender, receiver) = oneshot::channel();
600 let ctrl_msg = CtrlMsg::SendMsg {
601 hop_num,
602 msg,
603 sender,
604 };
605 self.control
606 .unbounded_send(ctrl_msg)
607 .map_err(|_| Error::CircuitClosed)?;
608
609 receiver.await.map_err(|_| Error::CircuitClosed)?
610 }
611
612 #[cfg(feature = "hs-service")]
641 pub async fn allow_stream_requests(
642 self: &Arc<ClientCirc>,
643 allow_commands: &[tor_cell::relaycell::RelayCmd],
644 hop_num: HopNum,
645 filter: impl crate::stream::IncomingStreamRequestFilter,
646 ) -> Result<impl futures::Stream<Item = IncomingStream>> {
647 use futures::stream::StreamExt;
648
649 use crate::tunnel::HopLocation;
650
651 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
653
654 let time_prov = self.time_provider.clone();
655 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
656 let (incoming_sender, incoming_receiver) =
657 MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
658 let (tx, rx) = oneshot::channel();
659
660 self.command
661 .unbounded_send(CtrlCmd::AwaitStreamRequest {
662 cmd_checker,
663 incoming_sender,
664 hop_num,
665 done: tx,
666 filter: Box::new(filter),
667 })
668 .map_err(|_| Error::CircuitClosed)?;
669
670 rx.await.map_err(|_| Error::CircuitClosed)??;
672
673 let allowed_hop_num = hop_num;
677
678 let circ = Arc::clone(self);
679 Ok(incoming_receiver.map(move |req_ctx| {
680 let StreamReqInfo {
681 req,
682 stream_id,
683 hop_num,
684 leg,
685 receiver,
686 msg_tx,
687 memquota,
688 relay_cell_format,
689 } = req_ctx;
690
691 assert_eq!(allowed_hop_num, hop_num);
696
697 let target = StreamTarget {
704 circ: Arc::clone(&circ),
705 tx: msg_tx,
706 hop: HopLocation::Hop((leg, hop_num)),
707 stream_id,
708 relay_cell_format,
709 };
710
711 let reader = StreamReader {
712 target: target.clone(),
713 receiver,
714 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
715 ended: false,
716 };
717
718 IncomingStream::new(req, target, reader, memquota)
719 }))
720 }
721
722 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
725 where
726 Tg: CircTarget,
727 {
728 if target
740 .protovers()
741 .supports_named_subver(named::RELAY_NTORV3)
742 {
743 self.extend_ntor_v3(target, params).await
744 } else {
745 self.extend_ntor(target, params).await
746 }
747 }
748
749 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
752 where
753 Tg: CircTarget,
754 {
755 let key = NtorPublicKey {
756 id: *target
757 .rsa_identity()
758 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
759 pk: *target.ntor_onion_key(),
760 };
761 let mut linkspecs = target
762 .linkspecs()
763 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
764 if !params.extend_by_ed25519_id {
765 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
766 }
767
768 let (tx, rx) = oneshot::channel();
769
770 let peer_id = OwnedChanTarget::from_chan_target(target);
771 self.control
772 .unbounded_send(CtrlMsg::ExtendNtor {
773 peer_id,
774 public_key: key,
775 linkspecs,
776 params,
777 done: tx,
778 })
779 .map_err(|_| Error::CircuitClosed)?;
780
781 rx.await.map_err(|_| Error::CircuitClosed)??;
782
783 Ok(())
784 }
785
786 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
789 where
790 Tg: CircTarget,
791 {
792 let key = NtorV3PublicKey {
793 id: *target
794 .ed_identity()
795 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
796 pk: *target.ntor_onion_key(),
797 };
798 let mut linkspecs = target
799 .linkspecs()
800 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
801 if !params.extend_by_ed25519_id {
802 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
803 }
804
805 let (tx, rx) = oneshot::channel();
806
807 let peer_id = OwnedChanTarget::from_chan_target(target);
808 self.control
809 .unbounded_send(CtrlMsg::ExtendNtorV3 {
810 peer_id,
811 public_key: key,
812 linkspecs,
813 params,
814 done: tx,
815 })
816 .map_err(|_| Error::CircuitClosed)?;
817
818 rx.await.map_err(|_| Error::CircuitClosed)??;
819
820 Ok(())
821 }
822
823 #[cfg(feature = "hs-common")]
849 pub async fn extend_virtual(
850 &self,
851 protocol: handshake::RelayProtocol,
852 role: handshake::HandshakeRole,
853 seed: impl handshake::KeyGenerator,
854 params: CircParameters,
855 ) -> Result<()> {
856 use self::handshake::BoxedClientLayer;
857
858 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
859 let relay_cell_format = protocol.relay_cell_format();
860
861 let BoxedClientLayer { fwd, back, binding } =
862 protocol.construct_client_layers(role, seed)?;
863
864 let (tx, rx) = oneshot::channel();
865 let message = CtrlCmd::ExtendVirtual {
866 relay_cell_format,
867 cell_crypto: (fwd, back, binding),
868 params,
869 done: tx,
870 };
871
872 self.command
873 .unbounded_send(message)
874 .map_err(|_| Error::CircuitClosed)?;
875
876 rx.await.map_err(|_| Error::CircuitClosed)?
877 }
878
879 async fn begin_stream_impl(
887 self: &Arc<ClientCirc>,
888 begin_msg: AnyRelayMsg,
889 cmd_checker: AnyCmdChecker,
890 ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
891 let hop = TargetHop::LastHop;
894
895 let time_prov = self.time_provider.clone();
896
897 let memquota = StreamAccount::new(self.mq_account())?;
898 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
899 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
900 let (tx, rx) = oneshot::channel();
901 let (msg_tx, msg_rx) =
902 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
903
904 self.control
905 .unbounded_send(CtrlMsg::BeginStream {
906 hop,
907 message: begin_msg,
908 sender,
909 rx: msg_rx,
910 done: tx,
911 cmd_checker,
912 })
913 .map_err(|_| Error::CircuitClosed)?;
914
915 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
916
917 let target = StreamTarget {
918 circ: self.clone(),
919 tx: msg_tx,
920 hop,
921 stream_id,
922 relay_cell_format,
923 };
924
925 let reader = StreamReader {
926 target: target.clone(),
927 receiver,
928 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
929 ended: false,
930 };
931
932 Ok((reader, target, memquota))
933 }
934
935 async fn begin_data_stream(
938 self: &Arc<ClientCirc>,
939 msg: AnyRelayMsg,
940 optimistic: bool,
941 ) -> Result<DataStream> {
942 let (reader, target, memquota) = self
943 .begin_stream_impl(msg, DataCmdChecker::new_any())
944 .await?;
945 let mut stream = DataStream::new(reader, target, memquota);
946 if !optimistic {
947 stream.wait_for_connection().await?;
948 }
949 Ok(stream)
950 }
951
952 pub async fn begin_stream(
958 self: &Arc<ClientCirc>,
959 target: &str,
960 port: u16,
961 parameters: Option<StreamParameters>,
962 ) -> Result<DataStream> {
963 let parameters = parameters.unwrap_or_default();
964 let begin_flags = parameters.begin_flags();
965 let optimistic = parameters.is_optimistic();
966 let target = if parameters.suppressing_hostname() {
967 ""
968 } else {
969 target
970 };
971 let beginmsg = Begin::new(target, port, begin_flags)
972 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
973 self.begin_data_stream(beginmsg.into(), optimistic).await
974 }
975
976 pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
979 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
984 .await
985 }
986
987 pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
993 let resolve_msg = Resolve::new(hostname);
994
995 let resolved_msg = self.try_resolve(resolve_msg).await?;
996
997 resolved_msg
998 .into_answers()
999 .into_iter()
1000 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1001 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1002 Ok(_) => None,
1003 Err(e) => Some(Err(e)),
1004 })
1005 .collect()
1006 }
1007
1008 pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1014 let resolve_ptr_msg = Resolve::new_reverse(&addr);
1015
1016 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1017
1018 resolved_msg
1019 .into_answers()
1020 .into_iter()
1021 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1022 Ok(ResolvedVal::Hostname(v)) => Some(
1023 String::from_utf8(v)
1024 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1025 ),
1026 Ok(_) => None,
1027 Err(e) => Some(Err(e)),
1028 })
1029 .collect()
1030 }
1031
1032 async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1035 let (reader, _target, memquota) = self
1036 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1037 .await?;
1038 let mut resolve_stream = ResolveStream::new(reader, memquota);
1039 resolve_stream.read_msg().await
1040 }
1041
1042 pub fn terminate(&self) {
1053 let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1054 }
1055
1056 pub(crate) fn protocol_error(&self) {
1064 self.terminate();
1065 }
1066
1067 pub fn is_closing(&self) -> bool {
1069 self.control.is_closed()
1070 }
1071
1072 pub fn unique_id(&self) -> UniqId {
1074 self.unique_id
1075 }
1076
1077 pub fn n_hops(&self) -> Result<usize> {
1084 self.mutable
1085 .n_hops(self.unique_id)
1086 .map_err(|_| Error::CircuitClosed)
1087 }
1088
1089 #[cfg(feature = "experimental-api")]
1096 pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1097 self.reactor_closed_rx.clone().map(|_| ())
1098 }
1099}
1100
1101#[cfg(feature = "send-control-msg")]
1109#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1110pub struct Conversation<'r>(&'r ClientCirc);
1111
1112#[cfg(feature = "send-control-msg")]
1113#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1114impl Conversation<'_> {
1115 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1120 self.send_internal(Some(msg), None).await
1121 }
1122
1123 pub(crate) async fn send_internal(
1127 &self,
1128 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1129 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1130 ) -> Result<()> {
1131 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1132 let (sender, receiver) = oneshot::channel();
1133
1134 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1135 msg,
1136 handler,
1137 sender,
1138 };
1139 self.0
1140 .control
1141 .unbounded_send(ctrl_msg)
1142 .map_err(|_| Error::CircuitClosed)?;
1143
1144 receiver.await.map_err(|_| Error::CircuitClosed)?
1145 }
1146}
1147
1148impl PendingClientCirc {
1149 pub(crate) fn new(
1155 id: CircId,
1156 channel: Arc<Channel>,
1157 createdreceiver: oneshot::Receiver<CreateResponse>,
1158 input: CircuitRxReceiver,
1159 unique_id: UniqId,
1160 runtime: DynTimeProvider,
1161 memquota: CircuitAccount,
1162 ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1163 let time_provider = channel.time_provider().clone();
1164 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1165 Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1166
1167 let circuit = ClientCirc {
1168 mutable,
1169 unique_id,
1170 control: control_tx,
1171 command: command_tx,
1172 reactor_closed_rx: reactor_closed_rx.shared(),
1173 #[cfg(test)]
1174 circid: id,
1175 memquota,
1176 time_provider,
1177 };
1178
1179 let pending = PendingClientCirc {
1180 recvcreated: createdreceiver,
1181 circ: Arc::new(circuit),
1182 };
1183 (pending, reactor)
1184 }
1185
1186 pub fn peek_unique_id(&self) -> UniqId {
1188 self.circ.unique_id
1189 }
1190
1191 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1198 let (tx, rx) = oneshot::channel();
1199 self.circ
1200 .control
1201 .unbounded_send(CtrlMsg::Create {
1202 recv_created: self.recvcreated,
1203 handshake: CircuitHandshake::CreateFast,
1204 params,
1205 done: tx,
1206 })
1207 .map_err(|_| Error::CircuitClosed)?;
1208
1209 rx.await.map_err(|_| Error::CircuitClosed)??;
1210
1211 Ok(self.circ)
1212 }
1213
1214 pub async fn create_firsthop<Tg>(
1219 self,
1220 target: &Tg,
1221 params: CircParameters,
1222 ) -> Result<Arc<ClientCirc>>
1223 where
1224 Tg: tor_linkspec::CircTarget,
1225 {
1226 if target
1228 .protovers()
1229 .supports_named_subver(named::RELAY_NTORV3)
1230 {
1231 self.create_firsthop_ntor_v3(target, params).await
1232 } else {
1233 self.create_firsthop_ntor(target, params).await
1234 }
1235 }
1236
1237 pub async fn create_firsthop_ntor<Tg>(
1242 self,
1243 target: &Tg,
1244 params: CircParameters,
1245 ) -> Result<Arc<ClientCirc>>
1246 where
1247 Tg: tor_linkspec::CircTarget,
1248 {
1249 let (tx, rx) = oneshot::channel();
1250
1251 self.circ
1252 .control
1253 .unbounded_send(CtrlMsg::Create {
1254 recv_created: self.recvcreated,
1255 handshake: CircuitHandshake::Ntor {
1256 public_key: NtorPublicKey {
1257 id: *target
1258 .rsa_identity()
1259 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1260 pk: *target.ntor_onion_key(),
1261 },
1262 ed_identity: *target
1263 .ed_identity()
1264 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1265 },
1266 params,
1267 done: tx,
1268 })
1269 .map_err(|_| Error::CircuitClosed)?;
1270
1271 rx.await.map_err(|_| Error::CircuitClosed)??;
1272
1273 Ok(self.circ)
1274 }
1275
1276 pub async fn create_firsthop_ntor_v3<Tg>(
1285 self,
1286 target: &Tg,
1287 params: CircParameters,
1288 ) -> Result<Arc<ClientCirc>>
1289 where
1290 Tg: tor_linkspec::CircTarget,
1291 {
1292 let (tx, rx) = oneshot::channel();
1293
1294 self.circ
1295 .control
1296 .unbounded_send(CtrlMsg::Create {
1297 recv_created: self.recvcreated,
1298 handshake: CircuitHandshake::NtorV3 {
1299 public_key: NtorV3PublicKey {
1300 id: *target
1301 .ed_identity()
1302 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1303 pk: *target.ntor_onion_key(),
1304 },
1305 },
1306 params,
1307 done: tx,
1308 })
1309 .map_err(|_| Error::CircuitClosed)?;
1310
1311 rx.await.map_err(|_| Error::CircuitClosed)??;
1312
1313 Ok(self.circ)
1314 }
1315}
1316
1317fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1320 match val {
1321 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1322 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1323 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1324 _ => Ok(val),
1325 }
1326}
1327
1328#[cfg(test)]
1329pub(crate) mod test {
1330 #![allow(clippy::bool_assert_comparison)]
1332 #![allow(clippy::clone_on_copy)]
1333 #![allow(clippy::dbg_macro)]
1334 #![allow(clippy::mixed_attributes_style)]
1335 #![allow(clippy::print_stderr)]
1336 #![allow(clippy::print_stdout)]
1337 #![allow(clippy::single_char_pattern)]
1338 #![allow(clippy::unwrap_used)]
1339 #![allow(clippy::unchecked_duration_subtraction)]
1340 #![allow(clippy::useless_vec)]
1341 #![allow(clippy::needless_pass_by_value)]
1342 use super::*;
1345 use crate::channel::OpenChanCellS2C;
1346 use crate::channel::{test::new_reactor, CodecError};
1347 use crate::congestion::test_utils::params::build_cc_vegas_params;
1348 use crate::crypto::cell::RelayCellBody;
1349 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1350 #[cfg(feature = "hs-service")]
1351 use crate::stream::IncomingStreamRequestFilter;
1352 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1353 use futures::channel::mpsc::{Receiver, Sender};
1354 use futures::io::{AsyncReadExt, AsyncWriteExt};
1355 use futures::sink::SinkExt;
1356 use futures::stream::StreamExt;
1357 use futures::task::SpawnExt;
1358 use hex_literal::hex;
1359 use std::collections::{HashMap, VecDeque};
1360 use std::fmt::Debug;
1361 use std::time::Duration;
1362 use tor_basic_utils::test_rng::testing_rng;
1363 use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCmd};
1364 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1365 use tor_cell::relaycell::msg::SendmeTag;
1366 use tor_cell::relaycell::{
1367 msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1368 };
1369 use tor_linkspec::OwnedCircTarget;
1370 use tor_memquota::HasMemoryCost;
1371 use tor_rtcompat::Runtime;
1372 use tracing::trace;
1373 use tracing_test::traced_test;
1374
1375 impl PendingClientCirc {
1376 pub(crate) fn peek_circid(&self) -> CircId {
1378 self.circ.circid
1379 }
1380 }
1381
1382 impl ClientCirc {
1383 pub(crate) fn peek_circid(&self) -> CircId {
1385 self.circid
1386 }
1387 }
1388
1389 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1390 let rfmt = RelayCellFormat::V0;
1392 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1393 .encode(rfmt, &mut testing_rng())
1394 .unwrap();
1395 let chanmsg = chanmsg::Relay::from(body);
1396 ClientCircChanMsg::Relay(chanmsg)
1397 }
1398
1399 const EXAMPLE_SK: [u8; 32] =
1401 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1402 const EXAMPLE_PK: [u8; 32] =
1403 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1404 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1405 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1406
1407 #[cfg(test)]
1409 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1410 buffer: usize,
1411 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1412 crate::fake_mpsc(buffer)
1413 }
1414
1415 fn example_target() -> OwnedCircTarget {
1417 let mut builder = OwnedCircTarget::builder();
1418 builder
1419 .chan_target()
1420 .ed_identity(EXAMPLE_ED_ID.into())
1421 .rsa_identity(EXAMPLE_RSA_ID.into());
1422 builder
1423 .ntor_onion_key(EXAMPLE_PK.into())
1424 .protocols("FlowCtrl=1".parse().unwrap())
1425 .build()
1426 .unwrap()
1427 }
1428 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1429 crate::crypto::handshake::ntor::NtorSecretKey::new(
1430 EXAMPLE_SK.into(),
1431 EXAMPLE_PK.into(),
1432 EXAMPLE_RSA_ID.into(),
1433 )
1434 }
1435 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1436 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1437 EXAMPLE_SK.into(),
1438 EXAMPLE_PK.into(),
1439 EXAMPLE_ED_ID.into(),
1440 )
1441 }
1442
1443 fn working_fake_channel<R: Runtime>(
1444 rt: &R,
1445 ) -> (
1446 Arc<Channel>,
1447 Receiver<AnyChanCell>,
1448 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1449 ) {
1450 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1451 rt.spawn(async {
1452 let _ignore = chan_reactor.run().await;
1453 })
1454 .unwrap();
1455 (channel, rx, tx)
1456 }
1457
1458 #[derive(Copy, Clone)]
1460 enum HandshakeType {
1461 Fast,
1462 Ntor,
1463 NtorV3,
1464 }
1465
1466 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1467 use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1471
1472 let (chan, mut rx, _sink) = working_fake_channel(rt);
1473 let circid = CircId::new(128).unwrap();
1474 let (created_send, created_recv) = oneshot::channel();
1475 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1476 let unique_id = UniqId::new(23, 17);
1477
1478 let (pending, reactor) = PendingClientCirc::new(
1479 circid,
1480 chan,
1481 created_recv,
1482 circmsg_recv,
1483 unique_id,
1484 DynTimeProvider::new(rt.clone()),
1485 CircuitAccount::new_noop(),
1486 );
1487
1488 rt.spawn(async {
1489 let _ignore = reactor.run().await;
1490 })
1491 .unwrap();
1492
1493 let simulate_relay_fut = async move {
1495 let mut rng = testing_rng();
1496 let create_cell = rx.next().await.unwrap();
1497 assert_eq!(create_cell.circid(), Some(circid));
1498 let reply = match handshake_type {
1499 HandshakeType::Fast => {
1500 let cf = match create_cell.msg() {
1501 AnyChanMsg::CreateFast(cf) => cf,
1502 other => panic!("{:?}", other),
1503 };
1504 let (_, rep) = CreateFastServer::server(
1505 &mut rng,
1506 &mut |_: &()| Some(()),
1507 &[()],
1508 cf.handshake(),
1509 )
1510 .unwrap();
1511 CreateResponse::CreatedFast(CreatedFast::new(rep))
1512 }
1513 HandshakeType::Ntor => {
1514 let c2 = match create_cell.msg() {
1515 AnyChanMsg::Create2(c2) => c2,
1516 other => panic!("{:?}", other),
1517 };
1518 let (_, rep) = NtorServer::server(
1519 &mut rng,
1520 &mut |_: &()| Some(()),
1521 &[example_ntor_key()],
1522 c2.body(),
1523 )
1524 .unwrap();
1525 CreateResponse::Created2(Created2::new(rep))
1526 }
1527 HandshakeType::NtorV3 => {
1528 let c2 = match create_cell.msg() {
1529 AnyChanMsg::Create2(c2) => c2,
1530 other => panic!("{:?}", other),
1531 };
1532 let mut reply_fn = if with_cc {
1533 |client_exts: &[CircRequestExt]| {
1534 let _ = client_exts
1535 .iter()
1536 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1537 .expect("Client failed to request CC");
1538 Some(vec![CircResponseExt::CcResponse(
1541 extend_ext::CcResponse::new(31),
1542 )])
1543 }
1544 } else {
1545 |_: &_| Some(vec![])
1546 };
1547 let (_, rep) = NtorV3Server::server(
1548 &mut rng,
1549 &mut reply_fn,
1550 &[example_ntor_v3_key()],
1551 c2.body(),
1552 )
1553 .unwrap();
1554 CreateResponse::Created2(Created2::new(rep))
1555 }
1556 };
1557 created_send.send(reply).unwrap();
1558 };
1559 let client_fut = async move {
1561 let target = example_target();
1562 let params = CircParameters::default();
1563 let ret = match handshake_type {
1564 HandshakeType::Fast => {
1565 trace!("doing fast create");
1566 pending.create_firsthop_fast(params).await
1567 }
1568 HandshakeType::Ntor => {
1569 trace!("doing ntor create");
1570 pending.create_firsthop_ntor(&target, params).await
1571 }
1572 HandshakeType::NtorV3 => {
1573 let params = if with_cc {
1574 CircParameters::new(true, build_cc_vegas_params())
1576 } else {
1577 params
1578 };
1579 trace!("doing ntor_v3 create");
1580 pending.create_firsthop_ntor_v3(&target, params).await
1581 }
1582 };
1583 trace!("create done: result {:?}", ret);
1584 ret
1585 };
1586
1587 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1588
1589 let _circ = circ.unwrap();
1590
1591 assert_eq!(_circ.n_hops().unwrap(), 1);
1593 }
1594
1595 #[traced_test]
1596 #[test]
1597 fn test_create_fast() {
1598 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1599 test_create(&rt, HandshakeType::Fast, false).await;
1600 });
1601 }
1602 #[traced_test]
1603 #[test]
1604 fn test_create_ntor() {
1605 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1606 test_create(&rt, HandshakeType::Ntor, false).await;
1607 });
1608 }
1609 #[traced_test]
1610 #[test]
1611 fn test_create_ntor_v3() {
1612 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1613 test_create(&rt, HandshakeType::NtorV3, false).await;
1614 });
1615 }
1616 #[traced_test]
1617 #[test]
1618 #[cfg(feature = "flowctl-cc")]
1619 fn test_create_ntor_v3_with_cc() {
1620 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1621 test_create(&rt, HandshakeType::NtorV3, true).await;
1622 });
1623 }
1624
1625 pub(crate) struct DummyCrypto {
1628 counter_tag: [u8; 20],
1629 counter: u32,
1630 lasthop: bool,
1631 }
1632 impl DummyCrypto {
1633 fn next_tag(&mut self) -> SendmeTag {
1634 #![allow(clippy::identity_op)]
1635 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1636 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1637 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1638 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1639 self.counter += 1;
1640 self.counter_tag.into()
1641 }
1642 }
1643
1644 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1645 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1646 self.next_tag()
1647 }
1648 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1649 }
1650 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1651 fn decrypt_inbound(
1652 &mut self,
1653 _cmd: ChanCmd,
1654 _cell: &mut RelayCellBody,
1655 ) -> Option<SendmeTag> {
1656 if self.lasthop {
1657 Some(self.next_tag())
1658 } else {
1659 None
1660 }
1661 }
1662 }
1663 impl DummyCrypto {
1664 pub(crate) fn new(lasthop: bool) -> Self {
1665 DummyCrypto {
1666 counter_tag: [0; 20],
1667 counter: 0,
1668 lasthop,
1669 }
1670 }
1671 }
1672
1673 async fn newcirc_ext<R: Runtime>(
1676 rt: &R,
1677 chan: Arc<Channel>,
1678 next_msg_from: HopNum,
1679 ) -> (Arc<ClientCirc>, CircuitRxSender) {
1680 let circid = CircId::new(128).unwrap();
1681 let (_created_send, created_recv) = oneshot::channel();
1682 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1683 let unique_id = UniqId::new(23, 17);
1684
1685 let (pending, reactor) = PendingClientCirc::new(
1686 circid,
1687 chan,
1688 created_recv,
1689 circmsg_recv,
1690 unique_id,
1691 DynTimeProvider::new(rt.clone()),
1692 CircuitAccount::new_noop(),
1693 );
1694
1695 rt.spawn(async {
1696 let _ignore = reactor.run().await;
1697 })
1698 .unwrap();
1699
1700 let PendingClientCirc {
1701 circ,
1702 recvcreated: _,
1703 } = pending;
1704
1705 let relay_cell_format = RelayCellFormat::V0;
1707 for idx in 0_u8..3 {
1708 let params = CircParameters::default();
1709 let (tx, rx) = oneshot::channel();
1710 circ.command
1711 .unbounded_send(CtrlCmd::AddFakeHop {
1712 relay_cell_format,
1713 fwd_lasthop: idx == 2,
1714 rev_lasthop: idx == u8::from(next_msg_from),
1715 params,
1716 done: tx,
1717 })
1718 .unwrap();
1719 rx.await.unwrap().unwrap();
1720 }
1721
1722 (circ, circmsg_send)
1723 }
1724
1725 async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1728 newcirc_ext(rt, chan, 2.into()).await
1729 }
1730
1731 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1732 use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1733
1734 let (chan, mut rx, _sink) = working_fake_channel(rt);
1735 let (circ, mut sink) = newcirc(rt, chan).await;
1736 let circid = circ.peek_circid();
1737 let params = CircParameters::default();
1738
1739 let extend_fut = async move {
1740 let target = example_target();
1741 match handshake_type {
1742 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1743 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1744 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1745 };
1746 circ };
1748 let reply_fut = async move {
1749 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1752 assert_eq!(id, Some(circid));
1753 let rmsg = match chmsg {
1754 AnyChanMsg::RelayEarly(r) => {
1755 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1756 .unwrap()
1757 }
1758 other => panic!("{:?}", other),
1759 };
1760 let e2 = match rmsg.msg() {
1761 AnyRelayMsg::Extend2(e2) => e2,
1762 other => panic!("{:?}", other),
1763 };
1764 let mut rng = testing_rng();
1765 let reply = match handshake_type {
1766 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1767 HandshakeType::Ntor => {
1768 let (_keygen, reply) = NtorServer::server(
1769 &mut rng,
1770 &mut |_: &()| Some(()),
1771 &[example_ntor_key()],
1772 e2.handshake(),
1773 )
1774 .unwrap();
1775 reply
1776 }
1777 HandshakeType::NtorV3 => {
1778 let (_keygen, reply) = NtorV3Server::server(
1779 &mut rng,
1780 &mut |_: &[CircRequestExt]| Some(vec![]),
1781 &[example_ntor_v3_key()],
1782 e2.handshake(),
1783 )
1784 .unwrap();
1785 reply
1786 }
1787 };
1788
1789 let extended2 = relaymsg::Extended2::new(reply).into();
1790 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1791 (sink, rx) };
1793
1794 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1795
1796 assert_eq!(circ.n_hops().unwrap(), 4);
1798
1799 {
1801 let path = circ.path_ref().unwrap();
1802 let path = path
1803 .all_hops()
1804 .filter_map(|hop| match hop {
1805 path::HopDetail::Relay(r) => Some(r),
1806 #[cfg(feature = "hs-common")]
1807 path::HopDetail::Virtual => None,
1808 })
1809 .collect::<Vec<_>>();
1810
1811 assert_eq!(path.len(), 4);
1812 use tor_linkspec::HasRelayIds;
1813 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1814 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1815 }
1816 {
1817 let path = circ.path_ref().unwrap();
1818 assert_eq!(path.n_hops(), 4);
1819 use tor_linkspec::HasRelayIds;
1820 assert_eq!(
1821 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1822 example_target().ed_identity()
1823 );
1824 assert_ne!(
1825 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1826 example_target().ed_identity()
1827 );
1828 }
1829 }
1830
1831 #[traced_test]
1832 #[test]
1833 fn test_extend_ntor() {
1834 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1835 test_extend(&rt, HandshakeType::Ntor).await;
1836 });
1837 }
1838
1839 #[traced_test]
1840 #[test]
1841 fn test_extend_ntor_v3() {
1842 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1843 test_extend(&rt, HandshakeType::NtorV3).await;
1844 });
1845 }
1846
1847 async fn bad_extend_test_impl<R: Runtime>(
1848 rt: &R,
1849 reply_hop: HopNum,
1850 bad_reply: ClientCircChanMsg,
1851 ) -> Error {
1852 let (chan, _rx, _sink) = working_fake_channel(rt);
1853 let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
1854 let params = CircParameters::default();
1855
1856 let target = example_target();
1857 #[allow(clippy::clone_on_copy)]
1858 let rtc = rt.clone();
1859 let sink_handle = rt
1860 .spawn_with_handle(async move {
1861 rtc.sleep(Duration::from_millis(100)).await;
1862 sink.send(bad_reply).await.unwrap();
1863 sink
1864 })
1865 .unwrap();
1866 let outcome = circ.extend_ntor(&target, params).await;
1867 let _sink = sink_handle.await;
1868
1869 assert_eq!(circ.n_hops().unwrap(), 3);
1870 assert!(outcome.is_err());
1871 outcome.unwrap_err()
1872 }
1873
1874 #[traced_test]
1875 #[test]
1876 fn bad_extend_wronghop() {
1877 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1878 let extended2 = relaymsg::Extended2::new(vec![]).into();
1879 let cc = rmsg_to_ccmsg(None, extended2);
1880
1881 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1882 match error {
1887 Error::CircuitClosed => {}
1888 x => panic!("got other error: {}", x),
1889 }
1890 });
1891 }
1892
1893 #[traced_test]
1894 #[test]
1895 fn bad_extend_wrongtype() {
1896 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1897 let extended = relaymsg::Extended::new(vec![7; 200]).into();
1898 let cc = rmsg_to_ccmsg(None, extended);
1899
1900 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1901 match error {
1902 Error::BytesErr {
1903 err: tor_bytes::Error::InvalidMessage(_),
1904 object: "extended2 message",
1905 } => {}
1906 other => panic!("{:?}", other),
1907 }
1908 });
1909 }
1910
1911 #[traced_test]
1912 #[test]
1913 fn bad_extend_destroy() {
1914 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1915 let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1916 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1917 match error {
1918 Error::CircuitClosed => {}
1919 other => panic!("{:?}", other),
1920 }
1921 });
1922 }
1923
1924 #[traced_test]
1925 #[test]
1926 fn bad_extend_crypto() {
1927 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1928 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1929 let cc = rmsg_to_ccmsg(None, extended2);
1930 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1931 assert!(matches!(error, Error::BadCircHandshakeAuth));
1932 });
1933 }
1934
1935 #[traced_test]
1936 #[test]
1937 fn begindir() {
1938 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1939 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1940 let (circ, mut sink) = newcirc(&rt, chan).await;
1941 let circid = circ.peek_circid();
1942
1943 let begin_and_send_fut = async move {
1944 let mut stream = circ.begin_dir_stream().await.unwrap();
1947 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1948 stream.flush().await.unwrap();
1949 let mut buf = [0_u8; 1024];
1950 let n = stream.read(&mut buf).await.unwrap();
1951 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1952 let n = stream.read(&mut buf).await.unwrap();
1953 assert_eq!(n, 0);
1954 stream
1955 };
1956 let reply_fut = async move {
1957 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1960 assert_eq!(id, Some(circid));
1961 let rmsg = match chmsg {
1962 AnyChanMsg::Relay(r) => {
1963 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1964 .unwrap()
1965 }
1966 other => panic!("{:?}", other),
1967 };
1968 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1969 assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
1970
1971 let connected = relaymsg::Connected::new_empty().into();
1973 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1974
1975 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1977 assert_eq!(id, Some(circid));
1978 let rmsg = match chmsg {
1979 AnyChanMsg::Relay(r) => {
1980 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1981 .unwrap()
1982 }
1983 other => panic!("{:?}", other),
1984 };
1985 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1986 assert_eq!(streamid_2, streamid);
1987 if let AnyRelayMsg::Data(d) = rmsg {
1988 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1989 } else {
1990 panic!();
1991 }
1992
1993 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1995 .unwrap()
1996 .into();
1997 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
1998
1999 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2001 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2002
2003 (rx, sink) };
2005
2006 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2007 });
2008 }
2009
2010 fn close_stream_helper(by_drop: bool) {
2012 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2013 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2014 let (circ, mut sink) = newcirc(&rt, chan).await;
2015
2016 let stream_fut = async move {
2017 let stream = circ
2018 .begin_stream("www.example.com", 80, None)
2019 .await
2020 .unwrap();
2021
2022 let (r, mut w) = stream.split();
2023 if by_drop {
2024 drop(r);
2026 drop(w);
2027 (None, circ) } else {
2029 w.close().await.unwrap();
2031 (Some(r), circ)
2032 }
2033 };
2034 let handler_fut = async {
2035 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2037 let rmsg = match msg {
2038 AnyChanMsg::Relay(r) => {
2039 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2040 .unwrap()
2041 }
2042 other => panic!("{:?}", other),
2043 };
2044 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2045 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2046
2047 let connected =
2049 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2050 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2051
2052 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2054 let rmsg = match msg {
2055 AnyChanMsg::Relay(r) => {
2056 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2057 .unwrap()
2058 }
2059 other => panic!("{:?}", other),
2060 };
2061 let (_, rmsg) = rmsg.into_streamid_and_msg();
2062 assert_eq!(rmsg.cmd(), RelayCmd::END);
2063
2064 (rx, sink) };
2066
2067 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2068 });
2069 }
2070
2071 #[traced_test]
2072 #[test]
2073 fn drop_stream() {
2074 close_stream_helper(true);
2075 }
2076
2077 #[traced_test]
2078 #[test]
2079 fn close_stream() {
2080 close_stream_helper(false);
2081 }
2082
2083 async fn setup_incoming_sendme_case<R: Runtime>(
2085 rt: &R,
2086 n_to_send: usize,
2087 ) -> (
2088 Arc<ClientCirc>,
2089 DataStream,
2090 CircuitRxSender,
2091 Option<StreamId>,
2092 usize,
2093 Receiver<AnyChanCell>,
2094 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2095 ) {
2096 let (chan, mut rx, sink2) = working_fake_channel(rt);
2097 let (circ, mut sink) = newcirc(rt, chan).await;
2098 let circid = circ.peek_circid();
2099
2100 let begin_and_send_fut = {
2101 let circ = circ.clone();
2102 async move {
2103 let mut stream = circ
2105 .begin_stream("www.example.com", 443, None)
2106 .await
2107 .unwrap();
2108 let junk = [0_u8; 1024];
2109 let mut remaining = n_to_send;
2110 while remaining > 0 {
2111 let n = std::cmp::min(remaining, junk.len());
2112 stream.write_all(&junk[..n]).await.unwrap();
2113 remaining -= n;
2114 }
2115 stream.flush().await.unwrap();
2116 stream
2117 }
2118 };
2119
2120 let receive_fut = async move {
2121 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2123 let rmsg = match chmsg {
2124 AnyChanMsg::Relay(r) => {
2125 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2126 .unwrap()
2127 }
2128 other => panic!("{:?}", other),
2129 };
2130 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2131 assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2132 let connected = relaymsg::Connected::new_empty().into();
2134 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2135 let mut bytes_received = 0_usize;
2137 let mut cells_received = 0_usize;
2138 while bytes_received < n_to_send {
2139 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2141 assert_eq!(id, Some(circid));
2142
2143 let rmsg = match chmsg {
2144 AnyChanMsg::Relay(r) => {
2145 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2146 .unwrap()
2147 }
2148 other => panic!("{:?}", other),
2149 };
2150 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2151 assert_eq!(streamid2, streamid);
2152 if let AnyRelayMsg::Data(dat) = rmsg {
2153 cells_received += 1;
2154 bytes_received += dat.as_ref().len();
2155 } else {
2156 panic!();
2157 }
2158 }
2159
2160 (sink, streamid, cells_received, rx)
2161 };
2162
2163 let (stream, (sink, streamid, cells_received, rx)) =
2164 futures::join!(begin_and_send_fut, receive_fut);
2165
2166 (circ, stream, sink, streamid, cells_received, rx, sink2)
2167 }
2168
2169 #[traced_test]
2170 #[test]
2171 fn accept_valid_sendme() {
2172 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2173 let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2174 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2175
2176 assert_eq!(cells_received, 301);
2177
2178 {
2180 let (tx, rx) = oneshot::channel();
2181 circ.command
2182 .unbounded_send(CtrlCmd::QuerySendWindow {
2183 hop: 2.into(),
2184 done: tx,
2185 })
2186 .unwrap();
2187 let (window, tags) = rx.await.unwrap().unwrap();
2188 assert_eq!(window, 1000 - 301);
2189 assert_eq!(tags.len(), 3);
2190 assert_eq!(
2192 tags[0],
2193 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2194 );
2195 assert_eq!(
2197 tags[1],
2198 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2199 );
2200 assert_eq!(
2202 tags[2],
2203 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2204 );
2205 }
2206
2207 let reply_with_sendme_fut = async move {
2208 let c_sendme =
2210 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2211 .into();
2212 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2213
2214 let s_sendme = relaymsg::Sendme::new_empty().into();
2216 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2217
2218 sink
2219 };
2220
2221 let _sink = reply_with_sendme_fut.await;
2222
2223 rt.advance_until_stalled().await;
2224
2225 {
2228 let (tx, rx) = oneshot::channel();
2229 circ.command
2230 .unbounded_send(CtrlCmd::QuerySendWindow {
2231 hop: 2.into(),
2232 done: tx,
2233 })
2234 .unwrap();
2235 let (window, _tags) = rx.await.unwrap().unwrap();
2236 assert_eq!(window, 1000 - 201);
2237 }
2238 });
2239 }
2240
2241 #[traced_test]
2242 #[test]
2243 fn invalid_circ_sendme() {
2244 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2245 let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2249 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2250
2251 let reply_with_sendme_fut = async move {
2252 let c_sendme =
2254 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2255 .into();
2256 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2257 sink
2258 };
2259
2260 let _sink = reply_with_sendme_fut.await;
2261
2262 rt.advance_until_stalled().await;
2264 assert!(circ.is_closing());
2265 });
2266 }
2267
2268 #[traced_test]
2269 #[test]
2270 fn test_busy_stream_fairness() {
2271 const N_STREAMS: usize = 3;
2273 const N_CELLS: usize = 20;
2275 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2278 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2285 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2286
2287 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2288 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2289 let (circ, mut sink) = newcirc(&rt, chan).await;
2290
2291 rt.spawn({
2297 let circ = circ.clone();
2300 async move {
2301 let mut clients = VecDeque::new();
2302 struct Client {
2303 stream: DataStream,
2304 to_write: &'static [u8],
2305 }
2306 for _ in 0..N_STREAMS {
2307 clients.push_back(Client {
2308 stream: circ
2309 .begin_stream("www.example.com", 80, None)
2310 .await
2311 .unwrap(),
2312 to_write: &[0_u8; N_BYTES][..],
2313 });
2314 }
2315 while let Some(mut client) = clients.pop_front() {
2316 if client.to_write.is_empty() {
2317 continue;
2319 }
2320 let written = client.stream.write(client.to_write).await.unwrap();
2321 client.to_write = &client.to_write[written..];
2322 clients.push_back(client);
2323 }
2324 }
2325 })
2326 .unwrap();
2327
2328 let channel_handler_fut = async {
2329 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2330 let mut total_bytes_received = 0;
2331
2332 loop {
2333 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2334 let rmsg = match msg {
2335 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2336 RelayCellFormat::V0,
2337 r.into_relay_body(),
2338 )
2339 .unwrap(),
2340 other => panic!("Unexpected chanmsg: {other:?}"),
2341 };
2342 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2343 match rmsg.cmd() {
2344 RelayCmd::BEGIN => {
2345 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2347 assert_eq!(prev, None);
2348 let connected = relaymsg::Connected::new_with_addr(
2350 "10.0.0.1".parse().unwrap(),
2351 1234,
2352 )
2353 .into();
2354 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2355 }
2356 RelayCmd::DATA => {
2357 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2358 let nbytes = data_msg.as_ref().len();
2359 total_bytes_received += nbytes;
2360 let streamid = streamid.unwrap();
2361 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2362 *stream_bytes += nbytes;
2363 if total_bytes_received >= N_BYTES {
2364 break;
2365 }
2366 }
2367 RelayCmd::END => {
2368 continue;
2373 }
2374 other => {
2375 panic!("Unexpected command {other:?}");
2376 }
2377 }
2378 }
2379
2380 (total_bytes_received, stream_bytes_received, rx, sink)
2383 };
2384
2385 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2386 channel_handler_fut.await;
2387 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2388 for (sid, stream_bytes) in stream_bytes_received {
2389 assert!(
2390 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2391 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2392 );
2393 }
2394 });
2395 }
2396
2397 #[test]
2398 fn basic_params() {
2399 use super::CircParameters;
2400 let mut p = CircParameters::default();
2401 assert!(p.extend_by_ed25519_id);
2402
2403 p.extend_by_ed25519_id = false;
2404 assert!(!p.extend_by_ed25519_id);
2405 }
2406
2407 #[cfg(feature = "hs-service")]
2408 struct AllowAllStreamsFilter;
2409 #[cfg(feature = "hs-service")]
2410 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2411 fn disposition(
2412 &mut self,
2413 _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2414 _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2415 ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2416 Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2417 }
2418 }
2419
2420 #[traced_test]
2421 #[test]
2422 #[cfg(feature = "hs-service")]
2423 fn allow_stream_requests_twice() {
2424 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2425 let (chan, _rx, _sink) = working_fake_channel(&rt);
2426 let (circ, _send) = newcirc(&rt, chan).await;
2427
2428 let _incoming = circ
2429 .allow_stream_requests(
2430 &[tor_cell::relaycell::RelayCmd::BEGIN],
2431 circ.last_hop_num().unwrap(),
2432 AllowAllStreamsFilter,
2433 )
2434 .await
2435 .unwrap();
2436
2437 let incoming = circ
2438 .allow_stream_requests(
2439 &[tor_cell::relaycell::RelayCmd::BEGIN],
2440 circ.last_hop_num().unwrap(),
2441 AllowAllStreamsFilter,
2442 )
2443 .await;
2444
2445 assert!(incoming.is_err());
2447 });
2448 }
2449
2450 #[traced_test]
2451 #[test]
2452 #[cfg(feature = "hs-service")]
2453 fn allow_stream_requests() {
2454 use tor_cell::relaycell::msg::BeginFlags;
2455
2456 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2457 const TEST_DATA: &[u8] = b"ping";
2458
2459 let (chan, _rx, _sink) = working_fake_channel(&rt);
2460 let (circ, mut send) = newcirc(&rt, chan).await;
2461
2462 let rfmt = RelayCellFormat::V0;
2463
2464 let (tx, rx) = oneshot::channel();
2466 let mut incoming = circ
2467 .allow_stream_requests(
2468 &[tor_cell::relaycell::RelayCmd::BEGIN],
2469 circ.last_hop_num().unwrap(),
2470 AllowAllStreamsFilter,
2471 )
2472 .await
2473 .unwrap();
2474
2475 let simulate_service = async move {
2476 let stream = incoming.next().await.unwrap();
2477 let mut data_stream = stream
2478 .accept_data(relaymsg::Connected::new_empty())
2479 .await
2480 .unwrap();
2481 tx.send(()).unwrap();
2483
2484 let mut buf = [0_u8; TEST_DATA.len()];
2486 data_stream.read_exact(&mut buf).await.unwrap();
2487 assert_eq!(&buf, TEST_DATA);
2488
2489 circ
2490 };
2491
2492 let simulate_client = async move {
2493 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2494 let body: BoxedCellBody =
2495 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2496 .encode(rfmt, &mut testing_rng())
2497 .unwrap();
2498 let begin_msg = chanmsg::Relay::from(body);
2499
2500 send.send(ClientCircChanMsg::Relay(begin_msg))
2502 .await
2503 .unwrap();
2504
2505 rx.await.unwrap();
2511 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2513 let body: BoxedCellBody =
2514 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2515 .encode(rfmt, &mut testing_rng())
2516 .unwrap();
2517 let data_msg = chanmsg::Relay::from(body);
2518
2519 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2520 send
2521 };
2522
2523 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2524 });
2525 }
2526
2527 #[traced_test]
2528 #[test]
2529 #[cfg(feature = "hs-service")]
2530 fn accept_stream_after_reject() {
2531 use tor_cell::relaycell::msg::BeginFlags;
2532 use tor_cell::relaycell::msg::EndReason;
2533
2534 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2535 const TEST_DATA: &[u8] = b"ping";
2536 const STREAM_COUNT: usize = 2;
2537 let rfmt = RelayCellFormat::V0;
2538
2539 let (chan, _rx, _sink) = working_fake_channel(&rt);
2540 let (circ, mut send) = newcirc(&rt, chan).await;
2541
2542 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2544
2545 let mut incoming = circ
2546 .allow_stream_requests(
2547 &[tor_cell::relaycell::RelayCmd::BEGIN],
2548 circ.last_hop_num().unwrap(),
2549 AllowAllStreamsFilter,
2550 )
2551 .await
2552 .unwrap();
2553
2554 let simulate_service = async move {
2555 for i in 0..STREAM_COUNT {
2557 let stream = incoming.next().await.unwrap();
2558
2559 if i == 0 {
2561 stream
2562 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2563 .await
2564 .unwrap();
2565 tx.send(()).await.unwrap();
2567 continue;
2568 }
2569
2570 let mut data_stream = stream
2571 .accept_data(relaymsg::Connected::new_empty())
2572 .await
2573 .unwrap();
2574 tx.send(()).await.unwrap();
2576
2577 let mut buf = [0_u8; TEST_DATA.len()];
2579 data_stream.read_exact(&mut buf).await.unwrap();
2580 assert_eq!(&buf, TEST_DATA);
2581 }
2582
2583 circ
2584 };
2585
2586 let simulate_client = async move {
2587 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2588 let body: BoxedCellBody =
2589 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2590 .encode(rfmt, &mut testing_rng())
2591 .unwrap();
2592 let begin_msg = chanmsg::Relay::from(body);
2593
2594 for _ in 0..STREAM_COUNT {
2597 send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2598 .await
2599 .unwrap();
2600
2601 rx.next().await.unwrap();
2603 }
2604
2605 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2607 let body: BoxedCellBody =
2608 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2609 .encode(rfmt, &mut testing_rng())
2610 .unwrap();
2611 let data_msg = chanmsg::Relay::from(body);
2612
2613 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2614 send
2615 };
2616
2617 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2618 });
2619 }
2620
2621 #[traced_test]
2622 #[test]
2623 #[cfg(feature = "hs-service")]
2624 fn incoming_stream_bad_hop() {
2625 use tor_cell::relaycell::msg::BeginFlags;
2626
2627 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2628 const EXPECTED_HOP: u8 = 1;
2630 let rfmt = RelayCellFormat::V0;
2631
2632 let (chan, _rx, _sink) = working_fake_channel(&rt);
2633 let (circ, mut send) = newcirc(&rt, chan).await;
2634
2635 let mut incoming = circ
2637 .allow_stream_requests(
2638 &[tor_cell::relaycell::RelayCmd::BEGIN],
2639 EXPECTED_HOP.into(),
2640 AllowAllStreamsFilter,
2641 )
2642 .await
2643 .unwrap();
2644
2645 let simulate_service = async move {
2646 assert!(incoming.next().await.is_none());
2649 circ
2650 };
2651
2652 let simulate_client = async move {
2653 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2654 let body: BoxedCellBody =
2655 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2656 .encode(rfmt, &mut testing_rng())
2657 .unwrap();
2658 let begin_msg = chanmsg::Relay::from(body);
2659
2660 send.send(ClientCircChanMsg::Relay(begin_msg))
2662 .await
2663 .unwrap();
2664
2665 send
2666 };
2667
2668 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2669 });
2670 }
2671}