tor_circmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
45
46// TODO #1645 (either remove this, or decide to have it everywhere)
47#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
48
49use build::CircuitBuilder;
50use mgr::{AbstractCirc, AbstractCircBuilder};
51use tor_basic_utils::retry::RetryDelay;
52use tor_chanmgr::ChanMgr;
53use tor_error::{error_report, warn_report};
54use tor_guardmgr::RetireCircuits;
55use tor_linkspec::ChanTarget;
56use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
57use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
58use tor_rtcompat::Runtime;
59
60#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
61use tor_linkspec::IntoOwnedChanTarget;
62
63use futures::task::SpawnExt;
64use futures::StreamExt;
65use std::sync::{Arc, Mutex, Weak};
66use std::time::{Duration, Instant};
67use tracing::{debug, info, trace, warn};
68
69#[cfg(feature = "testing")]
70pub use config::test_config::TestConfig;
71
72pub mod build;
73mod config;
74mod err;
75#[cfg(feature = "hs-common")]
76pub mod hspool;
77mod impls;
78pub mod isolation;
79mod mgr;
80#[cfg(test)]
81mod mocks;
82mod preemptive;
83pub mod timeouts;
84mod usage;
85
86// Can't apply `visibility` to modules.
87cfg_if::cfg_if! {
88    if #[cfg(feature = "experimental-api")] {
89        pub mod path;
90    } else {
91        pub(crate) mod path;
92    }
93}
94
95pub use err::Error;
96pub use isolation::IsolationToken;
97use tor_guardmgr::fallback::FallbackList;
98pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
99pub use usage::{TargetPort, TargetPorts};
100
101pub use config::{
102    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
103    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
104};
105
106use crate::isolation::StreamIsolation;
107use crate::mgr::CircProvenance;
108use crate::preemptive::PreemptiveCircuitPredictor;
109use usage::TargetCircUsage;
110
111use safelog::sensitive as sv;
112#[cfg(feature = "geoip")]
113use tor_geoip::CountryCode;
114pub use tor_guardmgr::{ExternalActivity, FirstHopId};
115use tor_persist::StateMgr;
116use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
117
118#[cfg(feature = "hs-common")]
119use crate::hspool::{HsCircKind, HsCircStemKind};
120#[cfg(all(feature = "vanguards", feature = "hs-common"))]
121use tor_guardmgr::vanguards::VanguardMgr;
122
123/// A Result type as returned from this crate.
124pub type Result<T> = std::result::Result<T, Error>;
125
126/// Type alias for dynamic StorageHandle that can handle our timeout state.
127type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
128
129/// Key used to load timeout state information.
130const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
131
132/// Represents what we know about the Tor network.
133///
134/// This can either be a complete directory, or a list of fallbacks.
135///
136/// Not every DirInfo can be used to build every kind of circuit:
137/// if you try to build a path with an inadequate DirInfo, you'll get a
138/// NeedConsensus error.
139#[derive(Debug, Copy, Clone)]
140#[non_exhaustive]
141pub enum DirInfo<'a> {
142    /// A list of fallbacks, for use when we don't know a network directory.
143    Fallbacks(&'a FallbackList),
144    /// A complete network directory
145    Directory(&'a NetDir),
146    /// No information: we can only build one-hop paths: and that, only if the
147    /// guard manager knows some guards or fallbacks.
148    Nothing,
149}
150
151impl<'a> From<&'a FallbackList> for DirInfo<'a> {
152    fn from(v: &'a FallbackList) -> DirInfo<'a> {
153        DirInfo::Fallbacks(v)
154    }
155}
156impl<'a> From<&'a NetDir> for DirInfo<'a> {
157    fn from(v: &'a NetDir) -> DirInfo<'a> {
158        DirInfo::Directory(v)
159    }
160}
161impl<'a> DirInfo<'a> {
162    /// Return a set of circuit parameters for this DirInfo.
163    fn circ_params(&self, usage: &TargetCircUsage) -> Result<CircParameters> {
164        use tor_netdir::params::NetParameters;
165        // We use a common function for both cases here to be sure that
166        // we look at the defaults from NetParameters code.
167        let defaults = NetParameters::default();
168        let net_params = match self {
169            DirInfo::Directory(d) => d.params(),
170            _ => &defaults,
171        };
172        match usage {
173            #[cfg(feature = "hs-common")]
174            TargetCircUsage::HsCircBase { .. } => {
175                build::onion_circparams_from_netparams(net_params)
176            }
177            _ => build::exit_circparams_from_netparams(net_params),
178        }
179    }
180}
181
182/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
183/// when they're suitable, and launching them if they don't already exist.
184///
185/// Right now, its notion of "suitable" is quite rudimentary: it just
186/// believes in two kinds of circuits: Exit circuits, and directory
187/// circuits.  Exit circuits are ones that were created to connect to
188/// a set of ports; directory circuits were made to talk to directory caches.
189///
190/// This is a "handle"; clones of it share state.
191pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::CircuitBuilder<R>, R>>);
192
193impl<R: Runtime> CircMgr<R> {
194    /// Construct a new circuit manager.
195    ///
196    /// # Usage note
197    ///
198    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
199    pub fn new<SM, CFG: CircMgrConfig>(
200        config: &CFG,
201        storage: SM,
202        runtime: &R,
203        chanmgr: Arc<ChanMgr<R>>,
204        guardmgr: &tor_guardmgr::GuardMgr<R>,
205    ) -> Result<Self>
206    where
207        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
208    {
209        Ok(Self(Arc::new(CircMgrInner::new(
210            config, storage, runtime, chanmgr, guardmgr,
211        )?)))
212    }
213
214    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
215    /// launching it if necessary.
216    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
217        self.0.get_or_launch_dir(netdir).await
218    }
219
220    /// Return a circuit suitable for exiting to all of the provided
221    /// `ports`, launching it if necessary.
222    ///
223    /// If the list of ports is empty, then the chosen circuit will
224    /// still end at _some_ exit.
225    pub async fn get_or_launch_exit(
226        &self,
227        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
228        ports: &[TargetPort],
229        isolation: StreamIsolation,
230        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
231        //             additive. The function should be refactored to be builder-like.
232        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
233    ) -> Result<Arc<ClientCirc>> {
234        self.0
235            .get_or_launch_exit(
236                netdir,
237                ports,
238                isolation,
239                #[cfg(feature = "geoip")]
240                country_code,
241            )
242            .await
243    }
244
245    /// Return a circuit to a specific relay, suitable for using for direct
246    /// (one-hop) directory downloads.
247    ///
248    /// This could be used, for example, to download a descriptor for a bridge.
249    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
250    #[cfg(feature = "specific-relay")]
251    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
252        &self,
253        target: T,
254    ) -> Result<Arc<ClientCirc>> {
255        self.0.get_or_launch_dir_specific(target).await
256    }
257
258    /// Launch the periodic daemon tasks required by the manager to function properly.
259    ///
260    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
261    //
262    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
263    pub fn launch_background_tasks<D, S>(
264        self: &Arc<Self>,
265        runtime: &R,
266        dir_provider: &Arc<D>,
267        state_mgr: S,
268    ) -> Result<Vec<TaskHandle>>
269    where
270        D: NetDirProvider + 'static + ?Sized,
271        S: StateMgr + std::marker::Send + 'static,
272    {
273        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
274    }
275
276    /// Return true if `netdir` has enough information to be used for this
277    /// circuit manager.
278    ///
279    /// (This will check whether the netdir is missing any primary guard
280    /// microdescriptors)
281    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
282        self.0.netdir_is_sufficient(netdir)
283    }
284
285    /// If `circ_id` is the unique identifier for a circuit that we're
286    /// keeping track of, don't give it out for any future requests.
287    pub fn retire_circ(&self, circ_id: &UniqId) {
288        self.0.retire_circ(circ_id);
289    }
290
291    /// Record that a failure occurred on a circuit with a given guard, in a way
292    /// that makes us unwilling to use that guard for future circuits.
293    ///
294    pub fn note_external_failure(
295        &self,
296        target: &impl ChanTarget,
297        external_failure: ExternalActivity,
298    ) {
299        self.0.note_external_failure(target, external_failure);
300    }
301
302    /// Record that a success occurred on a circuit with a given guard, in a way
303    /// that makes us possibly willing to use that guard for future circuits.
304    pub fn note_external_success(
305        &self,
306        target: &impl ChanTarget,
307        external_activity: ExternalActivity,
308    ) {
309        self.0.note_external_success(target, external_activity);
310    }
311
312    /// Return a stream of events about our estimated clock skew; these events
313    /// are `None` when we don't have enough information to make an estimate,
314    /// and `Some(`[`SkewEstimate`]`)` otherwise.
315    ///
316    /// Note that this stream can be lossy: if the estimate changes more than
317    /// one before you read from the stream, you might only get the most recent
318    /// update.
319    pub fn skew_events(&self) -> ClockSkewEvents {
320        self.0.skew_events()
321    }
322
323    /// Try to change our configuration settings to `new_config`.
324    ///
325    /// The actual behavior here will depend on the value of `how`.
326    ///
327    /// Returns whether any of the circuit pools should be cleared.
328    pub fn reconfigure<CFG: CircMgrConfig>(
329        &self,
330        new_config: &CFG,
331        how: tor_config::Reconfigure,
332    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
333        self.0.reconfigure(new_config, how)
334    }
335
336    /// Return an estimate-based delay for how long a given
337    /// [`Action`](timeouts::Action) should be allowed to complete.
338    ///
339    /// Note that **you do not need to use this function** in order to get
340    /// reasonable timeouts for the circuit-building operations provided by the
341    /// `tor-circmgr` crate: those, unless specifically noted, always use these
342    /// timeouts to cancel circuit operations that have taken too long.
343    ///
344    /// Instead, you should only use this function when you need to estimate how
345    /// long some _other_ operation should take to complete.  For example, if
346    /// you are sending a request over a 3-hop circuit and waiting for a reply,
347    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
348    /// length: 3 })`.
349    ///
350    /// Note also that this function returns a _timeout_ that the operation
351    /// should be permitted to complete, not an estimated Duration that the
352    /// operation _will_ take to complete. Timeouts are chosen to ensure that
353    /// most operations will complete, but very slow ones will not.  So even if
354    /// we expect that a circuit will complete in (say) 3 seconds, we might
355    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
356    /// complete.
357    ///
358    /// Estimate-based timeouts may change over time, given observations on the
359    /// actual amount of time needed for circuits to complete building.  If not
360    /// enough information has been gathered, a reasonable default will be used.
361    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
362        self.0.estimate_timeout(timeout_action)
363    }
364
365    /// Return a reference to the associated CircuitBuilder that this CircMgr
366    /// will use to create its circuits.
367    #[cfg(feature = "experimental-api")]
368    pub fn builder(&self) -> &CircuitBuilder<R> {
369        CircMgrInner::builder(&self.0)
370    }
371}
372
373/// Internal object used to implement CircMgr, which allows for mocking.
374#[derive(Clone)]
375pub(crate) struct CircMgrInner<B: AbstractCircBuilder<R> + 'static, R: Runtime> {
376    /// The underlying circuit manager object that implements our behavior.
377    mgr: Arc<mgr::AbstractCircMgr<B, R>>,
378    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
379    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
380}
381
382impl<R: Runtime> CircMgrInner<CircuitBuilder<R>, R> {
383    /// Construct a new circuit manager.
384    ///
385    /// # Usage note
386    ///
387    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
388    #[allow(clippy::unnecessary_wraps)]
389    pub(crate) fn new<SM, CFG: CircMgrConfig>(
390        config: &CFG,
391        storage: SM,
392        runtime: &R,
393        chanmgr: Arc<ChanMgr<R>>,
394        guardmgr: &tor_guardmgr::GuardMgr<R>,
395    ) -> Result<Self>
396    where
397        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
398    {
399        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
400        let vanguardmgr = {
401            // TODO(#1382): we need a way of checking if this arti instance
402            // is running an onion service or not.
403            //
404            // Perhaps this information should be provided by CircMgrConfig.
405            let has_onion_svc = false;
406            VanguardMgr::new(
407                config.vanguard_config(),
408                runtime.clone(),
409                storage.clone(),
410                has_onion_svc,
411            )?
412        };
413
414        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
415
416        let builder = build::CircuitBuilder::new(
417            runtime.clone(),
418            chanmgr,
419            config.path_rules().clone(),
420            storage_handle,
421            guardmgr.clone(),
422            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423            vanguardmgr,
424        );
425
426        Ok(Self::new_generic(config, runtime, guardmgr, builder))
427    }
428}
429
430impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
431    /// Generic implementation for [`CircMgrInner::new`]
432    pub(crate) fn new_generic<CFG: CircMgrConfig>(
433        config: &CFG,
434        runtime: &R,
435        guardmgr: &tor_guardmgr::GuardMgr<R>,
436        builder: B,
437    ) -> Self {
438        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
439            config.preemptive_circuits().clone(),
440        )));
441
442        guardmgr.set_filter(config.path_rules().build_guard_filter());
443
444        let mgr =
445            mgr::AbstractCircMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
446
447        CircMgrInner {
448            mgr: Arc::new(mgr),
449            predictor: preemptive,
450        }
451    }
452
453    /// Launch the periodic daemon tasks required by the manager to function properly.
454    ///
455    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
456    //
457    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
458    pub(crate) fn launch_background_tasks<D, S>(
459        self: &Arc<Self>,
460        runtime: &R,
461        dir_provider: &Arc<D>,
462        state_mgr: S,
463    ) -> Result<Vec<TaskHandle>>
464    where
465        D: NetDirProvider + 'static + ?Sized,
466        S: StateMgr + std::marker::Send + 'static,
467    {
468        let mut ret = vec![];
469
470        runtime
471            .spawn(Self::keep_circmgr_params_updated(
472                dir_provider.events(),
473                Arc::downgrade(self),
474                Arc::downgrade(dir_provider),
475            ))
476            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
477
478        let (sched, handle) = TaskSchedule::new(runtime.clone());
479        ret.push(handle);
480
481        runtime
482            .spawn(Self::update_persistent_state(
483                sched,
484                Arc::downgrade(self),
485                state_mgr,
486            ))
487            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
488
489        let (sched, handle) = TaskSchedule::new(runtime.clone());
490        ret.push(handle);
491
492        runtime
493            .spawn(Self::continually_launch_timeout_testing_circuits(
494                sched,
495                Arc::downgrade(self),
496                Arc::downgrade(dir_provider),
497            ))
498            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
499
500        let (sched, handle) = TaskSchedule::new(runtime.clone());
501        ret.push(handle);
502
503        runtime
504            .spawn(Self::continually_preemptively_build_circuits(
505                sched,
506                Arc::downgrade(self),
507                Arc::downgrade(dir_provider),
508            ))
509            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
510
511        self.mgr
512            .peek_builder()
513            .guardmgr()
514            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
515
516        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
517        {
518            let () = self
519                .mgr
520                .peek_builder()
521                .vanguardmgr()
522                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
523        }
524
525        Ok(ret)
526    }
527
528    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
529    /// launching it if necessary.
530    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Circ>> {
531        self.expire_circuits();
532        let usage = TargetCircUsage::Dir;
533        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
534    }
535
536    /// Return a circuit suitable for exiting to all of the provided
537    /// `ports`, launching it if necessary.
538    ///
539    /// If the list of ports is empty, then the chosen circuit will
540    /// still end at _some_ exit.
541    pub(crate) async fn get_or_launch_exit(
542        &self,
543        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
544        ports: &[TargetPort],
545        isolation: StreamIsolation,
546        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
547        //             additive. The function should be refactored to be builder-like.
548        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
549    ) -> Result<Arc<B::Circ>> {
550        self.expire_circuits();
551        let time = Instant::now();
552        {
553            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
554            if ports.is_empty() {
555                predictive.note_usage(None, time);
556            } else {
557                for port in ports.iter() {
558                    predictive.note_usage(Some(*port), time);
559                }
560            }
561        }
562        let require_stability = ports.iter().any(|p| {
563            self.mgr
564                .peek_builder()
565                .path_config()
566                .long_lived_ports
567                .contains(&p.port)
568        });
569        let ports = ports.iter().map(Clone::clone).collect();
570        #[cfg(not(feature = "geoip"))]
571        let country_code = None;
572        let usage = TargetCircUsage::Exit {
573            ports,
574            isolation,
575            country_code,
576            require_stability,
577        };
578        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
579    }
580
581    /// Return a circuit to a specific relay, suitable for using for direct
582    /// (one-hop) directory downloads.
583    ///
584    /// This could be used, for example, to download a descriptor for a bridge.
585    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
586    #[cfg(feature = "specific-relay")]
587    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
588        &self,
589        target: T,
590    ) -> Result<Arc<B::Circ>> {
591        self.expire_circuits();
592        let usage = TargetCircUsage::DirSpecificTarget(target.to_owned());
593        self.mgr
594            .get_or_launch(&usage, DirInfo::Nothing)
595            .await
596            .map(|(c, _)| c)
597    }
598
599    /// Try to change our configuration settings to `new_config`.
600    ///
601    /// The actual behavior here will depend on the value of `how`.
602    ///
603    /// Returns whether any of the circuit pools should be cleared.
604    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
605        &self,
606        new_config: &CFG,
607        how: tor_config::Reconfigure,
608    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
609        let old_path_rules = self.mgr.peek_builder().path_config();
610        let predictor = self.predictor.lock().expect("poisoned lock");
611        let preemptive_circuits = predictor.config();
612        if preemptive_circuits.initial_predicted_ports
613            != new_config.preemptive_circuits().initial_predicted_ports
614        {
615            // This change has no effect, since the list of ports was _initial_.
616            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
617        }
618
619        if how == tor_config::Reconfigure::CheckAllOrNothing {
620            return Ok(RetireCircuits::None);
621        }
622
623        let retire_because_of_guardmgr =
624            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
625
626        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
627        let retire_because_of_vanguardmgr = self
628            .mgr
629            .peek_builder()
630            .vanguardmgr()
631            .reconfigure(new_config.vanguard_config())?;
632
633        let new_reachable = &new_config.path_rules().reachable_addrs;
634        if new_reachable != &old_path_rules.reachable_addrs {
635            let filter = new_config.path_rules().build_guard_filter();
636            self.mgr.peek_builder().guardmgr().set_filter(filter);
637        }
638
639        let discard_all_circuits = !new_config
640            .path_rules()
641            .at_least_as_permissive_as(&old_path_rules)
642            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
643
644        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
645        let discard_all_circuits = discard_all_circuits
646            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
647
648        self.mgr
649            .peek_builder()
650            .set_path_config(new_config.path_rules().clone());
651        self.mgr
652            .set_circuit_timing(new_config.circuit_timing().clone());
653        predictor.set_config(new_config.preemptive_circuits().clone());
654
655        if discard_all_circuits {
656            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
657            // retire those circuits that do not conform to the new path rules,
658            // or do not conform to the new guard configuration.
659            info!("Path configuration has become more restrictive: retiring existing circuits.");
660            self.retire_all_circuits();
661            return Ok(RetireCircuits::All);
662        }
663        Ok(RetireCircuits::None)
664    }
665
666    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
667    /// `circmgr` with the consensus parameters from `dirmgr`.
668    ///
669    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
670    /// dangling.
671    ///
672    /// This is a daemon task: it runs indefinitely in the background.
673    async fn keep_circmgr_params_updated<D>(
674        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
675        circmgr: Weak<Self>,
676        dirmgr: Weak<D>,
677    ) where
678        D: NetDirProvider + 'static + ?Sized,
679    {
680        use DirEvent::*;
681        while let Some(event) = events.next().await {
682            if matches!(event, NewConsensus) {
683                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
684                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
685                        cm.update_network_parameters(netdir.params());
686                    }
687                } else {
688                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
689                    break;
690                }
691            }
692        }
693    }
694
695    /// Reconfigure this circuit manager using the latest set of
696    /// network parameters.
697    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
698        self.mgr.update_network_parameters(p);
699        self.mgr.peek_builder().update_network_parameters(p);
700    }
701
702    /// Run indefinitely, launching circuits as needed to get a good
703    /// estimate for our circuit build timeouts.
704    ///
705    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
706    ///
707    /// This is a daemon task: it runs indefinitely in the background.
708    async fn continually_launch_timeout_testing_circuits<D>(
709        mut sched: TaskSchedule<R>,
710        circmgr: Weak<Self>,
711        dirmgr: Weak<D>,
712    ) where
713        D: NetDirProvider + 'static + ?Sized,
714    {
715        while sched.next().await.is_some() {
716            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
717                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
718                    if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
719                        warn_report!(e, "Problem launching a timeout testing circuit");
720                    }
721                    let delay = netdir
722                        .params()
723                        .cbt_testing_delay
724                        .try_into()
725                        .expect("Out-of-bounds value from BoundedInt32");
726
727                    drop((cm, dm));
728                    sched.fire_in(delay);
729                } else {
730                    // wait for the provider to announce some event, which will probably be
731                    // NewConsensus; this is therefore a decent yardstick for rechecking
732                    let _ = dm.events().next().await;
733                    sched.fire();
734                }
735            } else {
736                return;
737            }
738        }
739    }
740
741    /// If we need to launch a testing circuit to judge our circuit
742    /// build timeouts timeouts, do so.
743    ///
744    /// # Note
745    ///
746    /// This function is invoked periodically from
747    /// `continually_launch_timeout_testing_circuits`.
748    fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
749        if !self.mgr.peek_builder().learning_timeouts() {
750            return Ok(());
751        }
752        // We expire any too-old circuits here, so they don't get
753        // counted towards max_circs.
754        self.expire_circuits();
755        let max_circs: u64 = netdir
756            .params()
757            .cbt_max_open_circuits_for_testing
758            .try_into()
759            .expect("Out-of-bounds result from BoundedInt32");
760        if (self.mgr.n_circs() as u64) < max_circs {
761            // Actually launch the circuit!
762            let usage = TargetCircUsage::TimeoutTesting;
763            let dirinfo = netdir.into();
764            let mgr = Arc::clone(&self.mgr);
765            debug!("Launching a circuit to test build times.");
766            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
767            // We don't actually care when this circuit is done,
768            // so it's okay to drop the Receiver without awaiting it.
769            drop(receiver);
770        }
771
772        Ok(())
773    }
774
775    /// Run forever, periodically telling `circmgr` to update its persistent
776    /// state.
777    ///
778    /// Exit when we notice that `circmgr` has been dropped.
779    ///
780    /// This is a daemon task: it runs indefinitely in the background.
781    async fn update_persistent_state<S>(
782        mut sched: TaskSchedule<R>,
783        circmgr: Weak<Self>,
784        statemgr: S,
785    ) where
786        S: StateMgr + std::marker::Send,
787    {
788        while sched.next().await.is_some() {
789            if let Some(circmgr) = Weak::upgrade(&circmgr) {
790                use tor_persist::LockStatus::*;
791
792                match statemgr.try_lock() {
793                    Err(e) => {
794                        error_report!(e, "Problem with state lock file");
795                        break;
796                    }
797                    Ok(NewlyAcquired) => {
798                        info!("We now own the lock on our state files.");
799                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
800                            error_report!(e, "Unable to upgrade to owned state files");
801                            break;
802                        }
803                    }
804                    Ok(AlreadyHeld) => {
805                        if let Err(e) = circmgr.store_persistent_state() {
806                            error_report!(e, "Unable to flush circmgr state");
807                            break;
808                        }
809                    }
810                    Ok(NoLock) => {
811                        if let Err(e) = circmgr.reload_persistent_state() {
812                            error_report!(e, "Unable to reload circmgr state");
813                            break;
814                        }
815                    }
816                }
817            } else {
818                debug!("Circmgr has disappeared; task exiting.");
819                return;
820            }
821            // TODO(nickm): This delay is probably too small.
822            //
823            // Also, we probably don't even want a fixed delay here.  Instead,
824            // we should be updating more frequently when the data is volatile
825            // or has important info to save, and not at all when there are no
826            // changes.
827            sched.fire_in(Duration::from_secs(60));
828        }
829
830        debug!("State update task exiting (potentially due to handle drop).");
831    }
832
833    /// Switch from having an unowned persistent state to having an owned one.
834    ///
835    /// Requires that we hold the lock on the state files.
836    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
837        self.mgr.peek_builder().upgrade_to_owned_state()?;
838        Ok(())
839    }
840
841    /// Reload state from the state manager.
842    ///
843    /// We only call this method if we _don't_ have the lock on the state
844    /// files.  If we have the lock, we only want to save.
845    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
846        self.mgr.peek_builder().reload_state()?;
847        Ok(())
848    }
849
850    /// Run indefinitely, launching circuits where the preemptive circuit
851    /// predictor thinks it'd be a good idea to have them.
852    ///
853    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
854    ///
855    /// This is a daemon task: it runs indefinitely in the background.
856    ///
857    /// # Note
858    ///
859    /// This would be better handled entirely within `tor-circmgr`, like
860    /// other daemon tasks.
861    async fn continually_preemptively_build_circuits<D>(
862        mut sched: TaskSchedule<R>,
863        circmgr: Weak<Self>,
864        dirmgr: Weak<D>,
865    ) where
866        D: NetDirProvider + 'static + ?Sized,
867    {
868        let base_delay = Duration::from_secs(10);
869        let mut retry = RetryDelay::from_duration(base_delay);
870
871        while sched.next().await.is_some() {
872            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
873                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
874                    let result = cm
875                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
876                        .await;
877
878                    let delay = match result {
879                        Ok(()) => {
880                            retry.reset();
881                            base_delay
882                        }
883                        Err(_) => retry.next_delay(&mut rand::rng()),
884                    };
885
886                    sched.fire_in(delay);
887                } else {
888                    // wait for the provider to announce some event, which will probably be
889                    // NewConsensus; this is therefore a decent yardstick for rechecking
890                    let _ = dm.events().next().await;
891                    sched.fire();
892                }
893            } else {
894                return;
895            }
896        }
897    }
898
899    /// Launch circuits preemptively, using the preemptive circuit predictor's
900    /// predictions.
901    ///
902    /// # Note
903    ///
904    /// This function is invoked periodically from
905    /// `continually_preemptively_build_circuits()`.
906    async fn launch_circuits_preemptively(
907        &self,
908        netdir: DirInfo<'_>,
909    ) -> std::result::Result<(), err::PreemptiveCircError> {
910        trace!("Checking preemptive circuit predictions.");
911        let (circs, threshold) = {
912            let path_config = self.mgr.peek_builder().path_config();
913            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
914            let threshold = preemptive.config().disable_at_threshold;
915            (preemptive.predict(&path_config), threshold)
916        };
917
918        if self.mgr.n_circs() >= threshold {
919            return Ok(());
920        }
921        let mut n_created = 0_usize;
922        let mut n_errors = 0_usize;
923
924        let futures = circs
925            .iter()
926            .map(|usage| self.mgr.get_or_launch(usage, netdir));
927        let results = futures::future::join_all(futures).await;
928        for (i, result) in results.into_iter().enumerate() {
929            match result {
930                Ok((_, CircProvenance::NewlyCreated)) => {
931                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
932                    n_created += 1;
933                }
934                Ok((_, CircProvenance::Preexisting)) => {
935                    trace!("Circuit already existed created for {:?}", circs[i]);
936                }
937                Err(e) => {
938                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
939                    n_errors += 1;
940                }
941            }
942        }
943
944        if n_created > 0 || n_errors == 0 {
945            // Either we successfully made a circuit, or we didn't have any
946            // failures while looking for preexisting circuits.  Progress was
947            // made, so there's no need to back off.
948            Ok(())
949        } else {
950            // We didn't build any circuits and we hit at least one error:
951            // We'll call this unsuccessful.
952            Err(err::PreemptiveCircError)
953        }
954    }
955
956    /// Create and return a new (typically anonymous) onion circuit stem
957    /// of type `stem_kind`.
958    ///
959    /// If `circ_kind` is provided, we apply additional rules to make sure
960    /// that this will be usable as a stem for the given kind of onion service circuit.
961    /// Otherwise, we pick a stem that will probably be useful in general.
962    ///
963    /// This circuit is guaranteed not to have been used for any traffic
964    /// previously, and it will not be given out for any other requests in the
965    /// future unless explicitly re-registered with a circuit manager.
966    ///
967    /// If `planned_target` is provided, then the circuit will be built so that
968    /// it does not share any family members with the provided target.  (The
969    /// circuit _will not be_ extended to that target itself!)
970    ///
971    /// Used to implement onion service clients and services.
972    #[cfg(feature = "hs-common")]
973    pub(crate) async fn launch_hs_unmanaged<T>(
974        &self,
975        planned_target: Option<T>,
976        dir: &NetDir,
977        stem_kind: HsCircStemKind,
978        circ_kind: Option<HsCircKind>,
979    ) -> Result<Arc<B::Circ>>
980    where
981        T: IntoOwnedChanTarget,
982    {
983        let usage = TargetCircUsage::HsCircBase {
984            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
985            stem_kind,
986            circ_kind,
987        };
988        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
989        Ok(client_circ)
990    }
991
992    /// Return true if `netdir` has enough information to be used for this
993    /// circuit manager.
994    ///
995    /// (This will check whether the netdir is missing any primary guard
996    /// microdescriptors)
997    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
998        self.mgr
999            .peek_builder()
1000            .guardmgr()
1001            .netdir_is_sufficient(netdir)
1002    }
1003
1004    /// Internal implementation for [`CircMgr::estimate_timeout`].
1005    pub(crate) fn estimate_timeout(
1006        &self,
1007        timeout_action: &timeouts::Action,
1008    ) -> std::time::Duration {
1009        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1010        timeout
1011    }
1012
1013    /// Internal implementation for [`CircMgr::builder`].
1014    pub(crate) fn builder(&self) -> &B {
1015        self.mgr.peek_builder()
1016    }
1017
1018    /// Flush state to the state manager, if there is any unsaved state and
1019    /// we have the lock.
1020    ///
1021    /// Return true if we saved something; false if we didn't have the lock.
1022    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1023        self.mgr.peek_builder().save_state()
1024    }
1025
1026    /// Expire every circuit that has been dirty for too long.
1027    ///
1028    /// Expired circuits are not closed while they still have users,
1029    /// but they are no longer given out for new requests.
1030    fn expire_circuits(&self) {
1031        // TODO: I would prefer not to call this at every request, but
1032        // it should be fine for now.  (At some point we may no longer
1033        // need this, or might not need to call it so often, now that
1034        // our circuit expiration runs on scheduled timers via
1035        // spawn_expiration_task.)
1036        let now = self.mgr.peek_runtime().now();
1037        self.mgr.expire_circs(now);
1038    }
1039
1040    /// Mark every circuit that we have launched so far as unsuitable for
1041    /// any future requests.  This won't close existing circuits that have
1042    /// streams attached to them, but it will prevent any future streams from
1043    /// being attached.
1044    ///
1045    /// TODO: we may want to expose this eventually.  If we do, we should
1046    /// be very clear that you don't want to use it haphazardly.
1047    pub(crate) fn retire_all_circuits(&self) {
1048        self.mgr.retire_all_circuits();
1049    }
1050
1051    /// If `circ_id` is the unique identifier for a circuit that we're
1052    /// keeping track of, don't give it out for any future requests.
1053    pub(crate) fn retire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id) {
1054        let _ = self.mgr.take_circ(circ_id);
1055    }
1056
1057    /// Return a stream of events about our estimated clock skew; these events
1058    /// are `None` when we don't have enough information to make an estimate,
1059    /// and `Some(`[`SkewEstimate`]`)` otherwise.
1060    ///
1061    /// Note that this stream can be lossy: if the estimate changes more than
1062    /// one before you read from the stream, you might only get the most recent
1063    /// update.
1064    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1065        self.mgr.peek_builder().guardmgr().skew_events()
1066    }
1067
1068    /// Record that a failure occurred on a circuit with a given guard, in a way
1069    /// that makes us unwilling to use that guard for future circuits.
1070    ///
1071    pub(crate) fn note_external_failure(
1072        &self,
1073        target: &impl ChanTarget,
1074        external_failure: ExternalActivity,
1075    ) {
1076        self.mgr
1077            .peek_builder()
1078            .guardmgr()
1079            .note_external_failure(target, external_failure);
1080    }
1081
1082    /// Record that a success occurred on a circuit with a given guard, in a way
1083    /// that makes us possibly willing to use that guard for future circuits.
1084    pub(crate) fn note_external_success(
1085        &self,
1086        target: &impl ChanTarget,
1087        external_activity: ExternalActivity,
1088    ) {
1089        self.mgr
1090            .peek_builder()
1091            .guardmgr()
1092            .note_external_success(target, external_activity);
1093    }
1094}
1095
1096impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1097    fn drop(&mut self) {
1098        match self.store_persistent_state() {
1099            Ok(true) => info!("Flushed persistent state at exit."),
1100            Ok(false) => debug!("Lock not held; no state to flush."),
1101            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1102        }
1103    }
1104}
1105
1106#[cfg(test)]
1107mod test {
1108    // @@ begin test lint list maintained by maint/add_warning @@
1109    #![allow(clippy::bool_assert_comparison)]
1110    #![allow(clippy::clone_on_copy)]
1111    #![allow(clippy::dbg_macro)]
1112    #![allow(clippy::mixed_attributes_style)]
1113    #![allow(clippy::print_stderr)]
1114    #![allow(clippy::print_stdout)]
1115    #![allow(clippy::single_char_pattern)]
1116    #![allow(clippy::unwrap_used)]
1117    #![allow(clippy::unchecked_duration_subtraction)]
1118    #![allow(clippy::useless_vec)]
1119    #![allow(clippy::needless_pass_by_value)]
1120    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1121    use mocks::FakeBuilder;
1122    use tor_guardmgr::GuardMgr;
1123    use tor_linkspec::OwnedChanTarget;
1124    use tor_netdir::testprovider::TestNetDirProvider;
1125    use tor_persist::TestingStateMgr;
1126
1127    use super::*;
1128
1129    #[test]
1130    fn get_params() {
1131        use tor_netdir::{MdReceiver, PartialNetDir};
1132        use tor_netdoc::doc::netstatus::NetParams;
1133        // If it's just fallbackdir, we get the default parameters.
1134        let fb = FallbackList::from([]);
1135        let di: DirInfo<'_> = (&fb).into();
1136
1137        let p1 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1138        assert!(!p1.extend_by_ed25519_id);
1139
1140        // Now try with a directory and configured parameters.
1141        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1142        let mut params = NetParams::default();
1143        params.set("circwindow".into(), 100);
1144        params.set("ExtendByEd25519ID".into(), 1);
1145        let mut dir = PartialNetDir::new(consensus, Some(&params));
1146        for m in microdescs {
1147            dir.add_microdesc(m);
1148        }
1149        let netdir = dir.unwrap_if_sufficient().unwrap();
1150        let di: DirInfo<'_> = (&netdir).into();
1151        let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1152        assert!(p2.extend_by_ed25519_id);
1153
1154        // Now try with a bogus circwindow value.
1155        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1156        let mut params = NetParams::default();
1157        params.set("circwindow".into(), 100_000);
1158        params.set("ExtendByEd25519ID".into(), 1);
1159        let mut dir = PartialNetDir::new(consensus, Some(&params));
1160        for m in microdescs {
1161            dir.add_microdesc(m);
1162        }
1163        let netdir = dir.unwrap_if_sufficient().unwrap();
1164        let di: DirInfo<'_> = (&netdir).into();
1165        let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1166        assert!(p2.extend_by_ed25519_id);
1167    }
1168
1169    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1170        let config = crate::config::test_config::TestConfig::default();
1171        let statemgr = TestingStateMgr::new();
1172        let guardmgr =
1173            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1174        let builder = FakeBuilder::new(
1175            &runtime,
1176            statemgr.clone(),
1177            &tor_guardmgr::TestConfig::default(),
1178        );
1179        let circmgr = Arc::new(CircMgrInner::new_generic(
1180            &config, &runtime, &guardmgr, builder,
1181        ));
1182        let netdir = Arc::new(TestNetDirProvider::new());
1183        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1184            .expect("launch CircMgrInner background tasks");
1185        circmgr
1186    }
1187
1188    #[test]
1189    #[cfg(feature = "hs-common")]
1190    fn test_launch_hs_unmanaged() {
1191        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1192            let circmgr = make_circmgr(runtime.clone());
1193            let netdir = tor_netdir::testnet::construct_netdir()
1194                .unwrap_if_sufficient()
1195                .unwrap();
1196
1197            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1198            runtime.spawn_identified("launch_hs_unamanged", async move {
1199                ret_tx
1200                    .send(
1201                        circmgr
1202                            .launch_hs_unmanaged::<OwnedChanTarget>(
1203                                None,
1204                                &netdir,
1205                                HsCircStemKind::Naive,
1206                                None,
1207                            )
1208                            .await,
1209                    )
1210                    .unwrap();
1211            });
1212            runtime.advance_by(Duration::from_millis(60)).await;
1213            ret_rx.await.unwrap().unwrap();
1214        });
1215    }
1216}