1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
49
50use build::TunnelBuilder;
51use mgr::{AbstractTunnel, AbstractTunnelBuilder};
52use tor_basic_utils::retry::RetryDelay;
53use tor_chanmgr::ChanMgr;
54use tor_dircommon::fallback::FallbackList;
55use tor_error::{error_report, warn_report};
56use tor_guardmgr::RetireCircuits;
57use tor_linkspec::ChanTarget;
58use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
59use tor_proto::circuit::UniqId;
60use tor_proto::client::circuit::CircParameters;
61use tor_rtcompat::Runtime;
62
63#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
64use tor_linkspec::IntoOwnedChanTarget;
65
66use futures::StreamExt;
67use futures::task::SpawnExt;
68use std::sync::{Arc, Mutex, Weak};
69use std::time::{Duration, Instant};
70use tracing::{debug, info, trace, warn};
71
72#[cfg(feature = "testing")]
73pub use config::test_config::TestConfig;
74
75pub mod build;
76mod config;
77mod err;
78#[cfg(feature = "hs-common")]
79pub mod hspool;
80mod impls;
81pub mod isolation;
82mod mgr;
83#[cfg(test)]
84mod mocks;
85mod preemptive;
86pub mod timeouts;
87mod tunnel;
88mod usage;
89
90cfg_if::cfg_if! {
92 if #[cfg(feature = "experimental-api")] {
93 pub mod path;
94 } else {
95 pub(crate) mod path;
96 }
97}
98
99pub use err::Error;
100pub use isolation::IsolationToken;
101pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
102pub use tunnel::{
103 ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
104 ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
105 ServiceOnionServiceIntroTunnel,
106};
107#[cfg(feature = "conflux")]
108#[cfg_attr(docsrs, doc(cfg(feature = "conflux")))]
109pub use tunnel::{
110 ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
111 ServiceMultiPathOnionServiceDataTunnel,
112};
113pub use usage::{TargetPort, TargetPorts};
114
115pub use config::{
116 CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
117 PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
118};
119
120use crate::isolation::StreamIsolation;
121use crate::mgr::TunnelProvenance;
122use crate::preemptive::PreemptiveCircuitPredictor;
123use usage::TargetTunnelUsage;
124
125use safelog::sensitive as sv;
126#[cfg(feature = "geoip")]
127use tor_geoip::CountryCode;
128pub use tor_guardmgr::{ExternalActivity, FirstHopId};
129use tor_persist::StateMgr;
130use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
131
132#[cfg(feature = "hs-common")]
133use crate::hspool::{HsCircKind, HsCircStemKind};
134#[cfg(all(feature = "vanguards", feature = "hs-common"))]
135use tor_guardmgr::vanguards::VanguardMgr;
136
137pub type Result<T> = std::result::Result<T, Error>;
139
140type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
142
143const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
145
146#[derive(Debug, Copy, Clone)]
154#[non_exhaustive]
155pub enum DirInfo<'a> {
156 Fallbacks(&'a FallbackList),
158 Directory(&'a NetDir),
160 Nothing,
163}
164
165impl<'a> From<&'a FallbackList> for DirInfo<'a> {
166 fn from(v: &'a FallbackList) -> DirInfo<'a> {
167 DirInfo::Fallbacks(v)
168 }
169}
170impl<'a> From<&'a NetDir> for DirInfo<'a> {
171 fn from(v: &'a NetDir) -> DirInfo<'a> {
172 DirInfo::Directory(v)
173 }
174}
175impl<'a> DirInfo<'a> {
176 fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
178 use tor_netdir::params::NetParameters;
179 let defaults = NetParameters::default();
182 let net_params = match self {
183 DirInfo::Directory(d) => d.params(),
184 _ => &defaults,
185 };
186 match usage {
187 #[cfg(feature = "hs-common")]
188 TargetTunnelUsage::HsCircBase { .. } => {
189 build::onion_circparams_from_netparams(net_params)
190 }
191 _ => build::exit_circparams_from_netparams(net_params),
192 }
193 }
194}
195
196pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
206
207impl<R: Runtime> CircMgr<R> {
208 pub fn new<SM, CFG: CircMgrConfig>(
214 config: &CFG,
215 storage: SM,
216 runtime: &R,
217 chanmgr: Arc<ChanMgr<R>>,
218 guardmgr: &tor_guardmgr::GuardMgr<R>,
219 ) -> Result<Self>
220 where
221 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
222 {
223 Ok(Self(Arc::new(CircMgrInner::new(
224 config, storage, runtime, chanmgr, guardmgr,
225 )?)))
226 }
227
228 pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
231 let tunnel = self.0.get_or_launch_dir(netdir).await?;
232 Ok(tunnel.into())
233 }
234
235 pub async fn get_or_launch_exit(
241 &self,
242 netdir: DirInfo<'_>, ports: &[TargetPort],
244 isolation: StreamIsolation,
245 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
248 ) -> Result<ClientDataTunnel> {
249 let tunnel = self
250 .0
251 .get_or_launch_exit(
252 netdir,
253 ports,
254 isolation,
255 #[cfg(feature = "geoip")]
256 country_code,
257 )
258 .await?;
259 Ok(tunnel.into())
260 }
261
262 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
267 #[cfg(feature = "specific-relay")]
268 pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
269 &self,
270 target: T,
271 ) -> Result<ClientDirTunnel> {
272 let tunnel = self.0.get_or_launch_dir_specific(target).await?;
273 Ok(tunnel.into())
274 }
275
276 pub fn launch_background_tasks<D, S>(
282 self: &Arc<Self>,
283 runtime: &R,
284 dir_provider: &Arc<D>,
285 state_mgr: S,
286 ) -> Result<Vec<TaskHandle>>
287 where
288 D: NetDirProvider + 'static + ?Sized,
289 S: StateMgr + std::marker::Send + 'static,
290 {
291 CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
292 }
293
294 pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
300 self.0.netdir_is_sufficient(netdir)
301 }
302
303 pub fn retire_circ(&self, circ_id: &UniqId) {
306 self.0.retire_circ(circ_id);
307 }
308
309 pub fn note_external_failure(
313 &self,
314 target: &impl ChanTarget,
315 external_failure: ExternalActivity,
316 ) {
317 self.0.note_external_failure(target, external_failure);
318 }
319
320 pub fn note_external_success(
323 &self,
324 target: &impl ChanTarget,
325 external_activity: ExternalActivity,
326 ) {
327 self.0.note_external_success(target, external_activity);
328 }
329
330 pub fn skew_events(&self) -> ClockSkewEvents {
338 self.0.skew_events()
339 }
340
341 pub fn reconfigure<CFG: CircMgrConfig>(
347 &self,
348 new_config: &CFG,
349 how: tor_config::Reconfigure,
350 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
351 self.0.reconfigure(new_config, how)
352 }
353
354 pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
380 self.0.estimate_timeout(timeout_action)
381 }
382
383 #[cfg(feature = "experimental-api")]
386 pub fn builder(&self) -> &TunnelBuilder<R> {
387 CircMgrInner::builder(&self.0)
388 }
389}
390
391#[derive(Clone)]
393pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
394 mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
396 predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
398}
399
400impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
401 #[allow(clippy::unnecessary_wraps)]
407 pub(crate) fn new<SM, CFG: CircMgrConfig>(
408 config: &CFG,
409 storage: SM,
410 runtime: &R,
411 chanmgr: Arc<ChanMgr<R>>,
412 guardmgr: &tor_guardmgr::GuardMgr<R>,
413 ) -> Result<Self>
414 where
415 SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
416 {
417 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
418 let vanguardmgr = {
419 let has_onion_svc = false;
424 VanguardMgr::new(
425 config.vanguard_config(),
426 runtime.clone(),
427 storage.clone(),
428 has_onion_svc,
429 )?
430 };
431
432 let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
433
434 let builder = build::TunnelBuilder::new(
435 runtime.clone(),
436 chanmgr,
437 config.path_rules().clone(),
438 storage_handle,
439 guardmgr.clone(),
440 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
441 vanguardmgr,
442 );
443
444 Ok(Self::new_generic(config, runtime, guardmgr, builder))
445 }
446}
447
448impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
449 pub(crate) fn new_generic<CFG: CircMgrConfig>(
451 config: &CFG,
452 runtime: &R,
453 guardmgr: &tor_guardmgr::GuardMgr<R>,
454 builder: B,
455 ) -> Self {
456 let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
457 config.preemptive_circuits().clone(),
458 )));
459
460 guardmgr.set_filter(config.path_rules().build_guard_filter());
461
462 let mgr =
463 mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
464
465 CircMgrInner {
466 mgr: Arc::new(mgr),
467 predictor: preemptive,
468 }
469 }
470
471 pub(crate) fn launch_background_tasks<D, S>(
477 self: &Arc<Self>,
478 runtime: &R,
479 dir_provider: &Arc<D>,
480 state_mgr: S,
481 ) -> Result<Vec<TaskHandle>>
482 where
483 D: NetDirProvider + 'static + ?Sized,
484 S: StateMgr + std::marker::Send + 'static,
485 {
486 let mut ret = vec![];
487
488 runtime
489 .spawn(Self::keep_circmgr_params_updated(
490 dir_provider.events(),
491 Arc::downgrade(self),
492 Arc::downgrade(dir_provider),
493 ))
494 .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
495
496 let (sched, handle) = TaskSchedule::new(runtime.clone());
497 ret.push(handle);
498
499 runtime
500 .spawn(Self::update_persistent_state(
501 sched,
502 Arc::downgrade(self),
503 state_mgr,
504 ))
505 .map_err(|e| Error::from_spawn("persistent state updater", e))?;
506
507 let (sched, handle) = TaskSchedule::new(runtime.clone());
508 ret.push(handle);
509
510 runtime
511 .spawn(Self::continually_launch_timeout_testing_circuits(
512 sched,
513 Arc::downgrade(self),
514 Arc::downgrade(dir_provider),
515 ))
516 .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
517
518 let (sched, handle) = TaskSchedule::new(runtime.clone());
519 ret.push(handle);
520
521 runtime
522 .spawn(Self::continually_preemptively_build_circuits(
523 sched,
524 Arc::downgrade(self),
525 Arc::downgrade(dir_provider),
526 ))
527 .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
528
529 self.mgr
530 .peek_builder()
531 .guardmgr()
532 .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
533
534 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
535 {
536 let () = self
537 .mgr
538 .peek_builder()
539 .vanguardmgr()
540 .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
541 }
542
543 Ok(ret)
544 }
545
546 pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
549 self.expire_circuits();
550 let usage = TargetTunnelUsage::Dir;
551 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
552 }
553
554 pub(crate) async fn get_or_launch_exit(
560 &self,
561 netdir: DirInfo<'_>, ports: &[TargetPort],
563 isolation: StreamIsolation,
564 #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
567 ) -> Result<Arc<B::Tunnel>> {
568 self.expire_circuits();
569 let time = Instant::now();
570 {
571 let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
572 if ports.is_empty() {
573 predictive.note_usage(None, time);
574 } else {
575 for port in ports.iter() {
576 predictive.note_usage(Some(*port), time);
577 }
578 }
579 }
580 let require_stability = ports.iter().any(|p| {
581 self.mgr
582 .peek_builder()
583 .path_config()
584 .long_lived_ports
585 .contains(&p.port)
586 });
587 let ports = ports.iter().map(Clone::clone).collect();
588 #[cfg(not(feature = "geoip"))]
589 let country_code = None;
590 let usage = TargetTunnelUsage::Exit {
591 ports,
592 isolation,
593 country_code,
594 require_stability,
595 };
596 self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
597 }
598
599 #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
604 #[cfg(feature = "specific-relay")]
605 pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
606 &self,
607 target: T,
608 ) -> Result<Arc<B::Tunnel>> {
609 self.expire_circuits();
610 let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
611 self.mgr
612 .get_or_launch(&usage, DirInfo::Nothing)
613 .await
614 .map(|(c, _)| c)
615 }
616
617 pub(crate) fn reconfigure<CFG: CircMgrConfig>(
623 &self,
624 new_config: &CFG,
625 how: tor_config::Reconfigure,
626 ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
627 let old_path_rules = self.mgr.peek_builder().path_config();
628 let predictor = self.predictor.lock().expect("poisoned lock");
629 let preemptive_circuits = predictor.config();
630 if preemptive_circuits.initial_predicted_ports
631 != new_config.preemptive_circuits().initial_predicted_ports
632 {
633 how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
635 }
636
637 if how == tor_config::Reconfigure::CheckAllOrNothing {
638 return Ok(RetireCircuits::None);
639 }
640
641 let retire_because_of_guardmgr =
642 self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
643
644 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
645 let retire_because_of_vanguardmgr = self
646 .mgr
647 .peek_builder()
648 .vanguardmgr()
649 .reconfigure(new_config.vanguard_config())?;
650
651 let new_reachable = &new_config.path_rules().reachable_addrs;
652 if new_reachable != &old_path_rules.reachable_addrs {
653 let filter = new_config.path_rules().build_guard_filter();
654 self.mgr.peek_builder().guardmgr().set_filter(filter);
655 }
656
657 let discard_all_circuits = !new_config
658 .path_rules()
659 .at_least_as_permissive_as(&old_path_rules)
660 || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
661
662 #[cfg(all(feature = "vanguards", feature = "hs-common"))]
663 let discard_all_circuits = discard_all_circuits
664 || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
665
666 self.mgr
667 .peek_builder()
668 .set_path_config(new_config.path_rules().clone());
669 self.mgr
670 .set_circuit_timing(new_config.circuit_timing().clone());
671 predictor.set_config(new_config.preemptive_circuits().clone());
672
673 if discard_all_circuits {
674 info!("Path configuration has become more restrictive: retiring existing circuits.");
678 self.retire_all_circuits();
679 return Ok(RetireCircuits::All);
680 }
681 Ok(RetireCircuits::None)
682 }
683
684 async fn keep_circmgr_params_updated<D>(
692 mut events: impl futures::Stream<Item = DirEvent> + Unpin,
693 circmgr: Weak<Self>,
694 dirmgr: Weak<D>,
695 ) where
696 D: NetDirProvider + 'static + ?Sized,
697 {
698 use DirEvent::*;
699 while let Some(event) = events.next().await {
700 if matches!(event, NewConsensus) {
701 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
702 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
703 cm.update_network_parameters(netdir.params());
704 }
705 } else {
706 debug!("Circmgr or dirmgr has disappeared; task exiting.");
707 break;
708 }
709 }
710 }
711 }
712
713 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
716 self.mgr.update_network_parameters(p);
717 self.mgr.peek_builder().update_network_parameters(p);
718 }
719
720 async fn continually_launch_timeout_testing_circuits<D>(
727 mut sched: TaskSchedule<R>,
728 circmgr: Weak<Self>,
729 dirmgr: Weak<D>,
730 ) where
731 D: NetDirProvider + 'static + ?Sized,
732 {
733 while sched.next().await.is_some() {
734 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
735 if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
736 if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
737 warn_report!(e, "Problem launching a timeout testing circuit");
738 }
739 let delay = netdir
740 .params()
741 .cbt_testing_delay
742 .try_into()
743 .expect("Out-of-bounds value from BoundedInt32");
744
745 drop((cm, dm));
746 sched.fire_in(delay);
747 } else {
748 let _ = dm.events().next().await;
751 sched.fire();
752 }
753 } else {
754 return;
755 }
756 }
757 }
758
759 fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
767 if !self.mgr.peek_builder().learning_timeouts() {
768 return Ok(());
769 }
770 self.expire_circuits();
773 let max_circs: u64 = netdir
774 .params()
775 .cbt_max_open_circuits_for_testing
776 .try_into()
777 .expect("Out-of-bounds result from BoundedInt32");
778 if (self.mgr.n_tunnels() as u64) < max_circs {
779 let usage = TargetTunnelUsage::TimeoutTesting;
781 let dirinfo = netdir.into();
782 let mgr = Arc::clone(&self.mgr);
783 debug!("Launching a circuit to test build times.");
784 let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
785 drop(receiver);
788 }
789
790 Ok(())
791 }
792
793 #[allow(clippy::cognitive_complexity)] async fn update_persistent_state<S>(
801 mut sched: TaskSchedule<R>,
802 circmgr: Weak<Self>,
803 statemgr: S,
804 ) where
805 S: StateMgr + std::marker::Send,
806 {
807 while sched.next().await.is_some() {
808 if let Some(circmgr) = Weak::upgrade(&circmgr) {
809 use tor_persist::LockStatus::*;
810
811 match statemgr.try_lock() {
812 Err(e) => {
813 error_report!(e, "Problem with state lock file");
814 break;
815 }
816 Ok(NewlyAcquired) => {
817 info!("We now own the lock on our state files.");
818 if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
819 error_report!(e, "Unable to upgrade to owned state files");
820 break;
821 }
822 }
823 Ok(AlreadyHeld) => {
824 if let Err(e) = circmgr.store_persistent_state() {
825 error_report!(e, "Unable to flush circmgr state");
826 break;
827 }
828 }
829 Ok(NoLock) => {
830 if let Err(e) = circmgr.reload_persistent_state() {
831 error_report!(e, "Unable to reload circmgr state");
832 break;
833 }
834 }
835 }
836 } else {
837 debug!("Circmgr has disappeared; task exiting.");
838 return;
839 }
840 sched.fire_in(Duration::from_secs(60));
847 }
848
849 debug!("State update task exiting (potentially due to handle drop).");
850 }
851
852 pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
856 self.mgr.peek_builder().upgrade_to_owned_state()?;
857 Ok(())
858 }
859
860 pub(crate) fn reload_persistent_state(&self) -> Result<()> {
865 self.mgr.peek_builder().reload_state()?;
866 Ok(())
867 }
868
869 async fn continually_preemptively_build_circuits<D>(
881 mut sched: TaskSchedule<R>,
882 circmgr: Weak<Self>,
883 dirmgr: Weak<D>,
884 ) where
885 D: NetDirProvider + 'static + ?Sized,
886 {
887 let base_delay = Duration::from_secs(10);
888 let mut retry = RetryDelay::from_duration(base_delay);
889
890 while sched.next().await.is_some() {
891 if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
892 if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
893 let result = cm
894 .launch_circuits_preemptively(DirInfo::Directory(&netdir))
895 .await;
896
897 let delay = match result {
898 Ok(()) => {
899 retry.reset();
900 base_delay
901 }
902 Err(_) => retry.next_delay(&mut rand::rng()),
903 };
904
905 sched.fire_in(delay);
906 } else {
907 let _ = dm.events().next().await;
910 sched.fire();
911 }
912 } else {
913 return;
914 }
915 }
916 }
917
918 #[allow(clippy::cognitive_complexity)]
926 async fn launch_circuits_preemptively(
927 &self,
928 netdir: DirInfo<'_>,
929 ) -> std::result::Result<(), err::PreemptiveCircError> {
930 trace!("Checking preemptive circuit predictions.");
931 let (circs, threshold) = {
932 let path_config = self.mgr.peek_builder().path_config();
933 let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
934 let threshold = preemptive.config().disable_at_threshold;
935 (preemptive.predict(&path_config), threshold)
936 };
937
938 if self.mgr.n_tunnels() >= threshold {
939 return Ok(());
940 }
941 let mut n_created = 0_usize;
942 let mut n_errors = 0_usize;
943
944 let futures = circs
945 .iter()
946 .map(|usage| self.mgr.get_or_launch(usage, netdir));
947 let results = futures::future::join_all(futures).await;
948 for (i, result) in results.into_iter().enumerate() {
949 match result {
950 Ok((_, TunnelProvenance::NewlyCreated)) => {
951 debug!("Preeemptive circuit was created for {:?}", circs[i]);
952 n_created += 1;
953 }
954 Ok((_, TunnelProvenance::Preexisting)) => {
955 trace!("Circuit already existed created for {:?}", circs[i]);
956 }
957 Err(e) => {
958 warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
959 n_errors += 1;
960 }
961 }
962 }
963
964 if n_created > 0 || n_errors == 0 {
965 Ok(())
969 } else {
970 Err(err::PreemptiveCircError)
973 }
974 }
975
976 #[cfg(feature = "hs-common")]
993 pub(crate) async fn launch_hs_unmanaged<T>(
994 &self,
995 planned_target: Option<T>,
996 dir: &NetDir,
997 stem_kind: HsCircStemKind,
998 circ_kind: Option<HsCircKind>,
999 ) -> Result<B::Tunnel>
1000 where
1001 T: IntoOwnedChanTarget,
1002 {
1003 let usage = TargetTunnelUsage::HsCircBase {
1004 compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1005 stem_kind,
1006 circ_kind,
1007 };
1008 let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1009 Ok(client_circ)
1010 }
1011
1012 pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1018 self.mgr
1019 .peek_builder()
1020 .guardmgr()
1021 .netdir_is_sufficient(netdir)
1022 }
1023
1024 pub(crate) fn estimate_timeout(
1026 &self,
1027 timeout_action: &timeouts::Action,
1028 ) -> std::time::Duration {
1029 let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1030 timeout
1031 }
1032
1033 pub(crate) fn builder(&self) -> &B {
1035 self.mgr.peek_builder()
1036 }
1037
1038 pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1043 self.mgr.peek_builder().save_state()
1044 }
1045
1046 fn expire_circuits(&self) {
1051 let now = self.mgr.peek_runtime().now();
1057 self.mgr.expire_tunnels(now);
1058 }
1059
1060 pub(crate) fn retire_all_circuits(&self) {
1068 self.mgr.retire_all_tunnels();
1069 }
1070
1071 pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1074 let _ = self.mgr.take_tunnel(circ_id);
1075 }
1076
1077 pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1085 self.mgr.peek_builder().guardmgr().skew_events()
1086 }
1087
1088 pub(crate) fn note_external_failure(
1092 &self,
1093 target: &impl ChanTarget,
1094 external_failure: ExternalActivity,
1095 ) {
1096 self.mgr
1097 .peek_builder()
1098 .guardmgr()
1099 .note_external_failure(target, external_failure);
1100 }
1101
1102 pub(crate) fn note_external_success(
1105 &self,
1106 target: &impl ChanTarget,
1107 external_activity: ExternalActivity,
1108 ) {
1109 self.mgr
1110 .peek_builder()
1111 .guardmgr()
1112 .note_external_success(target, external_activity);
1113 }
1114}
1115
1116impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1117 fn drop(&mut self) {
1118 match self.store_persistent_state() {
1119 Ok(true) => info!("Flushed persistent state at exit."),
1120 Ok(false) => debug!("Lock not held; no state to flush."),
1121 Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1122 }
1123 }
1124}
1125
1126#[cfg(test)]
1127mod test {
1128 #![allow(clippy::bool_assert_comparison)]
1130 #![allow(clippy::clone_on_copy)]
1131 #![allow(clippy::dbg_macro)]
1132 #![allow(clippy::mixed_attributes_style)]
1133 #![allow(clippy::print_stderr)]
1134 #![allow(clippy::print_stdout)]
1135 #![allow(clippy::single_char_pattern)]
1136 #![allow(clippy::unwrap_used)]
1137 #![allow(clippy::unchecked_duration_subtraction)]
1138 #![allow(clippy::useless_vec)]
1139 #![allow(clippy::needless_pass_by_value)]
1140 use mocks::FakeBuilder;
1142 use tor_guardmgr::GuardMgr;
1143 use tor_linkspec::OwnedChanTarget;
1144 use tor_netdir::testprovider::TestNetDirProvider;
1145 use tor_persist::TestingStateMgr;
1146
1147 use super::*;
1148
1149 #[test]
1150 fn get_params() {
1151 use tor_netdir::{MdReceiver, PartialNetDir};
1152 use tor_netdoc::doc::netstatus::NetParams;
1153 let fb = FallbackList::from([]);
1155 let di: DirInfo<'_> = (&fb).into();
1156
1157 let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1158 assert!(!p1.extend_by_ed25519_id);
1159
1160 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1162 let mut params = NetParams::default();
1163 params.set("circwindow".into(), 100);
1164 params.set("ExtendByEd25519ID".into(), 1);
1165 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1166 for m in microdescs {
1167 dir.add_microdesc(m);
1168 }
1169 let netdir = dir.unwrap_if_sufficient().unwrap();
1170 let di: DirInfo<'_> = (&netdir).into();
1171 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1172 assert!(p2.extend_by_ed25519_id);
1173
1174 let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1176 let mut params = NetParams::default();
1177 params.set("circwindow".into(), 100_000);
1178 params.set("ExtendByEd25519ID".into(), 1);
1179 let mut dir = PartialNetDir::new(consensus, Some(¶ms));
1180 for m in microdescs {
1181 dir.add_microdesc(m);
1182 }
1183 let netdir = dir.unwrap_if_sufficient().unwrap();
1184 let di: DirInfo<'_> = (&netdir).into();
1185 let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1186 assert!(p2.extend_by_ed25519_id);
1187 }
1188
1189 fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1190 let config = crate::config::test_config::TestConfig::default();
1191 let statemgr = TestingStateMgr::new();
1192 let guardmgr =
1193 GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1194 let builder = FakeBuilder::new(
1195 &runtime,
1196 statemgr.clone(),
1197 &tor_guardmgr::TestConfig::default(),
1198 );
1199 let circmgr = Arc::new(CircMgrInner::new_generic(
1200 &config, &runtime, &guardmgr, builder,
1201 ));
1202 let netdir = Arc::new(TestNetDirProvider::new());
1203 CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1204 .expect("launch CircMgrInner background tasks");
1205 circmgr
1206 }
1207
1208 #[test]
1209 #[cfg(feature = "hs-common")]
1210 fn test_launch_hs_unmanaged() {
1211 tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1212 let circmgr = make_circmgr(runtime.clone());
1213 let netdir = tor_netdir::testnet::construct_netdir()
1214 .unwrap_if_sufficient()
1215 .unwrap();
1216
1217 let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1218 runtime.spawn_identified("launch_hs_unamanged", async move {
1219 ret_tx
1220 .send(
1221 circmgr
1222 .launch_hs_unmanaged::<OwnedChanTarget>(
1223 None,
1224 &netdir,
1225 HsCircStemKind::Naive,
1226 None,
1227 )
1228 .await,
1229 )
1230 .unwrap();
1231 });
1232 runtime.advance_by(Duration::from_millis(60)).await;
1233 ret_rx.await.unwrap().unwrap();
1234 });
1235 }
1236}