tor_circmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46
47// TODO #1645 (either remove this, or decide to have it everywhere)
48#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
49
50use build::CircuitBuilder;
51use mgr::{AbstractCirc, AbstractCircBuilder};
52use tor_basic_utils::retry::RetryDelay;
53use tor_chanmgr::ChanMgr;
54use tor_error::{error_report, warn_report};
55use tor_guardmgr::RetireCircuits;
56use tor_linkspec::ChanTarget;
57use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
58use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
59use tor_rtcompat::Runtime;
60
61#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
62use tor_linkspec::IntoOwnedChanTarget;
63
64use futures::task::SpawnExt;
65use futures::StreamExt;
66use std::sync::{Arc, Mutex, Weak};
67use std::time::{Duration, Instant};
68use tracing::{debug, info, trace, warn};
69
70#[cfg(feature = "testing")]
71pub use config::test_config::TestConfig;
72
73pub mod build;
74mod config;
75mod err;
76#[cfg(feature = "hs-common")]
77pub mod hspool;
78mod impls;
79pub mod isolation;
80mod mgr;
81#[cfg(test)]
82mod mocks;
83mod preemptive;
84pub mod timeouts;
85mod usage;
86
87// Can't apply `visibility` to modules.
88cfg_if::cfg_if! {
89    if #[cfg(feature = "experimental-api")] {
90        pub mod path;
91    } else {
92        pub(crate) mod path;
93    }
94}
95
96pub use err::Error;
97pub use isolation::IsolationToken;
98use tor_guardmgr::fallback::FallbackList;
99pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
100pub use usage::{TargetPort, TargetPorts};
101
102pub use config::{
103    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
104    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
105};
106
107use crate::isolation::StreamIsolation;
108use crate::mgr::CircProvenance;
109use crate::preemptive::PreemptiveCircuitPredictor;
110use usage::TargetCircUsage;
111
112use safelog::sensitive as sv;
113#[cfg(feature = "geoip")]
114use tor_geoip::CountryCode;
115pub use tor_guardmgr::{ExternalActivity, FirstHopId};
116use tor_persist::StateMgr;
117use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
118
119#[cfg(feature = "hs-common")]
120use crate::hspool::{HsCircKind, HsCircStemKind};
121#[cfg(all(feature = "vanguards", feature = "hs-common"))]
122use tor_guardmgr::vanguards::VanguardMgr;
123
124/// A Result type as returned from this crate.
125pub type Result<T> = std::result::Result<T, Error>;
126
127/// Type alias for dynamic StorageHandle that can handle our timeout state.
128type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
129
130/// Key used to load timeout state information.
131const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
132
133/// Represents what we know about the Tor network.
134///
135/// This can either be a complete directory, or a list of fallbacks.
136///
137/// Not every DirInfo can be used to build every kind of circuit:
138/// if you try to build a path with an inadequate DirInfo, you'll get a
139/// NeedConsensus error.
140#[derive(Debug, Copy, Clone)]
141#[non_exhaustive]
142pub enum DirInfo<'a> {
143    /// A list of fallbacks, for use when we don't know a network directory.
144    Fallbacks(&'a FallbackList),
145    /// A complete network directory
146    Directory(&'a NetDir),
147    /// No information: we can only build one-hop paths: and that, only if the
148    /// guard manager knows some guards or fallbacks.
149    Nothing,
150}
151
152impl<'a> From<&'a FallbackList> for DirInfo<'a> {
153    fn from(v: &'a FallbackList) -> DirInfo<'a> {
154        DirInfo::Fallbacks(v)
155    }
156}
157impl<'a> From<&'a NetDir> for DirInfo<'a> {
158    fn from(v: &'a NetDir) -> DirInfo<'a> {
159        DirInfo::Directory(v)
160    }
161}
162impl<'a> DirInfo<'a> {
163    /// Return a set of circuit parameters for this DirInfo.
164    fn circ_params(&self, usage: &TargetCircUsage) -> Result<CircParameters> {
165        use tor_netdir::params::NetParameters;
166        // We use a common function for both cases here to be sure that
167        // we look at the defaults from NetParameters code.
168        let defaults = NetParameters::default();
169        let net_params = match self {
170            DirInfo::Directory(d) => d.params(),
171            _ => &defaults,
172        };
173        match usage {
174            #[cfg(feature = "hs-common")]
175            TargetCircUsage::HsCircBase { .. } => {
176                build::onion_circparams_from_netparams(net_params)
177            }
178            _ => build::exit_circparams_from_netparams(net_params),
179        }
180    }
181}
182
183/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
184/// when they're suitable, and launching them if they don't already exist.
185///
186/// Right now, its notion of "suitable" is quite rudimentary: it just
187/// believes in two kinds of circuits: Exit circuits, and directory
188/// circuits.  Exit circuits are ones that were created to connect to
189/// a set of ports; directory circuits were made to talk to directory caches.
190///
191/// This is a "handle"; clones of it share state.
192pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::CircuitBuilder<R>, R>>);
193
194impl<R: Runtime> CircMgr<R> {
195    /// Construct a new circuit manager.
196    ///
197    /// # Usage note
198    ///
199    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
200    pub fn new<SM, CFG: CircMgrConfig>(
201        config: &CFG,
202        storage: SM,
203        runtime: &R,
204        chanmgr: Arc<ChanMgr<R>>,
205        guardmgr: &tor_guardmgr::GuardMgr<R>,
206    ) -> Result<Self>
207    where
208        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
209    {
210        Ok(Self(Arc::new(CircMgrInner::new(
211            config, storage, runtime, chanmgr, guardmgr,
212        )?)))
213    }
214
215    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
216    /// launching it if necessary.
217    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
218        self.0.get_or_launch_dir(netdir).await
219    }
220
221    /// Return a circuit suitable for exiting to all of the provided
222    /// `ports`, launching it if necessary.
223    ///
224    /// If the list of ports is empty, then the chosen circuit will
225    /// still end at _some_ exit.
226    pub async fn get_or_launch_exit(
227        &self,
228        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
229        ports: &[TargetPort],
230        isolation: StreamIsolation,
231        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
232        //             additive. The function should be refactored to be builder-like.
233        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
234    ) -> Result<Arc<ClientCirc>> {
235        self.0
236            .get_or_launch_exit(
237                netdir,
238                ports,
239                isolation,
240                #[cfg(feature = "geoip")]
241                country_code,
242            )
243            .await
244    }
245
246    /// Return a circuit to a specific relay, suitable for using for direct
247    /// (one-hop) directory downloads.
248    ///
249    /// This could be used, for example, to download a descriptor for a bridge.
250    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
251    #[cfg(feature = "specific-relay")]
252    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
253        &self,
254        target: T,
255    ) -> Result<Arc<ClientCirc>> {
256        self.0.get_or_launch_dir_specific(target).await
257    }
258
259    /// Launch the periodic daemon tasks required by the manager to function properly.
260    ///
261    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
262    //
263    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
264    pub fn launch_background_tasks<D, S>(
265        self: &Arc<Self>,
266        runtime: &R,
267        dir_provider: &Arc<D>,
268        state_mgr: S,
269    ) -> Result<Vec<TaskHandle>>
270    where
271        D: NetDirProvider + 'static + ?Sized,
272        S: StateMgr + std::marker::Send + 'static,
273    {
274        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
275    }
276
277    /// Return true if `netdir` has enough information to be used for this
278    /// circuit manager.
279    ///
280    /// (This will check whether the netdir is missing any primary guard
281    /// microdescriptors)
282    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
283        self.0.netdir_is_sufficient(netdir)
284    }
285
286    /// If `circ_id` is the unique identifier for a circuit that we're
287    /// keeping track of, don't give it out for any future requests.
288    pub fn retire_circ(&self, circ_id: &UniqId) {
289        self.0.retire_circ(circ_id);
290    }
291
292    /// Record that a failure occurred on a circuit with a given guard, in a way
293    /// that makes us unwilling to use that guard for future circuits.
294    ///
295    pub fn note_external_failure(
296        &self,
297        target: &impl ChanTarget,
298        external_failure: ExternalActivity,
299    ) {
300        self.0.note_external_failure(target, external_failure);
301    }
302
303    /// Record that a success occurred on a circuit with a given guard, in a way
304    /// that makes us possibly willing to use that guard for future circuits.
305    pub fn note_external_success(
306        &self,
307        target: &impl ChanTarget,
308        external_activity: ExternalActivity,
309    ) {
310        self.0.note_external_success(target, external_activity);
311    }
312
313    /// Return a stream of events about our estimated clock skew; these events
314    /// are `None` when we don't have enough information to make an estimate,
315    /// and `Some(`[`SkewEstimate`]`)` otherwise.
316    ///
317    /// Note that this stream can be lossy: if the estimate changes more than
318    /// one before you read from the stream, you might only get the most recent
319    /// update.
320    pub fn skew_events(&self) -> ClockSkewEvents {
321        self.0.skew_events()
322    }
323
324    /// Try to change our configuration settings to `new_config`.
325    ///
326    /// The actual behavior here will depend on the value of `how`.
327    ///
328    /// Returns whether any of the circuit pools should be cleared.
329    pub fn reconfigure<CFG: CircMgrConfig>(
330        &self,
331        new_config: &CFG,
332        how: tor_config::Reconfigure,
333    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
334        self.0.reconfigure(new_config, how)
335    }
336
337    /// Return an estimate-based delay for how long a given
338    /// [`Action`](timeouts::Action) should be allowed to complete.
339    ///
340    /// Note that **you do not need to use this function** in order to get
341    /// reasonable timeouts for the circuit-building operations provided by the
342    /// `tor-circmgr` crate: those, unless specifically noted, always use these
343    /// timeouts to cancel circuit operations that have taken too long.
344    ///
345    /// Instead, you should only use this function when you need to estimate how
346    /// long some _other_ operation should take to complete.  For example, if
347    /// you are sending a request over a 3-hop circuit and waiting for a reply,
348    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
349    /// length: 3 })`.
350    ///
351    /// Note also that this function returns a _timeout_ that the operation
352    /// should be permitted to complete, not an estimated Duration that the
353    /// operation _will_ take to complete. Timeouts are chosen to ensure that
354    /// most operations will complete, but very slow ones will not.  So even if
355    /// we expect that a circuit will complete in (say) 3 seconds, we might
356    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
357    /// complete.
358    ///
359    /// Estimate-based timeouts may change over time, given observations on the
360    /// actual amount of time needed for circuits to complete building.  If not
361    /// enough information has been gathered, a reasonable default will be used.
362    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
363        self.0.estimate_timeout(timeout_action)
364    }
365
366    /// Return a reference to the associated CircuitBuilder that this CircMgr
367    /// will use to create its circuits.
368    #[cfg(feature = "experimental-api")]
369    pub fn builder(&self) -> &CircuitBuilder<R> {
370        CircMgrInner::builder(&self.0)
371    }
372}
373
374/// Internal object used to implement CircMgr, which allows for mocking.
375#[derive(Clone)]
376pub(crate) struct CircMgrInner<B: AbstractCircBuilder<R> + 'static, R: Runtime> {
377    /// The underlying circuit manager object that implements our behavior.
378    mgr: Arc<mgr::AbstractCircMgr<B, R>>,
379    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
380    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
381}
382
383impl<R: Runtime> CircMgrInner<CircuitBuilder<R>, R> {
384    /// Construct a new circuit manager.
385    ///
386    /// # Usage note
387    ///
388    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
389    #[allow(clippy::unnecessary_wraps)]
390    pub(crate) fn new<SM, CFG: CircMgrConfig>(
391        config: &CFG,
392        storage: SM,
393        runtime: &R,
394        chanmgr: Arc<ChanMgr<R>>,
395        guardmgr: &tor_guardmgr::GuardMgr<R>,
396    ) -> Result<Self>
397    where
398        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
399    {
400        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
401        let vanguardmgr = {
402            // TODO(#1382): we need a way of checking if this arti instance
403            // is running an onion service or not.
404            //
405            // Perhaps this information should be provided by CircMgrConfig.
406            let has_onion_svc = false;
407            VanguardMgr::new(
408                config.vanguard_config(),
409                runtime.clone(),
410                storage.clone(),
411                has_onion_svc,
412            )?
413        };
414
415        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
416
417        let builder = build::CircuitBuilder::new(
418            runtime.clone(),
419            chanmgr,
420            config.path_rules().clone(),
421            storage_handle,
422            guardmgr.clone(),
423            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
424            vanguardmgr,
425        );
426
427        Ok(Self::new_generic(config, runtime, guardmgr, builder))
428    }
429}
430
431impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
432    /// Generic implementation for [`CircMgrInner::new`]
433    pub(crate) fn new_generic<CFG: CircMgrConfig>(
434        config: &CFG,
435        runtime: &R,
436        guardmgr: &tor_guardmgr::GuardMgr<R>,
437        builder: B,
438    ) -> Self {
439        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
440            config.preemptive_circuits().clone(),
441        )));
442
443        guardmgr.set_filter(config.path_rules().build_guard_filter());
444
445        let mgr =
446            mgr::AbstractCircMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
447
448        CircMgrInner {
449            mgr: Arc::new(mgr),
450            predictor: preemptive,
451        }
452    }
453
454    /// Launch the periodic daemon tasks required by the manager to function properly.
455    ///
456    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
457    //
458    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
459    pub(crate) fn launch_background_tasks<D, S>(
460        self: &Arc<Self>,
461        runtime: &R,
462        dir_provider: &Arc<D>,
463        state_mgr: S,
464    ) -> Result<Vec<TaskHandle>>
465    where
466        D: NetDirProvider + 'static + ?Sized,
467        S: StateMgr + std::marker::Send + 'static,
468    {
469        let mut ret = vec![];
470
471        runtime
472            .spawn(Self::keep_circmgr_params_updated(
473                dir_provider.events(),
474                Arc::downgrade(self),
475                Arc::downgrade(dir_provider),
476            ))
477            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
478
479        let (sched, handle) = TaskSchedule::new(runtime.clone());
480        ret.push(handle);
481
482        runtime
483            .spawn(Self::update_persistent_state(
484                sched,
485                Arc::downgrade(self),
486                state_mgr,
487            ))
488            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
489
490        let (sched, handle) = TaskSchedule::new(runtime.clone());
491        ret.push(handle);
492
493        runtime
494            .spawn(Self::continually_launch_timeout_testing_circuits(
495                sched,
496                Arc::downgrade(self),
497                Arc::downgrade(dir_provider),
498            ))
499            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
500
501        let (sched, handle) = TaskSchedule::new(runtime.clone());
502        ret.push(handle);
503
504        runtime
505            .spawn(Self::continually_preemptively_build_circuits(
506                sched,
507                Arc::downgrade(self),
508                Arc::downgrade(dir_provider),
509            ))
510            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
511
512        self.mgr
513            .peek_builder()
514            .guardmgr()
515            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
516
517        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
518        {
519            let () = self
520                .mgr
521                .peek_builder()
522                .vanguardmgr()
523                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
524        }
525
526        Ok(ret)
527    }
528
529    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
530    /// launching it if necessary.
531    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Circ>> {
532        self.expire_circuits();
533        let usage = TargetCircUsage::Dir;
534        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
535    }
536
537    /// Return a circuit suitable for exiting to all of the provided
538    /// `ports`, launching it if necessary.
539    ///
540    /// If the list of ports is empty, then the chosen circuit will
541    /// still end at _some_ exit.
542    pub(crate) async fn get_or_launch_exit(
543        &self,
544        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
545        ports: &[TargetPort],
546        isolation: StreamIsolation,
547        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
548        //             additive. The function should be refactored to be builder-like.
549        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
550    ) -> Result<Arc<B::Circ>> {
551        self.expire_circuits();
552        let time = Instant::now();
553        {
554            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
555            if ports.is_empty() {
556                predictive.note_usage(None, time);
557            } else {
558                for port in ports.iter() {
559                    predictive.note_usage(Some(*port), time);
560                }
561            }
562        }
563        let require_stability = ports.iter().any(|p| {
564            self.mgr
565                .peek_builder()
566                .path_config()
567                .long_lived_ports
568                .contains(&p.port)
569        });
570        let ports = ports.iter().map(Clone::clone).collect();
571        #[cfg(not(feature = "geoip"))]
572        let country_code = None;
573        let usage = TargetCircUsage::Exit {
574            ports,
575            isolation,
576            country_code,
577            require_stability,
578        };
579        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
580    }
581
582    /// Return a circuit to a specific relay, suitable for using for direct
583    /// (one-hop) directory downloads.
584    ///
585    /// This could be used, for example, to download a descriptor for a bridge.
586    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
587    #[cfg(feature = "specific-relay")]
588    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
589        &self,
590        target: T,
591    ) -> Result<Arc<B::Circ>> {
592        self.expire_circuits();
593        let usage = TargetCircUsage::DirSpecificTarget(target.to_owned());
594        self.mgr
595            .get_or_launch(&usage, DirInfo::Nothing)
596            .await
597            .map(|(c, _)| c)
598    }
599
600    /// Try to change our configuration settings to `new_config`.
601    ///
602    /// The actual behavior here will depend on the value of `how`.
603    ///
604    /// Returns whether any of the circuit pools should be cleared.
605    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
606        &self,
607        new_config: &CFG,
608        how: tor_config::Reconfigure,
609    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
610        let old_path_rules = self.mgr.peek_builder().path_config();
611        let predictor = self.predictor.lock().expect("poisoned lock");
612        let preemptive_circuits = predictor.config();
613        if preemptive_circuits.initial_predicted_ports
614            != new_config.preemptive_circuits().initial_predicted_ports
615        {
616            // This change has no effect, since the list of ports was _initial_.
617            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
618        }
619
620        if how == tor_config::Reconfigure::CheckAllOrNothing {
621            return Ok(RetireCircuits::None);
622        }
623
624        let retire_because_of_guardmgr =
625            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
626
627        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
628        let retire_because_of_vanguardmgr = self
629            .mgr
630            .peek_builder()
631            .vanguardmgr()
632            .reconfigure(new_config.vanguard_config())?;
633
634        let new_reachable = &new_config.path_rules().reachable_addrs;
635        if new_reachable != &old_path_rules.reachable_addrs {
636            let filter = new_config.path_rules().build_guard_filter();
637            self.mgr.peek_builder().guardmgr().set_filter(filter);
638        }
639
640        let discard_all_circuits = !new_config
641            .path_rules()
642            .at_least_as_permissive_as(&old_path_rules)
643            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
644
645        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
646        let discard_all_circuits = discard_all_circuits
647            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
648
649        self.mgr
650            .peek_builder()
651            .set_path_config(new_config.path_rules().clone());
652        self.mgr
653            .set_circuit_timing(new_config.circuit_timing().clone());
654        predictor.set_config(new_config.preemptive_circuits().clone());
655
656        if discard_all_circuits {
657            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
658            // retire those circuits that do not conform to the new path rules,
659            // or do not conform to the new guard configuration.
660            info!("Path configuration has become more restrictive: retiring existing circuits.");
661            self.retire_all_circuits();
662            return Ok(RetireCircuits::All);
663        }
664        Ok(RetireCircuits::None)
665    }
666
667    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
668    /// `circmgr` with the consensus parameters from `dirmgr`.
669    ///
670    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
671    /// dangling.
672    ///
673    /// This is a daemon task: it runs indefinitely in the background.
674    async fn keep_circmgr_params_updated<D>(
675        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
676        circmgr: Weak<Self>,
677        dirmgr: Weak<D>,
678    ) where
679        D: NetDirProvider + 'static + ?Sized,
680    {
681        use DirEvent::*;
682        while let Some(event) = events.next().await {
683            if matches!(event, NewConsensus) {
684                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
685                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
686                        cm.update_network_parameters(netdir.params());
687                    }
688                } else {
689                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
690                    break;
691                }
692            }
693        }
694    }
695
696    /// Reconfigure this circuit manager using the latest set of
697    /// network parameters.
698    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
699        self.mgr.update_network_parameters(p);
700        self.mgr.peek_builder().update_network_parameters(p);
701    }
702
703    /// Run indefinitely, launching circuits as needed to get a good
704    /// estimate for our circuit build timeouts.
705    ///
706    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
707    ///
708    /// This is a daemon task: it runs indefinitely in the background.
709    async fn continually_launch_timeout_testing_circuits<D>(
710        mut sched: TaskSchedule<R>,
711        circmgr: Weak<Self>,
712        dirmgr: Weak<D>,
713    ) where
714        D: NetDirProvider + 'static + ?Sized,
715    {
716        while sched.next().await.is_some() {
717            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
718                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
719                    if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
720                        warn_report!(e, "Problem launching a timeout testing circuit");
721                    }
722                    let delay = netdir
723                        .params()
724                        .cbt_testing_delay
725                        .try_into()
726                        .expect("Out-of-bounds value from BoundedInt32");
727
728                    drop((cm, dm));
729                    sched.fire_in(delay);
730                } else {
731                    // wait for the provider to announce some event, which will probably be
732                    // NewConsensus; this is therefore a decent yardstick for rechecking
733                    let _ = dm.events().next().await;
734                    sched.fire();
735                }
736            } else {
737                return;
738            }
739        }
740    }
741
742    /// If we need to launch a testing circuit to judge our circuit
743    /// build timeouts timeouts, do so.
744    ///
745    /// # Note
746    ///
747    /// This function is invoked periodically from
748    /// `continually_launch_timeout_testing_circuits`.
749    fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
750        if !self.mgr.peek_builder().learning_timeouts() {
751            return Ok(());
752        }
753        // We expire any too-old circuits here, so they don't get
754        // counted towards max_circs.
755        self.expire_circuits();
756        let max_circs: u64 = netdir
757            .params()
758            .cbt_max_open_circuits_for_testing
759            .try_into()
760            .expect("Out-of-bounds result from BoundedInt32");
761        if (self.mgr.n_circs() as u64) < max_circs {
762            // Actually launch the circuit!
763            let usage = TargetCircUsage::TimeoutTesting;
764            let dirinfo = netdir.into();
765            let mgr = Arc::clone(&self.mgr);
766            debug!("Launching a circuit to test build times.");
767            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
768            // We don't actually care when this circuit is done,
769            // so it's okay to drop the Receiver without awaiting it.
770            drop(receiver);
771        }
772
773        Ok(())
774    }
775
776    /// Run forever, periodically telling `circmgr` to update its persistent
777    /// state.
778    ///
779    /// Exit when we notice that `circmgr` has been dropped.
780    ///
781    /// This is a daemon task: it runs indefinitely in the background.
782    #[allow(clippy::cognitive_complexity)] // because of tracing
783    async fn update_persistent_state<S>(
784        mut sched: TaskSchedule<R>,
785        circmgr: Weak<Self>,
786        statemgr: S,
787    ) where
788        S: StateMgr + std::marker::Send,
789    {
790        while sched.next().await.is_some() {
791            if let Some(circmgr) = Weak::upgrade(&circmgr) {
792                use tor_persist::LockStatus::*;
793
794                match statemgr.try_lock() {
795                    Err(e) => {
796                        error_report!(e, "Problem with state lock file");
797                        break;
798                    }
799                    Ok(NewlyAcquired) => {
800                        info!("We now own the lock on our state files.");
801                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
802                            error_report!(e, "Unable to upgrade to owned state files");
803                            break;
804                        }
805                    }
806                    Ok(AlreadyHeld) => {
807                        if let Err(e) = circmgr.store_persistent_state() {
808                            error_report!(e, "Unable to flush circmgr state");
809                            break;
810                        }
811                    }
812                    Ok(NoLock) => {
813                        if let Err(e) = circmgr.reload_persistent_state() {
814                            error_report!(e, "Unable to reload circmgr state");
815                            break;
816                        }
817                    }
818                }
819            } else {
820                debug!("Circmgr has disappeared; task exiting.");
821                return;
822            }
823            // TODO(nickm): This delay is probably too small.
824            //
825            // Also, we probably don't even want a fixed delay here.  Instead,
826            // we should be updating more frequently when the data is volatile
827            // or has important info to save, and not at all when there are no
828            // changes.
829            sched.fire_in(Duration::from_secs(60));
830        }
831
832        debug!("State update task exiting (potentially due to handle drop).");
833    }
834
835    /// Switch from having an unowned persistent state to having an owned one.
836    ///
837    /// Requires that we hold the lock on the state files.
838    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
839        self.mgr.peek_builder().upgrade_to_owned_state()?;
840        Ok(())
841    }
842
843    /// Reload state from the state manager.
844    ///
845    /// We only call this method if we _don't_ have the lock on the state
846    /// files.  If we have the lock, we only want to save.
847    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
848        self.mgr.peek_builder().reload_state()?;
849        Ok(())
850    }
851
852    /// Run indefinitely, launching circuits where the preemptive circuit
853    /// predictor thinks it'd be a good idea to have them.
854    ///
855    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
856    ///
857    /// This is a daemon task: it runs indefinitely in the background.
858    ///
859    /// # Note
860    ///
861    /// This would be better handled entirely within `tor-circmgr`, like
862    /// other daemon tasks.
863    async fn continually_preemptively_build_circuits<D>(
864        mut sched: TaskSchedule<R>,
865        circmgr: Weak<Self>,
866        dirmgr: Weak<D>,
867    ) where
868        D: NetDirProvider + 'static + ?Sized,
869    {
870        let base_delay = Duration::from_secs(10);
871        let mut retry = RetryDelay::from_duration(base_delay);
872
873        while sched.next().await.is_some() {
874            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
875                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
876                    let result = cm
877                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
878                        .await;
879
880                    let delay = match result {
881                        Ok(()) => {
882                            retry.reset();
883                            base_delay
884                        }
885                        Err(_) => retry.next_delay(&mut rand::rng()),
886                    };
887
888                    sched.fire_in(delay);
889                } else {
890                    // wait for the provider to announce some event, which will probably be
891                    // NewConsensus; this is therefore a decent yardstick for rechecking
892                    let _ = dm.events().next().await;
893                    sched.fire();
894                }
895            } else {
896                return;
897            }
898        }
899    }
900
901    /// Launch circuits preemptively, using the preemptive circuit predictor's
902    /// predictions.
903    ///
904    /// # Note
905    ///
906    /// This function is invoked periodically from
907    /// `continually_preemptively_build_circuits()`.
908    #[allow(clippy::cognitive_complexity)]
909    async fn launch_circuits_preemptively(
910        &self,
911        netdir: DirInfo<'_>,
912    ) -> std::result::Result<(), err::PreemptiveCircError> {
913        trace!("Checking preemptive circuit predictions.");
914        let (circs, threshold) = {
915            let path_config = self.mgr.peek_builder().path_config();
916            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
917            let threshold = preemptive.config().disable_at_threshold;
918            (preemptive.predict(&path_config), threshold)
919        };
920
921        if self.mgr.n_circs() >= threshold {
922            return Ok(());
923        }
924        let mut n_created = 0_usize;
925        let mut n_errors = 0_usize;
926
927        let futures = circs
928            .iter()
929            .map(|usage| self.mgr.get_or_launch(usage, netdir));
930        let results = futures::future::join_all(futures).await;
931        for (i, result) in results.into_iter().enumerate() {
932            match result {
933                Ok((_, CircProvenance::NewlyCreated)) => {
934                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
935                    n_created += 1;
936                }
937                Ok((_, CircProvenance::Preexisting)) => {
938                    trace!("Circuit already existed created for {:?}", circs[i]);
939                }
940                Err(e) => {
941                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
942                    n_errors += 1;
943                }
944            }
945        }
946
947        if n_created > 0 || n_errors == 0 {
948            // Either we successfully made a circuit, or we didn't have any
949            // failures while looking for preexisting circuits.  Progress was
950            // made, so there's no need to back off.
951            Ok(())
952        } else {
953            // We didn't build any circuits and we hit at least one error:
954            // We'll call this unsuccessful.
955            Err(err::PreemptiveCircError)
956        }
957    }
958
959    /// Create and return a new (typically anonymous) onion circuit stem
960    /// of type `stem_kind`.
961    ///
962    /// If `circ_kind` is provided, we apply additional rules to make sure
963    /// that this will be usable as a stem for the given kind of onion service circuit.
964    /// Otherwise, we pick a stem that will probably be useful in general.
965    ///
966    /// This circuit is guaranteed not to have been used for any traffic
967    /// previously, and it will not be given out for any other requests in the
968    /// future unless explicitly re-registered with a circuit manager.
969    ///
970    /// If `planned_target` is provided, then the circuit will be built so that
971    /// it does not share any family members with the provided target.  (The
972    /// circuit _will not be_ extended to that target itself!)
973    ///
974    /// Used to implement onion service clients and services.
975    #[cfg(feature = "hs-common")]
976    pub(crate) async fn launch_hs_unmanaged<T>(
977        &self,
978        planned_target: Option<T>,
979        dir: &NetDir,
980        stem_kind: HsCircStemKind,
981        circ_kind: Option<HsCircKind>,
982    ) -> Result<Arc<B::Circ>>
983    where
984        T: IntoOwnedChanTarget,
985    {
986        let usage = TargetCircUsage::HsCircBase {
987            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
988            stem_kind,
989            circ_kind,
990        };
991        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
992        Ok(client_circ)
993    }
994
995    /// Return true if `netdir` has enough information to be used for this
996    /// circuit manager.
997    ///
998    /// (This will check whether the netdir is missing any primary guard
999    /// microdescriptors)
1000    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1001        self.mgr
1002            .peek_builder()
1003            .guardmgr()
1004            .netdir_is_sufficient(netdir)
1005    }
1006
1007    /// Internal implementation for [`CircMgr::estimate_timeout`].
1008    pub(crate) fn estimate_timeout(
1009        &self,
1010        timeout_action: &timeouts::Action,
1011    ) -> std::time::Duration {
1012        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1013        timeout
1014    }
1015
1016    /// Internal implementation for [`CircMgr::builder`].
1017    pub(crate) fn builder(&self) -> &B {
1018        self.mgr.peek_builder()
1019    }
1020
1021    /// Flush state to the state manager, if there is any unsaved state and
1022    /// we have the lock.
1023    ///
1024    /// Return true if we saved something; false if we didn't have the lock.
1025    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1026        self.mgr.peek_builder().save_state()
1027    }
1028
1029    /// Expire every circuit that has been dirty for too long.
1030    ///
1031    /// Expired circuits are not closed while they still have users,
1032    /// but they are no longer given out for new requests.
1033    fn expire_circuits(&self) {
1034        // TODO: I would prefer not to call this at every request, but
1035        // it should be fine for now.  (At some point we may no longer
1036        // need this, or might not need to call it so often, now that
1037        // our circuit expiration runs on scheduled timers via
1038        // spawn_expiration_task.)
1039        let now = self.mgr.peek_runtime().now();
1040        self.mgr.expire_circs(now);
1041    }
1042
1043    /// Mark every circuit that we have launched so far as unsuitable for
1044    /// any future requests.  This won't close existing circuits that have
1045    /// streams attached to them, but it will prevent any future streams from
1046    /// being attached.
1047    ///
1048    /// TODO: we may want to expose this eventually.  If we do, we should
1049    /// be very clear that you don't want to use it haphazardly.
1050    pub(crate) fn retire_all_circuits(&self) {
1051        self.mgr.retire_all_circuits();
1052    }
1053
1054    /// If `circ_id` is the unique identifier for a circuit that we're
1055    /// keeping track of, don't give it out for any future requests.
1056    pub(crate) fn retire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id) {
1057        let _ = self.mgr.take_circ(circ_id);
1058    }
1059
1060    /// Return a stream of events about our estimated clock skew; these events
1061    /// are `None` when we don't have enough information to make an estimate,
1062    /// and `Some(`[`SkewEstimate`]`)` otherwise.
1063    ///
1064    /// Note that this stream can be lossy: if the estimate changes more than
1065    /// one before you read from the stream, you might only get the most recent
1066    /// update.
1067    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1068        self.mgr.peek_builder().guardmgr().skew_events()
1069    }
1070
1071    /// Record that a failure occurred on a circuit with a given guard, in a way
1072    /// that makes us unwilling to use that guard for future circuits.
1073    ///
1074    pub(crate) fn note_external_failure(
1075        &self,
1076        target: &impl ChanTarget,
1077        external_failure: ExternalActivity,
1078    ) {
1079        self.mgr
1080            .peek_builder()
1081            .guardmgr()
1082            .note_external_failure(target, external_failure);
1083    }
1084
1085    /// Record that a success occurred on a circuit with a given guard, in a way
1086    /// that makes us possibly willing to use that guard for future circuits.
1087    pub(crate) fn note_external_success(
1088        &self,
1089        target: &impl ChanTarget,
1090        external_activity: ExternalActivity,
1091    ) {
1092        self.mgr
1093            .peek_builder()
1094            .guardmgr()
1095            .note_external_success(target, external_activity);
1096    }
1097}
1098
1099impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1100    fn drop(&mut self) {
1101        match self.store_persistent_state() {
1102            Ok(true) => info!("Flushed persistent state at exit."),
1103            Ok(false) => debug!("Lock not held; no state to flush."),
1104            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1105        }
1106    }
1107}
1108
1109#[cfg(test)]
1110mod test {
1111    // @@ begin test lint list maintained by maint/add_warning @@
1112    #![allow(clippy::bool_assert_comparison)]
1113    #![allow(clippy::clone_on_copy)]
1114    #![allow(clippy::dbg_macro)]
1115    #![allow(clippy::mixed_attributes_style)]
1116    #![allow(clippy::print_stderr)]
1117    #![allow(clippy::print_stdout)]
1118    #![allow(clippy::single_char_pattern)]
1119    #![allow(clippy::unwrap_used)]
1120    #![allow(clippy::unchecked_duration_subtraction)]
1121    #![allow(clippy::useless_vec)]
1122    #![allow(clippy::needless_pass_by_value)]
1123    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1124    use mocks::FakeBuilder;
1125    use tor_guardmgr::GuardMgr;
1126    use tor_linkspec::OwnedChanTarget;
1127    use tor_netdir::testprovider::TestNetDirProvider;
1128    use tor_persist::TestingStateMgr;
1129
1130    use super::*;
1131
1132    #[test]
1133    fn get_params() {
1134        use tor_netdir::{MdReceiver, PartialNetDir};
1135        use tor_netdoc::doc::netstatus::NetParams;
1136        // If it's just fallbackdir, we get the default parameters.
1137        let fb = FallbackList::from([]);
1138        let di: DirInfo<'_> = (&fb).into();
1139
1140        let p1 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1141        assert!(!p1.extend_by_ed25519_id);
1142
1143        // Now try with a directory and configured parameters.
1144        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1145        let mut params = NetParams::default();
1146        params.set("circwindow".into(), 100);
1147        params.set("ExtendByEd25519ID".into(), 1);
1148        let mut dir = PartialNetDir::new(consensus, Some(&params));
1149        for m in microdescs {
1150            dir.add_microdesc(m);
1151        }
1152        let netdir = dir.unwrap_if_sufficient().unwrap();
1153        let di: DirInfo<'_> = (&netdir).into();
1154        let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1155        assert!(p2.extend_by_ed25519_id);
1156
1157        // Now try with a bogus circwindow value.
1158        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1159        let mut params = NetParams::default();
1160        params.set("circwindow".into(), 100_000);
1161        params.set("ExtendByEd25519ID".into(), 1);
1162        let mut dir = PartialNetDir::new(consensus, Some(&params));
1163        for m in microdescs {
1164            dir.add_microdesc(m);
1165        }
1166        let netdir = dir.unwrap_if_sufficient().unwrap();
1167        let di: DirInfo<'_> = (&netdir).into();
1168        let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
1169        assert!(p2.extend_by_ed25519_id);
1170    }
1171
1172    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1173        let config = crate::config::test_config::TestConfig::default();
1174        let statemgr = TestingStateMgr::new();
1175        let guardmgr =
1176            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1177        let builder = FakeBuilder::new(
1178            &runtime,
1179            statemgr.clone(),
1180            &tor_guardmgr::TestConfig::default(),
1181        );
1182        let circmgr = Arc::new(CircMgrInner::new_generic(
1183            &config, &runtime, &guardmgr, builder,
1184        ));
1185        let netdir = Arc::new(TestNetDirProvider::new());
1186        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1187            .expect("launch CircMgrInner background tasks");
1188        circmgr
1189    }
1190
1191    #[test]
1192    #[cfg(feature = "hs-common")]
1193    fn test_launch_hs_unmanaged() {
1194        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1195            let circmgr = make_circmgr(runtime.clone());
1196            let netdir = tor_netdir::testnet::construct_netdir()
1197                .unwrap_if_sufficient()
1198                .unwrap();
1199
1200            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1201            runtime.spawn_identified("launch_hs_unamanged", async move {
1202                ret_tx
1203                    .send(
1204                        circmgr
1205                            .launch_hs_unmanaged::<OwnedChanTarget>(
1206                                None,
1207                                &netdir,
1208                                HsCircStemKind::Naive,
1209                                None,
1210                            )
1211                            .await,
1212                    )
1213                    .unwrap();
1214            });
1215            runtime.advance_by(Duration::from_millis(60)).await;
1216            ret_rx.await.unwrap().unwrap();
1217        });
1218    }
1219}