1use crate::path::{OwnedPath, TorPath};
4use crate::timeouts::{self, Action};
5use crate::{Error, Result};
6use async_trait::async_trait;
7use futures::task::SpawnExt;
8use futures::Future;
9use oneshot_fused_workaround as oneshot;
10use std::sync::{
11 atomic::{AtomicU32, Ordering},
12 Arc,
13};
14use std::time::{Duration, Instant};
15use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
16use tor_error::into_internal;
17use tor_guardmgr::GuardStatus;
18use tor_linkspec::{ChanTarget, IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
19use tor_netdir::params::NetParameters;
20use tor_proto::ccparams::{self, AlgorithmType};
21use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
22use tor_rtcompat::{Runtime, SleepProviderExt};
23use tor_units::Percentage;
24
25#[cfg(all(feature = "vanguards", feature = "hs-common"))]
26use tor_guardmgr::vanguards::VanguardMgr;
27
28mod guardstatus;
29
30pub(crate) use guardstatus::GuardStatusHandle;
31
32#[async_trait]
40pub(crate) trait Buildable: Sized {
41 async fn create_chantarget<RT: Runtime>(
47 chanmgr: &ChanMgr<RT>,
48 rt: &RT,
49 guard_status: &GuardStatusHandle,
50 ct: &OwnedChanTarget,
51 params: CircParameters,
52 usage: ChannelUsage,
53 ) -> Result<Arc<Self>>;
54
55 async fn create<RT: Runtime>(
58 chanmgr: &ChanMgr<RT>,
59 rt: &RT,
60 guard_status: &GuardStatusHandle,
61 ct: &OwnedCircTarget,
62 params: CircParameters,
63 usage: ChannelUsage,
64 ) -> Result<Arc<Self>>;
65
66 async fn extend<RT: Runtime>(
69 &self,
70 rt: &RT,
71 ct: &OwnedCircTarget,
72 params: CircParameters,
73 ) -> Result<()>;
74}
75
76async fn create_common<RT: Runtime, CT: ChanTarget>(
82 chanmgr: &ChanMgr<RT>,
83 rt: &RT,
84 target: &CT,
85 guard_status: &GuardStatusHandle,
86 usage: ChannelUsage,
87) -> Result<PendingClientCirc> {
88 let result = chanmgr.get_or_launch(target, usage).await;
90
91 let chan = match result {
93 Ok((chan, ChanProvenance::NewlyCreated)) => {
94 guard_status.skew(chan.clock_skew());
95 chan
96 }
97 Ok((chan, _)) => chan,
98 Err(cause) => {
99 if let Some(skew) = cause.clock_skew() {
100 guard_status.skew(skew);
101 }
102 return Err(Error::Channel {
103 peer: target.to_logged(),
104 cause,
105 });
106 }
107 };
108 let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol {
110 error,
111 peer: None, action: "initializing circuit",
113 unique_id: None,
114 })?;
115
116 rt.spawn(async {
117 let _ = reactor.run().await;
118 })
119 .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
120
121 Ok(pending_circ)
122}
123
124#[async_trait]
125impl Buildable for ClientCirc {
126 async fn create_chantarget<RT: Runtime>(
127 chanmgr: &ChanMgr<RT>,
128 rt: &RT,
129 guard_status: &GuardStatusHandle,
130 ct: &OwnedChanTarget,
131 params: CircParameters,
132 usage: ChannelUsage,
133 ) -> Result<Arc<Self>> {
134 let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
135 let unique_id = Some(circ.peek_unique_id());
136 circ.create_firsthop_fast(params)
137 .await
138 .map_err(|error| Error::Protocol {
139 peer: Some(ct.to_logged()),
140 error,
141 action: "running CREATE_FAST handshake",
142 unique_id,
143 })
144 }
145 async fn create<RT: Runtime>(
146 chanmgr: &ChanMgr<RT>,
147 rt: &RT,
148 guard_status: &GuardStatusHandle,
149 ct: &OwnedCircTarget,
150 params: CircParameters,
151 usage: ChannelUsage,
152 ) -> Result<Arc<Self>> {
153 let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
154 let unique_id = Some(circ.peek_unique_id());
155
156 let handshake_res = circ.create_firsthop(ct, params).await;
157
158 handshake_res.map_err(|error| Error::Protocol {
159 peer: Some(ct.to_logged()),
160 error,
161 action: "creating first hop",
162 unique_id,
163 })
164 }
165 async fn extend<RT: Runtime>(
166 &self,
167 _rt: &RT,
168 ct: &OwnedCircTarget,
169 params: CircParameters,
170 ) -> Result<()> {
171 let res = ClientCirc::extend(self, ct, params).await;
173
174 res.map_err(|error| Error::Protocol {
175 error,
176 peer: None,
180 action: "extending circuit",
181 unique_id: Some(self.unique_id()),
182 })
183 }
184}
185
186struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
194 runtime: R,
196 chanmgr: Arc<ChanMgr<R>>,
198 timeouts: timeouts::Estimator,
200 _phantom: std::marker::PhantomData<C>,
203}
204
205impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
206 fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
208 Builder {
209 runtime,
210 chanmgr,
211 timeouts,
212 _phantom: std::marker::PhantomData,
213 }
214 }
215
216 async fn build_notimeout(
225 self: Arc<Self>,
226 path: OwnedPath,
227 params: CircParameters,
228 start_time: Instant,
229 n_hops_built: Arc<AtomicU32>,
230 guard_status: Arc<GuardStatusHandle>,
231 usage: ChannelUsage,
232 ) -> Result<Arc<C>> {
233 match path {
234 OwnedPath::ChannelOnly(target) => {
235 guard_status.pending(GuardStatus::Failure);
237 let circ = C::create_chantarget(
238 &self.chanmgr,
239 &self.runtime,
240 &guard_status,
241 &target,
242 params,
243 usage,
244 )
245 .await?;
246 self.timeouts
247 .note_hop_completed(0, self.runtime.now() - start_time, true);
248 n_hops_built.fetch_add(1, Ordering::SeqCst);
249 Ok(circ)
250 }
251 OwnedPath::Normal(p) => {
252 assert!(!p.is_empty());
253 let n_hops = p.len() as u8;
254 guard_status.pending(GuardStatus::Failure);
256 let circ = C::create(
258 &self.chanmgr,
259 &self.runtime,
260 &guard_status,
261 &p[0],
262 params.clone(),
263 usage,
264 )
265 .await?;
266 self.timeouts
267 .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
268 guard_status.pending(GuardStatus::Indeterminate);
271 n_hops_built.fetch_add(1, Ordering::SeqCst);
272 let mut hop_num = 1;
273 for relay in p[1..].iter() {
274 circ.extend(&self.runtime, relay, params.clone()).await?;
276 n_hops_built.fetch_add(1, Ordering::SeqCst);
277 self.timeouts.note_hop_completed(
278 hop_num,
279 self.runtime.now() - start_time,
280 hop_num == (n_hops - 1),
281 );
282 hop_num += 1;
283 }
284 Ok(circ)
285 }
286 }
287 }
288
289 async fn build_owned(
291 self: &Arc<Self>,
292 path: OwnedPath,
293 params: &CircParameters,
294 guard_status: Arc<GuardStatusHandle>,
295 usage: ChannelUsage,
296 ) -> Result<Arc<C>> {
297 let action = Action::BuildCircuit { length: path.len() };
298 let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
299 let start_time = self.runtime.now();
300
301 let hops_built = Arc::new(AtomicU32::new(0));
305
306 let self_clone = Arc::clone(self);
307 let params = params.clone();
308
309 let circuit_future = self_clone.build_notimeout(
310 path,
311 params,
312 start_time,
313 Arc::clone(&hops_built),
314 guard_status,
315 usage,
316 );
317
318 match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
319 Ok(circuit) => Ok(circuit),
320 Err(Error::CircTimeout(unique_id)) => {
321 let n_built = hops_built.load(Ordering::SeqCst);
322 self.timeouts
323 .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
324 Err(Error::CircTimeout(unique_id))
325 }
326 Err(e) => Err(e),
327 }
328 }
329
330 pub(crate) fn runtime(&self) -> &R {
332 &self.runtime
333 }
334
335 pub(crate) fn estimator(&self) -> &timeouts::Estimator {
337 &self.timeouts
338 }
339}
340
341pub struct CircuitBuilder<R: Runtime> {
349 builder: Arc<Builder<R, ClientCirc>>,
351 path_config: tor_config::MutCfg<crate::PathConfig>,
353 storage: crate::TimeoutStateHandle,
355 guardmgr: tor_guardmgr::GuardMgr<R>,
358 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
360 vanguardmgr: Arc<VanguardMgr<R>>,
361}
362
363impl<R: Runtime> CircuitBuilder<R> {
364 pub(crate) fn new(
368 runtime: R,
369 chanmgr: Arc<ChanMgr<R>>,
370 path_config: crate::PathConfig,
371 storage: crate::TimeoutStateHandle,
372 guardmgr: tor_guardmgr::GuardMgr<R>,
373 #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
374 ) -> Self {
375 let timeouts = timeouts::Estimator::from_storage(&storage);
376
377 CircuitBuilder {
378 builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
379 path_config: path_config.into(),
380 storage,
381 guardmgr,
382 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
383 vanguardmgr: Arc::new(vanguardmgr),
384 }
385 }
386
387 pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
389 self.path_config.get()
390 }
391
392 pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
394 self.path_config.replace(new_config);
395 }
396
397 pub(crate) fn save_state(&self) -> Result<bool> {
401 if !self.storage.can_store() {
402 return Ok(false);
403 }
404 self.builder.timeouts.save_state(&self.storage)?;
407 self.guardmgr.store_persistent_state()?;
408 Ok(true)
409 }
410
411 pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
414 self.builder
415 .timeouts
416 .upgrade_to_owning_storage(&self.storage);
417 self.guardmgr.upgrade_to_owned_persistent_state()?;
418 Ok(())
419 }
420
421 pub(crate) fn reload_state(&self) -> Result<()> {
423 if !self.storage.can_store() {
424 self.builder
425 .timeouts
426 .reload_readonly_from_storage(&self.storage);
427 }
428 self.guardmgr.reload_persistent_state()?;
429 Ok(())
430 }
431
432 pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
436 self.builder.timeouts.update_params(p);
437 }
438
439 pub(crate) async fn build_owned(
441 &self,
442 path: OwnedPath,
443 params: &CircParameters,
444 guard_status: Arc<GuardStatusHandle>,
445 usage: ChannelUsage,
446 ) -> Result<Arc<ClientCirc>> {
447 self.builder
448 .build_owned(path, params, guard_status, usage)
449 .await
450 }
451
452 pub async fn build(
459 &self,
460 path: &TorPath<'_>,
461 params: &CircParameters,
462 usage: ChannelUsage,
463 ) -> Result<Arc<ClientCirc>> {
464 let owned = path.try_into()?;
465 self.build_owned(owned, params, Arc::new(None.into()), usage)
466 .await
467 }
468
469 pub(crate) fn learning_timeouts(&self) -> bool {
471 self.builder.timeouts.learning_timeouts()
472 }
473
474 pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
476 &self.guardmgr
477 }
478
479 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
481 pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
482 &self.vanguardmgr
483 }
484
485 pub(crate) fn runtime(&self) -> &R {
487 self.builder.runtime()
488 }
489
490 pub(crate) fn estimator(&self) -> &timeouts::Estimator {
492 self.builder.estimator()
493 }
494}
495
496#[cfg(feature = "flowctl-cc")]
498fn build_cc_vegas(
499 inp: &NetParameters,
500 vegas_queue_params: ccparams::VegasQueueParams,
501) -> ccparams::Algorithm {
502 ccparams::Algorithm::Vegas(
503 ccparams::VegasParamsBuilder::default()
504 .cell_in_queue_params(vegas_queue_params)
505 .ss_cwnd_max(inp.cc_ss_max.into())
506 .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
507 .cwnd_full_min_pct(Percentage::new(
508 inp.cc_cwnd_full_minpct.as_percent().get() as u32
509 ))
510 .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
511 .build()
512 .expect("Unable to build Vegas params from NetParams"),
513 )
514}
515
516fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
518 ccparams::Algorithm::FixedWindow(build_cc_fixedwindow_params(inp))
519}
520
521fn build_cc_fixedwindow_params(inp: &NetParameters) -> ccparams::FixedWindowParams {
524 ccparams::FixedWindowParamsBuilder::default()
525 .circ_window_start(inp.circuit_window.get() as u16)
526 .circ_window_min(inp.circuit_window.lower() as u16)
527 .circ_window_max(inp.circuit_window.upper() as u16)
528 .build()
529 .expect("Unable to build FixedWindow params from NetParams")
530}
531
532fn circparameters_from_netparameters(
534 inp: &NetParameters,
535 alg: ccparams::Algorithm,
536) -> Result<CircParameters> {
537 let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
538 .cwnd_init(inp.cc_cwnd_init.into())
539 .cwnd_inc_pct_ss(Percentage::new(
540 inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
541 ))
542 .cwnd_inc(inp.cc_cwnd_inc.into())
543 .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
544 .cwnd_min(inp.cc_cwnd_min.into())
545 .cwnd_max(inp.cc_cwnd_max.into())
546 .sendme_inc(inp.cc_sendme_inc.into())
547 .build()
548 .map_err(into_internal!(
549 "Unable to build CongestionWindow params from NetParams"
550 ))?;
551 let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
552 .ewma_cwnd_pct(Percentage::new(
553 inp.cc_ewma_cwnd_pct.as_percent().get() as u32
554 ))
555 .ewma_max(inp.cc_ewma_max.into())
556 .ewma_ss_max(inp.cc_ewma_ss.into())
557 .rtt_reset_pct(Percentage::new(
558 inp.cc_rtt_reset_pct.as_percent().get() as u32
559 ))
560 .build()
561 .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
562 let ccontrol = ccparams::CongestionControlParamsBuilder::default()
563 .alg(alg)
564 .fixed_window_params(build_cc_fixedwindow_params(inp))
565 .cwnd_params(cwnd_params)
566 .rtt_params(rtt_params)
567 .build()
568 .map_err(into_internal!(
569 "Unable to build CongestionControl params from NetParams"
570 ))?;
571 Ok(CircParameters::new(
572 inp.extend_by_ed25519_id.into(),
573 ccontrol,
574 ))
575}
576
577pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
580 let alg = match AlgorithmType::from(inp.cc_alg.get()) {
581 #[cfg(feature = "flowctl-cc")]
582 AlgorithmType::VEGAS => {
583 if false {
588 build_cc_vegas(
589 inp,
590 (
591 inp.cc_vegas_alpha_exit.into(),
592 inp.cc_vegas_beta_exit.into(),
593 inp.cc_vegas_delta_exit.into(),
594 inp.cc_vegas_gamma_exit.into(),
595 inp.cc_vegas_sscap_exit.into(),
596 )
597 .into(),
598 )
599 } else {
600 build_cc_fixedwindow(inp)
601 }
602 }
603 _ => build_cc_fixedwindow(inp),
605 };
606 circparameters_from_netparameters(inp, alg)
607}
608
609pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
612 let alg = match AlgorithmType::from(inp.cc_alg.get()) {
613 #[cfg(feature = "flowctl-cc")]
614 AlgorithmType::VEGAS => {
615 if false {
620 build_cc_vegas(
621 inp,
622 (
623 inp.cc_vegas_alpha_onion.into(),
624 inp.cc_vegas_beta_onion.into(),
625 inp.cc_vegas_delta_onion.into(),
626 inp.cc_vegas_gamma_onion.into(),
627 inp.cc_vegas_sscap_onion.into(),
628 )
629 .into(),
630 )
631 } else {
632 build_cc_fixedwindow(inp)
633 }
634 }
635 _ => build_cc_fixedwindow(inp),
637 };
638 circparameters_from_netparameters(inp, alg)
639}
640
641async fn double_timeout<R, F, T>(
651 runtime: &R,
652 fut: F,
653 timeout: Duration,
654 abandon: Duration,
655) -> Result<T>
656where
657 R: Runtime,
658 F: Future<Output = Result<T>> + Send + 'static,
659 T: Send + 'static,
660{
661 let (snd, rcv) = oneshot::channel();
662 let rt = runtime.clone();
663 let inner_timeout_future = rt.timeout(abandon, fut);
666 let outer_timeout_future = rt.timeout(timeout, rcv);
667
668 runtime
669 .spawn(async move {
670 let result = inner_timeout_future.await;
671 let _ignore_cancelled_error = snd.send(result);
672 })
673 .map_err(|e| Error::from_spawn("circuit construction task", e))?;
674
675 let outcome = outer_timeout_future.await;
676 outcome
686 .map_err(|_| Error::CircTimeout(None))??
687 .map_err(|_| Error::CircTimeout(None))?
688}
689
690#[cfg(test)]
691mod test {
692 #![allow(clippy::bool_assert_comparison)]
694 #![allow(clippy::clone_on_copy)]
695 #![allow(clippy::dbg_macro)]
696 #![allow(clippy::mixed_attributes_style)]
697 #![allow(clippy::print_stderr)]
698 #![allow(clippy::print_stdout)]
699 #![allow(clippy::single_char_pattern)]
700 #![allow(clippy::unwrap_used)]
701 #![allow(clippy::unchecked_duration_subtraction)]
702 #![allow(clippy::useless_vec)]
703 #![allow(clippy::needless_pass_by_value)]
704 use super::*;
706 use crate::timeouts::TimeoutEstimator;
707 use futures::FutureExt;
708 use std::sync::Mutex;
709 use tor_chanmgr::ChannelConfig;
710 use tor_chanmgr::ChannelUsage as CU;
711 use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
712 use tor_llcrypto::pk::ed25519::Ed25519Identity;
713 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
714 use tor_proto::memquota::ToplevelAccount;
715 use tor_rtcompat::SleepProvider;
716 use tracing::trace;
717
718 fn gs() -> Arc<GuardStatusHandle> {
720 Arc::new(None.into())
721 }
722
723 #[test]
724 fn test_double_timeout() {
726 let t1 = Duration::from_secs(1);
727 let t10 = Duration::from_secs(10);
728 fn duration_close_to(d1: Duration, d2: Duration) -> bool {
730 d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
731 }
732
733 tor_rtmock::MockRuntime::test_with_various(|rto| async move {
734 let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
736 assert!(x.is_ok());
737 assert_eq!(x.unwrap(), 3_u32);
738
739 trace!("acquiesce after test1");
740 #[allow(clippy::clone_on_copy)]
741 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
743
744 let rt_clone = rt.clone();
746 rt_clone.block_advance("manually controlling advances");
748 let x = rt
749 .wait_for(double_timeout(
750 &rt,
751 async move {
752 let sl = rt_clone.sleep(Duration::from_millis(100));
753 rt_clone.allow_one_advance(Duration::from_millis(100));
754 sl.await;
755 Ok(4_u32)
756 },
757 t1,
758 t10,
759 ))
760 .await;
761 assert!(x.is_ok());
762 assert_eq!(x.unwrap(), 4_u32);
763
764 trace!("acquiesce after test2");
765 #[allow(clippy::clone_on_copy)]
766 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
768
769 let rt_clone = rt.clone();
772 let (snd, rcv) = oneshot::channel();
773 let start = rt.now();
774 rt.block_advance("manually controlling advances");
775 let x = rt
776 .wait_for(double_timeout(
777 &rt,
778 async move {
779 let sl = rt_clone.sleep(Duration::from_secs(2));
780 rt_clone.allow_one_advance(Duration::from_secs(2));
781 sl.await;
782 snd.send(()).unwrap();
783 Ok(4_u32)
784 },
785 t1,
786 t10,
787 ))
788 .await;
789 assert!(matches!(x, Err(Error::CircTimeout(_))));
790 let end = rt.now();
791 assert!(duration_close_to(end - start, Duration::from_secs(1)));
792 let waited = rt.wait_for(rcv).await;
793 assert_eq!(waited, Ok(()));
794
795 trace!("acquiesce after test3");
796 #[allow(clippy::clone_on_copy)]
797 #[allow(deprecated)] let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
799
800 let rt_clone = rt.clone();
802 rt.block_advance("manually controlling advances");
803 let (snd, rcv) = oneshot::channel();
804 let start = rt.now();
805 rt.allow_one_advance(Duration::from_secs(1));
807 let x = rt
808 .wait_for(double_timeout(
809 &rt,
810 async move {
811 rt_clone.sleep(Duration::from_secs(30)).await;
812 snd.send(()).unwrap();
813 Ok(4_u32)
814 },
815 t1,
816 t10,
817 ))
818 .await;
819 assert!(matches!(x, Err(Error::CircTimeout(_))));
820 let end = rt.now();
821 rt.allow_one_advance(Duration::from_secs(9));
823 let waited = rt.wait_for(rcv).await;
824 assert!(waited.is_err());
825 let end2 = rt.now();
826 assert!(duration_close_to(end - start, Duration::from_secs(1)));
827 assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
828 });
829 }
830
831 fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
840 let mut be = [0; 8];
841 be[..].copy_from_slice(&id.as_bytes()[0..8]);
842 let dur = u64::from_be_bytes(be);
843 be[..].copy_from_slice(&id.as_bytes()[8..16]);
844 let dur2 = u64::from_be_bytes(be);
845 (Duration::from_millis(dur), Duration::from_millis(dur2))
846 }
847 fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
856 let mut bytes = [0; 32];
857 let dur = (d1.as_millis() as u64).to_be_bytes();
858 bytes[0..8].copy_from_slice(&dur);
859 let dur = (d2.as_millis() as u64).to_be_bytes();
860 bytes[8..16].copy_from_slice(&dur);
861 bytes.into()
862 }
863
864 fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
867 let ed_id = ct
870 .identity(RelayIdType::Ed25519)
871 .expect("No ed25519 key was present for fake ChanTarget‽")
872 .try_into()
873 .expect("ChanTarget provided wrong key type");
874 timeouts_from_key(ed_id)
875 }
876
877 #[derive(Debug, Clone)]
879 struct FakeCirc {
880 hops: Vec<RelayIds>,
881 onehop: bool,
882 }
883 #[async_trait]
884 impl Buildable for Mutex<FakeCirc> {
885 async fn create_chantarget<RT: Runtime>(
886 _: &ChanMgr<RT>,
887 rt: &RT,
888 _guard_status: &GuardStatusHandle,
889 ct: &OwnedChanTarget,
890 _: CircParameters,
891 _usage: ChannelUsage,
892 ) -> Result<Arc<Self>> {
893 let (d1, d2) = timeouts_from_chantarget(ct);
894 rt.sleep(d1).await;
895 if !d2.is_zero() {
896 rt.allow_one_advance(d2);
897 }
898
899 let c = FakeCirc {
900 hops: vec![RelayIds::from_relay_ids(ct)],
901 onehop: true,
902 };
903 Ok(Arc::new(Mutex::new(c)))
904 }
905 async fn create<RT: Runtime>(
906 _: &ChanMgr<RT>,
907 rt: &RT,
908 _guard_status: &GuardStatusHandle,
909 ct: &OwnedCircTarget,
910 _: CircParameters,
911 _usage: ChannelUsage,
912 ) -> Result<Arc<Self>> {
913 let (d1, d2) = timeouts_from_chantarget(ct);
914 rt.sleep(d1).await;
915 if !d2.is_zero() {
916 rt.allow_one_advance(d2);
917 }
918
919 let c = FakeCirc {
920 hops: vec![RelayIds::from_relay_ids(ct)],
921 onehop: false,
922 };
923 Ok(Arc::new(Mutex::new(c)))
924 }
925 async fn extend<RT: Runtime>(
926 &self,
927 rt: &RT,
928 ct: &OwnedCircTarget,
929 _: CircParameters,
930 ) -> Result<()> {
931 let (d1, d2) = timeouts_from_chantarget(ct);
932 rt.sleep(d1).await;
933 if !d2.is_zero() {
934 rt.allow_one_advance(d2);
935 }
936
937 {
938 let mut c = self.lock().unwrap();
939 c.hops.push(RelayIds::from_relay_ids(ct));
940 }
941 Ok(())
942 }
943 }
944
945 struct TimeoutRecorder<R> {
947 runtime: R,
948 hist: Vec<(bool, u8, Duration)>,
949 on_timeout: Duration,
951 on_success: Duration,
953
954 snd_success: Option<oneshot::Sender<()>>,
955 rcv_success: Option<oneshot::Receiver<()>>,
956 }
957
958 impl<R> TimeoutRecorder<R> {
959 fn new(runtime: R) -> Self {
960 Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
961 }
962
963 fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
964 let (snd_success, rcv_success) = oneshot::channel();
965 Self {
966 runtime,
967 hist: Vec::new(),
968 on_timeout,
969 on_success,
970 rcv_success: Some(rcv_success),
971 snd_success: Some(snd_success),
972 }
973 }
974 }
975 impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
976 fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
977 if !is_last {
978 return;
979 }
980 let (rt, advance) = {
981 let mut this = self.lock().unwrap();
982 this.hist.push((true, hop, delay));
983 let _ = this.snd_success.take().unwrap().send(());
984 (this.runtime.clone(), this.on_success)
985 };
986 if !advance.is_zero() {
987 rt.allow_one_advance(advance);
988 }
989 }
990 fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
991 let (rt, advance) = {
992 let mut this = self.lock().unwrap();
993 this.hist.push((false, hop, delay));
994 (this.runtime.clone(), this.on_timeout)
995 };
996 if !advance.is_zero() {
997 rt.allow_one_advance(advance);
998 }
999 }
1000 fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
1001 (Duration::from_secs(3), Duration::from_secs(100))
1002 }
1003 fn learning_timeouts(&self) -> bool {
1004 false
1005 }
1006 fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
1007
1008 fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
1009 None
1010 }
1011 }
1012
1013 fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
1015 let mut builder = OwnedCircTarget::builder();
1016 builder
1017 .chan_target()
1018 .ed_identity(id)
1019 .rsa_identity([0x20; 20].into());
1020 builder
1021 .ntor_onion_key([0x33; 32].into())
1022 .protocols("".parse().unwrap())
1023 .build()
1024 .unwrap()
1025 }
1026 fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
1028 OwnedChanTarget::builder()
1029 .ed_identity(id)
1030 .rsa_identity([0x20; 20].into())
1031 .build()
1032 .unwrap()
1033 }
1034
1035 async fn run_builder_test(
1036 rt: tor_rtmock::MockRuntime,
1037 advance_initial: Duration,
1038 path: OwnedPath,
1039 advance_on_timeout: Option<(Duration, Duration)>,
1040 usage: ChannelUsage,
1041 ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
1042 let chanmgr = Arc::new(ChanMgr::new(
1043 rt.clone(),
1044 &ChannelConfig::default(),
1045 Default::default(),
1046 &Default::default(),
1047 ToplevelAccount::new_noop(),
1048 ));
1049 let timeouts = match advance_on_timeout {
1051 Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
1052 None => TimeoutRecorder::new(rt.clone()),
1053 };
1054 let timeouts = Arc::new(Mutex::new(timeouts));
1055 let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
1056 rt.clone(),
1057 chanmgr,
1058 timeouts::Estimator::new(Arc::clone(&timeouts)),
1059 );
1060
1061 rt.block_advance("manually controlling advances");
1062 rt.allow_one_advance(advance_initial);
1063 let outcome = rt.spawn_join("build-owned", async move {
1064 let arcbuilder = Arc::new(builder);
1065 let params = exit_circparams_from_netparams(&NetParameters::default())?;
1066 arcbuilder.build_owned(path, ¶ms, gs(), usage).await
1067 });
1068
1069 if advance_on_timeout.is_some() {
1071 let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
1072 rt.spawn_identified("receiver", async move {
1073 receiver.await.unwrap();
1074 });
1075 }
1076 rt.advance_until_stalled().await;
1077
1078 let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
1079 let timeouts = timeouts.lock().unwrap().hist.clone();
1080
1081 (circ, timeouts)
1082 }
1083
1084 #[test]
1085 fn build_onehop() {
1086 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1087 let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
1088 let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
1089
1090 let (outcome, timeouts) =
1091 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1092 let circ = outcome.unwrap();
1093 assert!(circ.onehop);
1094 assert_eq!(circ.hops.len(), 1);
1095 assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1096
1097 assert_eq!(timeouts.len(), 1);
1098 assert!(timeouts[0].0); assert_eq!(timeouts[0].1, 0); assert_eq!(timeouts[0].2, Duration::from_millis(100));
1101 });
1102 }
1103
1104 #[test]
1105 fn build_threehop() {
1106 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1107 let id_100ms =
1108 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1109 let id_200ms =
1110 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
1111 let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
1112 let path =
1113 OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
1114
1115 let (outcome, timeouts) =
1116 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1117 let circ = outcome.unwrap();
1118 assert!(!circ.onehop);
1119 assert_eq!(circ.hops.len(), 3);
1120 assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1121 assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
1122 assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
1123
1124 assert_eq!(timeouts.len(), 1);
1125 assert!(timeouts[0].0); assert_eq!(timeouts[0].1, 2); assert_eq!(timeouts[0].2, Duration::from_millis(600));
1128 });
1129 }
1130
1131 #[test]
1132 fn build_huge_timeout() {
1133 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1134 let id_100ms =
1135 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1136 let id_200ms =
1137 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1138 let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
1139
1140 let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
1141
1142 let (outcome, timeouts) =
1143 run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1144 assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1145
1146 assert_eq!(timeouts.len(), 1);
1147 assert!(!timeouts[0].0); assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1152 });
1153 }
1154
1155 #[test]
1156 fn build_modest_timeout() {
1157 tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1158 let id_100ms =
1159 key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1160 let id_200ms =
1161 key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1162 let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
1163
1164 let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
1165
1166 let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
1167
1168 let (outcome, timeouts) = run_builder_test(
1169 rt.clone(),
1170 Duration::from_millis(100),
1171 path,
1172 Some(timeout_advance),
1173 CU::UserTraffic,
1174 )
1175 .await;
1176 assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1177
1178 assert_eq!(timeouts.len(), 2);
1179 assert!(!timeouts[0].0); assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1184
1185 assert!(timeouts[1].0); assert_eq!(timeouts[1].1, 2); });
1190 }
1191}