1use crate::path::{OwnedPath, TorPath};
4use crate::timeouts::{self, Action};
5use crate::{Error, Result};
6use async_trait::async_trait;
7use futures::task::SpawnExt;
8use futures::Future;
9use oneshot_fused_workaround as oneshot;
10use std::sync::{
11 atomic::{AtomicU32, Ordering},
12 Arc,
13};
14use std::time::{Duration, Instant};
15use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
16use tor_error::into_internal;
17use tor_guardmgr::GuardStatus;
18use tor_linkspec::{ChanTarget, CircTarget, IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
19use tor_netdir::params::NetParameters;
20use tor_proto::ccparams::{self, AlgorithmType};
21use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
22use tor_protover::named::FLOWCTRL_CC;
23use tor_protover::Protocols;
24use tor_rtcompat::{Runtime, SleepProviderExt};
25use tor_units::Percentage;
26
27#[cfg(all(feature = "vanguards", feature = "hs-common"))]
28use tor_guardmgr::vanguards::VanguardMgr;
29
30mod guardstatus;
31
32pub(crate) use guardstatus::GuardStatusHandle;
33
34#[async_trait]
42pub(crate) trait Buildable: Sized {
43 async fn create_chantarget<RT: Runtime>(
49 chanmgr: &ChanMgr<RT>,
50 rt: &RT,
51 guard_status: &GuardStatusHandle,
52 ct: &OwnedChanTarget,
53 params: CircParameters,
54 usage: ChannelUsage,
55 ) -> Result<Arc<Self>>;
56
57 async fn create<RT: Runtime>(
60 chanmgr: &ChanMgr<RT>,
61 rt: &RT,
62 guard_status: &GuardStatusHandle,
63 ct: &OwnedCircTarget,
64 params: CircParameters,
65 usage: ChannelUsage,
66 ) -> Result<Arc<Self>>;
67
68 async fn extend<RT: Runtime>(
71 &self,
72 rt: &RT,
73 ct: &OwnedCircTarget,
74 params: CircParameters,
75 ) -> Result<()>;
76}
77
78async fn create_common<RT: Runtime, CT: ChanTarget>(
84 chanmgr: &ChanMgr<RT>,
85 rt: &RT,
86 target: &CT,
87 guard_status: &GuardStatusHandle,
88 usage: ChannelUsage,
89) -> Result<PendingClientCirc> {
90 let result = chanmgr.get_or_launch(target, usage).await;
92
93 let chan = match result {
95 Ok((chan, ChanProvenance::NewlyCreated)) => {
96 guard_status.skew(chan.clock_skew());
97 chan
98 }
99 Ok((chan, _)) => chan,
100 Err(cause) => {
101 if let Some(skew) = cause.clock_skew() {
102 guard_status.skew(skew);
103 }
104 return Err(Error::Channel {
105 peer: target.to_logged(),
106 cause,
107 });
108 }
109 };
110 let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol {
112 error,
113 peer: None, action: "initializing circuit",
115 unique_id: None,
116 })?;
117
118 rt.spawn(async {
119 let _ = reactor.run().await;
120 })
121 .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
122
123 Ok(pending_circ)
124}
125
126#[async_trait]
127impl Buildable for ClientCirc {
128 async fn create_chantarget<RT: Runtime>(
129 chanmgr: &ChanMgr<RT>,
130 rt: &RT,
131 guard_status: &GuardStatusHandle,
132 ct: &OwnedChanTarget,
133 params: CircParameters,
134 usage: ChannelUsage,
135 ) -> Result<Arc<Self>> {
136 let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
137 let unique_id = Some(circ.peek_unique_id());
138 circ.create_firsthop_fast(params)
139 .await
140 .map_err(|error| Error::Protocol {
141 peer: Some(ct.to_logged()),
142 error,
143 action: "running CREATE_FAST handshake",
144 unique_id,
145 })
146 }
147 async fn create<RT: Runtime>(
148 chanmgr: &ChanMgr<RT>,
149 rt: &RT,
150 guard_status: &GuardStatusHandle,
151 ct: &OwnedCircTarget,
152 params: CircParameters,
153 usage: ChannelUsage,
154 ) -> Result<Arc<Self>> {
155 let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
156 let unique_id = Some(circ.peek_unique_id());
157
158 let handshake_res = circ.create_firsthop(ct, params).await;
159
160 handshake_res.map_err(|error| Error::Protocol {
161 peer: Some(ct.to_logged()),
162 error,
163 action: "creating first hop",
164 unique_id,
165 })
166 }
167 async fn extend<RT: Runtime>(
168 &self,
169 _rt: &RT,
170 ct: &OwnedCircTarget,
171 params: CircParameters,
172 ) -> Result<()> {
173 let res = ClientCirc::extend(self, ct, params).await;
175
176 res.map_err(|error| Error::Protocol {
177 error,
178 peer: None,
182 action: "extending circuit",
183 unique_id: Some(self.unique_id()),
184 })
185 }
186}
187
188struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
196 runtime: R,
198 chanmgr: Arc<ChanMgr<R>>,
200 timeouts: timeouts::Estimator,
202 _phantom: std::marker::PhantomData<C>,
205}
206
207impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
208 fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
210 Builder {
211 runtime,
212 chanmgr,
213 timeouts,
214 _phantom: std::marker::PhantomData,
215 }
216 }
217
218 fn apply_protovers_to_circparams(params: &mut CircParameters, protocols: &Protocols) {
221 if !protocols.supports_named_subver(FLOWCTRL_CC) {
224 params.ccontrol.use_fallback_alg();
225 }
226 }
227
228 async fn build_notimeout(
237 self: Arc<Self>,
238 path: OwnedPath,
239 params: CircParameters,
240 start_time: Instant,
241 n_hops_built: Arc<AtomicU32>,
242 guard_status: Arc<GuardStatusHandle>,
243 usage: ChannelUsage,
244 ) -> Result<Arc<C>> {
245 match path {
246 OwnedPath::ChannelOnly(target) => {
247 guard_status.pending(GuardStatus::Failure);
249 let circ = C::create_chantarget(
250 &self.chanmgr,
251 &self.runtime,
252 &guard_status,
253 &target,
254 params,
255 usage,
256 )
257 .await?;
258 self.timeouts
259 .note_hop_completed(0, self.runtime.now() - start_time, true);
260 n_hops_built.fetch_add(1, Ordering::SeqCst);
261 Ok(circ)
262 }
263 OwnedPath::Normal(p) => {
264 assert!(!p.is_empty());
265 let n_hops = p.len() as u8;
266 guard_status.pending(GuardStatus::Failure);
268 let mut first_hop_params = params.clone();
270 Self::apply_protovers_to_circparams(&mut first_hop_params, p[0].protovers());
271 let circ = C::create(
272 &self.chanmgr,
273 &self.runtime,
274 &guard_status,
275 &p[0],
276 first_hop_params,
277 usage,
278 )
279 .await?;
280 self.timeouts
281 .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
282 guard_status.pending(GuardStatus::Indeterminate);
285 n_hops_built.fetch_add(1, Ordering::SeqCst);
286 let mut hop_num = 1;
287 for relay in p[1..].iter() {
288 let mut hop_params = params.clone();
290 Self::apply_protovers_to_circparams(&mut hop_params, relay.protovers());
291 circ.extend(&self.runtime, relay, hop_params).await?;
292 n_hops_built.fetch_add(1, Ordering::SeqCst);
293 self.timeouts.note_hop_completed(
294 hop_num,
295 self.runtime.now() - start_time,
296 hop_num == (n_hops - 1),
297 );
298 hop_num += 1;
299 }
300 Ok(circ)
301 }
302 }
303 }
304
305 async fn build_owned(
307 self: &Arc<Self>,
308 path: OwnedPath,
309 params: &CircParameters,
310 guard_status: Arc<GuardStatusHandle>,
311 usage: ChannelUsage,
312 ) -> Result<Arc<C>> {
313 let action = Action::BuildCircuit { length: path.len() };
314 let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
315 let start_time = self.runtime.now();
316
317 let hops_built = Arc::new(AtomicU32::new(0));
321
322 let self_clone = Arc::clone(self);
323 let params = params.clone();
324
325 let circuit_future = self_clone.build_notimeout(
326 path,
327 params,
328 start_time,
329 Arc::clone(&hops_built),
330 guard_status,
331 usage,
332 );
333
334 match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
335 Ok(circuit) => Ok(circuit),
336 Err(Error::CircTimeout(unique_id)) => {
337 let n_built = hops_built.load(Ordering::SeqCst);
338 self.timeouts
339 .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
340 Err(Error::CircTimeout(unique_id))
341 }
342 Err(e) => Err(e),
343 }
344 }
345
346 pub(crate) fn runtime(&self) -> &R {
348 &self.runtime
349 }
350
351 pub(crate) fn estimator(&self) -> &timeouts::Estimator {
353 &self.timeouts
354 }
355}
356
357pub struct CircuitBuilder<R: Runtime> {
365 builder: Arc<Builder<R, ClientCirc>>,
367 path_config: tor_config::MutCfg<crate::PathConfig>,
369 storage: crate::TimeoutStateHandle,
371 guardmgr: tor_guardmgr::GuardMgr<R>,
374 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
376 vanguardmgr: Arc<VanguardMgr<R>>,
377}
378
379impl<R: Runtime> CircuitBuilder<R> {
380 pub(crate) fn new(
384 runtime: R,
385 chanmgr: Arc<ChanMgr<R>>,
386 path_config: crate::PathConfig,
387 storage: crate::TimeoutStateHandle,
388 guardmgr: tor_guardmgr::GuardMgr<R>,
389 #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
390 ) -> Self {
391 let timeouts = timeouts::Estimator::from_storage(&storage);
392
393 CircuitBuilder {
394 builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
395 path_config: path_config.into(),
396 storage,
397 guardmgr,
398 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
399 vanguardmgr: Arc::new(vanguardmgr),
400 }
401 }
402
403 pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
405 self.path_config.get()
406 }
407
408 pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
410 self.path_config.replace(new_config);
411 }
412
413 pub(crate) fn save_state(&self) -> Result<bool> {
417 if !self.storage.can_store() {
418 return Ok(false);
419 }
420 self.builder.timeouts.save_state(&self.storage)?;
423 self.guardmgr.store_persistent_state()?;
424 Ok(true)
425 }
426
427 pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
430 self.builder
431 .timeouts
432 .upgrade_to_owning_storage(&self.storage);
433 self.guardmgr.upgrade_to_owned_persistent_state()?;
434 Ok(())
435 }
436
437 pub(crate) fn reload_state(&self) -> Result<()> {
439 if !self.storage.can_store() {
440 self.builder
441 .timeouts
442 .reload_readonly_from_storage(&self.storage);
443 }
444 self.guardmgr.reload_persistent_state()?;
445 Ok(())
446 }
447
448 pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
452 self.builder.timeouts.update_params(p);
453 }
454
455 pub(crate) async fn build_owned(
457 &self,
458 path: OwnedPath,
459 params: &CircParameters,
460 guard_status: Arc<GuardStatusHandle>,
461 usage: ChannelUsage,
462 ) -> Result<Arc<ClientCirc>> {
463 self.builder
464 .build_owned(path, params, guard_status, usage)
465 .await
466 }
467
468 pub async fn build(
475 &self,
476 path: &TorPath<'_>,
477 params: &CircParameters,
478 usage: ChannelUsage,
479 ) -> Result<Arc<ClientCirc>> {
480 let owned = path.try_into()?;
481 self.build_owned(owned, params, Arc::new(None.into()), usage)
482 .await
483 }
484
485 pub(crate) fn learning_timeouts(&self) -> bool {
487 self.builder.timeouts.learning_timeouts()
488 }
489
490 pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
492 &self.guardmgr
493 }
494
495 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
497 pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
498 &self.vanguardmgr
499 }
500
501 pub(crate) fn runtime(&self) -> &R {
503 self.builder.runtime()
504 }
505
506 pub(crate) fn estimator(&self) -> &timeouts::Estimator {
508 self.builder.estimator()
509 }
510}
511
512#[cfg(feature = "flowctl-cc")]
514fn build_cc_vegas(
515 inp: &NetParameters,
516 vegas_queue_params: ccparams::VegasQueueParams,
517) -> ccparams::Algorithm {
518 ccparams::Algorithm::Vegas(
519 ccparams::VegasParamsBuilder::default()
520 .cell_in_queue_params(vegas_queue_params)
521 .ss_cwnd_max(inp.cc_ss_max.into())
522 .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
523 .cwnd_full_min_pct(Percentage::new(
524 inp.cc_cwnd_full_minpct.as_percent().get() as u32
525 ))
526 .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
527 .build()
528 .expect("Unable to build Vegas params from NetParams"),
529 )
530}
531
532fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
534 ccparams::Algorithm::FixedWindow(
535 ccparams::FixedWindowParamsBuilder::default()
536 .circ_window_start(inp.circuit_window.get() as u16)
537 .circ_window_min(inp.circuit_window.lower() as u16)
538 .circ_window_max(inp.circuit_window.upper() as u16)
539 .build()
540 .expect("Unable to build FixedWindow params from NetParams"),
541 )
542}
543
544fn circparameters_from_netparameters(
546 inp: &NetParameters,
547 alg: ccparams::Algorithm,
548) -> Result<CircParameters> {
549 let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
550 .cwnd_init(inp.cc_cwnd_init.into())
551 .cwnd_inc_pct_ss(Percentage::new(
552 inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
553 ))
554 .cwnd_inc(inp.cc_cwnd_inc.into())
555 .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
556 .cwnd_min(inp.cc_cwnd_min.into())
557 .cwnd_max(inp.cc_cwnd_max.into())
558 .sendme_inc(inp.cc_sendme_inc.into())
559 .build()
560 .map_err(into_internal!(
561 "Unable to build CongestionWindow params from NetParams"
562 ))?;
563 let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
564 .ewma_cwnd_pct(Percentage::new(
565 inp.cc_ewma_cwnd_pct.as_percent().get() as u32
566 ))
567 .ewma_max(inp.cc_ewma_max.into())
568 .ewma_ss_max(inp.cc_ewma_ss.into())
569 .rtt_reset_pct(Percentage::new(
570 inp.cc_rtt_reset_pct.as_percent().get() as u32
571 ))
572 .build()
573 .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
574 let ccontrol = ccparams::CongestionControlParamsBuilder::default()
575 .alg(alg)
576 .fallback_alg(build_cc_fixedwindow(inp))
577 .cwnd_params(cwnd_params)
578 .rtt_params(rtt_params)
579 .build()
580 .map_err(into_internal!(
581 "Unable to build CongestionControl params from NetParams"
582 ))?;
583 Ok(CircParameters::new(
584 inp.extend_by_ed25519_id.into(),
585 ccontrol,
586 ))
587}
588
589pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
592 let alg = match AlgorithmType::from(inp.cc_alg.get()) {
593 #[cfg(feature = "flowctl-cc")]
594 AlgorithmType::VEGAS => {
595 if false {
600 build_cc_vegas(
601 inp,
602 (
603 inp.cc_vegas_alpha_exit.into(),
604 inp.cc_vegas_beta_exit.into(),
605 inp.cc_vegas_delta_exit.into(),
606 inp.cc_vegas_gamma_exit.into(),
607 inp.cc_vegas_sscap_exit.into(),
608 )
609 .into(),
610 )
611 } else {
612 build_cc_fixedwindow(inp)
613 }
614 }
615 _ => build_cc_fixedwindow(inp),
617 };
618 circparameters_from_netparameters(inp, alg)
619}
620
621pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
624 let alg = match AlgorithmType::from(inp.cc_alg.get()) {
625 #[cfg(feature = "flowctl-cc")]
626 AlgorithmType::VEGAS => {
627 if false {
632 build_cc_vegas(
633 inp,
634 (
635 inp.cc_vegas_alpha_onion.into(),
636 inp.cc_vegas_beta_onion.into(),
637 inp.cc_vegas_delta_onion.into(),
638 inp.cc_vegas_gamma_onion.into(),
639 inp.cc_vegas_sscap_onion.into(),
640 )
641 .into(),
642 )
643 } else {
644 build_cc_fixedwindow(inp)
645 }
646 }
647 _ => build_cc_fixedwindow(inp),
649 };
650 circparameters_from_netparameters(inp, alg)
651}
652
653async fn double_timeout<R, F, T>(
663 runtime: &R,
664 fut: F,
665 timeout: Duration,
666 abandon: Duration,
667) -> Result<T>
668where
669 R: Runtime,
670 F: Future<Output = Result<T>> + Send + 'static,
671 T: Send + 'static,
672{
673 let (snd, rcv) = oneshot::channel();
674 let rt = runtime.clone();
675 let inner_timeout_future = rt.timeout(abandon, fut);
678 let outer_timeout_future = rt.timeout(timeout, rcv);
679
680 runtime
681 .spawn(async move {
682 let result = inner_timeout_future.await;
683 let _ignore_cancelled_error = snd.send(result);
684 })
685 .map_err(|e| Error::from_spawn("circuit construction task", e))?;
686
687 let outcome = outer_timeout_future.await;
688 outcome
698 .map_err(|_| Error::CircTimeout(None))??
699 .map_err(|_| Error::CircTimeout(None))?
700}
701
702#[cfg(test)]
703mod test {
704 #![allow(clippy::bool_assert_comparison)]
706 #![allow(clippy::clone_on_copy)]
707 #![allow(clippy::dbg_macro)]
708 #![allow(clippy::mixed_attributes_style)]
709 #![allow(clippy::print_stderr)]
710 #![allow(clippy::print_stdout)]
711 #![allow(clippy::single_char_pattern)]
712 #![allow(clippy::unwrap_used)]
713 #![allow(clippy::unchecked_duration_subtraction)]
714 #![allow(clippy::useless_vec)]
715 #![allow(clippy::needless_pass_by_value)]
716 use super::*;
718 use crate::timeouts::TimeoutEstimator;
719 use futures::FutureExt;
720 use std::sync::Mutex;
721 use tor_chanmgr::ChannelConfig;
722 use tor_chanmgr::ChannelUsage as CU;
723 use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
724 use tor_llcrypto::pk::ed25519::Ed25519Identity;
725 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
726 use tor_proto::memquota::ToplevelAccount;
727 use tor_rtcompat::SleepProvider;
728 use tracing::trace;
729
730 fn gs() -> Arc<GuardStatusHandle> {
732 Arc::new(None.into())
733 }
734
735 #[test]
736 fn test_double_timeout() {
738 let t1 = Duration::from_secs(1);
739 let t10 = Duration::from_secs(10);
740 fn duration_close_to(d1: Duration, d2: Duration) -> bool {
742 d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
743 }
744
745 tor_rtmock::MockRuntime::test_with_various(|rto| async move {
746 let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
748 assert!(x.is_ok());
749 assert_eq!(x.unwrap(), 3_u32);
750
751 trace!("acquiesce after test1");
752 #[allow(clippy::clone_on_copy)]
753 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
755
756 let rt_clone = rt.clone();
758 rt_clone.block_advance("manually controlling advances");
760 let x = rt
761 .wait_for(double_timeout(
762 &rt,
763 async move {
764 let sl = rt_clone.sleep(Duration::from_millis(100));
765 rt_clone.allow_one_advance(Duration::from_millis(100));
766 sl.await;
767 Ok(4_u32)
768 },
769 t1,
770 t10,
771 ))
772 .await;
773 assert!(x.is_ok());
774 assert_eq!(x.unwrap(), 4_u32);
775
776 trace!("acquiesce after test2");
777 #[allow(clippy::clone_on_copy)]
778 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
780
781 let rt_clone = rt.clone();
784 let (snd, rcv) = oneshot::channel();
785 let start = rt.now();
786 rt.block_advance("manually controlling advances");
787 let x = rt
788 .wait_for(double_timeout(
789 &rt,
790 async move {
791 let sl = rt_clone.sleep(Duration::from_secs(2));
792 rt_clone.allow_one_advance(Duration::from_secs(2));
793 sl.await;
794 snd.send(()).unwrap();
795 Ok(4_u32)
796 },
797 t1,
798 t10,
799 ))
800 .await;
801 assert!(matches!(x, Err(Error::CircTimeout(_))));
802 let end = rt.now();
803 assert!(duration_close_to(end - start, Duration::from_secs(1)));
804 let waited = rt.wait_for(rcv).await;
805 assert_eq!(waited, Ok(()));
806
807 trace!("acquiesce after test3");
808 #[allow(clippy::clone_on_copy)]
809 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
811
812 let rt_clone = rt.clone();
814 rt.block_advance("manually controlling advances");
815 let (snd, rcv) = oneshot::channel();
816 let start = rt.now();
817 rt.allow_one_advance(Duration::from_secs(1));
819 let x = rt
820 .wait_for(double_timeout(
821 &rt,
822 async move {
823 rt_clone.sleep(Duration::from_secs(30)).await;
824 snd.send(()).unwrap();
825 Ok(4_u32)
826 },
827 t1,
828 t10,
829 ))
830 .await;
831 assert!(matches!(x, Err(Error::CircTimeout(_))));
832 let end = rt.now();
833 rt.allow_one_advance(Duration::from_secs(9));
835 let waited = rt.wait_for(rcv).await;
836 assert!(waited.is_err());
837 let end2 = rt.now();
838 assert!(duration_close_to(end - start, Duration::from_secs(1)));
839 assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
840 });
841 }
842
843 fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
852 let mut be = [0; 8];
853 be[..].copy_from_slice(&id.as_bytes()[0..8]);
854 let dur = u64::from_be_bytes(be);
855 be[..].copy_from_slice(&id.as_bytes()[8..16]);
856 let dur2 = u64::from_be_bytes(be);
857 (Duration::from_millis(dur), Duration::from_millis(dur2))
858 }
859 fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
868 let mut bytes = [0; 32];
869 let dur = (d1.as_millis() as u64).to_be_bytes();
870 bytes[0..8].copy_from_slice(&dur);
871 let dur = (d2.as_millis() as u64).to_be_bytes();
872 bytes[8..16].copy_from_slice(&dur);
873 bytes.into()
874 }
875
876 fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
879 let ed_id = ct
882 .identity(RelayIdType::Ed25519)
883 .expect("No ed25519 key was present for fake ChanTarget‽")
884 .try_into()
885 .expect("ChanTarget provided wrong key type");
886 timeouts_from_key(ed_id)
887 }
888
889 #[derive(Debug, Clone)]
891 struct FakeCirc {
892 hops: Vec<RelayIds>,
893 onehop: bool,
894 }
895 #[async_trait]
896 impl Buildable for Mutex<FakeCirc> {
897 async fn create_chantarget<RT: Runtime>(
898 _: &ChanMgr<RT>,
899 rt: &RT,
900 _guard_status: &GuardStatusHandle,
901 ct: &OwnedChanTarget,
902 _: CircParameters,
903 _usage: ChannelUsage,
904 ) -> Result<Arc<Self>> {
905 let (d1, d2) = timeouts_from_chantarget(ct);
906 rt.sleep(d1).await;
907 if !d2.is_zero() {
908 rt.allow_one_advance(d2);
909 }
910
911 let c = FakeCirc {
912 hops: vec![RelayIds::from_relay_ids(ct)],
913 onehop: true,
914 };
915 Ok(Arc::new(Mutex::new(c)))
916 }
917 async fn create<RT: Runtime>(
918 _: &ChanMgr<RT>,
919 rt: &RT,
920 _guard_status: &GuardStatusHandle,
921 ct: &OwnedCircTarget,
922 _: CircParameters,
923 _usage: ChannelUsage,
924 ) -> Result<Arc<Self>> {
925 let (d1, d2) = timeouts_from_chantarget(ct);
926 rt.sleep(d1).await;
927 if !d2.is_zero() {
928 rt.allow_one_advance(d2);
929 }
930
931 let c = FakeCirc {
932 hops: vec![RelayIds::from_relay_ids(ct)],
933 onehop: false,
934 };
935 Ok(Arc::new(Mutex::new(c)))
936 }
937 async fn extend<RT: Runtime>(
938 &self,
939 rt: &RT,
940 ct: &OwnedCircTarget,
941 _: CircParameters,
942 ) -> Result<()> {
943 let (d1, d2) = timeouts_from_chantarget(ct);
944 rt.sleep(d1).await;
945 if !d2.is_zero() {
946 rt.allow_one_advance(d2);
947 }
948
949 {
950 let mut c = self.lock().unwrap();
951 c.hops.push(RelayIds::from_relay_ids(ct));
952 }
953 Ok(())
954 }
955 }
956
957 struct TimeoutRecorder<R> {
959 runtime: R,
960 hist: Vec<(bool, u8, Duration)>,
961 on_timeout: Duration,
963 on_success: Duration,
965
966 snd_success: Option<oneshot::Sender<()>>,
967 rcv_success: Option<oneshot::Receiver<()>>,
968 }
969
970 impl<R> TimeoutRecorder<R> {
971 fn new(runtime: R) -> Self {
972 Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
973 }
974
975 fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
976 let (snd_success, rcv_success) = oneshot::channel();
977 Self {
978 runtime,
979 hist: Vec::new(),
980 on_timeout,
981 on_success,
982 rcv_success: Some(rcv_success),
983 snd_success: Some(snd_success),
984 }
985 }
986 }
987 impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
988 fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
989 if !is_last {
990 return;
991 }
992 let (rt, advance) = {
993 let mut this = self.lock().unwrap();
994 this.hist.push((true, hop, delay));
995 let _ = this.snd_success.take().unwrap().send(());
996 (this.runtime.clone(), this.on_success)
997 };
998 if !advance.is_zero() {
999 rt.allow_one_advance(advance);
1000 }
1001 }
1002 fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
1003 let (rt, advance) = {
1004 let mut this = self.lock().unwrap();
1005 this.hist.push((false, hop, delay));
1006 (this.runtime.clone(), this.on_timeout)
1007 };
1008 if !advance.is_zero() {
1009 rt.allow_one_advance(advance);
1010 }
1011 }
1012 fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
1013 (Duration::from_secs(3), Duration::from_secs(100))
1014 }
1015 fn learning_timeouts(&self) -> bool {
1016 false
1017 }
1018 fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
1019
1020 fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
1021 None
1022 }
1023 }
1024
1025 fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
1027 let mut builder = OwnedCircTarget::builder();
1028 builder
1029 .chan_target()
1030 .ed_identity(id)
1031 .rsa_identity([0x20; 20].into());
1032 builder
1033 .ntor_onion_key([0x33; 32].into())
1034 .protocols("".parse().unwrap())
1035 .build()
1036 .unwrap()
1037 }
1038 fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
1040 OwnedChanTarget::builder()
1041 .ed_identity(id)
1042 .rsa_identity([0x20; 20].into())
1043 .build()
1044 .unwrap()
1045 }
1046
1047 async fn run_builder_test(
1048 rt: tor_rtmock::MockRuntime,
1049 advance_initial: Duration,
1050 path: OwnedPath,
1051 advance_on_timeout: Option<(Duration, Duration)>,
1052 usage: ChannelUsage,
1053 ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
1054 let chanmgr = Arc::new(ChanMgr::new(
1055 rt.clone(),
1056 &ChannelConfig::default(),
1057 Default::default(),
1058 &Default::default(),
1059 ToplevelAccount::new_noop(),
1060 ));
1061 let timeouts = match advance_on_timeout {
1063 Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
1064 None => TimeoutRecorder::new(rt.clone()),
1065 };
1066 let timeouts = Arc::new(Mutex::new(timeouts));
1067 let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
1068 rt.clone(),
1069 chanmgr,
1070 timeouts::Estimator::new(Arc::clone(&timeouts)),
1071 );
1072
1073 rt.block_advance("manually controlling advances");
1074 rt.allow_one_advance(advance_initial);
1075 let outcome = rt.spawn_join("build-owned", async move {
1076 let arcbuilder = Arc::new(builder);
1077 let params = exit_circparams_from_netparams(&NetParameters::default())?;
1078 arcbuilder.build_owned(path, ¶ms, gs(), usage).await
1079 });
1080
1081 if advance_on_timeout.is_some() {
1083 let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
1084 rt.spawn_identified("receiver", async move {
1085 receiver.await.unwrap();
1086 });
1087 }
1088 rt.advance_until_stalled().await;
1089
1090 let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
1091 let timeouts = timeouts.lock().unwrap().hist.clone();
1092
1093 (circ, timeouts)
1094 }
1095
1096 #[test]
1097 fn build_onehop() {
1098 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1099 let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
1100 let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
1101
1102 let (outcome, timeouts) =
1103 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1104 let circ = outcome.unwrap();
1105 assert!(circ.onehop);
1106 assert_eq!(circ.hops.len(), 1);
1107 assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1108
1109 assert_eq!(timeouts.len(), 1);
1110 assert!(timeouts[0].0); assert_eq!(timeouts[0].1, 0); assert_eq!(timeouts[0].2, Duration::from_millis(100));
1113 });
1114 }
1115
1116 #[test]
1117 fn build_threehop() {
1118 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1119 let id_100ms =
1120 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1121 let id_200ms =
1122 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
1123 let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
1124 let path =
1125 OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
1126
1127 let (outcome, timeouts) =
1128 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1129 let circ = outcome.unwrap();
1130 assert!(!circ.onehop);
1131 assert_eq!(circ.hops.len(), 3);
1132 assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1133 assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
1134 assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
1135
1136 assert_eq!(timeouts.len(), 1);
1137 assert!(timeouts[0].0); assert_eq!(timeouts[0].1, 2); assert_eq!(timeouts[0].2, Duration::from_millis(600));
1140 });
1141 }
1142
1143 #[test]
1144 fn build_huge_timeout() {
1145 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1146 let id_100ms =
1147 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1148 let id_200ms =
1149 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1150 let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
1151
1152 let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
1153
1154 let (outcome, timeouts) =
1155 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1156 assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1157
1158 assert_eq!(timeouts.len(), 1);
1159 assert!(!timeouts[0].0); assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1164 });
1165 }
1166
1167 #[test]
1168 fn build_modest_timeout() {
1169 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1170 let id_100ms =
1171 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1172 let id_200ms =
1173 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1174 let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
1175
1176 let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
1177
1178 let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
1179
1180 let (outcome, timeouts) = run_builder_test(
1181 rt.clone(),
1182 Duration::from_millis(100),
1183 path,
1184 Some(timeout_advance),
1185 CU::UserTraffic,
1186 )
1187 .await;
1188 assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1189
1190 assert_eq!(timeouts.len(), 2);
1191 assert!(!timeouts[0].0); assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1196
1197 assert!(timeouts[1].0); assert_eq!(timeouts[1].1, 2); });
1202 }
1203}