1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
49
50use build::CircuitBuilder;
51use mgr::{AbstractCirc, AbstractCircBuilder};
52use tor_basic_utils::retry::RetryDelay;
53use tor_chanmgr::ChanMgr;
54use tor_error::{error_report, warn_report};
55use tor_guardmgr::RetireCircuits;
56use tor_linkspec::ChanTarget;
57use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
58use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
59use tor_rtcompat::Runtime;
60
61#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
62use tor_linkspec::IntoOwnedChanTarget;
63
64use futures::task::SpawnExt;
65use futures::StreamExt;
66use std::sync::{Arc, Mutex, Weak};
67use std::time::{Duration, Instant};
68use tracing::{debug, info, trace, warn};
69
70#[cfg(feature = "testing")]
71pub use config::test_config::TestConfig;
72
73pub mod build;
74mod config;
75mod err;
76#[cfg(feature = "hs-common")]
77pub mod hspool;
78mod impls;
79pub mod isolation;
80mod mgr;
81#[cfg(test)]
82mod mocks;
83mod preemptive;
84pub mod timeouts;
85mod usage;
86
87cfg_if::cfg_if! {
89 if #[cfg(feature = "experimental-api")] {
90 pub mod path;
91 } else {
92 pub(crate) mod path;
93 }
94}
95
96pub use err::Error;
97pub use isolation::IsolationToken;
98use tor_guardmgr::fallback::FallbackList;
99pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
100pub use usage::{TargetPort, TargetPorts};
101
102pub use config::{
103 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
104 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
105};
106
107use crate::isolation::StreamIsolation;
108use crate::mgr::CircProvenance;
109use crate::preemptive::PreemptiveCircuitPredictor;
110use usage::TargetCircUsage;
111
112use safelog::sensitive as sv;
113#[cfg(feature = "geoip")]
114use tor_geoip::CountryCode;
115pub use tor_guardmgr::{ExternalActivity, FirstHopId};
116use tor_persist::StateMgr;
117use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
118
119#[cfg(feature = "hs-common")]
120use crate::hspool::{HsCircKind, HsCircStemKind};
121#[cfg(all(feature = "vanguards", feature = "hs-common"))]
122use tor_guardmgr::vanguards::VanguardMgr;
123
124pub type Result<T> = std::result::Result<T, Error>;
126
127type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
129
130const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
132
133#[derive(Debug, Copy, Clone)]
141#[non_exhaustive]
142pub enum DirInfo<'a> {
143 Fallbacks(&'a FallbackList),
145 Directory(&'a NetDir),
147 Nothing,
150}
151
152impl<'a> From<&'a FallbackList> for DirInfo<'a> {
153 fn from(v: &'a FallbackList) -> DirInfo<'a> {
154 DirInfo::Fallbacks(v)
155 }
156}
157impl<'a> From<&'a NetDir> for DirInfo<'a> {
158 fn from(v: &'a NetDir) -> DirInfo<'a> {
159 DirInfo::Directory(v)
160 }
161}
162impl<'a> DirInfo<'a> {
163 fn circ_params(&self, usage: &TargetCircUsage) -> Result<CircParameters> {
165 use tor_netdir::params::NetParameters;
166 let defaults = NetParameters::default();
169 let net_params = match self {
170 DirInfo::Directory(d) => d.params(),
171 _ => &defaults,
172 };
173 match usage {
174 #[cfg(feature = "hs-common")]
175 TargetCircUsage::HsCircBase { .. } => {
176 build::onion_circparams_from_netparams(net_params)
177 }
178 _ => build::exit_circparams_from_netparams(net_params),
179 }
180 }
181}
182
183pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::CircuitBuilder<R>, R>>);
193
194impl<R: Runtime> CircMgr<R> {
195 pub fn new<SM, CFG: CircMgrConfig>(
201 config: &CFG,
202 storage: SM,
203 runtime: &R,
204 chanmgr: Arc<ChanMgr<R>>,
205 guardmgr: &tor_guardmgr::GuardMgr<R>,
206 ) -> Result<Self>
207 where
208 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
209 {
210 Ok(Self(Arc::new(CircMgrInner::new(
211 config, storage, runtime, chanmgr, guardmgr,
212 )?)))
213 }
214
215 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
218 self.0.get_or_launch_dir(netdir).await
219 }
220
221 pub async fn get_or_launch_exit(
227 &self,
228 netdir: DirInfo<'_>, ports: &[TargetPort],
230 isolation: StreamIsolation,
231 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
234 ) -> Result<Arc<ClientCirc>> {
235 self.0
236 .get_or_launch_exit(
237 netdir,
238 ports,
239 isolation,
240 #[cfg(feature = "geoip")]
241 country_code,
242 )
243 .await
244 }
245
246 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
251 #[cfg(feature = "specific-relay")]
252 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
253 &self,
254 target: T,
255 ) -> Result<Arc<ClientCirc>> {
256 self.0.get_or_launch_dir_specific(target).await
257 }
258
259 pub fn launch_background_tasks<D, S>(
265 self: &Arc<Self>,
266 runtime: &R,
267 dir_provider: &Arc<D>,
268 state_mgr: S,
269 ) -> Result<Vec<TaskHandle>>
270 where
271 D: NetDirProvider + 'static + ?Sized,
272 S: StateMgr + std::marker::Send + 'static,
273 {
274 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
275 }
276
277 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
283 self.0.netdir_is_sufficient(netdir)
284 }
285
286 pub fn retire_circ(&self, circ_id: &UniqId) {
289 self.0.retire_circ(circ_id);
290 }
291
292 pub fn note_external_failure(
296 &self,
297 target: &impl ChanTarget,
298 external_failure: ExternalActivity,
299 ) {
300 self.0.note_external_failure(target, external_failure);
301 }
302
303 pub fn note_external_success(
306 &self,
307 target: &impl ChanTarget,
308 external_activity: ExternalActivity,
309 ) {
310 self.0.note_external_success(target, external_activity);
311 }
312
313 pub fn skew_events(&self) -> ClockSkewEvents {
321 self.0.skew_events()
322 }
323
324 pub fn reconfigure<CFG: CircMgrConfig>(
330 &self,
331 new_config: &CFG,
332 how: tor_config::Reconfigure,
333 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
334 self.0.reconfigure(new_config, how)
335 }
336
337 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
363 self.0.estimate_timeout(timeout_action)
364 }
365
366 #[cfg(feature = "experimental-api")]
369 pub fn builder(&self) -> &CircuitBuilder<R> {
370 CircMgrInner::builder(&self.0)
371 }
372}
373
374#[derive(Clone)]
376pub(crate) struct CircMgrInner<B: AbstractCircBuilder<R> + 'static, R: Runtime> {
377 mgr: Arc<mgr::AbstractCircMgr<B, R>>,
379 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
381}
382
383impl<R: Runtime> CircMgrInner<CircuitBuilder<R>, R> {
384 #[allow(clippy::unnecessary_wraps)]
390 pub(crate) fn new<SM, CFG: CircMgrConfig>(
391 config: &CFG,
392 storage: SM,
393 runtime: &R,
394 chanmgr: Arc<ChanMgr<R>>,
395 guardmgr: &tor_guardmgr::GuardMgr<R>,
396 ) -> Result<Self>
397 where
398 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
399 {
400 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
401 let vanguardmgr = {
402 let has_onion_svc = false;
407 VanguardMgr::new(
408 config.vanguard_config(),
409 runtime.clone(),
410 storage.clone(),
411 has_onion_svc,
412 )?
413 };
414
415 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
416
417 let builder = build::CircuitBuilder::new(
418 runtime.clone(),
419 chanmgr,
420 config.path_rules().clone(),
421 storage_handle,
422 guardmgr.clone(),
423 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
424 vanguardmgr,
425 );
426
427 Ok(Self::new_generic(config, runtime, guardmgr, builder))
428 }
429}
430
431impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
432 pub(crate) fn new_generic<CFG: CircMgrConfig>(
434 config: &CFG,
435 runtime: &R,
436 guardmgr: &tor_guardmgr::GuardMgr<R>,
437 builder: B,
438 ) -> Self {
439 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
440 config.preemptive_circuits().clone(),
441 )));
442
443 guardmgr.set_filter(config.path_rules().build_guard_filter());
444
445 let mgr =
446 mgr::AbstractCircMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
447
448 CircMgrInner {
449 mgr: Arc::new(mgr),
450 predictor: preemptive,
451 }
452 }
453
454 pub(crate) fn launch_background_tasks<D, S>(
460 self: &Arc<Self>,
461 runtime: &R,
462 dir_provider: &Arc<D>,
463 state_mgr: S,
464 ) -> Result<Vec<TaskHandle>>
465 where
466 D: NetDirProvider + 'static + ?Sized,
467 S: StateMgr + std::marker::Send + 'static,
468 {
469 let mut ret = vec![];
470
471 runtime
472 .spawn(Self::keep_circmgr_params_updated(
473 dir_provider.events(),
474 Arc::downgrade(self),
475 Arc::downgrade(dir_provider),
476 ))
477 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
478
479 let (sched, handle) = TaskSchedule::new(runtime.clone());
480 ret.push(handle);
481
482 runtime
483 .spawn(Self::update_persistent_state(
484 sched,
485 Arc::downgrade(self),
486 state_mgr,
487 ))
488 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
489
490 let (sched, handle) = TaskSchedule::new(runtime.clone());
491 ret.push(handle);
492
493 runtime
494 .spawn(Self::continually_launch_timeout_testing_circuits(
495 sched,
496 Arc::downgrade(self),
497 Arc::downgrade(dir_provider),
498 ))
499 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
500
501 let (sched, handle) = TaskSchedule::new(runtime.clone());
502 ret.push(handle);
503
504 runtime
505 .spawn(Self::continually_preemptively_build_circuits(
506 sched,
507 Arc::downgrade(self),
508 Arc::downgrade(dir_provider),
509 ))
510 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
511
512 self.mgr
513 .peek_builder()
514 .guardmgr()
515 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
516
517 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
518 {
519 let () = self
520 .mgr
521 .peek_builder()
522 .vanguardmgr()
523 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
524 }
525
526 Ok(ret)
527 }
528
529 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Circ>> {
532 self.expire_circuits();
533 let usage = TargetCircUsage::Dir;
534 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
535 }
536
537 pub(crate) async fn get_or_launch_exit(
543 &self,
544 netdir: DirInfo<'_>, ports: &[TargetPort],
546 isolation: StreamIsolation,
547 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
550 ) -> Result<Arc<B::Circ>> {
551 self.expire_circuits();
552 let time = Instant::now();
553 {
554 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
555 if ports.is_empty() {
556 predictive.note_usage(None, time);
557 } else {
558 for port in ports.iter() {
559 predictive.note_usage(Some(*port), time);
560 }
561 }
562 }
563 let require_stability = ports.iter().any(|p| {
564 self.mgr
565 .peek_builder()
566 .path_config()
567 .long_lived_ports
568 .contains(&p.port)
569 });
570 let ports = ports.iter().map(Clone::clone).collect();
571 #[cfg(not(feature = "geoip"))]
572 let country_code = None;
573 let usage = TargetCircUsage::Exit {
574 ports,
575 isolation,
576 country_code,
577 require_stability,
578 };
579 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
580 }
581
582 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
587 #[cfg(feature = "specific-relay")]
588 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
589 &self,
590 target: T,
591 ) -> Result<Arc<B::Circ>> {
592 self.expire_circuits();
593 let usage = TargetCircUsage::DirSpecificTarget(target.to_owned());
594 self.mgr
595 .get_or_launch(&usage, DirInfo::Nothing)
596 .await
597 .map(|(c, _)| c)
598 }
599
600 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
606 &self,
607 new_config: &CFG,
608 how: tor_config::Reconfigure,
609 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
610 let old_path_rules = self.mgr.peek_builder().path_config();
611 let predictor = self.predictor.lock().expect("poisoned lock");
612 let preemptive_circuits = predictor.config();
613 if preemptive_circuits.initial_predicted_ports
614 != new_config.preemptive_circuits().initial_predicted_ports
615 {
616 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
618 }
619
620 if how == tor_config::Reconfigure::CheckAllOrNothing {
621 return Ok(RetireCircuits::None);
622 }
623
624 let retire_because_of_guardmgr =
625 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
626
627 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
628 let retire_because_of_vanguardmgr = self
629 .mgr
630 .peek_builder()
631 .vanguardmgr()
632 .reconfigure(new_config.vanguard_config())?;
633
634 let new_reachable = &new_config.path_rules().reachable_addrs;
635 if new_reachable != &old_path_rules.reachable_addrs {
636 let filter = new_config.path_rules().build_guard_filter();
637 self.mgr.peek_builder().guardmgr().set_filter(filter);
638 }
639
640 let discard_all_circuits = !new_config
641 .path_rules()
642 .at_least_as_permissive_as(&old_path_rules)
643 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
644
645 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
646 let discard_all_circuits = discard_all_circuits
647 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
648
649 self.mgr
650 .peek_builder()
651 .set_path_config(new_config.path_rules().clone());
652 self.mgr
653 .set_circuit_timing(new_config.circuit_timing().clone());
654 predictor.set_config(new_config.preemptive_circuits().clone());
655
656 if discard_all_circuits {
657 info!("Path configuration has become more restrictive: retiring existing circuits.");
661 self.retire_all_circuits();
662 return Ok(RetireCircuits::All);
663 }
664 Ok(RetireCircuits::None)
665 }
666
667 async fn keep_circmgr_params_updated<D>(
675 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
676 circmgr: Weak<Self>,
677 dirmgr: Weak<D>,
678 ) where
679 D: NetDirProvider + 'static + ?Sized,
680 {
681 use DirEvent::*;
682 while let Some(event) = events.next().await {
683 if matches!(event, NewConsensus) {
684 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
685 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
686 cm.update_network_parameters(netdir.params());
687 }
688 } else {
689 debug!("Circmgr or dirmgr has disappeared; task exiting.");
690 break;
691 }
692 }
693 }
694 }
695
696 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
699 self.mgr.update_network_parameters(p);
700 self.mgr.peek_builder().update_network_parameters(p);
701 }
702
703 async fn continually_launch_timeout_testing_circuits<D>(
710 mut sched: TaskSchedule<R>,
711 circmgr: Weak<Self>,
712 dirmgr: Weak<D>,
713 ) where
714 D: NetDirProvider + 'static + ?Sized,
715 {
716 while sched.next().await.is_some() {
717 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
718 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
719 if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
720 warn_report!(e, "Problem launching a timeout testing circuit");
721 }
722 let delay = netdir
723 .params()
724 .cbt_testing_delay
725 .try_into()
726 .expect("Out-of-bounds value from BoundedInt32");
727
728 drop((cm, dm));
729 sched.fire_in(delay);
730 } else {
731 let _ = dm.events().next().await;
734 sched.fire();
735 }
736 } else {
737 return;
738 }
739 }
740 }
741
742 fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
750 if !self.mgr.peek_builder().learning_timeouts() {
751 return Ok(());
752 }
753 self.expire_circuits();
756 let max_circs: u64 = netdir
757 .params()
758 .cbt_max_open_circuits_for_testing
759 .try_into()
760 .expect("Out-of-bounds result from BoundedInt32");
761 if (self.mgr.n_circs() as u64) < max_circs {
762 let usage = TargetCircUsage::TimeoutTesting;
764 let dirinfo = netdir.into();
765 let mgr = Arc::clone(&self.mgr);
766 debug!("Launching a circuit to test build times.");
767 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
768 drop(receiver);
771 }
772
773 Ok(())
774 }
775
776 #[allow(clippy::cognitive_complexity)] async fn update_persistent_state<S>(
784 mut sched: TaskSchedule<R>,
785 circmgr: Weak<Self>,
786 statemgr: S,
787 ) where
788 S: StateMgr + std::marker::Send,
789 {
790 while sched.next().await.is_some() {
791 if let Some(circmgr) = Weak::upgrade(&circmgr) {
792 use tor_persist::LockStatus::*;
793
794 match statemgr.try_lock() {
795 Err(e) => {
796 error_report!(e, "Problem with state lock file");
797 break;
798 }
799 Ok(NewlyAcquired) => {
800 info!("We now own the lock on our state files.");
801 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
802 error_report!(e, "Unable to upgrade to owned state files");
803 break;
804 }
805 }
806 Ok(AlreadyHeld) => {
807 if let Err(e) = circmgr.store_persistent_state() {
808 error_report!(e, "Unable to flush circmgr state");
809 break;
810 }
811 }
812 Ok(NoLock) => {
813 if let Err(e) = circmgr.reload_persistent_state() {
814 error_report!(e, "Unable to reload circmgr state");
815 break;
816 }
817 }
818 }
819 } else {
820 debug!("Circmgr has disappeared; task exiting.");
821 return;
822 }
823 sched.fire_in(Duration::from_secs(60));
830 }
831
832 debug!("State update task exiting (potentially due to handle drop).");
833 }
834
835 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
839 self.mgr.peek_builder().upgrade_to_owned_state()?;
840 Ok(())
841 }
842
843 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
848 self.mgr.peek_builder().reload_state()?;
849 Ok(())
850 }
851
852 async fn continually_preemptively_build_circuits<D>(
864 mut sched: TaskSchedule<R>,
865 circmgr: Weak<Self>,
866 dirmgr: Weak<D>,
867 ) where
868 D: NetDirProvider + 'static + ?Sized,
869 {
870 let base_delay = Duration::from_secs(10);
871 let mut retry = RetryDelay::from_duration(base_delay);
872
873 while sched.next().await.is_some() {
874 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
875 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
876 let result = cm
877 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
878 .await;
879
880 let delay = match result {
881 Ok(()) => {
882 retry.reset();
883 base_delay
884 }
885 Err(_) => retry.next_delay(&mut rand::rng()),
886 };
887
888 sched.fire_in(delay);
889 } else {
890 let _ = dm.events().next().await;
893 sched.fire();
894 }
895 } else {
896 return;
897 }
898 }
899 }
900
901 #[allow(clippy::cognitive_complexity)]
909 async fn launch_circuits_preemptively(
910 &self,
911 netdir: DirInfo<'_>,
912 ) -> std::result::Result<(), err::PreemptiveCircError> {
913 trace!("Checking preemptive circuit predictions.");
914 let (circs, threshold) = {
915 let path_config = self.mgr.peek_builder().path_config();
916 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
917 let threshold = preemptive.config().disable_at_threshold;
918 (preemptive.predict(&path_config), threshold)
919 };
920
921 if self.mgr.n_circs() >= threshold {
922 return Ok(());
923 }
924 let mut n_created = 0_usize;
925 let mut n_errors = 0_usize;
926
927 let futures = circs
928 .iter()
929 .map(|usage| self.mgr.get_or_launch(usage, netdir));
930 let results = futures::future::join_all(futures).await;
931 for (i, result) in results.into_iter().enumerate() {
932 match result {
933 Ok((_, CircProvenance::NewlyCreated)) => {
934 debug!("Preeemptive circuit was created for {:?}", circs[i]);
935 n_created += 1;
936 }
937 Ok((_, CircProvenance::Preexisting)) => {
938 trace!("Circuit already existed created for {:?}", circs[i]);
939 }
940 Err(e) => {
941 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
942 n_errors += 1;
943 }
944 }
945 }
946
947 if n_created > 0 || n_errors == 0 {
948 Ok(())
952 } else {
953 Err(err::PreemptiveCircError)
956 }
957 }
958
959 #[cfg(feature = "hs-common")]
976 pub(crate) async fn launch_hs_unmanaged<T>(
977 &self,
978 planned_target: Option<T>,
979 dir: &NetDir,
980 stem_kind: HsCircStemKind,
981 circ_kind: Option<HsCircKind>,
982 ) -> Result<Arc<B::Circ>>
983 where
984 T: IntoOwnedChanTarget,
985 {
986 let usage = TargetCircUsage::HsCircBase {
987 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
988 stem_kind,
989 circ_kind,
990 };
991 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
992 Ok(client_circ)
993 }
994
995 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1001 self.mgr
1002 .peek_builder()
1003 .guardmgr()
1004 .netdir_is_sufficient(netdir)
1005 }
1006
1007 pub(crate) fn estimate_timeout(
1009 &self,
1010 timeout_action: &timeouts::Action,
1011 ) -> std::time::Duration {
1012 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1013 timeout
1014 }
1015
1016 pub(crate) fn builder(&self) -> &B {
1018 self.mgr.peek_builder()
1019 }
1020
1021 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1026 self.mgr.peek_builder().save_state()
1027 }
1028
1029 fn expire_circuits(&self) {
1034 let now = self.mgr.peek_runtime().now();
1040 self.mgr.expire_circs(now);
1041 }
1042
1043 pub(crate) fn retire_all_circuits(&self) {
1051 self.mgr.retire_all_circuits();
1052 }
1053
1054 pub(crate) fn retire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id) {
1057 let _ = self.mgr.take_circ(circ_id);
1058 }
1059
1060 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1068 self.mgr.peek_builder().guardmgr().skew_events()
1069 }
1070
1071 pub(crate) fn note_external_failure(
1075 &self,
1076 target: &impl ChanTarget,
1077 external_failure: ExternalActivity,
1078 ) {
1079 self.mgr
1080 .peek_builder()
1081 .guardmgr()
1082 .note_external_failure(target, external_failure);
1083 }
1084
1085 pub(crate) fn note_external_success(
1088 &self,
1089 target: &impl ChanTarget,
1090 external_activity: ExternalActivity,
1091 ) {
1092 self.mgr
1093 .peek_builder()
1094 .guardmgr()
1095 .note_external_success(target, external_activity);
1096 }
1097}
1098
1099impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1100 fn drop(&mut self) {
1101 match self.store_persistent_state() {
1102 Ok(true) => info!("Flushed persistent state at exit."),
1103 Ok(false) => debug!("Lock not held; no state to flush."),
1104 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1105 }
1106 }
1107}
1108
1109#[cfg(test)]
1110mod test {
1111 #![allow(clippy::bool_assert_comparison)]
1113 #![allow(clippy::clone_on_copy)]
1114 #![allow(clippy::dbg_macro)]
1115 #![allow(clippy::mixed_attributes_style)]
1116 #![allow(clippy::print_stderr)]
1117 #![allow(clippy::print_stdout)]
1118 #![allow(clippy::single_char_pattern)]
1119 #![allow(clippy::unwrap_used)]
1120 #![allow(clippy::unchecked_duration_subtraction)]
1121 #![allow(clippy::useless_vec)]
1122 #![allow(clippy::needless_pass_by_value)]
1123 use mocks::FakeBuilder;
1125 use tor_guardmgr::GuardMgr;
1126 use tor_linkspec::OwnedChanTarget;
1127 use tor_netdir::testprovider::TestNetDirProvider;
1128 use tor_persist::TestingStateMgr;
1129
1130 use super::*;
1131
1132 #[test]
1133 fn get_params() {
1134 use tor_netdir::{MdReceiver, PartialNetDir};
1135 use tor_netdoc::doc::netstatus::NetParams;
1136 let fb = FallbackList::from([]);
1138 let di: DirInfo<'_> = (&fb).into();
1139
1140 let p1 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1141 assert!(!p1.extend_by_ed25519_id);
1142
1143 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1145 let mut params = NetParams::default();
1146 params.set("circwindow".into(), 100);
1147 params.set("ExtendByEd25519ID".into(), 1);
1148 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1149 for m in microdescs {
1150 dir.add_microdesc(m);
1151 }
1152 let netdir = dir.unwrap_if_sufficient().unwrap();
1153 let di: DirInfo<'_> = (&netdir).into();
1154 let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1155 assert!(p2.extend_by_ed25519_id);
1156
1157 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1159 let mut params = NetParams::default();
1160 params.set("circwindow".into(), 100_000);
1161 params.set("ExtendByEd25519ID".into(), 1);
1162 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1163 for m in microdescs {
1164 dir.add_microdesc(m);
1165 }
1166 let netdir = dir.unwrap_if_sufficient().unwrap();
1167 let di: DirInfo<'_> = (&netdir).into();
1168 let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1169 assert!(p2.extend_by_ed25519_id);
1170 }
1171
1172 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1173 let config = crate::config::test_config::TestConfig::default();
1174 let statemgr = TestingStateMgr::new();
1175 let guardmgr =
1176 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1177 let builder = FakeBuilder::new(
1178 &runtime,
1179 statemgr.clone(),
1180 &tor_guardmgr::TestConfig::default(),
1181 );
1182 let circmgr = Arc::new(CircMgrInner::new_generic(
1183 &config, &runtime, &guardmgr, builder,
1184 ));
1185 let netdir = Arc::new(TestNetDirProvider::new());
1186 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1187 .expect("launch CircMgrInner background tasks");
1188 circmgr
1189 }
1190
1191 #[test]
1192 #[cfg(feature = "hs-common")]
1193 fn test_launch_hs_unmanaged() {
1194 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1195 let circmgr = make_circmgr(runtime.clone());
1196 let netdir = tor_netdir::testnet::construct_netdir()
1197 .unwrap_if_sufficient()
1198 .unwrap();
1199
1200 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1201 runtime.spawn_identified("launch_hs_unamanged", async move {
1202 ret_tx
1203 .send(
1204 circmgr
1205 .launch_hs_unmanaged::<OwnedChanTarget>(
1206 None,
1207 &netdir,
1208 HsCircStemKind::Naive,
1209 None,
1210 )
1211 .await,
1212 )
1213 .unwrap();
1214 });
1215 runtime.advance_by(Duration::from_millis(60)).await;
1216 ret_rx.await.unwrap().unwrap();
1217 });
1218 }
1219}