tor_circmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46
47// TODO #1645 (either remove this, or decide to have it everywhere)
48#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
49
50use build::TunnelBuilder;
51use mgr::{AbstractTunnel, AbstractTunnelBuilder};
52use tor_basic_utils::retry::RetryDelay;
53use tor_chanmgr::ChanMgr;
54use tor_dircommon::fallback::FallbackList;
55use tor_error::{error_report, warn_report};
56use tor_guardmgr::RetireCircuits;
57use tor_linkspec::ChanTarget;
58use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
59use tor_proto::circuit::UniqId;
60use tor_proto::client::circuit::CircParameters;
61use tor_rtcompat::Runtime;
62
63#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
64use tor_linkspec::IntoOwnedChanTarget;
65
66use futures::StreamExt;
67use futures::task::SpawnExt;
68use std::sync::{Arc, Mutex, Weak};
69use std::time::{Duration, Instant};
70use tracing::{debug, info, trace, warn};
71
72#[cfg(feature = "testing")]
73pub use config::test_config::TestConfig;
74
75pub mod build;
76mod config;
77mod err;
78#[cfg(feature = "hs-common")]
79pub mod hspool;
80mod impls;
81pub mod isolation;
82mod mgr;
83#[cfg(test)]
84mod mocks;
85mod preemptive;
86pub mod timeouts;
87mod tunnel;
88mod usage;
89
90// Can't apply `visibility` to modules.
91cfg_if::cfg_if! {
92    if #[cfg(feature = "experimental-api")] {
93        pub mod path;
94    } else {
95        pub(crate) mod path;
96    }
97}
98
99pub use err::Error;
100pub use isolation::IsolationToken;
101pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
102pub use tunnel::{
103    ClientDataTunnel, ClientDirTunnel, ClientOnionServiceDataTunnel, ClientOnionServiceDirTunnel,
104    ClientOnionServiceIntroTunnel, ServiceOnionServiceDataTunnel, ServiceOnionServiceDirTunnel,
105    ServiceOnionServiceIntroTunnel,
106};
107#[cfg(feature = "conflux")]
108#[cfg_attr(docsrs, doc(cfg(feature = "conflux")))]
109pub use tunnel::{
110    ClientMultiPathDataTunnel, ClientMultiPathOnionServiceDataTunnel,
111    ServiceMultiPathOnionServiceDataTunnel,
112};
113pub use usage::{TargetPort, TargetPorts};
114
115pub use config::{
116    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
117    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
118};
119
120use crate::isolation::StreamIsolation;
121use crate::mgr::TunnelProvenance;
122use crate::preemptive::PreemptiveCircuitPredictor;
123use usage::TargetTunnelUsage;
124
125use safelog::sensitive as sv;
126#[cfg(feature = "geoip")]
127use tor_geoip::CountryCode;
128pub use tor_guardmgr::{ExternalActivity, FirstHopId};
129use tor_persist::StateMgr;
130use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
131
132#[cfg(feature = "hs-common")]
133use crate::hspool::{HsCircKind, HsCircStemKind};
134#[cfg(all(feature = "vanguards", feature = "hs-common"))]
135use tor_guardmgr::vanguards::VanguardMgr;
136
137/// A Result type as returned from this crate.
138pub type Result<T> = std::result::Result<T, Error>;
139
140/// Type alias for dynamic StorageHandle that can handle our timeout state.
141type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
142
143/// Key used to load timeout state information.
144const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
145
146/// Represents what we know about the Tor network.
147///
148/// This can either be a complete directory, or a list of fallbacks.
149///
150/// Not every DirInfo can be used to build every kind of circuit:
151/// if you try to build a path with an inadequate DirInfo, you'll get a
152/// NeedConsensus error.
153#[derive(Debug, Copy, Clone)]
154#[non_exhaustive]
155pub enum DirInfo<'a> {
156    /// A list of fallbacks, for use when we don't know a network directory.
157    Fallbacks(&'a FallbackList),
158    /// A complete network directory
159    Directory(&'a NetDir),
160    /// No information: we can only build one-hop paths: and that, only if the
161    /// guard manager knows some guards or fallbacks.
162    Nothing,
163}
164
165impl<'a> From<&'a FallbackList> for DirInfo<'a> {
166    fn from(v: &'a FallbackList) -> DirInfo<'a> {
167        DirInfo::Fallbacks(v)
168    }
169}
170impl<'a> From<&'a NetDir> for DirInfo<'a> {
171    fn from(v: &'a NetDir) -> DirInfo<'a> {
172        DirInfo::Directory(v)
173    }
174}
175impl<'a> DirInfo<'a> {
176    /// Return a set of circuit parameters for this DirInfo.
177    fn circ_params(&self, usage: &TargetTunnelUsage) -> Result<CircParameters> {
178        use tor_netdir::params::NetParameters;
179        // We use a common function for both cases here to be sure that
180        // we look at the defaults from NetParameters code.
181        let defaults = NetParameters::default();
182        let net_params = match self {
183            DirInfo::Directory(d) => d.params(),
184            _ => &defaults,
185        };
186        match usage {
187            #[cfg(feature = "hs-common")]
188            TargetTunnelUsage::HsCircBase { .. } => {
189                build::onion_circparams_from_netparams(net_params)
190            }
191            _ => build::exit_circparams_from_netparams(net_params),
192        }
193    }
194}
195
196/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
197/// when they're suitable, and launching them if they don't already exist.
198///
199/// Right now, its notion of "suitable" is quite rudimentary: it just
200/// believes in two kinds of circuits: Exit circuits, and directory
201/// circuits.  Exit circuits are ones that were created to connect to
202/// a set of ports; directory circuits were made to talk to directory caches.
203///
204/// This is a "handle"; clones of it share state.
205pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::TunnelBuilder<R>, R>>);
206
207impl<R: Runtime> CircMgr<R> {
208    /// Construct a new circuit manager.
209    ///
210    /// # Usage note
211    ///
212    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
213    pub fn new<SM, CFG: CircMgrConfig>(
214        config: &CFG,
215        storage: SM,
216        runtime: &R,
217        chanmgr: Arc<ChanMgr<R>>,
218        guardmgr: &tor_guardmgr::GuardMgr<R>,
219    ) -> Result<Self>
220    where
221        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
222    {
223        Ok(Self(Arc::new(CircMgrInner::new(
224            config, storage, runtime, chanmgr, guardmgr,
225        )?)))
226    }
227
228    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
229    /// launching it if necessary.
230    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientDirTunnel> {
231        let tunnel = self.0.get_or_launch_dir(netdir).await?;
232        Ok(tunnel.into())
233    }
234
235    /// Return a circuit suitable for exiting to all of the provided
236    /// `ports`, launching it if necessary.
237    ///
238    /// If the list of ports is empty, then the chosen circuit will
239    /// still end at _some_ exit.
240    pub async fn get_or_launch_exit(
241        &self,
242        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
243        ports: &[TargetPort],
244        isolation: StreamIsolation,
245        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
246        //             additive. The function should be refactored to be builder-like.
247        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
248    ) -> Result<ClientDataTunnel> {
249        let tunnel = self
250            .0
251            .get_or_launch_exit(
252                netdir,
253                ports,
254                isolation,
255                #[cfg(feature = "geoip")]
256                country_code,
257            )
258            .await?;
259        Ok(tunnel.into())
260    }
261
262    /// Return a circuit to a specific relay, suitable for using for direct
263    /// (one-hop) directory downloads.
264    ///
265    /// This could be used, for example, to download a descriptor for a bridge.
266    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
267    #[cfg(feature = "specific-relay")]
268    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
269        &self,
270        target: T,
271    ) -> Result<ClientDirTunnel> {
272        let tunnel = self.0.get_or_launch_dir_specific(target).await?;
273        Ok(tunnel.into())
274    }
275
276    /// Launch the periodic daemon tasks required by the manager to function properly.
277    ///
278    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
279    //
280    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
281    pub fn launch_background_tasks<D, S>(
282        self: &Arc<Self>,
283        runtime: &R,
284        dir_provider: &Arc<D>,
285        state_mgr: S,
286    ) -> Result<Vec<TaskHandle>>
287    where
288        D: NetDirProvider + 'static + ?Sized,
289        S: StateMgr + std::marker::Send + 'static,
290    {
291        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
292    }
293
294    /// Return true if `netdir` has enough information to be used for this
295    /// circuit manager.
296    ///
297    /// (This will check whether the netdir is missing any primary guard
298    /// microdescriptors)
299    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
300        self.0.netdir_is_sufficient(netdir)
301    }
302
303    /// If `circ_id` is the unique identifier for a circuit that we're
304    /// keeping track of, don't give it out for any future requests.
305    pub fn retire_circ(&self, circ_id: &UniqId) {
306        self.0.retire_circ(circ_id);
307    }
308
309    /// Record that a failure occurred on a circuit with a given guard, in a way
310    /// that makes us unwilling to use that guard for future circuits.
311    ///
312    pub fn note_external_failure(
313        &self,
314        target: &impl ChanTarget,
315        external_failure: ExternalActivity,
316    ) {
317        self.0.note_external_failure(target, external_failure);
318    }
319
320    /// Record that a success occurred on a circuit with a given guard, in a way
321    /// that makes us possibly willing to use that guard for future circuits.
322    pub fn note_external_success(
323        &self,
324        target: &impl ChanTarget,
325        external_activity: ExternalActivity,
326    ) {
327        self.0.note_external_success(target, external_activity);
328    }
329
330    /// Return a stream of events about our estimated clock skew; these events
331    /// are `None` when we don't have enough information to make an estimate,
332    /// and `Some(`[`SkewEstimate`]`)` otherwise.
333    ///
334    /// Note that this stream can be lossy: if the estimate changes more than
335    /// one before you read from the stream, you might only get the most recent
336    /// update.
337    pub fn skew_events(&self) -> ClockSkewEvents {
338        self.0.skew_events()
339    }
340
341    /// Try to change our configuration settings to `new_config`.
342    ///
343    /// The actual behavior here will depend on the value of `how`.
344    ///
345    /// Returns whether any of the circuit pools should be cleared.
346    pub fn reconfigure<CFG: CircMgrConfig>(
347        &self,
348        new_config: &CFG,
349        how: tor_config::Reconfigure,
350    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
351        self.0.reconfigure(new_config, how)
352    }
353
354    /// Return an estimate-based delay for how long a given
355    /// [`Action`](timeouts::Action) should be allowed to complete.
356    ///
357    /// Note that **you do not need to use this function** in order to get
358    /// reasonable timeouts for the circuit-building operations provided by the
359    /// `tor-circmgr` crate: those, unless specifically noted, always use these
360    /// timeouts to cancel circuit operations that have taken too long.
361    ///
362    /// Instead, you should only use this function when you need to estimate how
363    /// long some _other_ operation should take to complete.  For example, if
364    /// you are sending a request over a 3-hop circuit and waiting for a reply,
365    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
366    /// length: 3 })`.
367    ///
368    /// Note also that this function returns a _timeout_ that the operation
369    /// should be permitted to complete, not an estimated Duration that the
370    /// operation _will_ take to complete. Timeouts are chosen to ensure that
371    /// most operations will complete, but very slow ones will not.  So even if
372    /// we expect that a circuit will complete in (say) 3 seconds, we might
373    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
374    /// complete.
375    ///
376    /// Estimate-based timeouts may change over time, given observations on the
377    /// actual amount of time needed for circuits to complete building.  If not
378    /// enough information has been gathered, a reasonable default will be used.
379    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
380        self.0.estimate_timeout(timeout_action)
381    }
382
383    /// Return a reference to the associated CircuitBuilder that this CircMgr
384    /// will use to create its circuits.
385    #[cfg(feature = "experimental-api")]
386    pub fn builder(&self) -> &TunnelBuilder<R> {
387        CircMgrInner::builder(&self.0)
388    }
389}
390
391/// Internal object used to implement CircMgr, which allows for mocking.
392#[derive(Clone)]
393pub(crate) struct CircMgrInner<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> {
394    /// The underlying circuit manager object that implements our behavior.
395    mgr: Arc<mgr::AbstractTunnelMgr<B, R>>,
396    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
397    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
398}
399
400impl<R: Runtime> CircMgrInner<TunnelBuilder<R>, R> {
401    /// Construct a new circuit manager.
402    ///
403    /// # Usage note
404    ///
405    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
406    #[allow(clippy::unnecessary_wraps)]
407    pub(crate) fn new<SM, CFG: CircMgrConfig>(
408        config: &CFG,
409        storage: SM,
410        runtime: &R,
411        chanmgr: Arc<ChanMgr<R>>,
412        guardmgr: &tor_guardmgr::GuardMgr<R>,
413    ) -> Result<Self>
414    where
415        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
416    {
417        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
418        let vanguardmgr = {
419            // TODO(#1382): we need a way of checking if this arti instance
420            // is running an onion service or not.
421            //
422            // Perhaps this information should be provided by CircMgrConfig.
423            let has_onion_svc = false;
424            VanguardMgr::new(
425                config.vanguard_config(),
426                runtime.clone(),
427                storage.clone(),
428                has_onion_svc,
429            )?
430        };
431
432        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
433
434        let builder = build::TunnelBuilder::new(
435            runtime.clone(),
436            chanmgr,
437            config.path_rules().clone(),
438            storage_handle,
439            guardmgr.clone(),
440            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
441            vanguardmgr,
442        );
443
444        Ok(Self::new_generic(config, runtime, guardmgr, builder))
445    }
446}
447
448impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
449    /// Generic implementation for [`CircMgrInner::new`]
450    pub(crate) fn new_generic<CFG: CircMgrConfig>(
451        config: &CFG,
452        runtime: &R,
453        guardmgr: &tor_guardmgr::GuardMgr<R>,
454        builder: B,
455    ) -> Self {
456        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
457            config.preemptive_circuits().clone(),
458        )));
459
460        guardmgr.set_filter(config.path_rules().build_guard_filter());
461
462        let mgr =
463            mgr::AbstractTunnelMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
464
465        CircMgrInner {
466            mgr: Arc::new(mgr),
467            predictor: preemptive,
468        }
469    }
470
471    /// Launch the periodic daemon tasks required by the manager to function properly.
472    ///
473    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
474    //
475    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
476    pub(crate) fn launch_background_tasks<D, S>(
477        self: &Arc<Self>,
478        runtime: &R,
479        dir_provider: &Arc<D>,
480        state_mgr: S,
481    ) -> Result<Vec<TaskHandle>>
482    where
483        D: NetDirProvider + 'static + ?Sized,
484        S: StateMgr + std::marker::Send + 'static,
485    {
486        let mut ret = vec![];
487
488        runtime
489            .spawn(Self::keep_circmgr_params_updated(
490                dir_provider.events(),
491                Arc::downgrade(self),
492                Arc::downgrade(dir_provider),
493            ))
494            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
495
496        let (sched, handle) = TaskSchedule::new(runtime.clone());
497        ret.push(handle);
498
499        runtime
500            .spawn(Self::update_persistent_state(
501                sched,
502                Arc::downgrade(self),
503                state_mgr,
504            ))
505            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
506
507        let (sched, handle) = TaskSchedule::new(runtime.clone());
508        ret.push(handle);
509
510        runtime
511            .spawn(Self::continually_launch_timeout_testing_circuits(
512                sched,
513                Arc::downgrade(self),
514                Arc::downgrade(dir_provider),
515            ))
516            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
517
518        let (sched, handle) = TaskSchedule::new(runtime.clone());
519        ret.push(handle);
520
521        runtime
522            .spawn(Self::continually_preemptively_build_circuits(
523                sched,
524                Arc::downgrade(self),
525                Arc::downgrade(dir_provider),
526            ))
527            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
528
529        self.mgr
530            .peek_builder()
531            .guardmgr()
532            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
533
534        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
535        {
536            let () = self
537                .mgr
538                .peek_builder()
539                .vanguardmgr()
540                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
541        }
542
543        Ok(ret)
544    }
545
546    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
547    /// launching it if necessary.
548    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Tunnel>> {
549        self.expire_circuits();
550        let usage = TargetTunnelUsage::Dir;
551        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
552    }
553
554    /// Return a circuit suitable for exiting to all of the provided
555    /// `ports`, launching it if necessary.
556    ///
557    /// If the list of ports is empty, then the chosen circuit will
558    /// still end at _some_ exit.
559    pub(crate) async fn get_or_launch_exit(
560        &self,
561        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
562        ports: &[TargetPort],
563        isolation: StreamIsolation,
564        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
565        //             additive. The function should be refactored to be builder-like.
566        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
567    ) -> Result<Arc<B::Tunnel>> {
568        self.expire_circuits();
569        let time = Instant::now();
570        {
571            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
572            if ports.is_empty() {
573                predictive.note_usage(None, time);
574            } else {
575                for port in ports.iter() {
576                    predictive.note_usage(Some(*port), time);
577                }
578            }
579        }
580        let require_stability = ports.iter().any(|p| {
581            self.mgr
582                .peek_builder()
583                .path_config()
584                .long_lived_ports
585                .contains(&p.port)
586        });
587        let ports = ports.iter().map(Clone::clone).collect();
588        #[cfg(not(feature = "geoip"))]
589        let country_code = None;
590        let usage = TargetTunnelUsage::Exit {
591            ports,
592            isolation,
593            country_code,
594            require_stability,
595        };
596        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
597    }
598
599    /// Return a circuit to a specific relay, suitable for using for direct
600    /// (one-hop) directory downloads.
601    ///
602    /// This could be used, for example, to download a descriptor for a bridge.
603    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
604    #[cfg(feature = "specific-relay")]
605    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
606        &self,
607        target: T,
608    ) -> Result<Arc<B::Tunnel>> {
609        self.expire_circuits();
610        let usage = TargetTunnelUsage::DirSpecificTarget(target.to_owned());
611        self.mgr
612            .get_or_launch(&usage, DirInfo::Nothing)
613            .await
614            .map(|(c, _)| c)
615    }
616
617    /// Try to change our configuration settings to `new_config`.
618    ///
619    /// The actual behavior here will depend on the value of `how`.
620    ///
621    /// Returns whether any of the circuit pools should be cleared.
622    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
623        &self,
624        new_config: &CFG,
625        how: tor_config::Reconfigure,
626    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
627        let old_path_rules = self.mgr.peek_builder().path_config();
628        let predictor = self.predictor.lock().expect("poisoned lock");
629        let preemptive_circuits = predictor.config();
630        if preemptive_circuits.initial_predicted_ports
631            != new_config.preemptive_circuits().initial_predicted_ports
632        {
633            // This change has no effect, since the list of ports was _initial_.
634            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
635        }
636
637        if how == tor_config::Reconfigure::CheckAllOrNothing {
638            return Ok(RetireCircuits::None);
639        }
640
641        let retire_because_of_guardmgr =
642            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
643
644        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
645        let retire_because_of_vanguardmgr = self
646            .mgr
647            .peek_builder()
648            .vanguardmgr()
649            .reconfigure(new_config.vanguard_config())?;
650
651        let new_reachable = &new_config.path_rules().reachable_addrs;
652        if new_reachable != &old_path_rules.reachable_addrs {
653            let filter = new_config.path_rules().build_guard_filter();
654            self.mgr.peek_builder().guardmgr().set_filter(filter);
655        }
656
657        let discard_all_circuits = !new_config
658            .path_rules()
659            .at_least_as_permissive_as(&old_path_rules)
660            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
661
662        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
663        let discard_all_circuits = discard_all_circuits
664            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
665
666        self.mgr
667            .peek_builder()
668            .set_path_config(new_config.path_rules().clone());
669        self.mgr
670            .set_circuit_timing(new_config.circuit_timing().clone());
671        predictor.set_config(new_config.preemptive_circuits().clone());
672
673        if discard_all_circuits {
674            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
675            // retire those circuits that do not conform to the new path rules,
676            // or do not conform to the new guard configuration.
677            info!("Path configuration has become more restrictive: retiring existing circuits.");
678            self.retire_all_circuits();
679            return Ok(RetireCircuits::All);
680        }
681        Ok(RetireCircuits::None)
682    }
683
684    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
685    /// `circmgr` with the consensus parameters from `dirmgr`.
686    ///
687    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
688    /// dangling.
689    ///
690    /// This is a daemon task: it runs indefinitely in the background.
691    async fn keep_circmgr_params_updated<D>(
692        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
693        circmgr: Weak<Self>,
694        dirmgr: Weak<D>,
695    ) where
696        D: NetDirProvider + 'static + ?Sized,
697    {
698        use DirEvent::*;
699        while let Some(event) = events.next().await {
700            if matches!(event, NewConsensus) {
701                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
702                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
703                        cm.update_network_parameters(netdir.params());
704                    }
705                } else {
706                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
707                    break;
708                }
709            }
710        }
711    }
712
713    /// Reconfigure this circuit manager using the latest set of
714    /// network parameters.
715    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
716        self.mgr.update_network_parameters(p);
717        self.mgr.peek_builder().update_network_parameters(p);
718    }
719
720    /// Run indefinitely, launching circuits as needed to get a good
721    /// estimate for our circuit build timeouts.
722    ///
723    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
724    ///
725    /// This is a daemon task: it runs indefinitely in the background.
726    async fn continually_launch_timeout_testing_circuits<D>(
727        mut sched: TaskSchedule<R>,
728        circmgr: Weak<Self>,
729        dirmgr: Weak<D>,
730    ) where
731        D: NetDirProvider + 'static + ?Sized,
732    {
733        while sched.next().await.is_some() {
734            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
735                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
736                    if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
737                        warn_report!(e, "Problem launching a timeout testing circuit");
738                    }
739                    let delay = netdir
740                        .params()
741                        .cbt_testing_delay
742                        .try_into()
743                        .expect("Out-of-bounds value from BoundedInt32");
744
745                    drop((cm, dm));
746                    sched.fire_in(delay);
747                } else {
748                    // wait for the provider to announce some event, which will probably be
749                    // NewConsensus; this is therefore a decent yardstick for rechecking
750                    let _ = dm.events().next().await;
751                    sched.fire();
752                }
753            } else {
754                return;
755            }
756        }
757    }
758
759    /// If we need to launch a testing circuit to judge our circuit
760    /// build timeouts timeouts, do so.
761    ///
762    /// # Note
763    ///
764    /// This function is invoked periodically from
765    /// `continually_launch_timeout_testing_circuits`.
766    fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
767        if !self.mgr.peek_builder().learning_timeouts() {
768            return Ok(());
769        }
770        // We expire any too-old circuits here, so they don't get
771        // counted towards max_circs.
772        self.expire_circuits();
773        let max_circs: u64 = netdir
774            .params()
775            .cbt_max_open_circuits_for_testing
776            .try_into()
777            .expect("Out-of-bounds result from BoundedInt32");
778        if (self.mgr.n_tunnels() as u64) < max_circs {
779            // Actually launch the circuit!
780            let usage = TargetTunnelUsage::TimeoutTesting;
781            let dirinfo = netdir.into();
782            let mgr = Arc::clone(&self.mgr);
783            debug!("Launching a circuit to test build times.");
784            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
785            // We don't actually care when this circuit is done,
786            // so it's okay to drop the Receiver without awaiting it.
787            drop(receiver);
788        }
789
790        Ok(())
791    }
792
793    /// Run forever, periodically telling `circmgr` to update its persistent
794    /// state.
795    ///
796    /// Exit when we notice that `circmgr` has been dropped.
797    ///
798    /// This is a daemon task: it runs indefinitely in the background.
799    #[allow(clippy::cognitive_complexity)] // because of tracing
800    async fn update_persistent_state<S>(
801        mut sched: TaskSchedule<R>,
802        circmgr: Weak<Self>,
803        statemgr: S,
804    ) where
805        S: StateMgr + std::marker::Send,
806    {
807        while sched.next().await.is_some() {
808            if let Some(circmgr) = Weak::upgrade(&circmgr) {
809                use tor_persist::LockStatus::*;
810
811                match statemgr.try_lock() {
812                    Err(e) => {
813                        error_report!(e, "Problem with state lock file");
814                        break;
815                    }
816                    Ok(NewlyAcquired) => {
817                        info!("We now own the lock on our state files.");
818                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
819                            error_report!(e, "Unable to upgrade to owned state files");
820                            break;
821                        }
822                    }
823                    Ok(AlreadyHeld) => {
824                        if let Err(e) = circmgr.store_persistent_state() {
825                            error_report!(e, "Unable to flush circmgr state");
826                            break;
827                        }
828                    }
829                    Ok(NoLock) => {
830                        if let Err(e) = circmgr.reload_persistent_state() {
831                            error_report!(e, "Unable to reload circmgr state");
832                            break;
833                        }
834                    }
835                }
836            } else {
837                debug!("Circmgr has disappeared; task exiting.");
838                return;
839            }
840            // TODO(nickm): This delay is probably too small.
841            //
842            // Also, we probably don't even want a fixed delay here.  Instead,
843            // we should be updating more frequently when the data is volatile
844            // or has important info to save, and not at all when there are no
845            // changes.
846            sched.fire_in(Duration::from_secs(60));
847        }
848
849        debug!("State update task exiting (potentially due to handle drop).");
850    }
851
852    /// Switch from having an unowned persistent state to having an owned one.
853    ///
854    /// Requires that we hold the lock on the state files.
855    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
856        self.mgr.peek_builder().upgrade_to_owned_state()?;
857        Ok(())
858    }
859
860    /// Reload state from the state manager.
861    ///
862    /// We only call this method if we _don't_ have the lock on the state
863    /// files.  If we have the lock, we only want to save.
864    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
865        self.mgr.peek_builder().reload_state()?;
866        Ok(())
867    }
868
869    /// Run indefinitely, launching circuits where the preemptive circuit
870    /// predictor thinks it'd be a good idea to have them.
871    ///
872    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
873    ///
874    /// This is a daemon task: it runs indefinitely in the background.
875    ///
876    /// # Note
877    ///
878    /// This would be better handled entirely within `tor-circmgr`, like
879    /// other daemon tasks.
880    async fn continually_preemptively_build_circuits<D>(
881        mut sched: TaskSchedule<R>,
882        circmgr: Weak<Self>,
883        dirmgr: Weak<D>,
884    ) where
885        D: NetDirProvider + 'static + ?Sized,
886    {
887        let base_delay = Duration::from_secs(10);
888        let mut retry = RetryDelay::from_duration(base_delay);
889
890        while sched.next().await.is_some() {
891            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
892                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
893                    let result = cm
894                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
895                        .await;
896
897                    let delay = match result {
898                        Ok(()) => {
899                            retry.reset();
900                            base_delay
901                        }
902                        Err(_) => retry.next_delay(&mut rand::rng()),
903                    };
904
905                    sched.fire_in(delay);
906                } else {
907                    // wait for the provider to announce some event, which will probably be
908                    // NewConsensus; this is therefore a decent yardstick for rechecking
909                    let _ = dm.events().next().await;
910                    sched.fire();
911                }
912            } else {
913                return;
914            }
915        }
916    }
917
918    /// Launch circuits preemptively, using the preemptive circuit predictor's
919    /// predictions.
920    ///
921    /// # Note
922    ///
923    /// This function is invoked periodically from
924    /// `continually_preemptively_build_circuits()`.
925    #[allow(clippy::cognitive_complexity)]
926    async fn launch_circuits_preemptively(
927        &self,
928        netdir: DirInfo<'_>,
929    ) -> std::result::Result<(), err::PreemptiveCircError> {
930        trace!("Checking preemptive circuit predictions.");
931        let (circs, threshold) = {
932            let path_config = self.mgr.peek_builder().path_config();
933            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
934            let threshold = preemptive.config().disable_at_threshold;
935            (preemptive.predict(&path_config), threshold)
936        };
937
938        if self.mgr.n_tunnels() >= threshold {
939            return Ok(());
940        }
941        let mut n_created = 0_usize;
942        let mut n_errors = 0_usize;
943
944        let futures = circs
945            .iter()
946            .map(|usage| self.mgr.get_or_launch(usage, netdir));
947        let results = futures::future::join_all(futures).await;
948        for (i, result) in results.into_iter().enumerate() {
949            match result {
950                Ok((_, TunnelProvenance::NewlyCreated)) => {
951                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
952                    n_created += 1;
953                }
954                Ok((_, TunnelProvenance::Preexisting)) => {
955                    trace!("Circuit already existed created for {:?}", circs[i]);
956                }
957                Err(e) => {
958                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
959                    n_errors += 1;
960                }
961            }
962        }
963
964        if n_created > 0 || n_errors == 0 {
965            // Either we successfully made a circuit, or we didn't have any
966            // failures while looking for preexisting circuits.  Progress was
967            // made, so there's no need to back off.
968            Ok(())
969        } else {
970            // We didn't build any circuits and we hit at least one error:
971            // We'll call this unsuccessful.
972            Err(err::PreemptiveCircError)
973        }
974    }
975
976    /// Create and return a new (typically anonymous) onion circuit stem
977    /// of type `stem_kind`.
978    ///
979    /// If `circ_kind` is provided, we apply additional rules to make sure
980    /// that this will be usable as a stem for the given kind of onion service circuit.
981    /// Otherwise, we pick a stem that will probably be useful in general.
982    ///
983    /// This circuit is guaranteed not to have been used for any traffic
984    /// previously, and it will not be given out for any other requests in the
985    /// future unless explicitly re-registered with a circuit manager.
986    ///
987    /// If `planned_target` is provided, then the circuit will be built so that
988    /// it does not share any family members with the provided target.  (The
989    /// circuit _will not be_ extended to that target itself!)
990    ///
991    /// Used to implement onion service clients and services.
992    #[cfg(feature = "hs-common")]
993    pub(crate) async fn launch_hs_unmanaged<T>(
994        &self,
995        planned_target: Option<T>,
996        dir: &NetDir,
997        stem_kind: HsCircStemKind,
998        circ_kind: Option<HsCircKind>,
999    ) -> Result<B::Tunnel>
1000    where
1001        T: IntoOwnedChanTarget,
1002    {
1003        let usage = TargetTunnelUsage::HsCircBase {
1004            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
1005            stem_kind,
1006            circ_kind,
1007        };
1008        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
1009        Ok(client_circ)
1010    }
1011
1012    /// Return true if `netdir` has enough information to be used for this
1013    /// circuit manager.
1014    ///
1015    /// (This will check whether the netdir is missing any primary guard
1016    /// microdescriptors)
1017    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
1018        self.mgr
1019            .peek_builder()
1020            .guardmgr()
1021            .netdir_is_sufficient(netdir)
1022    }
1023
1024    /// Internal implementation for [`CircMgr::estimate_timeout`].
1025    pub(crate) fn estimate_timeout(
1026        &self,
1027        timeout_action: &timeouts::Action,
1028    ) -> std::time::Duration {
1029        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
1030        timeout
1031    }
1032
1033    /// Internal implementation for [`CircMgr::builder`].
1034    pub(crate) fn builder(&self) -> &B {
1035        self.mgr.peek_builder()
1036    }
1037
1038    /// Flush state to the state manager, if there is any unsaved state and
1039    /// we have the lock.
1040    ///
1041    /// Return true if we saved something; false if we didn't have the lock.
1042    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
1043        self.mgr.peek_builder().save_state()
1044    }
1045
1046    /// Expire every circuit that has been dirty for too long.
1047    ///
1048    /// Expired circuits are not closed while they still have users,
1049    /// but they are no longer given out for new requests.
1050    fn expire_circuits(&self) {
1051        // TODO: I would prefer not to call this at every request, but
1052        // it should be fine for now.  (At some point we may no longer
1053        // need this, or might not need to call it so often, now that
1054        // our circuit expiration runs on scheduled timers via
1055        // spawn_expiration_task.)
1056        let now = self.mgr.peek_runtime().now();
1057        self.mgr.expire_tunnels(now);
1058    }
1059
1060    /// Mark every circuit that we have launched so far as unsuitable for
1061    /// any future requests.  This won't close existing circuits that have
1062    /// streams attached to them, but it will prevent any future streams from
1063    /// being attached.
1064    ///
1065    /// TODO: we may want to expose this eventually.  If we do, we should
1066    /// be very clear that you don't want to use it haphazardly.
1067    pub(crate) fn retire_all_circuits(&self) {
1068        self.mgr.retire_all_tunnels();
1069    }
1070
1071    /// If `circ_id` is the unique identifier for a circuit that we're
1072    /// keeping track of, don't give it out for any future requests.
1073    pub(crate) fn retire_circ(&self, circ_id: &<B::Tunnel as AbstractTunnel>::Id) {
1074        let _ = self.mgr.take_tunnel(circ_id);
1075    }
1076
1077    /// Return a stream of events about our estimated clock skew; these events
1078    /// are `None` when we don't have enough information to make an estimate,
1079    /// and `Some(`[`SkewEstimate`]`)` otherwise.
1080    ///
1081    /// Note that this stream can be lossy: if the estimate changes more than
1082    /// one before you read from the stream, you might only get the most recent
1083    /// update.
1084    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
1085        self.mgr.peek_builder().guardmgr().skew_events()
1086    }
1087
1088    /// Record that a failure occurred on a circuit with a given guard, in a way
1089    /// that makes us unwilling to use that guard for future circuits.
1090    ///
1091    pub(crate) fn note_external_failure(
1092        &self,
1093        target: &impl ChanTarget,
1094        external_failure: ExternalActivity,
1095    ) {
1096        self.mgr
1097            .peek_builder()
1098            .guardmgr()
1099            .note_external_failure(target, external_failure);
1100    }
1101
1102    /// Record that a success occurred on a circuit with a given guard, in a way
1103    /// that makes us possibly willing to use that guard for future circuits.
1104    pub(crate) fn note_external_success(
1105        &self,
1106        target: &impl ChanTarget,
1107        external_activity: ExternalActivity,
1108    ) {
1109        self.mgr
1110            .peek_builder()
1111            .guardmgr()
1112            .note_external_success(target, external_activity);
1113    }
1114}
1115
1116impl<B: AbstractTunnelBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
1117    fn drop(&mut self) {
1118        match self.store_persistent_state() {
1119            Ok(true) => info!("Flushed persistent state at exit."),
1120            Ok(false) => debug!("Lock not held; no state to flush."),
1121            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
1122        }
1123    }
1124}
1125
1126#[cfg(test)]
1127mod test {
1128    // @@ begin test lint list maintained by maint/add_warning @@
1129    #![allow(clippy::bool_assert_comparison)]
1130    #![allow(clippy::clone_on_copy)]
1131    #![allow(clippy::dbg_macro)]
1132    #![allow(clippy::mixed_attributes_style)]
1133    #![allow(clippy::print_stderr)]
1134    #![allow(clippy::print_stdout)]
1135    #![allow(clippy::single_char_pattern)]
1136    #![allow(clippy::unwrap_used)]
1137    #![allow(clippy::unchecked_duration_subtraction)]
1138    #![allow(clippy::useless_vec)]
1139    #![allow(clippy::needless_pass_by_value)]
1140    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1141    use mocks::FakeBuilder;
1142    use tor_guardmgr::GuardMgr;
1143    use tor_linkspec::OwnedChanTarget;
1144    use tor_netdir::testprovider::TestNetDirProvider;
1145    use tor_persist::TestingStateMgr;
1146
1147    use super::*;
1148
1149    #[test]
1150    fn get_params() {
1151        use tor_netdir::{MdReceiver, PartialNetDir};
1152        use tor_netdoc::doc::netstatus::NetParams;
1153        // If it's just fallbackdir, we get the default parameters.
1154        let fb = FallbackList::from([]);
1155        let di: DirInfo<'_> = (&fb).into();
1156
1157        let p1 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1158        assert!(!p1.extend_by_ed25519_id);
1159
1160        // Now try with a directory and configured parameters.
1161        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1162        let mut params = NetParams::default();
1163        params.set("circwindow".into(), 100);
1164        params.set("ExtendByEd25519ID".into(), 1);
1165        let mut dir = PartialNetDir::new(consensus, Some(&params));
1166        for m in microdescs {
1167            dir.add_microdesc(m);
1168        }
1169        let netdir = dir.unwrap_if_sufficient().unwrap();
1170        let di: DirInfo<'_> = (&netdir).into();
1171        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1172        assert!(p2.extend_by_ed25519_id);
1173
1174        // Now try with a bogus circwindow value.
1175        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
1176        let mut params = NetParams::default();
1177        params.set("circwindow".into(), 100_000);
1178        params.set("ExtendByEd25519ID".into(), 1);
1179        let mut dir = PartialNetDir::new(consensus, Some(&params));
1180        for m in microdescs {
1181            dir.add_microdesc(m);
1182        }
1183        let netdir = dir.unwrap_if_sufficient().unwrap();
1184        let di: DirInfo<'_> = (&netdir).into();
1185        let p2 = di.circ_params(&TargetTunnelUsage::Dir).unwrap();
1186        assert!(p2.extend_by_ed25519_id);
1187    }
1188
1189    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
1190        let config = crate::config::test_config::TestConfig::default();
1191        let statemgr = TestingStateMgr::new();
1192        let guardmgr =
1193            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
1194        let builder = FakeBuilder::new(
1195            &runtime,
1196            statemgr.clone(),
1197            &tor_guardmgr::TestConfig::default(),
1198        );
1199        let circmgr = Arc::new(CircMgrInner::new_generic(
1200            &config, &runtime, &guardmgr, builder,
1201        ));
1202        let netdir = Arc::new(TestNetDirProvider::new());
1203        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
1204            .expect("launch CircMgrInner background tasks");
1205        circmgr
1206    }
1207
1208    #[test]
1209    #[cfg(feature = "hs-common")]
1210    fn test_launch_hs_unmanaged() {
1211        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
1212            let circmgr = make_circmgr(runtime.clone());
1213            let netdir = tor_netdir::testnet::construct_netdir()
1214                .unwrap_if_sufficient()
1215                .unwrap();
1216
1217            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
1218            runtime.spawn_identified("launch_hs_unamanged", async move {
1219                ret_tx
1220                    .send(
1221                        circmgr
1222                            .launch_hs_unmanaged::<OwnedChanTarget>(
1223                                None,
1224                                &netdir,
1225                                HsCircStemKind::Naive,
1226                                None,
1227                            )
1228                            .await,
1229                    )
1230                    .unwrap();
1231            });
1232            runtime.advance_by(Duration::from_millis(60)).await;
1233            ret_rx.await.unwrap().unwrap();
1234        });
1235    }
1236}