1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
48
49use build::CircuitBuilder;
50use mgr::{AbstractCirc, AbstractCircBuilder};
51use tor_basic_utils::retry::RetryDelay;
52use tor_chanmgr::ChanMgr;
53use tor_error::{error_report, warn_report};
54use tor_guardmgr::RetireCircuits;
55use tor_linkspec::ChanTarget;
56use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
57use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
58use tor_rtcompat::Runtime;
59
60#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
61use tor_linkspec::IntoOwnedChanTarget;
62
63use futures::task::SpawnExt;
64use futures::StreamExt;
65use std::sync::{Arc, Mutex, Weak};
66use std::time::{Duration, Instant};
67use tracing::{debug, info, trace, warn};
68
69#[cfg(feature = "testing")]
70pub use config::test_config::TestConfig;
71
72pub mod build;
73mod config;
74mod err;
75#[cfg(feature = "hs-common")]
76pub mod hspool;
77mod impls;
78pub mod isolation;
79mod mgr;
80#[cfg(test)]
81mod mocks;
82mod preemptive;
83pub mod timeouts;
84mod usage;
85
86cfg_if::cfg_if! {
88 if #[cfg(feature = "experimental-api")] {
89 pub mod path;
90 } else {
91 pub(crate) mod path;
92 }
93}
94
95pub use err::Error;
96pub use isolation::IsolationToken;
97use tor_guardmgr::fallback::FallbackList;
98pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
99pub use usage::{TargetPort, TargetPorts};
100
101pub use config::{
102 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
103 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
104};
105
106use crate::isolation::StreamIsolation;
107use crate::mgr::CircProvenance;
108use crate::preemptive::PreemptiveCircuitPredictor;
109use usage::TargetCircUsage;
110
111use safelog::sensitive as sv;
112#[cfg(feature = "geoip")]
113use tor_geoip::CountryCode;
114pub use tor_guardmgr::{ExternalActivity, FirstHopId};
115use tor_persist::StateMgr;
116use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
117
118#[cfg(feature = "hs-common")]
119use crate::hspool::{HsCircKind, HsCircStemKind};
120#[cfg(all(feature = "vanguards", feature = "hs-common"))]
121use tor_guardmgr::vanguards::VanguardMgr;
122
123pub type Result<T> = std::result::Result<T, Error>;
125
126type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
128
129const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
131
132#[derive(Debug, Copy, Clone)]
140#[non_exhaustive]
141pub enum DirInfo<'a> {
142 Fallbacks(&'a FallbackList),
144 Directory(&'a NetDir),
146 Nothing,
149}
150
151impl<'a> From<&'a FallbackList> for DirInfo<'a> {
152 fn from(v: &'a FallbackList) -> DirInfo<'a> {
153 DirInfo::Fallbacks(v)
154 }
155}
156impl<'a> From<&'a NetDir> for DirInfo<'a> {
157 fn from(v: &'a NetDir) -> DirInfo<'a> {
158 DirInfo::Directory(v)
159 }
160}
161impl<'a> DirInfo<'a> {
162 fn circ_params(&self, usage: &TargetCircUsage) -> Result<CircParameters> {
164 use tor_netdir::params::NetParameters;
165 let defaults = NetParameters::default();
168 let net_params = match self {
169 DirInfo::Directory(d) => d.params(),
170 _ => &defaults,
171 };
172 match usage {
173 #[cfg(feature = "hs-common")]
174 TargetCircUsage::HsCircBase { .. } => {
175 build::onion_circparams_from_netparams(net_params)
176 }
177 _ => build::exit_circparams_from_netparams(net_params),
178 }
179 }
180}
181
182pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::CircuitBuilder<R>, R>>);
192
193impl<R: Runtime> CircMgr<R> {
194 pub fn new<SM, CFG: CircMgrConfig>(
200 config: &CFG,
201 storage: SM,
202 runtime: &R,
203 chanmgr: Arc<ChanMgr<R>>,
204 guardmgr: &tor_guardmgr::GuardMgr<R>,
205 ) -> Result<Self>
206 where
207 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
208 {
209 Ok(Self(Arc::new(CircMgrInner::new(
210 config, storage, runtime, chanmgr, guardmgr,
211 )?)))
212 }
213
214 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
217 self.0.get_or_launch_dir(netdir).await
218 }
219
220 pub async fn get_or_launch_exit(
226 &self,
227 netdir: DirInfo<'_>, ports: &[TargetPort],
229 isolation: StreamIsolation,
230 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
233 ) -> Result<Arc<ClientCirc>> {
234 self.0
235 .get_or_launch_exit(
236 netdir,
237 ports,
238 isolation,
239 #[cfg(feature = "geoip")]
240 country_code,
241 )
242 .await
243 }
244
245 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
250 #[cfg(feature = "specific-relay")]
251 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
252 &self,
253 target: T,
254 ) -> Result<Arc<ClientCirc>> {
255 self.0.get_or_launch_dir_specific(target).await
256 }
257
258 pub fn launch_background_tasks<D, S>(
264 self: &Arc<Self>,
265 runtime: &R,
266 dir_provider: &Arc<D>,
267 state_mgr: S,
268 ) -> Result<Vec<TaskHandle>>
269 where
270 D: NetDirProvider + 'static + ?Sized,
271 S: StateMgr + std::marker::Send + 'static,
272 {
273 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
274 }
275
276 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
282 self.0.netdir_is_sufficient(netdir)
283 }
284
285 pub fn retire_circ(&self, circ_id: &UniqId) {
288 self.0.retire_circ(circ_id);
289 }
290
291 pub fn note_external_failure(
295 &self,
296 target: &impl ChanTarget,
297 external_failure: ExternalActivity,
298 ) {
299 self.0.note_external_failure(target, external_failure);
300 }
301
302 pub fn note_external_success(
305 &self,
306 target: &impl ChanTarget,
307 external_activity: ExternalActivity,
308 ) {
309 self.0.note_external_success(target, external_activity);
310 }
311
312 pub fn skew_events(&self) -> ClockSkewEvents {
320 self.0.skew_events()
321 }
322
323 pub fn reconfigure<CFG: CircMgrConfig>(
329 &self,
330 new_config: &CFG,
331 how: tor_config::Reconfigure,
332 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
333 self.0.reconfigure(new_config, how)
334 }
335
336 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
362 self.0.estimate_timeout(timeout_action)
363 }
364
365 #[cfg(feature = "experimental-api")]
368 pub fn builder(&self) -> &CircuitBuilder<R> {
369 CircMgrInner::builder(&self.0)
370 }
371}
372
373#[derive(Clone)]
375pub(crate) struct CircMgrInner<B: AbstractCircBuilder<R> + 'static, R: Runtime> {
376 mgr: Arc<mgr::AbstractCircMgr<B, R>>,
378 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
380}
381
382impl<R: Runtime> CircMgrInner<CircuitBuilder<R>, R> {
383 #[allow(clippy::unnecessary_wraps)]
389 pub(crate) fn new<SM, CFG: CircMgrConfig>(
390 config: &CFG,
391 storage: SM,
392 runtime: &R,
393 chanmgr: Arc<ChanMgr<R>>,
394 guardmgr: &tor_guardmgr::GuardMgr<R>,
395 ) -> Result<Self>
396 where
397 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
398 {
399 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
400 let vanguardmgr = {
401 let has_onion_svc = false;
406 VanguardMgr::new(
407 config.vanguard_config(),
408 runtime.clone(),
409 storage.clone(),
410 has_onion_svc,
411 )?
412 };
413
414 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
415
416 let builder = build::CircuitBuilder::new(
417 runtime.clone(),
418 chanmgr,
419 config.path_rules().clone(),
420 storage_handle,
421 guardmgr.clone(),
422 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423 vanguardmgr,
424 );
425
426 Ok(Self::new_generic(config, runtime, guardmgr, builder))
427 }
428}
429
430impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
431 pub(crate) fn new_generic<CFG: CircMgrConfig>(
433 config: &CFG,
434 runtime: &R,
435 guardmgr: &tor_guardmgr::GuardMgr<R>,
436 builder: B,
437 ) -> Self {
438 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
439 config.preemptive_circuits().clone(),
440 )));
441
442 guardmgr.set_filter(config.path_rules().build_guard_filter());
443
444 let mgr =
445 mgr::AbstractCircMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
446
447 CircMgrInner {
448 mgr: Arc::new(mgr),
449 predictor: preemptive,
450 }
451 }
452
453 pub(crate) fn launch_background_tasks<D, S>(
459 self: &Arc<Self>,
460 runtime: &R,
461 dir_provider: &Arc<D>,
462 state_mgr: S,
463 ) -> Result<Vec<TaskHandle>>
464 where
465 D: NetDirProvider + 'static + ?Sized,
466 S: StateMgr + std::marker::Send + 'static,
467 {
468 let mut ret = vec![];
469
470 runtime
471 .spawn(Self::keep_circmgr_params_updated(
472 dir_provider.events(),
473 Arc::downgrade(self),
474 Arc::downgrade(dir_provider),
475 ))
476 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
477
478 let (sched, handle) = TaskSchedule::new(runtime.clone());
479 ret.push(handle);
480
481 runtime
482 .spawn(Self::update_persistent_state(
483 sched,
484 Arc::downgrade(self),
485 state_mgr,
486 ))
487 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
488
489 let (sched, handle) = TaskSchedule::new(runtime.clone());
490 ret.push(handle);
491
492 runtime
493 .spawn(Self::continually_launch_timeout_testing_circuits(
494 sched,
495 Arc::downgrade(self),
496 Arc::downgrade(dir_provider),
497 ))
498 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
499
500 let (sched, handle) = TaskSchedule::new(runtime.clone());
501 ret.push(handle);
502
503 runtime
504 .spawn(Self::continually_preemptively_build_circuits(
505 sched,
506 Arc::downgrade(self),
507 Arc::downgrade(dir_provider),
508 ))
509 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
510
511 self.mgr
512 .peek_builder()
513 .guardmgr()
514 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
515
516 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
517 {
518 let () = self
519 .mgr
520 .peek_builder()
521 .vanguardmgr()
522 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
523 }
524
525 Ok(ret)
526 }
527
528 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Circ>> {
531 self.expire_circuits();
532 let usage = TargetCircUsage::Dir;
533 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
534 }
535
536 pub(crate) async fn get_or_launch_exit(
542 &self,
543 netdir: DirInfo<'_>, ports: &[TargetPort],
545 isolation: StreamIsolation,
546 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
549 ) -> Result<Arc<B::Circ>> {
550 self.expire_circuits();
551 let time = Instant::now();
552 {
553 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
554 if ports.is_empty() {
555 predictive.note_usage(None, time);
556 } else {
557 for port in ports.iter() {
558 predictive.note_usage(Some(*port), time);
559 }
560 }
561 }
562 let require_stability = ports.iter().any(|p| {
563 self.mgr
564 .peek_builder()
565 .path_config()
566 .long_lived_ports
567 .contains(&p.port)
568 });
569 let ports = ports.iter().map(Clone::clone).collect();
570 #[cfg(not(feature = "geoip"))]
571 let country_code = None;
572 let usage = TargetCircUsage::Exit {
573 ports,
574 isolation,
575 country_code,
576 require_stability,
577 };
578 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
579 }
580
581 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
586 #[cfg(feature = "specific-relay")]
587 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
588 &self,
589 target: T,
590 ) -> Result<Arc<B::Circ>> {
591 self.expire_circuits();
592 let usage = TargetCircUsage::DirSpecificTarget(target.to_owned());
593 self.mgr
594 .get_or_launch(&usage, DirInfo::Nothing)
595 .await
596 .map(|(c, _)| c)
597 }
598
599 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
605 &self,
606 new_config: &CFG,
607 how: tor_config::Reconfigure,
608 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
609 let old_path_rules = self.mgr.peek_builder().path_config();
610 let predictor = self.predictor.lock().expect("poisoned lock");
611 let preemptive_circuits = predictor.config();
612 if preemptive_circuits.initial_predicted_ports
613 != new_config.preemptive_circuits().initial_predicted_ports
614 {
615 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
617 }
618
619 if how == tor_config::Reconfigure::CheckAllOrNothing {
620 return Ok(RetireCircuits::None);
621 }
622
623 let retire_because_of_guardmgr =
624 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
625
626 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
627 let retire_because_of_vanguardmgr = self
628 .mgr
629 .peek_builder()
630 .vanguardmgr()
631 .reconfigure(new_config.vanguard_config())?;
632
633 let new_reachable = &new_config.path_rules().reachable_addrs;
634 if new_reachable != &old_path_rules.reachable_addrs {
635 let filter = new_config.path_rules().build_guard_filter();
636 self.mgr.peek_builder().guardmgr().set_filter(filter);
637 }
638
639 let discard_all_circuits = !new_config
640 .path_rules()
641 .at_least_as_permissive_as(&old_path_rules)
642 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
643
644 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
645 let discard_all_circuits = discard_all_circuits
646 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
647
648 self.mgr
649 .peek_builder()
650 .set_path_config(new_config.path_rules().clone());
651 self.mgr
652 .set_circuit_timing(new_config.circuit_timing().clone());
653 predictor.set_config(new_config.preemptive_circuits().clone());
654
655 if discard_all_circuits {
656 info!("Path configuration has become more restrictive: retiring existing circuits.");
660 self.retire_all_circuits();
661 return Ok(RetireCircuits::All);
662 }
663 Ok(RetireCircuits::None)
664 }
665
666 async fn keep_circmgr_params_updated<D>(
674 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
675 circmgr: Weak<Self>,
676 dirmgr: Weak<D>,
677 ) where
678 D: NetDirProvider + 'static + ?Sized,
679 {
680 use DirEvent::*;
681 while let Some(event) = events.next().await {
682 if matches!(event, NewConsensus) {
683 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
684 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
685 cm.update_network_parameters(netdir.params());
686 }
687 } else {
688 debug!("Circmgr or dirmgr has disappeared; task exiting.");
689 break;
690 }
691 }
692 }
693 }
694
695 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
698 self.mgr.update_network_parameters(p);
699 self.mgr.peek_builder().update_network_parameters(p);
700 }
701
702 async fn continually_launch_timeout_testing_circuits<D>(
709 mut sched: TaskSchedule<R>,
710 circmgr: Weak<Self>,
711 dirmgr: Weak<D>,
712 ) where
713 D: NetDirProvider + 'static + ?Sized,
714 {
715 while sched.next().await.is_some() {
716 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
717 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
718 if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
719 warn_report!(e, "Problem launching a timeout testing circuit");
720 }
721 let delay = netdir
722 .params()
723 .cbt_testing_delay
724 .try_into()
725 .expect("Out-of-bounds value from BoundedInt32");
726
727 drop((cm, dm));
728 sched.fire_in(delay);
729 } else {
730 let _ = dm.events().next().await;
733 sched.fire();
734 }
735 } else {
736 return;
737 }
738 }
739 }
740
741 fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
749 if !self.mgr.peek_builder().learning_timeouts() {
750 return Ok(());
751 }
752 self.expire_circuits();
755 let max_circs: u64 = netdir
756 .params()
757 .cbt_max_open_circuits_for_testing
758 .try_into()
759 .expect("Out-of-bounds result from BoundedInt32");
760 if (self.mgr.n_circs() as u64) < max_circs {
761 let usage = TargetCircUsage::TimeoutTesting;
763 let dirinfo = netdir.into();
764 let mgr = Arc::clone(&self.mgr);
765 debug!("Launching a circuit to test build times.");
766 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
767 drop(receiver);
770 }
771
772 Ok(())
773 }
774
775 async fn update_persistent_state<S>(
782 mut sched: TaskSchedule<R>,
783 circmgr: Weak<Self>,
784 statemgr: S,
785 ) where
786 S: StateMgr + std::marker::Send,
787 {
788 while sched.next().await.is_some() {
789 if let Some(circmgr) = Weak::upgrade(&circmgr) {
790 use tor_persist::LockStatus::*;
791
792 match statemgr.try_lock() {
793 Err(e) => {
794 error_report!(e, "Problem with state lock file");
795 break;
796 }
797 Ok(NewlyAcquired) => {
798 info!("We now own the lock on our state files.");
799 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
800 error_report!(e, "Unable to upgrade to owned state files");
801 break;
802 }
803 }
804 Ok(AlreadyHeld) => {
805 if let Err(e) = circmgr.store_persistent_state() {
806 error_report!(e, "Unable to flush circmgr state");
807 break;
808 }
809 }
810 Ok(NoLock) => {
811 if let Err(e) = circmgr.reload_persistent_state() {
812 error_report!(e, "Unable to reload circmgr state");
813 break;
814 }
815 }
816 }
817 } else {
818 debug!("Circmgr has disappeared; task exiting.");
819 return;
820 }
821 sched.fire_in(Duration::from_secs(60));
828 }
829
830 debug!("State update task exiting (potentially due to handle drop).");
831 }
832
833 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
837 self.mgr.peek_builder().upgrade_to_owned_state()?;
838 Ok(())
839 }
840
841 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
846 self.mgr.peek_builder().reload_state()?;
847 Ok(())
848 }
849
850 async fn continually_preemptively_build_circuits<D>(
862 mut sched: TaskSchedule<R>,
863 circmgr: Weak<Self>,
864 dirmgr: Weak<D>,
865 ) where
866 D: NetDirProvider + 'static + ?Sized,
867 {
868 let base_delay = Duration::from_secs(10);
869 let mut retry = RetryDelay::from_duration(base_delay);
870
871 while sched.next().await.is_some() {
872 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
873 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
874 let result = cm
875 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
876 .await;
877
878 let delay = match result {
879 Ok(()) => {
880 retry.reset();
881 base_delay
882 }
883 Err(_) => retry.next_delay(&mut rand::rng()),
884 };
885
886 sched.fire_in(delay);
887 } else {
888 let _ = dm.events().next().await;
891 sched.fire();
892 }
893 } else {
894 return;
895 }
896 }
897 }
898
899 async fn launch_circuits_preemptively(
907 &self,
908 netdir: DirInfo<'_>,
909 ) -> std::result::Result<(), err::PreemptiveCircError> {
910 trace!("Checking preemptive circuit predictions.");
911 let (circs, threshold) = {
912 let path_config = self.mgr.peek_builder().path_config();
913 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
914 let threshold = preemptive.config().disable_at_threshold;
915 (preemptive.predict(&path_config), threshold)
916 };
917
918 if self.mgr.n_circs() >= threshold {
919 return Ok(());
920 }
921 let mut n_created = 0_usize;
922 let mut n_errors = 0_usize;
923
924 let futures = circs
925 .iter()
926 .map(|usage| self.mgr.get_or_launch(usage, netdir));
927 let results = futures::future::join_all(futures).await;
928 for (i, result) in results.into_iter().enumerate() {
929 match result {
930 Ok((_, CircProvenance::NewlyCreated)) => {
931 debug!("Preeemptive circuit was created for {:?}", circs[i]);
932 n_created += 1;
933 }
934 Ok((_, CircProvenance::Preexisting)) => {
935 trace!("Circuit already existed created for {:?}", circs[i]);
936 }
937 Err(e) => {
938 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
939 n_errors += 1;
940 }
941 }
942 }
943
944 if n_created > 0 || n_errors == 0 {
945 Ok(())
949 } else {
950 Err(err::PreemptiveCircError)
953 }
954 }
955
956 #[cfg(feature = "hs-common")]
973 pub(crate) async fn launch_hs_unmanaged<T>(
974 &self,
975 planned_target: Option<T>,
976 dir: &NetDir,
977 stem_kind: HsCircStemKind,
978 circ_kind: Option<HsCircKind>,
979 ) -> Result<Arc<B::Circ>>
980 where
981 T: IntoOwnedChanTarget,
982 {
983 let usage = TargetCircUsage::HsCircBase {
984 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
985 stem_kind,
986 circ_kind,
987 };
988 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
989 Ok(client_circ)
990 }
991
992 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
998 self.mgr
999 .peek_builder()
1000 .guardmgr()
1001 .netdir_is_sufficient(netdir)
1002 }
1003
1004 pub(crate) fn estimate_timeout(
1006 &self,
1007 timeout_action: &timeouts::Action,
1008 ) -> std::time::Duration {
1009 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1010 timeout
1011 }
1012
1013 pub(crate) fn builder(&self) -> &B {
1015 self.mgr.peek_builder()
1016 }
1017
1018 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1023 self.mgr.peek_builder().save_state()
1024 }
1025
1026 fn expire_circuits(&self) {
1031 let now = self.mgr.peek_runtime().now();
1037 self.mgr.expire_circs(now);
1038 }
1039
1040 pub(crate) fn retire_all_circuits(&self) {
1048 self.mgr.retire_all_circuits();
1049 }
1050
1051 pub(crate) fn retire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id) {
1054 let _ = self.mgr.take_circ(circ_id);
1055 }
1056
1057 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1065 self.mgr.peek_builder().guardmgr().skew_events()
1066 }
1067
1068 pub(crate) fn note_external_failure(
1072 &self,
1073 target: &impl ChanTarget,
1074 external_failure: ExternalActivity,
1075 ) {
1076 self.mgr
1077 .peek_builder()
1078 .guardmgr()
1079 .note_external_failure(target, external_failure);
1080 }
1081
1082 pub(crate) fn note_external_success(
1085 &self,
1086 target: &impl ChanTarget,
1087 external_activity: ExternalActivity,
1088 ) {
1089 self.mgr
1090 .peek_builder()
1091 .guardmgr()
1092 .note_external_success(target, external_activity);
1093 }
1094}
1095
1096impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1097 fn drop(&mut self) {
1098 match self.store_persistent_state() {
1099 Ok(true) => info!("Flushed persistent state at exit."),
1100 Ok(false) => debug!("Lock not held; no state to flush."),
1101 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1102 }
1103 }
1104}
1105
1106#[cfg(test)]
1107mod test {
1108 #![allow(clippy::bool_assert_comparison)]
1110 #![allow(clippy::clone_on_copy)]
1111 #![allow(clippy::dbg_macro)]
1112 #![allow(clippy::mixed_attributes_style)]
1113 #![allow(clippy::print_stderr)]
1114 #![allow(clippy::print_stdout)]
1115 #![allow(clippy::single_char_pattern)]
1116 #![allow(clippy::unwrap_used)]
1117 #![allow(clippy::unchecked_duration_subtraction)]
1118 #![allow(clippy::useless_vec)]
1119 #![allow(clippy::needless_pass_by_value)]
1120 use mocks::FakeBuilder;
1122 use tor_guardmgr::GuardMgr;
1123 use tor_linkspec::OwnedChanTarget;
1124 use tor_netdir::testprovider::TestNetDirProvider;
1125 use tor_persist::TestingStateMgr;
1126
1127 use super::*;
1128
1129 #[test]
1130 fn get_params() {
1131 use tor_netdir::{MdReceiver, PartialNetDir};
1132 use tor_netdoc::doc::netstatus::NetParams;
1133 let fb = FallbackList::from([]);
1135 let di: DirInfo<'_> = (&fb).into();
1136
1137 let p1 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1138 assert!(!p1.extend_by_ed25519_id);
1139
1140 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1142 let mut params = NetParams::default();
1143 params.set("circwindow".into(), 100);
1144 params.set("ExtendByEd25519ID".into(), 1);
1145 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1146 for m in microdescs {
1147 dir.add_microdesc(m);
1148 }
1149 let netdir = dir.unwrap_if_sufficient().unwrap();
1150 let di: DirInfo<'_> = (&netdir).into();
1151 let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1152 assert!(p2.extend_by_ed25519_id);
1153
1154 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1156 let mut params = NetParams::default();
1157 params.set("circwindow".into(), 100_000);
1158 params.set("ExtendByEd25519ID".into(), 1);
1159 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1160 for m in microdescs {
1161 dir.add_microdesc(m);
1162 }
1163 let netdir = dir.unwrap_if_sufficient().unwrap();
1164 let di: DirInfo<'_> = (&netdir).into();
1165 let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1166 assert!(p2.extend_by_ed25519_id);
1167 }
1168
1169 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1170 let config = crate::config::test_config::TestConfig::default();
1171 let statemgr = TestingStateMgr::new();
1172 let guardmgr =
1173 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1174 let builder = FakeBuilder::new(
1175 &runtime,
1176 statemgr.clone(),
1177 &tor_guardmgr::TestConfig::default(),
1178 );
1179 let circmgr = Arc::new(CircMgrInner::new_generic(
1180 &config, &runtime, &guardmgr, builder,
1181 ));
1182 let netdir = Arc::new(TestNetDirProvider::new());
1183 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1184 .expect("launch CircMgrInner background tasks");
1185 circmgr
1186 }
1187
1188 #[test]
1189 #[cfg(feature = "hs-common")]
1190 fn test_launch_hs_unmanaged() {
1191 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1192 let circmgr = make_circmgr(runtime.clone());
1193 let netdir = tor_netdir::testnet::construct_netdir()
1194 .unwrap_if_sufficient()
1195 .unwrap();
1196
1197 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1198 runtime.spawn_identified("launch_hs_unamanged", async move {
1199 ret_tx
1200 .send(
1201 circmgr
1202 .launch_hs_unmanaged::<OwnedChanTarget>(
1203 None,
1204 &netdir,
1205 HsCircStemKind::Naive,
1206 None,
1207 )
1208 .await,
1209 )
1210 .unwrap();
1211 });
1212 runtime.advance_by(Duration::from_millis(60)).await;
1213 ret_rx.await.unwrap().unwrap();
1214 });
1215 }
1216}