1pub(crate) mod celltypes;
39pub(crate) mod halfcirc;
40
41#[cfg(feature = "hs-common")]
42pub mod handshake;
43#[cfg(not(feature = "hs-common"))]
44pub(crate) mod handshake;
45
46pub(super) mod path;
47pub(crate) mod unique_id;
48
49use crate::channel::Channel;
50use crate::congestion::params::CongestionControlParams;
51use crate::crypto::cell::HopNum;
52use crate::crypto::handshake::ntor_v3::NtorV3PublicKey;
53use crate::memquota::{CircuitAccount, SpecificAccount as _};
54use crate::stream::{
55 AnyCmdChecker, DataCmdChecker, DataStream, ResolveCmdChecker, ResolveStream, StreamParameters,
56 StreamReader,
57};
58use crate::tunnel::circuit::celltypes::*;
59use crate::tunnel::reactor::CtrlCmd;
60use crate::tunnel::reactor::{
61 CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
62};
63use crate::tunnel::{LegId, StreamTarget, TargetHop};
64use crate::util::skew::ClockSkew;
65use crate::{Error, ResolveError, Result};
66use educe::Educe;
67use path::HopDetail;
68use tor_cell::{
69 chancell::CircId,
70 relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
71};
72
73use tor_error::{bad_api_usage, internal, into_internal};
74use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
75use tor_protover::named;
76
77pub use crate::crypto::binding::CircuitBinding;
78pub use crate::memquota::StreamAccount;
79pub use crate::tunnel::circuit::unique_id::UniqId;
80
81#[cfg(feature = "hs-service")]
82use {
83 crate::stream::{IncomingCmdChecker, IncomingStream},
84 crate::tunnel::reactor::StreamReqInfo,
85};
86
87use futures::channel::mpsc;
88use oneshot_fused_workaround as oneshot;
89
90use crate::congestion::sendme::StreamRecvWindow;
91use crate::DynTimeProvider;
92use futures::FutureExt as _;
93use std::collections::HashMap;
94use std::net::IpAddr;
95use std::sync::{Arc, Mutex};
96use tor_memquota::mq_queue::{self, ChannelSpec as _, MpscSpec};
97
98use crate::crypto::handshake::ntor::NtorPublicKey;
99
100pub use path::{Path, PathEntry};
101
102pub const CIRCUIT_BUFFER_SIZE: usize = 128;
104
105#[cfg(feature = "send-control-msg")]
106use {crate::tunnel::msghandler::UserMsgHandler, crate::tunnel::reactor::MetaCellHandler};
107
108pub use crate::tunnel::reactor::syncview::ClientCircSyncView;
109#[cfg(feature = "send-control-msg")]
110#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
111pub use {crate::tunnel::msghandler::MsgHandler, crate::tunnel::reactor::MetaCellDisposition};
112
113pub(crate) type StreamMpscSender<T> = mq_queue::Sender<T, MpscSpec>;
115pub(crate) type StreamMpscReceiver<T> = mq_queue::Receiver<T, MpscSpec>;
117
118pub(crate) type CircuitRxSender = mq_queue::Sender<ClientCircChanMsg, MpscSpec>;
120pub(crate) type CircuitRxReceiver = mq_queue::Receiver<ClientCircChanMsg, MpscSpec>;
122
123#[derive(Debug)]
124pub struct ClientCirc {
167 mutable: Arc<TunnelMutableState>,
169 unique_id: UniqId,
171 pub(super) control: mpsc::UnboundedSender<CtrlMsg>,
173 pub(super) command: mpsc::UnboundedSender<CtrlCmd>,
175 #[cfg_attr(not(feature = "experimental-api"), allow(dead_code))]
178 reactor_closed_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
179 #[cfg(test)]
181 circid: CircId,
182 memquota: CircuitAccount,
184 time_provider: DynTimeProvider,
186}
187
188#[derive(Debug, Default)]
208pub(super) struct TunnelMutableState(Mutex<HashMap<UniqId, (LegId, Arc<MutableState>)>>);
209
210impl TunnelMutableState {
211 pub(super) fn insert(&self, unique_id: UniqId, leg: LegId, mutable: Arc<MutableState>) {
213 #[allow(unused)] let state = self
215 .0
216 .lock()
217 .expect("lock poisoned")
218 .insert(unique_id, (leg, mutable));
219
220 debug_assert!(state.is_none());
221 }
222
223 pub(super) fn remove(&self, unique_id: UniqId) {
225 #[allow(unused)] let state = self.0.lock().expect("lock poisoned").remove(&unique_id);
227
228 debug_assert!(state.is_some());
229 }
230
231 fn path_ref(&self, unique_id: UniqId) -> Result<Arc<Path>> {
235 let lock = self.0.lock().expect("lock poisoned");
236 let (_leg, mutable) = lock
237 .get(&unique_id)
238 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
239
240 Ok(mutable.path())
241 }
242
243 fn first_hop(&self, unique_id: UniqId) -> Result<Option<OwnedChanTarget>> {
248 let lock = self.0.lock().expect("lock poisoned");
249 let (_leg, mutable) = lock
250 .get(&unique_id)
251 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
252
253 let first_hop = mutable.first_hop().map(|first_hop| match first_hop {
254 path::HopDetail::Relay(r) => r,
255 #[cfg(feature = "hs-common")]
256 path::HopDetail::Virtual => {
257 panic!("somehow made a circuit with a virtual first hop.")
258 }
259 });
260
261 Ok(first_hop)
262 }
263
264 fn last_hop_num(&self, unique_id: UniqId) -> Result<Option<HopNum>> {
270 let lock = self.0.lock().expect("lock poisoned");
271 let (_leg, mutable) = lock
272 .get(&unique_id)
273 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
274
275 Ok(mutable.last_hop_num())
276 }
277
278 fn n_hops(&self, unique_id: UniqId) -> Result<usize> {
282 let lock = self.0.lock().expect("lock poisoned");
283 let (_leg, mutable) = lock
284 .get(&unique_id)
285 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
286
287 Ok(mutable.n_hops())
288 }
289
290 fn binding_key(&self, unique_id: UniqId, hop: HopNum) -> Result<Option<CircuitBinding>> {
293 let lock = self.0.lock().expect("lock poisoned");
294 let (_leg, mutable) = lock
295 .get(&unique_id)
296 .ok_or_else(|| bad_api_usage!("no circuit with unique ID {unique_id}"))?;
297
298 Ok(mutable.binding_key(hop))
299 }
300}
301
302#[derive(Educe, Default)]
304#[educe(Debug)]
305pub(super) struct MutableState(Mutex<CircuitState>);
306
307impl MutableState {
308 pub(super) fn add_hop(&self, peer_id: HopDetail, binding: Option<CircuitBinding>) {
310 let mut mutable = self.0.lock().expect("poisoned lock");
311 Arc::make_mut(&mut mutable.path).push_hop(peer_id);
312 mutable.binding.push(binding);
313 }
314
315 pub(super) fn path(&self) -> Arc<path::Path> {
317 let mutable = self.0.lock().expect("poisoned lock");
318 Arc::clone(&mutable.path)
319 }
320
321 pub(super) fn binding_key(&self, hop: HopNum) -> Option<CircuitBinding> {
324 let mutable = self.0.lock().expect("poisoned lock");
325
326 mutable.binding.get::<usize>(hop.into()).cloned().flatten()
327 }
330
331 fn first_hop(&self) -> Option<HopDetail> {
333 let mutable = self.0.lock().expect("poisoned lock");
334 mutable.path.first_hop()
335 }
336
337 fn last_hop_num(&self) -> Option<HopNum> {
344 let mutable = self.0.lock().expect("poisoned lock");
345 mutable.path.last_hop_num()
346 }
347
348 fn n_hops(&self) -> usize {
355 let mutable = self.0.lock().expect("poisoned lock");
356 mutable.path.n_hops()
357 }
358}
359
360#[derive(Educe, Default)]
362#[educe(Debug)]
363pub(super) struct CircuitState {
364 path: Arc<path::Path>,
370
371 #[educe(Debug(ignore))]
379 binding: Vec<Option<CircuitBinding>>,
380}
381
382pub struct PendingClientCirc {
387 recvcreated: oneshot::Receiver<CreateResponse>,
390 circ: Arc<ClientCirc>,
392}
393
394#[non_exhaustive]
396#[derive(Clone, Debug)]
397pub struct CircParameters {
398 pub extend_by_ed25519_id: bool,
401 pub ccontrol: CongestionControlParams,
403}
404
405#[cfg(test)]
406impl std::default::Default for CircParameters {
407 fn default() -> Self {
408 Self {
409 extend_by_ed25519_id: true,
410 ccontrol: crate::congestion::test_utils::params::build_cc_fixed_params(),
411 }
412 }
413}
414
415impl CircParameters {
416 pub fn new(extend_by_ed25519_id: bool, ccontrol: CongestionControlParams) -> Self {
418 Self {
419 extend_by_ed25519_id,
420 ccontrol,
421 }
422 }
423}
424
425impl ClientCirc {
426 pub fn first_hop(&self) -> Result<OwnedChanTarget> {
434 Ok(self
435 .mutable
436 .first_hop(self.unique_id)
437 .map_err(|_| Error::CircuitClosed)?
438 .expect("called first_hop on an un-constructed circuit"))
439 }
440
441 pub fn last_hop_num(&self) -> Result<HopNum> {
451 Ok(self
452 .mutable
453 .last_hop_num(self.unique_id)?
454 .ok_or_else(|| internal!("no last hop index"))?)
455 }
456
457 pub fn path_ref(&self) -> Result<Arc<Path>> {
462 self.mutable
463 .path_ref(self.unique_id)
464 .map_err(|_| Error::CircuitClosed)
465 }
466
467 pub async fn first_hop_clock_skew(&self) -> Result<ClockSkew> {
471 let (tx, rx) = oneshot::channel();
472
473 self.control
474 .unbounded_send(CtrlMsg::FirstHopClockSkew { answer: tx })
475 .map_err(|_| Error::CircuitClosed)?;
476
477 Ok(rx.await.map_err(|_| Error::CircuitClosed)??)
478 }
479
480 pub fn mq_account(&self) -> &CircuitAccount {
482 &self.memquota
483 }
484
485 pub fn binding_key(&self, hop: HopNum) -> Result<Option<CircuitBinding>> {
493 self.mutable
494 .binding_key(self.unique_id, hop)
495 .map_err(|_| Error::CircuitClosed)
496 }
497
498 #[cfg(feature = "send-control-msg")]
576 pub async fn start_conversation(
577 &self,
578 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
579 reply_handler: impl MsgHandler + Send + 'static,
580 hop_num: HopNum,
581 ) -> Result<Conversation<'_>> {
582 let handler = Box::new(UserMsgHandler::new(hop_num, reply_handler));
583 let conversation = Conversation(self);
584 conversation.send_internal(msg, Some(handler)).await?;
585 Ok(conversation)
586 }
587
588 #[cfg(feature = "send-control-msg")]
594 pub async fn send_raw_msg(
595 &self,
596 msg: tor_cell::relaycell::msg::AnyRelayMsg,
597 hop_num: HopNum,
598 ) -> Result<()> {
599 let (sender, receiver) = oneshot::channel();
600 let ctrl_msg = CtrlMsg::SendMsg {
601 hop_num,
602 msg,
603 sender,
604 };
605 self.control
606 .unbounded_send(ctrl_msg)
607 .map_err(|_| Error::CircuitClosed)?;
608
609 receiver.await.map_err(|_| Error::CircuitClosed)?
610 }
611
612 #[cfg(feature = "hs-service")]
641 pub async fn allow_stream_requests(
642 self: &Arc<ClientCirc>,
643 allow_commands: &[tor_cell::relaycell::RelayCmd],
644 hop_num: HopNum,
645 filter: impl crate::stream::IncomingStreamRequestFilter,
646 ) -> Result<impl futures::Stream<Item = IncomingStream>> {
647 use futures::stream::StreamExt;
648
649 use crate::tunnel::HopLocation;
650
651 const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
653
654 let time_prov = self.time_provider.clone();
655 let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
656 let (incoming_sender, incoming_receiver) =
657 MpscSpec::new(INCOMING_BUFFER).new_mq(time_prov, self.memquota.as_raw_account())?;
658 let (tx, rx) = oneshot::channel();
659
660 self.command
661 .unbounded_send(CtrlCmd::AwaitStreamRequest {
662 cmd_checker,
663 incoming_sender,
664 hop_num,
665 done: tx,
666 filter: Box::new(filter),
667 })
668 .map_err(|_| Error::CircuitClosed)?;
669
670 rx.await.map_err(|_| Error::CircuitClosed)??;
672
673 let allowed_hop_num = hop_num;
677
678 let circ = Arc::clone(self);
679 Ok(incoming_receiver.map(move |req_ctx| {
680 let StreamReqInfo {
681 req,
682 stream_id,
683 hop_num,
684 leg,
685 receiver,
686 msg_tx,
687 memquota,
688 relay_cell_format,
689 } = req_ctx;
690
691 assert_eq!(allowed_hop_num, hop_num);
696
697 let target = StreamTarget {
704 circ: Arc::clone(&circ),
705 tx: msg_tx,
706 hop: HopLocation::Hop((leg, hop_num)),
707 stream_id,
708 relay_cell_format,
709 };
710
711 let reader = StreamReader {
712 target: target.clone(),
713 receiver,
714 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
715 ended: false,
716 };
717
718 IncomingStream::new(req, target, reader, memquota)
719 }))
720 }
721
722 pub async fn extend<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
725 where
726 Tg: CircTarget,
727 {
728 if target
740 .protovers()
741 .supports_named_subver(named::RELAY_NTORV3)
742 {
743 self.extend_ntor_v3(target, params).await
744 } else {
745 self.extend_ntor(target, params).await
746 }
747 }
748
749 pub async fn extend_ntor<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
752 where
753 Tg: CircTarget,
754 {
755 let key = NtorPublicKey {
756 id: *target
757 .rsa_identity()
758 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
759 pk: *target.ntor_onion_key(),
760 };
761 let mut linkspecs = target
762 .linkspecs()
763 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
764 if !params.extend_by_ed25519_id {
765 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
766 }
767
768 let (tx, rx) = oneshot::channel();
769
770 let peer_id = OwnedChanTarget::from_chan_target(target);
771 self.control
772 .unbounded_send(CtrlMsg::ExtendNtor {
773 peer_id,
774 public_key: key,
775 linkspecs,
776 params,
777 done: tx,
778 })
779 .map_err(|_| Error::CircuitClosed)?;
780
781 rx.await.map_err(|_| Error::CircuitClosed)??;
782
783 Ok(())
784 }
785
786 pub async fn extend_ntor_v3<Tg>(&self, target: &Tg, params: CircParameters) -> Result<()>
789 where
790 Tg: CircTarget,
791 {
792 let key = NtorV3PublicKey {
793 id: *target
794 .ed_identity()
795 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
796 pk: *target.ntor_onion_key(),
797 };
798 let mut linkspecs = target
799 .linkspecs()
800 .map_err(into_internal!("Could not encode linkspecs for extend_ntor"))?;
801 if !params.extend_by_ed25519_id {
802 linkspecs.retain(|ls| ls.lstype() != LinkSpecType::ED25519ID);
803 }
804
805 let (tx, rx) = oneshot::channel();
806
807 let peer_id = OwnedChanTarget::from_chan_target(target);
808 self.control
809 .unbounded_send(CtrlMsg::ExtendNtorV3 {
810 peer_id,
811 public_key: key,
812 linkspecs,
813 params,
814 done: tx,
815 })
816 .map_err(|_| Error::CircuitClosed)?;
817
818 rx.await.map_err(|_| Error::CircuitClosed)??;
819
820 Ok(())
821 }
822
823 #[cfg(feature = "hs-common")]
849 pub async fn extend_virtual(
850 &self,
851 protocol: handshake::RelayProtocol,
852 role: handshake::HandshakeRole,
853 seed: impl handshake::KeyGenerator,
854 params: CircParameters,
855 ) -> Result<()> {
856 use self::handshake::BoxedClientLayer;
857
858 let protocol = handshake::RelayCryptLayerProtocol::from(protocol);
859 let relay_cell_format = protocol.relay_cell_format();
860
861 let BoxedClientLayer { fwd, back, binding } =
862 protocol.construct_client_layers(role, seed)?;
863
864 let (tx, rx) = oneshot::channel();
865 let message = CtrlCmd::ExtendVirtual {
866 relay_cell_format,
867 cell_crypto: (fwd, back, binding),
868 params,
869 done: tx,
870 };
871
872 self.command
873 .unbounded_send(message)
874 .map_err(|_| Error::CircuitClosed)?;
875
876 rx.await.map_err(|_| Error::CircuitClosed)?
877 }
878
879 async fn begin_stream_impl(
887 self: &Arc<ClientCirc>,
888 begin_msg: AnyRelayMsg,
889 cmd_checker: AnyCmdChecker,
890 ) -> Result<(StreamReader, StreamTarget, StreamAccount)> {
891 let hop = TargetHop::LastHop;
894
895 let time_prov = self.time_provider.clone();
896
897 let memquota = StreamAccount::new(self.mq_account())?;
898 let (sender, receiver) = MpscSpec::new(STREAM_READER_BUFFER)
899 .new_mq(time_prov.clone(), memquota.as_raw_account())?;
900 let (tx, rx) = oneshot::channel();
901 let (msg_tx, msg_rx) =
902 MpscSpec::new(CIRCUIT_BUFFER_SIZE).new_mq(time_prov, memquota.as_raw_account())?;
903
904 self.control
905 .unbounded_send(CtrlMsg::BeginStream {
906 hop,
907 message: begin_msg,
908 sender,
909 rx: msg_rx,
910 done: tx,
911 cmd_checker,
912 })
913 .map_err(|_| Error::CircuitClosed)?;
914
915 let (stream_id, hop, relay_cell_format) = rx.await.map_err(|_| Error::CircuitClosed)??;
916
917 let target = StreamTarget {
918 circ: self.clone(),
919 tx: msg_tx,
920 hop,
921 stream_id,
922 relay_cell_format,
923 };
924
925 let reader = StreamReader {
926 target: target.clone(),
927 receiver,
928 recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
929 ended: false,
930 };
931
932 Ok((reader, target, memquota))
933 }
934
935 async fn begin_data_stream(
938 self: &Arc<ClientCirc>,
939 msg: AnyRelayMsg,
940 optimistic: bool,
941 ) -> Result<DataStream> {
942 let (reader, target, memquota) = self
943 .begin_stream_impl(msg, DataCmdChecker::new_any())
944 .await?;
945 let mut stream = DataStream::new(reader, target, memquota);
946 if !optimistic {
947 stream.wait_for_connection().await?;
948 }
949 Ok(stream)
950 }
951
952 pub async fn begin_stream(
958 self: &Arc<ClientCirc>,
959 target: &str,
960 port: u16,
961 parameters: Option<StreamParameters>,
962 ) -> Result<DataStream> {
963 let parameters = parameters.unwrap_or_default();
964 let begin_flags = parameters.begin_flags();
965 let optimistic = parameters.is_optimistic();
966 let target = if parameters.suppressing_hostname() {
967 ""
968 } else {
969 target
970 };
971 let beginmsg = Begin::new(target, port, begin_flags)
972 .map_err(|e| Error::from_cell_enc(e, "begin message"))?;
973 self.begin_data_stream(beginmsg.into(), optimistic).await
974 }
975
976 pub async fn begin_dir_stream(self: Arc<ClientCirc>) -> Result<DataStream> {
979 self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
984 .await
985 }
986
987 pub async fn resolve(self: &Arc<ClientCirc>, hostname: &str) -> Result<Vec<IpAddr>> {
993 let resolve_msg = Resolve::new(hostname);
994
995 let resolved_msg = self.try_resolve(resolve_msg).await?;
996
997 resolved_msg
998 .into_answers()
999 .into_iter()
1000 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1001 Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
1002 Ok(_) => None,
1003 Err(e) => Some(Err(e)),
1004 })
1005 .collect()
1006 }
1007
1008 pub async fn resolve_ptr(self: &Arc<ClientCirc>, addr: IpAddr) -> Result<Vec<String>> {
1014 let resolve_ptr_msg = Resolve::new_reverse(&addr);
1015
1016 let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
1017
1018 resolved_msg
1019 .into_answers()
1020 .into_iter()
1021 .filter_map(|(val, _)| match resolvedval_to_result(val) {
1022 Ok(ResolvedVal::Hostname(v)) => Some(
1023 String::from_utf8(v)
1024 .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
1025 ),
1026 Ok(_) => None,
1027 Err(e) => Some(Err(e)),
1028 })
1029 .collect()
1030 }
1031
1032 async fn try_resolve(self: &Arc<ClientCirc>, msg: Resolve) -> Result<Resolved> {
1035 let (reader, _target, memquota) = self
1036 .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any())
1037 .await?;
1038 let mut resolve_stream = ResolveStream::new(reader, memquota);
1039 resolve_stream.read_msg().await
1040 }
1041
1042 pub fn terminate(&self) {
1053 let _ = self.command.unbounded_send(CtrlCmd::Shutdown);
1054 }
1055
1056 pub(crate) fn protocol_error(&self) {
1064 self.terminate();
1065 }
1066
1067 pub fn is_closing(&self) -> bool {
1069 self.control.is_closed()
1070 }
1071
1072 pub fn unique_id(&self) -> UniqId {
1074 self.unique_id
1075 }
1076
1077 pub fn n_hops(&self) -> Result<usize> {
1084 self.mutable
1085 .n_hops(self.unique_id)
1086 .map_err(|_| Error::CircuitClosed)
1087 }
1088
1089 #[cfg(feature = "experimental-api")]
1096 pub fn wait_for_close(&self) -> impl futures::Future<Output = ()> + Send + Sync + 'static {
1097 self.reactor_closed_rx.clone().map(|_| ())
1098 }
1099}
1100
1101#[cfg(feature = "send-control-msg")]
1109#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1110pub struct Conversation<'r>(&'r ClientCirc);
1111
1112#[cfg(feature = "send-control-msg")]
1113#[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))]
1114impl Conversation<'_> {
1115 pub async fn send_message(&self, msg: tor_cell::relaycell::msg::AnyRelayMsg) -> Result<()> {
1120 self.send_internal(Some(msg), None).await
1121 }
1122
1123 pub(crate) async fn send_internal(
1127 &self,
1128 msg: Option<tor_cell::relaycell::msg::AnyRelayMsg>,
1129 handler: Option<Box<dyn MetaCellHandler + Send + 'static>>,
1130 ) -> Result<()> {
1131 let msg = msg.map(|msg| tor_cell::relaycell::AnyRelayMsgOuter::new(None, msg));
1132 let (sender, receiver) = oneshot::channel();
1133
1134 let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler {
1135 msg,
1136 handler,
1137 sender,
1138 };
1139 self.0
1140 .control
1141 .unbounded_send(ctrl_msg)
1142 .map_err(|_| Error::CircuitClosed)?;
1143
1144 receiver.await.map_err(|_| Error::CircuitClosed)?
1145 }
1146}
1147
1148impl PendingClientCirc {
1149 pub(crate) fn new(
1155 id: CircId,
1156 channel: Arc<Channel>,
1157 createdreceiver: oneshot::Receiver<CreateResponse>,
1158 input: CircuitRxReceiver,
1159 unique_id: UniqId,
1160 runtime: DynTimeProvider,
1161 memquota: CircuitAccount,
1162 ) -> (PendingClientCirc, crate::tunnel::reactor::Reactor) {
1163 let time_provider = channel.time_provider().clone();
1164 let (reactor, control_tx, command_tx, reactor_closed_rx, mutable) =
1165 Reactor::new(channel, id, unique_id, input, runtime, memquota.clone());
1166
1167 let circuit = ClientCirc {
1168 mutable,
1169 unique_id,
1170 control: control_tx,
1171 command: command_tx,
1172 reactor_closed_rx: reactor_closed_rx.shared(),
1173 #[cfg(test)]
1174 circid: id,
1175 memquota,
1176 time_provider,
1177 };
1178
1179 let pending = PendingClientCirc {
1180 recvcreated: createdreceiver,
1181 circ: Arc::new(circuit),
1182 };
1183 (pending, reactor)
1184 }
1185
1186 pub fn peek_unique_id(&self) -> UniqId {
1188 self.circ.unique_id
1189 }
1190
1191 pub async fn create_firsthop_fast(self, params: CircParameters) -> Result<Arc<ClientCirc>> {
1198 let (tx, rx) = oneshot::channel();
1199 self.circ
1200 .control
1201 .unbounded_send(CtrlMsg::Create {
1202 recv_created: self.recvcreated,
1203 handshake: CircuitHandshake::CreateFast,
1204 params,
1205 done: tx,
1206 })
1207 .map_err(|_| Error::CircuitClosed)?;
1208
1209 rx.await.map_err(|_| Error::CircuitClosed)??;
1210
1211 Ok(self.circ)
1212 }
1213
1214 pub async fn create_firsthop<Tg>(
1219 self,
1220 target: &Tg,
1221 params: CircParameters,
1222 ) -> Result<Arc<ClientCirc>>
1223 where
1224 Tg: tor_linkspec::CircTarget,
1225 {
1226 if target
1228 .protovers()
1229 .supports_named_subver(named::RELAY_NTORV3)
1230 {
1231 self.create_firsthop_ntor_v3(target, params).await
1232 } else {
1233 self.create_firsthop_ntor(target, params).await
1234 }
1235 }
1236
1237 pub async fn create_firsthop_ntor<Tg>(
1242 self,
1243 target: &Tg,
1244 params: CircParameters,
1245 ) -> Result<Arc<ClientCirc>>
1246 where
1247 Tg: tor_linkspec::CircTarget,
1248 {
1249 let (tx, rx) = oneshot::channel();
1250
1251 self.circ
1252 .control
1253 .unbounded_send(CtrlMsg::Create {
1254 recv_created: self.recvcreated,
1255 handshake: CircuitHandshake::Ntor {
1256 public_key: NtorPublicKey {
1257 id: *target
1258 .rsa_identity()
1259 .ok_or(Error::MissingId(RelayIdType::Rsa))?,
1260 pk: *target.ntor_onion_key(),
1261 },
1262 ed_identity: *target
1263 .ed_identity()
1264 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1265 },
1266 params,
1267 done: tx,
1268 })
1269 .map_err(|_| Error::CircuitClosed)?;
1270
1271 rx.await.map_err(|_| Error::CircuitClosed)??;
1272
1273 Ok(self.circ)
1274 }
1275
1276 pub async fn create_firsthop_ntor_v3<Tg>(
1285 self,
1286 target: &Tg,
1287 params: CircParameters,
1288 ) -> Result<Arc<ClientCirc>>
1289 where
1290 Tg: tor_linkspec::CircTarget,
1291 {
1292 let (tx, rx) = oneshot::channel();
1293
1294 self.circ
1295 .control
1296 .unbounded_send(CtrlMsg::Create {
1297 recv_created: self.recvcreated,
1298 handshake: CircuitHandshake::NtorV3 {
1299 public_key: NtorV3PublicKey {
1300 id: *target
1301 .ed_identity()
1302 .ok_or(Error::MissingId(RelayIdType::Ed25519))?,
1303 pk: *target.ntor_onion_key(),
1304 },
1305 },
1306 params,
1307 done: tx,
1308 })
1309 .map_err(|_| Error::CircuitClosed)?;
1310
1311 rx.await.map_err(|_| Error::CircuitClosed)??;
1312
1313 Ok(self.circ)
1314 }
1315}
1316
1317fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
1320 match val {
1321 ResolvedVal::TransientError => Err(Error::ResolveError(ResolveError::Transient)),
1322 ResolvedVal::NontransientError => Err(Error::ResolveError(ResolveError::Nontransient)),
1323 ResolvedVal::Unrecognized(_, _) => Err(Error::ResolveError(ResolveError::Unrecognized)),
1324 _ => Ok(val),
1325 }
1326}
1327
1328#[cfg(test)]
1329pub(crate) mod test {
1330 #![allow(clippy::bool_assert_comparison)]
1332 #![allow(clippy::clone_on_copy)]
1333 #![allow(clippy::dbg_macro)]
1334 #![allow(clippy::mixed_attributes_style)]
1335 #![allow(clippy::print_stderr)]
1336 #![allow(clippy::print_stdout)]
1337 #![allow(clippy::single_char_pattern)]
1338 #![allow(clippy::unwrap_used)]
1339 #![allow(clippy::unchecked_duration_subtraction)]
1340 #![allow(clippy::useless_vec)]
1341 #![allow(clippy::needless_pass_by_value)]
1342 use super::*;
1345 use crate::channel::OpenChanCellS2C;
1346 use crate::channel::{test::new_reactor, CodecError};
1347 use crate::congestion::test_utils::params::build_cc_vegas_params;
1348 use crate::crypto::cell::RelayCellBody;
1349 use crate::crypto::handshake::ntor_v3::NtorV3Server;
1350 #[cfg(feature = "hs-service")]
1351 use crate::stream::IncomingStreamRequestFilter;
1352 use chanmsg::{AnyChanMsg, Created2, CreatedFast};
1353 use futures::channel::mpsc::{Receiver, Sender};
1354 use futures::io::{AsyncReadExt, AsyncWriteExt};
1355 use futures::sink::SinkExt;
1356 use futures::stream::StreamExt;
1357 use futures::task::SpawnExt;
1358 use hex_literal::hex;
1359 use std::collections::{HashMap, VecDeque};
1360 use std::fmt::Debug;
1361 use std::time::Duration;
1362 use tor_basic_utils::test_rng::testing_rng;
1363 use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody, ChanCmd};
1364 use tor_cell::relaycell::extend::{self as extend_ext, CircRequestExt, CircResponseExt};
1365 use tor_cell::relaycell::msg::SendmeTag;
1366 use tor_cell::relaycell::{
1367 msg as relaymsg, AnyRelayMsgOuter, RelayCellFormat, RelayCmd, RelayMsg as _, StreamId,
1368 };
1369 use tor_linkspec::OwnedCircTarget;
1370 use tor_memquota::HasMemoryCost;
1371 use tor_rtcompat::Runtime;
1372 use tracing::trace;
1373 use tracing_test::traced_test;
1374
1375 impl PendingClientCirc {
1376 pub(crate) fn peek_circid(&self) -> CircId {
1378 self.circ.circid
1379 }
1380 }
1381
1382 impl ClientCirc {
1383 pub(crate) fn peek_circid(&self) -> CircId {
1385 self.circid
1386 }
1387 }
1388
1389 fn rmsg_to_ccmsg(id: Option<StreamId>, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg {
1390 let rfmt = RelayCellFormat::V0;
1392 let body: BoxedCellBody = AnyRelayMsgOuter::new(id, msg)
1393 .encode(rfmt, &mut testing_rng())
1394 .unwrap();
1395 let chanmsg = chanmsg::Relay::from(body);
1396 ClientCircChanMsg::Relay(chanmsg)
1397 }
1398
1399 const EXAMPLE_SK: [u8; 32] =
1401 hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803");
1402 const EXAMPLE_PK: [u8; 32] =
1403 hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253");
1404 const EXAMPLE_ED_ID: [u8; 32] = [6; 32];
1405 const EXAMPLE_RSA_ID: [u8; 20] = [10; 20];
1406
1407 #[cfg(test)]
1409 pub(crate) fn fake_mpsc<T: HasMemoryCost + Debug + Send>(
1410 buffer: usize,
1411 ) -> (StreamMpscSender<T>, StreamMpscReceiver<T>) {
1412 crate::fake_mpsc(buffer)
1413 }
1414
1415 fn example_target() -> OwnedCircTarget {
1417 let mut builder = OwnedCircTarget::builder();
1418 builder
1419 .chan_target()
1420 .ed_identity(EXAMPLE_ED_ID.into())
1421 .rsa_identity(EXAMPLE_RSA_ID.into());
1422 builder
1423 .ntor_onion_key(EXAMPLE_PK.into())
1424 .protocols("FlowCtrl=1".parse().unwrap())
1425 .build()
1426 .unwrap()
1427 }
1428 fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
1429 crate::crypto::handshake::ntor::NtorSecretKey::new(
1430 EXAMPLE_SK.into(),
1431 EXAMPLE_PK.into(),
1432 EXAMPLE_RSA_ID.into(),
1433 )
1434 }
1435 fn example_ntor_v3_key() -> crate::crypto::handshake::ntor_v3::NtorV3SecretKey {
1436 crate::crypto::handshake::ntor_v3::NtorV3SecretKey::new(
1437 EXAMPLE_SK.into(),
1438 EXAMPLE_PK.into(),
1439 EXAMPLE_ED_ID.into(),
1440 )
1441 }
1442
1443 fn working_fake_channel<R: Runtime>(
1444 rt: &R,
1445 ) -> (
1446 Arc<Channel>,
1447 Receiver<AnyChanCell>,
1448 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
1449 ) {
1450 let (channel, chan_reactor, rx, tx) = new_reactor(rt.clone());
1451 rt.spawn(async {
1452 let _ignore = chan_reactor.run().await;
1453 })
1454 .unwrap();
1455 (channel, rx, tx)
1456 }
1457
1458 #[derive(Copy, Clone)]
1460 enum HandshakeType {
1461 Fast,
1462 Ntor,
1463 NtorV3,
1464 }
1465
1466 async fn test_create<R: Runtime>(rt: &R, handshake_type: HandshakeType, with_cc: bool) {
1467 use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
1471
1472 let (chan, mut rx, _sink) = working_fake_channel(rt);
1473 let circid = CircId::new(128).unwrap();
1474 let (created_send, created_recv) = oneshot::channel();
1475 let (_circmsg_send, circmsg_recv) = fake_mpsc(64);
1476 let unique_id = UniqId::new(23, 17);
1477
1478 let (pending, reactor) = PendingClientCirc::new(
1479 circid,
1480 chan,
1481 created_recv,
1482 circmsg_recv,
1483 unique_id,
1484 DynTimeProvider::new(rt.clone()),
1485 CircuitAccount::new_noop(),
1486 );
1487
1488 rt.spawn(async {
1489 let _ignore = reactor.run().await;
1490 })
1491 .unwrap();
1492
1493 let simulate_relay_fut = async move {
1495 let mut rng = testing_rng();
1496 let create_cell = rx.next().await.unwrap();
1497 assert_eq!(create_cell.circid(), Some(circid));
1498 let reply = match handshake_type {
1499 HandshakeType::Fast => {
1500 let cf = match create_cell.msg() {
1501 AnyChanMsg::CreateFast(cf) => cf,
1502 other => panic!("{:?}", other),
1503 };
1504 let (_, rep) = CreateFastServer::server(
1505 &mut rng,
1506 &mut |_: &()| Some(()),
1507 &[()],
1508 cf.handshake(),
1509 )
1510 .unwrap();
1511 CreateResponse::CreatedFast(CreatedFast::new(rep))
1512 }
1513 HandshakeType::Ntor => {
1514 let c2 = match create_cell.msg() {
1515 AnyChanMsg::Create2(c2) => c2,
1516 other => panic!("{:?}", other),
1517 };
1518 let (_, rep) = NtorServer::server(
1519 &mut rng,
1520 &mut |_: &()| Some(()),
1521 &[example_ntor_key()],
1522 c2.body(),
1523 )
1524 .unwrap();
1525 CreateResponse::Created2(Created2::new(rep))
1526 }
1527 HandshakeType::NtorV3 => {
1528 let c2 = match create_cell.msg() {
1529 AnyChanMsg::Create2(c2) => c2,
1530 other => panic!("{:?}", other),
1531 };
1532 let mut reply_fn = if with_cc {
1533 |client_exts: &[CircRequestExt]| {
1534 let _ = client_exts
1535 .iter()
1536 .find(|e| matches!(e, CircRequestExt::CcRequest(_)))
1537 .expect("Client failed to request CC");
1538 Some(vec![CircResponseExt::CcResponse(
1541 extend_ext::CcResponse::new(31),
1542 )])
1543 }
1544 } else {
1545 |_: &_| Some(vec![])
1546 };
1547 let (_, rep) = NtorV3Server::server(
1548 &mut rng,
1549 &mut reply_fn,
1550 &[example_ntor_v3_key()],
1551 c2.body(),
1552 )
1553 .unwrap();
1554 CreateResponse::Created2(Created2::new(rep))
1555 }
1556 };
1557 created_send.send(reply).unwrap();
1558 };
1559 let client_fut = async move {
1561 let target = example_target();
1562 let params = CircParameters::default();
1563 let ret = match handshake_type {
1564 HandshakeType::Fast => {
1565 trace!("doing fast create");
1566 pending.create_firsthop_fast(params).await
1567 }
1568 HandshakeType::Ntor => {
1569 trace!("doing ntor create");
1570 pending.create_firsthop_ntor(&target, params).await
1571 }
1572 HandshakeType::NtorV3 => {
1573 let params = if with_cc {
1574 CircParameters::new(true, build_cc_vegas_params())
1576 } else {
1577 params
1578 };
1579 trace!("doing ntor_v3 create");
1580 pending.create_firsthop_ntor_v3(&target, params).await
1581 }
1582 };
1583 trace!("create done: result {:?}", ret);
1584 ret
1585 };
1586
1587 let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
1588
1589 let _circ = circ.unwrap();
1590
1591 assert_eq!(_circ.n_hops().unwrap(), 1);
1593 }
1594
1595 #[traced_test]
1596 #[test]
1597 fn test_create_fast() {
1598 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1599 test_create(&rt, HandshakeType::Fast, false).await;
1600 });
1601 }
1602 #[traced_test]
1603 #[test]
1604 fn test_create_ntor() {
1605 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1606 test_create(&rt, HandshakeType::Ntor, false).await;
1607 });
1608 }
1609 #[traced_test]
1610 #[test]
1611 fn test_create_ntor_v3() {
1612 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1613 test_create(&rt, HandshakeType::NtorV3, false).await;
1614 });
1615 }
1616 #[traced_test]
1617 #[test]
1618 #[cfg(feature = "flowctl-cc")]
1619 fn test_create_ntor_v3_with_cc() {
1620 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1621 test_create(&rt, HandshakeType::NtorV3, true).await;
1622 });
1623 }
1624
1625 pub(crate) struct DummyCrypto {
1628 counter_tag: [u8; 20],
1629 counter: u32,
1630 lasthop: bool,
1631 }
1632 impl DummyCrypto {
1633 fn next_tag(&mut self) -> SendmeTag {
1634 #![allow(clippy::identity_op)]
1635 self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
1636 self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
1637 self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
1638 self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
1639 self.counter += 1;
1640 self.counter_tag.into()
1641 }
1642 }
1643
1644 impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
1645 fn originate_for(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) -> SendmeTag {
1646 self.next_tag()
1647 }
1648 fn encrypt_outbound(&mut self, _cmd: ChanCmd, _cell: &mut RelayCellBody) {}
1649 }
1650 impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
1651 fn decrypt_inbound(
1652 &mut self,
1653 _cmd: ChanCmd,
1654 _cell: &mut RelayCellBody,
1655 ) -> Option<SendmeTag> {
1656 if self.lasthop {
1657 Some(self.next_tag())
1658 } else {
1659 None
1660 }
1661 }
1662 }
1663 impl DummyCrypto {
1664 pub(crate) fn new(lasthop: bool) -> Self {
1665 DummyCrypto {
1666 counter_tag: [0; 20],
1667 counter: 0,
1668 lasthop,
1669 }
1670 }
1671 }
1672
1673 async fn newcirc_ext<R: Runtime>(
1676 rt: &R,
1677 chan: Arc<Channel>,
1678 next_msg_from: HopNum,
1679 ) -> (Arc<ClientCirc>, CircuitRxSender) {
1680 let circid = CircId::new(128).unwrap();
1681 let (_created_send, created_recv) = oneshot::channel();
1682 let (circmsg_send, circmsg_recv) = fake_mpsc(64);
1683 let unique_id = UniqId::new(23, 17);
1684
1685 let (pending, reactor) = PendingClientCirc::new(
1686 circid,
1687 chan,
1688 created_recv,
1689 circmsg_recv,
1690 unique_id,
1691 DynTimeProvider::new(rt.clone()),
1692 CircuitAccount::new_noop(),
1693 );
1694
1695 rt.spawn(async {
1696 let _ignore = reactor.run().await;
1697 })
1698 .unwrap();
1699
1700 let PendingClientCirc {
1701 circ,
1702 recvcreated: _,
1703 } = pending;
1704
1705 let relay_cell_format = RelayCellFormat::V0;
1707 for idx in 0_u8..3 {
1708 let params = CircParameters::default();
1709 let (tx, rx) = oneshot::channel();
1710 circ.command
1711 .unbounded_send(CtrlCmd::AddFakeHop {
1712 relay_cell_format,
1713 fwd_lasthop: idx == 2,
1714 rev_lasthop: idx == u8::from(next_msg_from),
1715 params,
1716 done: tx,
1717 })
1718 .unwrap();
1719 rx.await.unwrap().unwrap();
1720 }
1721
1722 (circ, circmsg_send)
1723 }
1724
1725 async fn newcirc<R: Runtime>(rt: &R, chan: Arc<Channel>) -> (Arc<ClientCirc>, CircuitRxSender) {
1728 newcirc_ext(rt, chan, 2.into()).await
1729 }
1730
1731 async fn test_extend<R: Runtime>(rt: &R, handshake_type: HandshakeType) {
1732 use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
1733
1734 let (chan, mut rx, _sink) = working_fake_channel(rt);
1735 let (circ, mut sink) = newcirc(rt, chan).await;
1736 let circid = circ.peek_circid();
1737 let params = CircParameters::default();
1738
1739 let extend_fut = async move {
1740 let target = example_target();
1741 match handshake_type {
1742 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1743 HandshakeType::Ntor => circ.extend_ntor(&target, params).await.unwrap(),
1744 HandshakeType::NtorV3 => circ.extend_ntor_v3(&target, params).await.unwrap(),
1745 };
1746 circ };
1748 let reply_fut = async move {
1749 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1752 assert_eq!(id, Some(circid));
1753 let rmsg = match chmsg {
1754 AnyChanMsg::RelayEarly(r) => {
1755 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1756 .unwrap()
1757 }
1758 other => panic!("{:?}", other),
1759 };
1760 let e2 = match rmsg.msg() {
1761 AnyRelayMsg::Extend2(e2) => e2,
1762 other => panic!("{:?}", other),
1763 };
1764 let mut rng = testing_rng();
1765 let reply = match handshake_type {
1766 HandshakeType::Fast => panic!("Can't extend with Fast handshake"),
1767 HandshakeType::Ntor => {
1768 let (_keygen, reply) = NtorServer::server(
1769 &mut rng,
1770 &mut |_: &()| Some(()),
1771 &[example_ntor_key()],
1772 e2.handshake(),
1773 )
1774 .unwrap();
1775 reply
1776 }
1777 HandshakeType::NtorV3 => {
1778 let (_keygen, reply) = NtorV3Server::server(
1779 &mut rng,
1780 &mut |_: &[CircRequestExt]| Some(vec![]),
1781 &[example_ntor_v3_key()],
1782 e2.handshake(),
1783 )
1784 .unwrap();
1785 reply
1786 }
1787 };
1788
1789 let extended2 = relaymsg::Extended2::new(reply).into();
1790 sink.send(rmsg_to_ccmsg(None, extended2)).await.unwrap();
1791 (sink, rx) };
1793
1794 let (circ, (_sink, _rx)) = futures::join!(extend_fut, reply_fut);
1795
1796 assert_eq!(circ.n_hops().unwrap(), 4);
1798
1799 {
1801 let path = circ
1802 .path_ref()
1803 .unwrap()
1804 .all_hops()
1805 .into_iter()
1806 .filter_map(|hop| match hop {
1807 path::HopDetail::Relay(r) => Some(r),
1808 #[cfg(feature = "hs-common")]
1809 path::HopDetail::Virtual => None,
1810 })
1811 .collect::<Vec<_>>();
1812
1813 assert_eq!(path.len(), 4);
1814 use tor_linkspec::HasRelayIds;
1815 assert_eq!(path[3].ed_identity(), example_target().ed_identity());
1816 assert_ne!(path[0].ed_identity(), example_target().ed_identity());
1817 }
1818 {
1819 let path = circ.path_ref().unwrap();
1820 assert_eq!(path.n_hops(), 4);
1821 use tor_linkspec::HasRelayIds;
1822 assert_eq!(
1823 path.hops()[3].as_chan_target().unwrap().ed_identity(),
1824 example_target().ed_identity()
1825 );
1826 assert_ne!(
1827 path.hops()[0].as_chan_target().unwrap().ed_identity(),
1828 example_target().ed_identity()
1829 );
1830 }
1831 }
1832
1833 #[traced_test]
1834 #[test]
1835 fn test_extend_ntor() {
1836 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1837 test_extend(&rt, HandshakeType::Ntor).await;
1838 });
1839 }
1840
1841 #[traced_test]
1842 #[test]
1843 fn test_extend_ntor_v3() {
1844 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1845 test_extend(&rt, HandshakeType::NtorV3).await;
1846 });
1847 }
1848
1849 async fn bad_extend_test_impl<R: Runtime>(
1850 rt: &R,
1851 reply_hop: HopNum,
1852 bad_reply: ClientCircChanMsg,
1853 ) -> Error {
1854 let (chan, _rx, _sink) = working_fake_channel(rt);
1855 let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
1856 let params = CircParameters::default();
1857
1858 let target = example_target();
1859 #[allow(clippy::clone_on_copy)]
1860 let rtc = rt.clone();
1861 let sink_handle = rt
1862 .spawn_with_handle(async move {
1863 rtc.sleep(Duration::from_millis(100)).await;
1864 sink.send(bad_reply).await.unwrap();
1865 sink
1866 })
1867 .unwrap();
1868 let outcome = circ.extend_ntor(&target, params).await;
1869 let _sink = sink_handle.await;
1870
1871 assert_eq!(circ.n_hops().unwrap(), 3);
1872 assert!(outcome.is_err());
1873 outcome.unwrap_err()
1874 }
1875
1876 #[traced_test]
1877 #[test]
1878 fn bad_extend_wronghop() {
1879 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1880 let extended2 = relaymsg::Extended2::new(vec![]).into();
1881 let cc = rmsg_to_ccmsg(None, extended2);
1882
1883 let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
1884 match error {
1889 Error::CircuitClosed => {}
1890 x => panic!("got other error: {}", x),
1891 }
1892 });
1893 }
1894
1895 #[traced_test]
1896 #[test]
1897 fn bad_extend_wrongtype() {
1898 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1899 let extended = relaymsg::Extended::new(vec![7; 200]).into();
1900 let cc = rmsg_to_ccmsg(None, extended);
1901
1902 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1903 match error {
1904 Error::BytesErr {
1905 err: tor_bytes::Error::InvalidMessage(_),
1906 object: "extended2 message",
1907 } => {}
1908 other => panic!("{:?}", other),
1909 }
1910 });
1911 }
1912
1913 #[traced_test]
1914 #[test]
1915 fn bad_extend_destroy() {
1916 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1917 let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
1918 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1919 match error {
1920 Error::CircuitClosed => {}
1921 other => panic!("{:?}", other),
1922 }
1923 });
1924 }
1925
1926 #[traced_test]
1927 #[test]
1928 fn bad_extend_crypto() {
1929 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1930 let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
1931 let cc = rmsg_to_ccmsg(None, extended2);
1932 let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
1933 assert!(matches!(error, Error::BadCircHandshakeAuth));
1934 });
1935 }
1936
1937 #[traced_test]
1938 #[test]
1939 fn begindir() {
1940 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
1941 let (chan, mut rx, _sink) = working_fake_channel(&rt);
1942 let (circ, mut sink) = newcirc(&rt, chan).await;
1943 let circid = circ.peek_circid();
1944
1945 let begin_and_send_fut = async move {
1946 let mut stream = circ.begin_dir_stream().await.unwrap();
1949 stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
1950 stream.flush().await.unwrap();
1951 let mut buf = [0_u8; 1024];
1952 let n = stream.read(&mut buf).await.unwrap();
1953 assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
1954 let n = stream.read(&mut buf).await.unwrap();
1955 assert_eq!(n, 0);
1956 stream
1957 };
1958 let reply_fut = async move {
1959 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1962 assert_eq!(id, Some(circid));
1963 let rmsg = match chmsg {
1964 AnyChanMsg::Relay(r) => {
1965 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1966 .unwrap()
1967 }
1968 other => panic!("{:?}", other),
1969 };
1970 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
1971 assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
1972
1973 let connected = relaymsg::Connected::new_empty().into();
1975 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
1976
1977 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
1979 assert_eq!(id, Some(circid));
1980 let rmsg = match chmsg {
1981 AnyChanMsg::Relay(r) => {
1982 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
1983 .unwrap()
1984 }
1985 other => panic!("{:?}", other),
1986 };
1987 let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
1988 assert_eq!(streamid_2, streamid);
1989 if let AnyRelayMsg::Data(d) = rmsg {
1990 assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
1991 } else {
1992 panic!();
1993 }
1994
1995 let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
1997 .unwrap()
1998 .into();
1999 sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
2000
2001 let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
2003 sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
2004
2005 (rx, sink) };
2007
2008 let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
2009 });
2010 }
2011
2012 fn close_stream_helper(by_drop: bool) {
2014 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2015 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2016 let (circ, mut sink) = newcirc(&rt, chan).await;
2017
2018 let stream_fut = async move {
2019 let stream = circ
2020 .begin_stream("www.example.com", 80, None)
2021 .await
2022 .unwrap();
2023
2024 let (r, mut w) = stream.split();
2025 if by_drop {
2026 drop(r);
2028 drop(w);
2029 (None, circ) } else {
2031 w.close().await.unwrap();
2033 (Some(r), circ)
2034 }
2035 };
2036 let handler_fut = async {
2037 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2039 let rmsg = match msg {
2040 AnyChanMsg::Relay(r) => {
2041 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2042 .unwrap()
2043 }
2044 other => panic!("{:?}", other),
2045 };
2046 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2047 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
2048
2049 let connected =
2051 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
2052 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2053
2054 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2056 let rmsg = match msg {
2057 AnyChanMsg::Relay(r) => {
2058 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2059 .unwrap()
2060 }
2061 other => panic!("{:?}", other),
2062 };
2063 let (_, rmsg) = rmsg.into_streamid_and_msg();
2064 assert_eq!(rmsg.cmd(), RelayCmd::END);
2065
2066 (rx, sink) };
2068
2069 let ((_opt_reader, _circ), (_rx, _sink)) = futures::join!(stream_fut, handler_fut);
2070 });
2071 }
2072
2073 #[traced_test]
2074 #[test]
2075 fn drop_stream() {
2076 close_stream_helper(true);
2077 }
2078
2079 #[traced_test]
2080 #[test]
2081 fn close_stream() {
2082 close_stream_helper(false);
2083 }
2084
2085 async fn setup_incoming_sendme_case<R: Runtime>(
2087 rt: &R,
2088 n_to_send: usize,
2089 ) -> (
2090 Arc<ClientCirc>,
2091 DataStream,
2092 CircuitRxSender,
2093 Option<StreamId>,
2094 usize,
2095 Receiver<AnyChanCell>,
2096 Sender<std::result::Result<OpenChanCellS2C, CodecError>>,
2097 ) {
2098 let (chan, mut rx, sink2) = working_fake_channel(rt);
2099 let (circ, mut sink) = newcirc(rt, chan).await;
2100 let circid = circ.peek_circid();
2101
2102 let begin_and_send_fut = {
2103 let circ = circ.clone();
2104 async move {
2105 let mut stream = circ
2107 .begin_stream("www.example.com", 443, None)
2108 .await
2109 .unwrap();
2110 let junk = [0_u8; 1024];
2111 let mut remaining = n_to_send;
2112 while remaining > 0 {
2113 let n = std::cmp::min(remaining, junk.len());
2114 stream.write_all(&junk[..n]).await.unwrap();
2115 remaining -= n;
2116 }
2117 stream.flush().await.unwrap();
2118 stream
2119 }
2120 };
2121
2122 let receive_fut = async move {
2123 let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2125 let rmsg = match chmsg {
2126 AnyChanMsg::Relay(r) => {
2127 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2128 .unwrap()
2129 }
2130 other => panic!("{:?}", other),
2131 };
2132 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2133 assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
2134 let connected = relaymsg::Connected::new_empty().into();
2136 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2137 let mut bytes_received = 0_usize;
2139 let mut cells_received = 0_usize;
2140 while bytes_received < n_to_send {
2141 let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
2143 assert_eq!(id, Some(circid));
2144
2145 let rmsg = match chmsg {
2146 AnyChanMsg::Relay(r) => {
2147 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
2148 .unwrap()
2149 }
2150 other => panic!("{:?}", other),
2151 };
2152 let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
2153 assert_eq!(streamid2, streamid);
2154 if let AnyRelayMsg::Data(dat) = rmsg {
2155 cells_received += 1;
2156 bytes_received += dat.as_ref().len();
2157 } else {
2158 panic!();
2159 }
2160 }
2161
2162 (sink, streamid, cells_received, rx)
2163 };
2164
2165 let (stream, (sink, streamid, cells_received, rx)) =
2166 futures::join!(begin_and_send_fut, receive_fut);
2167
2168 (circ, stream, sink, streamid, cells_received, rx, sink2)
2169 }
2170
2171 #[traced_test]
2172 #[test]
2173 fn accept_valid_sendme() {
2174 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2175 let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
2176 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2177
2178 assert_eq!(cells_received, 301);
2179
2180 {
2182 let (tx, rx) = oneshot::channel();
2183 circ.command
2184 .unbounded_send(CtrlCmd::QuerySendWindow {
2185 hop: 2.into(),
2186 done: tx,
2187 })
2188 .unwrap();
2189 let (window, tags) = rx.await.unwrap().unwrap();
2190 assert_eq!(window, 1000 - 301);
2191 assert_eq!(tags.len(), 3);
2192 assert_eq!(
2194 tags[0],
2195 SendmeTag::from(hex!("6400000000000000000000000000000000000000"))
2196 );
2197 assert_eq!(
2199 tags[1],
2200 SendmeTag::from(hex!("c800000000000000000000000000000000000000"))
2201 );
2202 assert_eq!(
2204 tags[2],
2205 SendmeTag::from(hex!("2c01000000000000000000000000000000000000"))
2206 );
2207 }
2208
2209 let reply_with_sendme_fut = async move {
2210 let c_sendme =
2212 relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
2213 .into();
2214 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2215
2216 let s_sendme = relaymsg::Sendme::new_empty().into();
2218 sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
2219
2220 sink
2221 };
2222
2223 let _sink = reply_with_sendme_fut.await;
2224
2225 rt.advance_until_stalled().await;
2226
2227 {
2230 let (tx, rx) = oneshot::channel();
2231 circ.command
2232 .unbounded_send(CtrlCmd::QuerySendWindow {
2233 hop: 2.into(),
2234 done: tx,
2235 })
2236 .unwrap();
2237 let (window, _tags) = rx.await.unwrap().unwrap();
2238 assert_eq!(window, 1000 - 201);
2239 }
2240 });
2241 }
2242
2243 #[traced_test]
2244 #[test]
2245 fn invalid_circ_sendme() {
2246 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
2247 let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
2251 setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
2252
2253 let reply_with_sendme_fut = async move {
2254 let c_sendme =
2256 relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
2257 .into();
2258 sink.send(rmsg_to_ccmsg(None, c_sendme)).await.unwrap();
2259 sink
2260 };
2261
2262 let _sink = reply_with_sendme_fut.await;
2263
2264 rt.advance_until_stalled().await;
2266 assert!(circ.is_closing());
2267 });
2268 }
2269
2270 #[traced_test]
2271 #[test]
2272 fn test_busy_stream_fairness() {
2273 const N_STREAMS: usize = 3;
2275 const N_CELLS: usize = 20;
2277 const N_BYTES: usize = relaymsg::Data::MAXLEN_V0 * N_CELLS;
2280 const MIN_EXPECTED_BYTES_PER_STREAM: usize =
2287 N_BYTES / N_STREAMS - relaymsg::Data::MAXLEN_V0;
2288
2289 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2290 let (chan, mut rx, _sink) = working_fake_channel(&rt);
2291 let (circ, mut sink) = newcirc(&rt, chan).await;
2292
2293 rt.spawn({
2299 let circ = circ.clone();
2302 async move {
2303 let mut clients = VecDeque::new();
2304 struct Client {
2305 stream: DataStream,
2306 to_write: &'static [u8],
2307 }
2308 for _ in 0..N_STREAMS {
2309 clients.push_back(Client {
2310 stream: circ
2311 .begin_stream("www.example.com", 80, None)
2312 .await
2313 .unwrap(),
2314 to_write: &[0_u8; N_BYTES][..],
2315 });
2316 }
2317 while let Some(mut client) = clients.pop_front() {
2318 if client.to_write.is_empty() {
2319 continue;
2321 }
2322 let written = client.stream.write(client.to_write).await.unwrap();
2323 client.to_write = &client.to_write[written..];
2324 clients.push_back(client);
2325 }
2326 }
2327 })
2328 .unwrap();
2329
2330 let channel_handler_fut = async {
2331 let mut stream_bytes_received = HashMap::<StreamId, usize>::new();
2332 let mut total_bytes_received = 0;
2333
2334 loop {
2335 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
2336 let rmsg = match msg {
2337 AnyChanMsg::Relay(r) => AnyRelayMsgOuter::decode_singleton(
2338 RelayCellFormat::V0,
2339 r.into_relay_body(),
2340 )
2341 .unwrap(),
2342 other => panic!("Unexpected chanmsg: {other:?}"),
2343 };
2344 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
2345 match rmsg.cmd() {
2346 RelayCmd::BEGIN => {
2347 let prev = stream_bytes_received.insert(streamid.unwrap(), 0);
2349 assert_eq!(prev, None);
2350 let connected = relaymsg::Connected::new_with_addr(
2352 "10.0.0.1".parse().unwrap(),
2353 1234,
2354 )
2355 .into();
2356 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
2357 }
2358 RelayCmd::DATA => {
2359 let data_msg = relaymsg::Data::try_from(rmsg).unwrap();
2360 let nbytes = data_msg.as_ref().len();
2361 total_bytes_received += nbytes;
2362 let streamid = streamid.unwrap();
2363 let stream_bytes = stream_bytes_received.get_mut(&streamid).unwrap();
2364 *stream_bytes += nbytes;
2365 if total_bytes_received >= N_BYTES {
2366 break;
2367 }
2368 }
2369 RelayCmd::END => {
2370 continue;
2375 }
2376 other => {
2377 panic!("Unexpected command {other:?}");
2378 }
2379 }
2380 }
2381
2382 (total_bytes_received, stream_bytes_received, rx, sink)
2385 };
2386
2387 let (total_bytes_received, stream_bytes_received, _rx, _sink) =
2388 channel_handler_fut.await;
2389 assert_eq!(stream_bytes_received.len(), N_STREAMS);
2390 for (sid, stream_bytes) in stream_bytes_received {
2391 assert!(
2392 stream_bytes >= MIN_EXPECTED_BYTES_PER_STREAM,
2393 "Only {stream_bytes} of {total_bytes_received} bytes received from {N_STREAMS} came from {sid:?}; expected at least {MIN_EXPECTED_BYTES_PER_STREAM}"
2394 );
2395 }
2396 });
2397 }
2398
2399 #[test]
2400 fn basic_params() {
2401 use super::CircParameters;
2402 let mut p = CircParameters::default();
2403 assert!(p.extend_by_ed25519_id);
2404
2405 p.extend_by_ed25519_id = false;
2406 assert!(!p.extend_by_ed25519_id);
2407 }
2408
2409 #[cfg(feature = "hs-service")]
2410 struct AllowAllStreamsFilter;
2411 #[cfg(feature = "hs-service")]
2412 impl IncomingStreamRequestFilter for AllowAllStreamsFilter {
2413 fn disposition(
2414 &mut self,
2415 _ctx: &crate::stream::IncomingStreamRequestContext<'_>,
2416 _circ: &crate::tunnel::reactor::syncview::ClientCircSyncView<'_>,
2417 ) -> Result<crate::stream::IncomingStreamRequestDisposition> {
2418 Ok(crate::stream::IncomingStreamRequestDisposition::Accept)
2419 }
2420 }
2421
2422 #[traced_test]
2423 #[test]
2424 #[cfg(feature = "hs-service")]
2425 fn allow_stream_requests_twice() {
2426 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2427 let (chan, _rx, _sink) = working_fake_channel(&rt);
2428 let (circ, _send) = newcirc(&rt, chan).await;
2429
2430 let _incoming = circ
2431 .allow_stream_requests(
2432 &[tor_cell::relaycell::RelayCmd::BEGIN],
2433 circ.last_hop_num().unwrap(),
2434 AllowAllStreamsFilter,
2435 )
2436 .await
2437 .unwrap();
2438
2439 let incoming = circ
2440 .allow_stream_requests(
2441 &[tor_cell::relaycell::RelayCmd::BEGIN],
2442 circ.last_hop_num().unwrap(),
2443 AllowAllStreamsFilter,
2444 )
2445 .await;
2446
2447 assert!(incoming.is_err());
2449 });
2450 }
2451
2452 #[traced_test]
2453 #[test]
2454 #[cfg(feature = "hs-service")]
2455 fn allow_stream_requests() {
2456 use tor_cell::relaycell::msg::BeginFlags;
2457
2458 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2459 const TEST_DATA: &[u8] = b"ping";
2460
2461 let (chan, _rx, _sink) = working_fake_channel(&rt);
2462 let (circ, mut send) = newcirc(&rt, chan).await;
2463
2464 let rfmt = RelayCellFormat::V0;
2465
2466 let (tx, rx) = oneshot::channel();
2468 let mut incoming = circ
2469 .allow_stream_requests(
2470 &[tor_cell::relaycell::RelayCmd::BEGIN],
2471 circ.last_hop_num().unwrap(),
2472 AllowAllStreamsFilter,
2473 )
2474 .await
2475 .unwrap();
2476
2477 let simulate_service = async move {
2478 let stream = incoming.next().await.unwrap();
2479 let mut data_stream = stream
2480 .accept_data(relaymsg::Connected::new_empty())
2481 .await
2482 .unwrap();
2483 tx.send(()).unwrap();
2485
2486 let mut buf = [0_u8; TEST_DATA.len()];
2488 data_stream.read_exact(&mut buf).await.unwrap();
2489 assert_eq!(&buf, TEST_DATA);
2490
2491 circ
2492 };
2493
2494 let simulate_client = async move {
2495 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2496 let body: BoxedCellBody =
2497 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2498 .encode(rfmt, &mut testing_rng())
2499 .unwrap();
2500 let begin_msg = chanmsg::Relay::from(body);
2501
2502 send.send(ClientCircChanMsg::Relay(begin_msg))
2504 .await
2505 .unwrap();
2506
2507 rx.await.unwrap();
2513 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2515 let body: BoxedCellBody =
2516 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2517 .encode(rfmt, &mut testing_rng())
2518 .unwrap();
2519 let data_msg = chanmsg::Relay::from(body);
2520
2521 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2522 send
2523 };
2524
2525 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2526 });
2527 }
2528
2529 #[traced_test]
2530 #[test]
2531 #[cfg(feature = "hs-service")]
2532 fn accept_stream_after_reject() {
2533 use tor_cell::relaycell::msg::BeginFlags;
2534 use tor_cell::relaycell::msg::EndReason;
2535
2536 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2537 const TEST_DATA: &[u8] = b"ping";
2538 const STREAM_COUNT: usize = 2;
2539 let rfmt = RelayCellFormat::V0;
2540
2541 let (chan, _rx, _sink) = working_fake_channel(&rt);
2542 let (circ, mut send) = newcirc(&rt, chan).await;
2543
2544 let (mut tx, mut rx) = mpsc::channel(STREAM_COUNT);
2546
2547 let mut incoming = circ
2548 .allow_stream_requests(
2549 &[tor_cell::relaycell::RelayCmd::BEGIN],
2550 circ.last_hop_num().unwrap(),
2551 AllowAllStreamsFilter,
2552 )
2553 .await
2554 .unwrap();
2555
2556 let simulate_service = async move {
2557 for i in 0..STREAM_COUNT {
2559 let stream = incoming.next().await.unwrap();
2560
2561 if i == 0 {
2563 stream
2564 .reject(relaymsg::End::new_with_reason(EndReason::INTERNAL))
2565 .await
2566 .unwrap();
2567 tx.send(()).await.unwrap();
2569 continue;
2570 }
2571
2572 let mut data_stream = stream
2573 .accept_data(relaymsg::Connected::new_empty())
2574 .await
2575 .unwrap();
2576 tx.send(()).await.unwrap();
2578
2579 let mut buf = [0_u8; TEST_DATA.len()];
2581 data_stream.read_exact(&mut buf).await.unwrap();
2582 assert_eq!(&buf, TEST_DATA);
2583 }
2584
2585 circ
2586 };
2587
2588 let simulate_client = async move {
2589 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2590 let body: BoxedCellBody =
2591 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2592 .encode(rfmt, &mut testing_rng())
2593 .unwrap();
2594 let begin_msg = chanmsg::Relay::from(body);
2595
2596 for _ in 0..STREAM_COUNT {
2599 send.send(ClientCircChanMsg::Relay(begin_msg.clone()))
2600 .await
2601 .unwrap();
2602
2603 rx.next().await.unwrap();
2605 }
2606
2607 let data = relaymsg::Data::new(TEST_DATA).unwrap();
2609 let body: BoxedCellBody =
2610 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Data(data))
2611 .encode(rfmt, &mut testing_rng())
2612 .unwrap();
2613 let data_msg = chanmsg::Relay::from(body);
2614
2615 send.send(ClientCircChanMsg::Relay(data_msg)).await.unwrap();
2616 send
2617 };
2618
2619 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2620 });
2621 }
2622
2623 #[traced_test]
2624 #[test]
2625 #[cfg(feature = "hs-service")]
2626 fn incoming_stream_bad_hop() {
2627 use tor_cell::relaycell::msg::BeginFlags;
2628
2629 tor_rtcompat::test_with_all_runtimes!(|rt| async move {
2630 const EXPECTED_HOP: u8 = 1;
2632 let rfmt = RelayCellFormat::V0;
2633
2634 let (chan, _rx, _sink) = working_fake_channel(&rt);
2635 let (circ, mut send) = newcirc(&rt, chan).await;
2636
2637 let mut incoming = circ
2639 .allow_stream_requests(
2640 &[tor_cell::relaycell::RelayCmd::BEGIN],
2641 EXPECTED_HOP.into(),
2642 AllowAllStreamsFilter,
2643 )
2644 .await
2645 .unwrap();
2646
2647 let simulate_service = async move {
2648 assert!(incoming.next().await.is_none());
2651 circ
2652 };
2653
2654 let simulate_client = async move {
2655 let begin = Begin::new("localhost", 80, BeginFlags::IPV6_OKAY).unwrap();
2656 let body: BoxedCellBody =
2657 AnyRelayMsgOuter::new(StreamId::new(12), AnyRelayMsg::Begin(begin))
2658 .encode(rfmt, &mut testing_rng())
2659 .unwrap();
2660 let begin_msg = chanmsg::Relay::from(body);
2661
2662 send.send(ClientCircChanMsg::Relay(begin_msg))
2664 .await
2665 .unwrap();
2666
2667 send
2668 };
2669
2670 let (_circ, _send) = futures::join!(simulate_service, simulate_client);
2671 });
2672 }
2673}