1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
48
49use build::CircuitBuilder;
50use mgr::{AbstractCirc, AbstractCircBuilder};
51use tor_basic_utils::retry::RetryDelay;
52use tor_chanmgr::ChanMgr;
53use tor_error::{error_report, warn_report};
54use tor_guardmgr::RetireCircuits;
55use tor_linkspec::ChanTarget;
56use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
57use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
58use tor_rtcompat::Runtime;
59
60#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
61use tor_linkspec::IntoOwnedChanTarget;
62
63use futures::task::SpawnExt;
64use futures::StreamExt;
65use std::sync::{Arc, Mutex, Weak};
66use std::time::{Duration, Instant};
67use tracing::{debug, info, trace, warn};
68
69#[cfg(feature = "testing")]
70pub use config::test_config::TestConfig;
71
72pub mod build;
73mod config;
74mod err;
75#[cfg(feature = "hs-common")]
76pub mod hspool;
77mod impls;
78pub mod isolation;
79mod mgr;
80#[cfg(test)]
81mod mocks;
82mod preemptive;
83pub mod timeouts;
84mod usage;
85
86cfg_if::cfg_if! {
88 if #[cfg(feature = "experimental-api")] {
89 pub mod path;
90 } else {
91 pub(crate) mod path;
92 }
93}
94
95pub use err::Error;
96pub use isolation::IsolationToken;
97use tor_guardmgr::fallback::FallbackList;
98pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
99pub use usage::{TargetPort, TargetPorts};
100
101pub use config::{
102 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
103 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
104};
105
106use crate::isolation::StreamIsolation;
107use crate::mgr::CircProvenance;
108use crate::preemptive::PreemptiveCircuitPredictor;
109use usage::TargetCircUsage;
110
111use safelog::sensitive as sv;
112#[cfg(feature = "geoip")]
113use tor_geoip::CountryCode;
114pub use tor_guardmgr::{ExternalActivity, FirstHopId};
115use tor_persist::StateMgr;
116use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
117
118#[cfg(feature = "hs-common")]
119use crate::hspool::{HsCircKind, HsCircStemKind};
120#[cfg(all(feature = "vanguards", feature = "hs-common"))]
121use tor_guardmgr::vanguards::VanguardMgr;
122
123pub type Result<T> = std::result::Result<T, Error>;
125
126type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
128
129const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
131
132#[derive(Debug, Copy, Clone)]
140#[non_exhaustive]
141pub enum DirInfo<'a> {
142 Fallbacks(&'a FallbackList),
144 Directory(&'a NetDir),
146 Nothing,
149}
150
151impl<'a> From<&'a FallbackList> for DirInfo<'a> {
152 fn from(v: &'a FallbackList) -> DirInfo<'a> {
153 DirInfo::Fallbacks(v)
154 }
155}
156impl<'a> From<&'a NetDir> for DirInfo<'a> {
157 fn from(v: &'a NetDir) -> DirInfo<'a> {
158 DirInfo::Directory(v)
159 }
160}
161impl<'a> DirInfo<'a> {
162 fn circ_params(&self, usage: &TargetCircUsage) -> Result<CircParameters> {
164 use tor_netdir::params::NetParameters;
165 let defaults = NetParameters::default();
168 let net_params = match self {
169 DirInfo::Directory(d) => d.params(),
170 _ => &defaults,
171 };
172 match usage {
173 #[cfg(feature = "hs-common")]
174 TargetCircUsage::HsCircBase { .. } => {
175 build::onion_circparams_from_netparams(net_params)
176 }
177 _ => build::exit_circparams_from_netparams(net_params),
178 }
179 }
180}
181
182pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::CircuitBuilder<R>, R>>);
192
193impl<R: Runtime> CircMgr<R> {
194 pub fn new<SM, CFG: CircMgrConfig>(
200 config: &CFG,
201 storage: SM,
202 runtime: &R,
203 chanmgr: Arc<ChanMgr<R>>,
204 guardmgr: &tor_guardmgr::GuardMgr<R>,
205 ) -> Result<Self>
206 where
207 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
208 {
209 Ok(Self(Arc::new(CircMgrInner::new(
210 config, storage, runtime, chanmgr, guardmgr,
211 )?)))
212 }
213
214 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
217 self.0.get_or_launch_dir(netdir).await
218 }
219
220 pub async fn get_or_launch_exit(
226 &self,
227 netdir: DirInfo<'_>, ports: &[TargetPort],
229 isolation: StreamIsolation,
230 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
233 ) -> Result<Arc<ClientCirc>> {
234 self.0
235 .get_or_launch_exit(
236 netdir,
237 ports,
238 isolation,
239 #[cfg(feature = "geoip")]
240 country_code,
241 )
242 .await
243 }
244
245 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
250 #[cfg(feature = "specific-relay")]
251 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
252 &self,
253 target: T,
254 ) -> Result<Arc<ClientCirc>> {
255 self.0.get_or_launch_dir_specific(target).await
256 }
257
258 pub fn launch_background_tasks<D, S>(
264 self: &Arc<Self>,
265 runtime: &R,
266 dir_provider: &Arc<D>,
267 state_mgr: S,
268 ) -> Result<Vec<TaskHandle>>
269 where
270 D: NetDirProvider + 'static + ?Sized,
271 S: StateMgr + std::marker::Send + 'static,
272 {
273 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
274 }
275
276 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
282 self.0.netdir_is_sufficient(netdir)
283 }
284
285 pub fn retire_circ(&self, circ_id: &UniqId) {
288 self.0.retire_circ(circ_id);
289 }
290
291 pub fn note_external_failure(
295 &self,
296 target: &impl ChanTarget,
297 external_failure: ExternalActivity,
298 ) {
299 self.0.note_external_failure(target, external_failure);
300 }
301
302 pub fn note_external_success(
305 &self,
306 target: &impl ChanTarget,
307 external_activity: ExternalActivity,
308 ) {
309 self.0.note_external_success(target, external_activity);
310 }
311
312 pub fn skew_events(&self) -> ClockSkewEvents {
320 self.0.skew_events()
321 }
322
323 pub fn reconfigure<CFG: CircMgrConfig>(
329 &self,
330 new_config: &CFG,
331 how: tor_config::Reconfigure,
332 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
333 self.0.reconfigure(new_config, how)
334 }
335
336 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
362 self.0.estimate_timeout(timeout_action)
363 }
364
365 #[cfg(feature = "experimental-api")]
368 pub fn builder(&self) -> &CircuitBuilder<R> {
369 CircMgrInner::builder(&self.0)
370 }
371}
372
373#[derive(Clone)]
375pub(crate) struct CircMgrInner<B: AbstractCircBuilder<R> + 'static, R: Runtime> {
376 mgr: Arc<mgr::AbstractCircMgr<B, R>>,
378 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
380}
381
382impl<R: Runtime> CircMgrInner<CircuitBuilder<R>, R> {
383 #[allow(clippy::unnecessary_wraps)]
389 pub(crate) fn new<SM, CFG: CircMgrConfig>(
390 config: &CFG,
391 storage: SM,
392 runtime: &R,
393 chanmgr: Arc<ChanMgr<R>>,
394 guardmgr: &tor_guardmgr::GuardMgr<R>,
395 ) -> Result<Self>
396 where
397 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
398 {
399 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
400 let vanguardmgr = {
401 let has_onion_svc = false;
406 VanguardMgr::new(
407 config.vanguard_config(),
408 runtime.clone(),
409 storage.clone(),
410 has_onion_svc,
411 )?
412 };
413
414 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
415
416 let builder = build::CircuitBuilder::new(
417 runtime.clone(),
418 chanmgr,
419 config.path_rules().clone(),
420 storage_handle,
421 guardmgr.clone(),
422 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423 vanguardmgr,
424 );
425
426 Ok(Self::new_generic(config, runtime, guardmgr, builder))
427 }
428}
429
430impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
431 pub(crate) fn new_generic<CFG: CircMgrConfig>(
433 config: &CFG,
434 runtime: &R,
435 guardmgr: &tor_guardmgr::GuardMgr<R>,
436 builder: B,
437 ) -> Self {
438 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
439 config.preemptive_circuits().clone(),
440 )));
441
442 guardmgr.set_filter(config.path_rules().build_guard_filter());
443
444 let mgr =
445 mgr::AbstractCircMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
446
447 CircMgrInner {
448 mgr: Arc::new(mgr),
449 predictor: preemptive,
450 }
451 }
452
453 pub(crate) fn launch_background_tasks<D, S>(
459 self: &Arc<Self>,
460 runtime: &R,
461 dir_provider: &Arc<D>,
462 state_mgr: S,
463 ) -> Result<Vec<TaskHandle>>
464 where
465 D: NetDirProvider + 'static + ?Sized,
466 S: StateMgr + std::marker::Send + 'static,
467 {
468 let mut ret = vec![];
469
470 runtime
471 .spawn(Self::keep_circmgr_params_updated(
472 dir_provider.events(),
473 Arc::downgrade(self),
474 Arc::downgrade(dir_provider),
475 ))
476 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
477
478 let (sched, handle) = TaskSchedule::new(runtime.clone());
479 ret.push(handle);
480
481 runtime
482 .spawn(Self::update_persistent_state(
483 sched,
484 Arc::downgrade(self),
485 state_mgr,
486 ))
487 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
488
489 let (sched, handle) = TaskSchedule::new(runtime.clone());
490 ret.push(handle);
491
492 runtime
493 .spawn(Self::continually_launch_timeout_testing_circuits(
494 sched,
495 Arc::downgrade(self),
496 Arc::downgrade(dir_provider),
497 ))
498 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
499
500 let (sched, handle) = TaskSchedule::new(runtime.clone());
501 ret.push(handle);
502
503 runtime
504 .spawn(Self::continually_preemptively_build_circuits(
505 sched,
506 Arc::downgrade(self),
507 Arc::downgrade(dir_provider),
508 ))
509 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
510
511 self.mgr
512 .peek_builder()
513 .guardmgr()
514 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
515
516 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
517 {
518 let () = self
519 .mgr
520 .peek_builder()
521 .vanguardmgr()
522 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
523 }
524
525 Ok(ret)
526 }
527
528 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Circ>> {
531 self.expire_circuits();
532 let usage = TargetCircUsage::Dir;
533 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
534 }
535
536 pub(crate) async fn get_or_launch_exit(
542 &self,
543 netdir: DirInfo<'_>, ports: &[TargetPort],
545 isolation: StreamIsolation,
546 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
549 ) -> Result<Arc<B::Circ>> {
550 self.expire_circuits();
551 let time = Instant::now();
552 {
553 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
554 if ports.is_empty() {
555 predictive.note_usage(None, time);
556 } else {
557 for port in ports.iter() {
558 predictive.note_usage(Some(*port), time);
559 }
560 }
561 }
562 let require_stability = ports.iter().any(|p| {
563 self.mgr
564 .peek_builder()
565 .path_config()
566 .long_lived_ports
567 .contains(&p.port)
568 });
569 let ports = ports.iter().map(Clone::clone).collect();
570 #[cfg(not(feature = "geoip"))]
571 let country_code = None;
572 let usage = TargetCircUsage::Exit {
573 ports,
574 isolation,
575 country_code,
576 require_stability,
577 };
578 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
579 }
580
581 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
586 #[cfg(feature = "specific-relay")]
587 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
588 &self,
589 target: T,
590 ) -> Result<Arc<B::Circ>> {
591 self.expire_circuits();
592 let usage = TargetCircUsage::DirSpecificTarget(target.to_owned());
593 self.mgr
594 .get_or_launch(&usage, DirInfo::Nothing)
595 .await
596 .map(|(c, _)| c)
597 }
598
599 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
605 &self,
606 new_config: &CFG,
607 how: tor_config::Reconfigure,
608 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
609 let old_path_rules = self.mgr.peek_builder().path_config();
610 let predictor = self.predictor.lock().expect("poisoned lock");
611 let preemptive_circuits = predictor.config();
612 if preemptive_circuits.initial_predicted_ports
613 != new_config.preemptive_circuits().initial_predicted_ports
614 {
615 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
617 }
618
619 if how == tor_config::Reconfigure::CheckAllOrNothing {
620 return Ok(RetireCircuits::None);
621 }
622
623 let retire_because_of_guardmgr =
624 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
625
626 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
627 let retire_because_of_vanguardmgr = self
628 .mgr
629 .peek_builder()
630 .vanguardmgr()
631 .reconfigure(new_config.vanguard_config())?;
632
633 let new_reachable = &new_config.path_rules().reachable_addrs;
634 if new_reachable != &old_path_rules.reachable_addrs {
635 let filter = new_config.path_rules().build_guard_filter();
636 self.mgr.peek_builder().guardmgr().set_filter(filter);
637 }
638
639 let discard_all_circuits = !new_config
640 .path_rules()
641 .at_least_as_permissive_as(&old_path_rules)
642 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
643
644 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
645 let discard_all_circuits = discard_all_circuits
646 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
647
648 self.mgr
649 .peek_builder()
650 .set_path_config(new_config.path_rules().clone());
651 self.mgr
652 .set_circuit_timing(new_config.circuit_timing().clone());
653 predictor.set_config(new_config.preemptive_circuits().clone());
654
655 if discard_all_circuits {
656 info!("Path configuration has become more restrictive: retiring existing circuits.");
660 self.retire_all_circuits();
661 return Ok(RetireCircuits::All);
662 }
663 Ok(RetireCircuits::None)
664 }
665
666 async fn keep_circmgr_params_updated<D>(
674 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
675 circmgr: Weak<Self>,
676 dirmgr: Weak<D>,
677 ) where
678 D: NetDirProvider + 'static + ?Sized,
679 {
680 use DirEvent::*;
681 while let Some(event) = events.next().await {
682 if matches!(event, NewConsensus) {
683 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
684 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
685 cm.update_network_parameters(netdir.params());
686 }
687 } else {
688 debug!("Circmgr or dirmgr has disappeared; task exiting.");
689 break;
690 }
691 }
692 }
693 }
694
695 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
698 self.mgr.update_network_parameters(p);
699 self.mgr.peek_builder().update_network_parameters(p);
700 }
701
702 async fn continually_launch_timeout_testing_circuits<D>(
709 mut sched: TaskSchedule<R>,
710 circmgr: Weak<Self>,
711 dirmgr: Weak<D>,
712 ) where
713 D: NetDirProvider + 'static + ?Sized,
714 {
715 while sched.next().await.is_some() {
716 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
717 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
718 if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
719 warn_report!(e, "Problem launching a timeout testing circuit");
720 }
721 let delay = netdir
722 .params()
723 .cbt_testing_delay
724 .try_into()
725 .expect("Out-of-bounds value from BoundedInt32");
726
727 drop((cm, dm));
728 sched.fire_in(delay);
729 } else {
730 let _ = dm.events().next().await;
733 sched.fire();
734 }
735 } else {
736 return;
737 }
738 }
739 }
740
741 fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
749 if !self.mgr.peek_builder().learning_timeouts() {
750 return Ok(());
751 }
752 self.expire_circuits();
755 let max_circs: u64 = netdir
756 .params()
757 .cbt_max_open_circuits_for_testing
758 .try_into()
759 .expect("Out-of-bounds result from BoundedInt32");
760 if (self.mgr.n_circs() as u64) < max_circs {
761 let usage = TargetCircUsage::TimeoutTesting;
763 let dirinfo = netdir.into();
764 let mgr = Arc::clone(&self.mgr);
765 debug!("Launching a circuit to test build times.");
766 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
767 drop(receiver);
770 }
771
772 Ok(())
773 }
774
775 #[allow(clippy::cognitive_complexity)] async fn update_persistent_state<S>(
783 mut sched: TaskSchedule<R>,
784 circmgr: Weak<Self>,
785 statemgr: S,
786 ) where
787 S: StateMgr + std::marker::Send,
788 {
789 while sched.next().await.is_some() {
790 if let Some(circmgr) = Weak::upgrade(&circmgr) {
791 use tor_persist::LockStatus::*;
792
793 match statemgr.try_lock() {
794 Err(e) => {
795 error_report!(e, "Problem with state lock file");
796 break;
797 }
798 Ok(NewlyAcquired) => {
799 info!("We now own the lock on our state files.");
800 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
801 error_report!(e, "Unable to upgrade to owned state files");
802 break;
803 }
804 }
805 Ok(AlreadyHeld) => {
806 if let Err(e) = circmgr.store_persistent_state() {
807 error_report!(e, "Unable to flush circmgr state");
808 break;
809 }
810 }
811 Ok(NoLock) => {
812 if let Err(e) = circmgr.reload_persistent_state() {
813 error_report!(e, "Unable to reload circmgr state");
814 break;
815 }
816 }
817 }
818 } else {
819 debug!("Circmgr has disappeared; task exiting.");
820 return;
821 }
822 sched.fire_in(Duration::from_secs(60));
829 }
830
831 debug!("State update task exiting (potentially due to handle drop).");
832 }
833
834 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
838 self.mgr.peek_builder().upgrade_to_owned_state()?;
839 Ok(())
840 }
841
842 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
847 self.mgr.peek_builder().reload_state()?;
848 Ok(())
849 }
850
851 async fn continually_preemptively_build_circuits<D>(
863 mut sched: TaskSchedule<R>,
864 circmgr: Weak<Self>,
865 dirmgr: Weak<D>,
866 ) where
867 D: NetDirProvider + 'static + ?Sized,
868 {
869 let base_delay = Duration::from_secs(10);
870 let mut retry = RetryDelay::from_duration(base_delay);
871
872 while sched.next().await.is_some() {
873 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
874 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
875 let result = cm
876 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
877 .await;
878
879 let delay = match result {
880 Ok(()) => {
881 retry.reset();
882 base_delay
883 }
884 Err(_) => retry.next_delay(&mut rand::rng()),
885 };
886
887 sched.fire_in(delay);
888 } else {
889 let _ = dm.events().next().await;
892 sched.fire();
893 }
894 } else {
895 return;
896 }
897 }
898 }
899
900 #[allow(clippy::cognitive_complexity)]
908 async fn launch_circuits_preemptively(
909 &self,
910 netdir: DirInfo<'_>,
911 ) -> std::result::Result<(), err::PreemptiveCircError> {
912 trace!("Checking preemptive circuit predictions.");
913 let (circs, threshold) = {
914 let path_config = self.mgr.peek_builder().path_config();
915 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
916 let threshold = preemptive.config().disable_at_threshold;
917 (preemptive.predict(&path_config), threshold)
918 };
919
920 if self.mgr.n_circs() >= threshold {
921 return Ok(());
922 }
923 let mut n_created = 0_usize;
924 let mut n_errors = 0_usize;
925
926 let futures = circs
927 .iter()
928 .map(|usage| self.mgr.get_or_launch(usage, netdir));
929 let results = futures::future::join_all(futures).await;
930 for (i, result) in results.into_iter().enumerate() {
931 match result {
932 Ok((_, CircProvenance::NewlyCreated)) => {
933 debug!("Preeemptive circuit was created for {:?}", circs[i]);
934 n_created += 1;
935 }
936 Ok((_, CircProvenance::Preexisting)) => {
937 trace!("Circuit already existed created for {:?}", circs[i]);
938 }
939 Err(e) => {
940 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
941 n_errors += 1;
942 }
943 }
944 }
945
946 if n_created > 0 || n_errors == 0 {
947 Ok(())
951 } else {
952 Err(err::PreemptiveCircError)
955 }
956 }
957
958 #[cfg(feature = "hs-common")]
975 pub(crate) async fn launch_hs_unmanaged<T>(
976 &self,
977 planned_target: Option<T>,
978 dir: &NetDir,
979 stem_kind: HsCircStemKind,
980 circ_kind: Option<HsCircKind>,
981 ) -> Result<Arc<B::Circ>>
982 where
983 T: IntoOwnedChanTarget,
984 {
985 let usage = TargetCircUsage::HsCircBase {
986 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
987 stem_kind,
988 circ_kind,
989 };
990 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
991 Ok(client_circ)
992 }
993
994 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1000 self.mgr
1001 .peek_builder()
1002 .guardmgr()
1003 .netdir_is_sufficient(netdir)
1004 }
1005
1006 pub(crate) fn estimate_timeout(
1008 &self,
1009 timeout_action: &timeouts::Action,
1010 ) -> std::time::Duration {
1011 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1012 timeout
1013 }
1014
1015 pub(crate) fn builder(&self) -> &B {
1017 self.mgr.peek_builder()
1018 }
1019
1020 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1025 self.mgr.peek_builder().save_state()
1026 }
1027
1028 fn expire_circuits(&self) {
1033 let now = self.mgr.peek_runtime().now();
1039 self.mgr.expire_circs(now);
1040 }
1041
1042 pub(crate) fn retire_all_circuits(&self) {
1050 self.mgr.retire_all_circuits();
1051 }
1052
1053 pub(crate) fn retire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id) {
1056 let _ = self.mgr.take_circ(circ_id);
1057 }
1058
1059 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1067 self.mgr.peek_builder().guardmgr().skew_events()
1068 }
1069
1070 pub(crate) fn note_external_failure(
1074 &self,
1075 target: &impl ChanTarget,
1076 external_failure: ExternalActivity,
1077 ) {
1078 self.mgr
1079 .peek_builder()
1080 .guardmgr()
1081 .note_external_failure(target, external_failure);
1082 }
1083
1084 pub(crate) fn note_external_success(
1087 &self,
1088 target: &impl ChanTarget,
1089 external_activity: ExternalActivity,
1090 ) {
1091 self.mgr
1092 .peek_builder()
1093 .guardmgr()
1094 .note_external_success(target, external_activity);
1095 }
1096}
1097
1098impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1099 fn drop(&mut self) {
1100 match self.store_persistent_state() {
1101 Ok(true) => info!("Flushed persistent state at exit."),
1102 Ok(false) => debug!("Lock not held; no state to flush."),
1103 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109mod test {
1110 #![allow(clippy::bool_assert_comparison)]
1112 #![allow(clippy::clone_on_copy)]
1113 #![allow(clippy::dbg_macro)]
1114 #![allow(clippy::mixed_attributes_style)]
1115 #![allow(clippy::print_stderr)]
1116 #![allow(clippy::print_stdout)]
1117 #![allow(clippy::single_char_pattern)]
1118 #![allow(clippy::unwrap_used)]
1119 #![allow(clippy::unchecked_duration_subtraction)]
1120 #![allow(clippy::useless_vec)]
1121 #![allow(clippy::needless_pass_by_value)]
1122 use mocks::FakeBuilder;
1124 use tor_guardmgr::GuardMgr;
1125 use tor_linkspec::OwnedChanTarget;
1126 use tor_netdir::testprovider::TestNetDirProvider;
1127 use tor_persist::TestingStateMgr;
1128
1129 use super::*;
1130
1131 #[test]
1132 fn get_params() {
1133 use tor_netdir::{MdReceiver, PartialNetDir};
1134 use tor_netdoc::doc::netstatus::NetParams;
1135 let fb = FallbackList::from([]);
1137 let di: DirInfo<'_> = (&fb).into();
1138
1139 let p1 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1140 assert!(!p1.extend_by_ed25519_id);
1141
1142 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1144 let mut params = NetParams::default();
1145 params.set("circwindow".into(), 100);
1146 params.set("ExtendByEd25519ID".into(), 1);
1147 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1148 for m in microdescs {
1149 dir.add_microdesc(m);
1150 }
1151 let netdir = dir.unwrap_if_sufficient().unwrap();
1152 let di: DirInfo<'_> = (&netdir).into();
1153 let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1154 assert!(p2.extend_by_ed25519_id);
1155
1156 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1158 let mut params = NetParams::default();
1159 params.set("circwindow".into(), 100_000);
1160 params.set("ExtendByEd25519ID".into(), 1);
1161 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1162 for m in microdescs {
1163 dir.add_microdesc(m);
1164 }
1165 let netdir = dir.unwrap_if_sufficient().unwrap();
1166 let di: DirInfo<'_> = (&netdir).into();
1167 let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1168 assert!(p2.extend_by_ed25519_id);
1169 }
1170
1171 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1172 let config = crate::config::test_config::TestConfig::default();
1173 let statemgr = TestingStateMgr::new();
1174 let guardmgr =
1175 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1176 let builder = FakeBuilder::new(
1177 &runtime,
1178 statemgr.clone(),
1179 &tor_guardmgr::TestConfig::default(),
1180 );
1181 let circmgr = Arc::new(CircMgrInner::new_generic(
1182 &config, &runtime, &guardmgr, builder,
1183 ));
1184 let netdir = Arc::new(TestNetDirProvider::new());
1185 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1186 .expect("launch CircMgrInner background tasks");
1187 circmgr
1188 }
1189
1190 #[test]
1191 #[cfg(feature = "hs-common")]
1192 fn test_launch_hs_unmanaged() {
1193 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1194 let circmgr = make_circmgr(runtime.clone());
1195 let netdir = tor_netdir::testnet::construct_netdir()
1196 .unwrap_if_sufficient()
1197 .unwrap();
1198
1199 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1200 runtime.spawn_identified("launch_hs_unamanged", async move {
1201 ret_tx
1202 .send(
1203 circmgr
1204 .launch_hs_unmanaged::<OwnedChanTarget>(
1205 None,
1206 &netdir,
1207 HsCircStemKind::Naive,
1208 None,
1209 )
1210 .await,
1211 )
1212 .unwrap();
1213 });
1214 runtime.advance_by(Duration::from_millis(60)).await;
1215 ret_rx.await.unwrap().unwrap();
1216 });
1217 }
1218}