1
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_duration_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
45

            
46
// TODO #1645 (either remove this, or decide to have it everywhere)
47
#![cfg_attr(not(all(feature = "full", feature = "experimental")), allow(unused))]
48

            
49
use build::CircuitBuilder;
50
use mgr::{AbstractCirc, AbstractCircBuilder};
51
use tor_basic_utils::retry::RetryDelay;
52
use tor_chanmgr::ChanMgr;
53
use tor_error::{error_report, warn_report};
54
use tor_guardmgr::RetireCircuits;
55
use tor_linkspec::ChanTarget;
56
use tor_netdir::{DirEvent, NetDir, NetDirProvider, Timeliness};
57
use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
58
use tor_rtcompat::Runtime;
59

            
60
#[cfg(any(feature = "specific-relay", feature = "hs-common"))]
61
use tor_linkspec::IntoOwnedChanTarget;
62

            
63
use futures::task::SpawnExt;
64
use futures::StreamExt;
65
use std::sync::{Arc, Mutex, Weak};
66
use std::time::{Duration, Instant};
67
use tracing::{debug, info, trace, warn};
68

            
69
#[cfg(feature = "testing")]
70
pub use config::test_config::TestConfig;
71

            
72
pub mod build;
73
mod config;
74
mod err;
75
#[cfg(feature = "hs-common")]
76
pub mod hspool;
77
mod impls;
78
pub mod isolation;
79
mod mgr;
80
#[cfg(test)]
81
mod mocks;
82
mod preemptive;
83
pub mod timeouts;
84
mod usage;
85

            
86
// Can't apply `visibility` to modules.
87
cfg_if::cfg_if! {
88
    if #[cfg(feature = "experimental-api")] {
89
        pub mod path;
90
    } else {
91
        pub(crate) mod path;
92
    }
93
}
94

            
95
pub use err::Error;
96
pub use isolation::IsolationToken;
97
use tor_guardmgr::fallback::FallbackList;
98
pub use tor_guardmgr::{ClockSkewEvents, GuardMgrConfig, SkewEstimate};
99
pub use usage::{TargetPort, TargetPorts};
100

            
101
pub use config::{
102
    CircMgrConfig, CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
103
    PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
104
};
105

            
106
use crate::isolation::StreamIsolation;
107
use crate::mgr::CircProvenance;
108
use crate::preemptive::PreemptiveCircuitPredictor;
109
use usage::TargetCircUsage;
110

            
111
use safelog::sensitive as sv;
112
#[cfg(feature = "geoip")]
113
use tor_geoip::CountryCode;
114
pub use tor_guardmgr::{ExternalActivity, FirstHopId};
115
use tor_persist::StateMgr;
116
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
117

            
118
#[cfg(feature = "hs-common")]
119
use crate::hspool::HsCircStemKind;
120
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
121
use tor_guardmgr::vanguards::VanguardMgr;
122

            
123
/// A Result type as returned from this crate.
124
pub type Result<T> = std::result::Result<T, Error>;
125

            
126
/// Type alias for dynamic StorageHandle that can handle our timeout state.
127
type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
128

            
129
/// Key used to load timeout state information.
130
const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
131

            
132
/// Represents what we know about the Tor network.
133
///
134
/// This can either be a complete directory, or a list of fallbacks.
135
///
136
/// Not every DirInfo can be used to build every kind of circuit:
137
/// if you try to build a path with an inadequate DirInfo, you'll get a
138
/// NeedConsensus error.
139
#[derive(Debug, Copy, Clone)]
140
#[non_exhaustive]
141
pub enum DirInfo<'a> {
142
    /// A list of fallbacks, for use when we don't know a network directory.
143
    Fallbacks(&'a FallbackList),
144
    /// A complete network directory
145
    Directory(&'a NetDir),
146
    /// No information: we can only build one-hop paths: and that, only if the
147
    /// guard manager knows some guards or fallbacks.
148
    Nothing,
149
}
150

            
151
impl<'a> From<&'a FallbackList> for DirInfo<'a> {
152
478
    fn from(v: &'a FallbackList) -> DirInfo<'a> {
153
478
        DirInfo::Fallbacks(v)
154
478
    }
155
}
156
impl<'a> From<&'a NetDir> for DirInfo<'a> {
157
516
    fn from(v: &'a NetDir) -> DirInfo<'a> {
158
516
        DirInfo::Directory(v)
159
516
    }
160
}
161
impl<'a> DirInfo<'a> {
162
    /// Return a set of circuit parameters for this DirInfo.
163
6
    fn circ_params(&self, usage: &TargetCircUsage) -> Result<CircParameters> {
164
        use tor_netdir::params::NetParameters;
165
        // We use a common function for both cases here to be sure that
166
        // we look at the defaults from NetParameters code.
167
6
        let defaults = NetParameters::default();
168
6
        let net_params = match self {
169
4
            DirInfo::Directory(d) => d.params(),
170
2
            _ => &defaults,
171
        };
172
6
        match usage {
173
            #[cfg(feature = "hs-common")]
174
            TargetCircUsage::HsCircBase { .. } => {
175
                build::onion_circparams_from_netparams(net_params)
176
            }
177
6
            _ => build::exit_circparams_from_netparams(net_params),
178
        }
179
6
    }
180
}
181

            
182
/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
183
/// when they're suitable, and launching them if they don't already exist.
184
///
185
/// Right now, its notion of "suitable" is quite rudimentary: it just
186
/// believes in two kinds of circuits: Exit circuits, and directory
187
/// circuits.  Exit circuits are ones that were created to connect to
188
/// a set of ports; directory circuits were made to talk to directory caches.
189
///
190
/// This is a "handle"; clones of it share state.
191
pub struct CircMgr<R: Runtime>(Arc<CircMgrInner<build::CircuitBuilder<R>, R>>);
192

            
193
impl<R: Runtime> CircMgr<R> {
194
    /// Construct a new circuit manager.
195
    ///
196
    /// # Usage note
197
    ///
198
    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
199
16
    pub fn new<SM, CFG: CircMgrConfig>(
200
16
        config: &CFG,
201
16
        storage: SM,
202
16
        runtime: &R,
203
16
        chanmgr: Arc<ChanMgr<R>>,
204
16
        guardmgr: &tor_guardmgr::GuardMgr<R>,
205
16
    ) -> Result<Self>
206
16
    where
207
16
        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
208
16
    {
209
16
        Ok(Self(Arc::new(CircMgrInner::new(
210
16
            config, storage, runtime, chanmgr, guardmgr,
211
16
        )?)))
212
16
    }
213

            
214
    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
215
    /// launching it if necessary.
216
    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
217
        self.0.get_or_launch_dir(netdir).await
218
    }
219

            
220
    /// Return a circuit suitable for exiting to all of the provided
221
    /// `ports`, launching it if necessary.
222
    ///
223
    /// If the list of ports is empty, then the chosen circuit will
224
    /// still end at _some_ exit.
225
    pub async fn get_or_launch_exit(
226
        &self,
227
        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
228
        ports: &[TargetPort],
229
        isolation: StreamIsolation,
230
        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
231
        //             additive. The function should be refactored to be builder-like.
232
        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
233
    ) -> Result<Arc<ClientCirc>> {
234
        self.0
235
            .get_or_launch_exit(
236
                netdir,
237
                ports,
238
                isolation,
239
                #[cfg(feature = "geoip")]
240
                country_code,
241
            )
242
            .await
243
    }
244

            
245
    /// Return a circuit to a specific relay, suitable for using for direct
246
    /// (one-hop) directory downloads.
247
    ///
248
    /// This could be used, for example, to download a descriptor for a bridge.
249
    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
250
    #[cfg(feature = "specific-relay")]
251
    pub async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
252
        &self,
253
        target: T,
254
    ) -> Result<Arc<ClientCirc>> {
255
        self.0.get_or_launch_dir_specific(target).await
256
    }
257

            
258
    /// Launch the periodic daemon tasks required by the manager to function properly.
259
    ///
260
    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
261
    //
262
    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
263
8
    pub fn launch_background_tasks<D, S>(
264
8
        self: &Arc<Self>,
265
8
        runtime: &R,
266
8
        dir_provider: &Arc<D>,
267
8
        state_mgr: S,
268
8
    ) -> Result<Vec<TaskHandle>>
269
8
    where
270
8
        D: NetDirProvider + 'static + ?Sized,
271
8
        S: StateMgr + std::marker::Send + 'static,
272
8
    {
273
8
        CircMgrInner::launch_background_tasks(&self.0.clone(), runtime, dir_provider, state_mgr)
274
8
    }
275

            
276
    /// Return true if `netdir` has enough information to be used for this
277
    /// circuit manager.
278
    ///
279
    /// (This will check whether the netdir is missing any primary guard
280
    /// microdescriptors)
281
    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
282
        self.0.netdir_is_sufficient(netdir)
283
    }
284

            
285
    /// If `circ_id` is the unique identifier for a circuit that we're
286
    /// keeping track of, don't give it out for any future requests.
287
    pub fn retire_circ(&self, circ_id: &UniqId) {
288
        self.0.retire_circ(circ_id);
289
    }
290

            
291
    /// Record that a failure occurred on a circuit with a given guard, in a way
292
    /// that makes us unwilling to use that guard for future circuits.
293
    ///
294
    pub fn note_external_failure(
295
        &self,
296
        target: &impl ChanTarget,
297
        external_failure: ExternalActivity,
298
    ) {
299
        self.0.note_external_failure(target, external_failure);
300
    }
301

            
302
    /// Record that a success occurred on a circuit with a given guard, in a way
303
    /// that makes us possibly willing to use that guard for future circuits.
304
    pub fn note_external_success(
305
        &self,
306
        target: &impl ChanTarget,
307
        external_activity: ExternalActivity,
308
    ) {
309
        self.0.note_external_success(target, external_activity);
310
    }
311

            
312
    /// Return a stream of events about our estimated clock skew; these events
313
    /// are `None` when we don't have enough information to make an estimate,
314
    /// and `Some(`[`SkewEstimate`]`)` otherwise.
315
    ///
316
    /// Note that this stream can be lossy: if the estimate changes more than
317
    /// one before you read from the stream, you might only get the most recent
318
    /// update.
319
8
    pub fn skew_events(&self) -> ClockSkewEvents {
320
8
        self.0.skew_events()
321
8
    }
322

            
323
    /// Try to change our configuration settings to `new_config`.
324
    ///
325
    /// The actual behavior here will depend on the value of `how`.
326
    ///
327
    /// Returns whether any of the circuit pools should be cleared.
328
4
    pub fn reconfigure<CFG: CircMgrConfig>(
329
4
        &self,
330
4
        new_config: &CFG,
331
4
        how: tor_config::Reconfigure,
332
4
    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
333
4
        self.0.reconfigure(new_config, how)
334
4
    }
335

            
336
    /// Return an estimate-based delay for how long a given
337
    /// [`Action`](timeouts::Action) should be allowed to complete.
338
    ///
339
    /// Note that **you do not need to use this function** in order to get
340
    /// reasonable timeouts for the circuit-building operations provided by the
341
    /// `tor-circmgr` crate: those, unless specifically noted, always use these
342
    /// timeouts to cancel circuit operations that have taken too long.
343
    ///
344
    /// Instead, you should only use this function when you need to estimate how
345
    /// long some _other_ operation should take to complete.  For example, if
346
    /// you are sending a request over a 3-hop circuit and waiting for a reply,
347
    /// you might choose to wait for `estimate_timeout(Action::RoundTrip {
348
    /// length: 3 })`.
349
    ///
350
    /// Note also that this function returns a _timeout_ that the operation
351
    /// should be permitted to complete, not an estimated Duration that the
352
    /// operation _will_ take to complete. Timeouts are chosen to ensure that
353
    /// most operations will complete, but very slow ones will not.  So even if
354
    /// we expect that a circuit will complete in (say) 3 seconds, we might
355
    /// still allow a timeout of 4.5 seconds, to ensure that most circuits can
356
    /// complete.
357
    ///
358
    /// Estimate-based timeouts may change over time, given observations on the
359
    /// actual amount of time needed for circuits to complete building.  If not
360
    /// enough information has been gathered, a reasonable default will be used.
361
    pub fn estimate_timeout(&self, timeout_action: &timeouts::Action) -> std::time::Duration {
362
        self.0.estimate_timeout(timeout_action)
363
    }
364

            
365
    /// Return a reference to the associated CircuitBuilder that this CircMgr
366
    /// will use to create its circuits.
367
    #[cfg(feature = "experimental-api")]
368
    pub fn builder(&self) -> &CircuitBuilder<R> {
369
        CircMgrInner::builder(&self.0)
370
    }
371
}
372

            
373
/// Internal object used to implement CircMgr, which allows for mocking.
374
#[derive(Clone)]
375
pub(crate) struct CircMgrInner<B: AbstractCircBuilder<R> + 'static, R: Runtime> {
376
    /// The underlying circuit manager object that implements our behavior.
377
    mgr: Arc<mgr::AbstractCircMgr<B, R>>,
378
    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
379
    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
380
}
381

            
382
impl<R: Runtime> CircMgrInner<CircuitBuilder<R>, R> {
383
    /// Construct a new circuit manager.
384
    ///
385
    /// # Usage note
386
    ///
387
    /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
388
    #[allow(clippy::unnecessary_wraps)]
389
28
    pub(crate) fn new<SM, CFG: CircMgrConfig>(
390
28
        config: &CFG,
391
28
        storage: SM,
392
28
        runtime: &R,
393
28
        chanmgr: Arc<ChanMgr<R>>,
394
28
        guardmgr: &tor_guardmgr::GuardMgr<R>,
395
28
    ) -> Result<Self>
396
28
    where
397
28
        SM: tor_persist::StateMgr + Clone + Send + Sync + 'static,
398
28
    {
399
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
400
28
        let vanguardmgr = {
401
            // TODO(#1382): we need a way of checking if this arti instance
402
            // is running an onion service or not.
403
            //
404
            // Perhaps this information should be provided by CircMgrConfig.
405
28
            let has_onion_svc = false;
406
28
            VanguardMgr::new(
407
28
                config.vanguard_config(),
408
28
                runtime.clone(),
409
28
                storage.clone(),
410
28
                has_onion_svc,
411
28
            )?
412
        };
413

            
414
28
        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
415
28

            
416
28
        let builder = build::CircuitBuilder::new(
417
28
            runtime.clone(),
418
28
            chanmgr,
419
28
            config.path_rules().clone(),
420
28
            storage_handle,
421
28
            guardmgr.clone(),
422
28
            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
423
28
            vanguardmgr,
424
28
        );
425
28

            
426
28
        Ok(Self::new_generic(config, runtime, guardmgr, builder))
427
28
    }
428
}
429

            
430
impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> CircMgrInner<B, R> {
431
    /// Generic implementation for [`CircMgrInner::new`]
432
32
    pub(crate) fn new_generic<CFG: CircMgrConfig>(
433
32
        config: &CFG,
434
32
        runtime: &R,
435
32
        guardmgr: &tor_guardmgr::GuardMgr<R>,
436
32
        builder: B,
437
32
    ) -> Self {
438
32
        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
439
32
            config.preemptive_circuits().clone(),
440
32
        )));
441
32

            
442
32
        guardmgr.set_filter(config.path_rules().build_guard_filter());
443
32

            
444
32
        let mgr =
445
32
            mgr::AbstractCircMgr::new(builder, runtime.clone(), config.circuit_timing().clone());
446
32

            
447
32
        CircMgrInner {
448
32
            mgr: Arc::new(mgr),
449
32
            predictor: preemptive,
450
32
        }
451
32
    }
452

            
453
    /// Launch the periodic daemon tasks required by the manager to function properly.
454
    ///
455
    /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
456
    //
457
    // NOTE(eta): The ?Sized on D is so we can pass a trait object in.
458
12
    pub(crate) fn launch_background_tasks<D, S>(
459
12
        self: &Arc<Self>,
460
12
        runtime: &R,
461
12
        dir_provider: &Arc<D>,
462
12
        state_mgr: S,
463
12
    ) -> Result<Vec<TaskHandle>>
464
12
    where
465
12
        D: NetDirProvider + 'static + ?Sized,
466
12
        S: StateMgr + std::marker::Send + 'static,
467
12
    {
468
12
        let mut ret = vec![];
469
12

            
470
12
        runtime
471
12
            .spawn(Self::keep_circmgr_params_updated(
472
12
                dir_provider.events(),
473
12
                Arc::downgrade(self),
474
12
                Arc::downgrade(dir_provider),
475
12
            ))
476
12
            .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
477

            
478
12
        let (sched, handle) = TaskSchedule::new(runtime.clone());
479
12
        ret.push(handle);
480
12

            
481
12
        runtime
482
12
            .spawn(Self::update_persistent_state(
483
12
                sched,
484
12
                Arc::downgrade(self),
485
12
                state_mgr,
486
12
            ))
487
12
            .map_err(|e| Error::from_spawn("persistent state updater", e))?;
488

            
489
12
        let (sched, handle) = TaskSchedule::new(runtime.clone());
490
12
        ret.push(handle);
491
12

            
492
12
        runtime
493
12
            .spawn(Self::continually_launch_timeout_testing_circuits(
494
12
                sched,
495
12
                Arc::downgrade(self),
496
12
                Arc::downgrade(dir_provider),
497
12
            ))
498
12
            .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
499

            
500
12
        let (sched, handle) = TaskSchedule::new(runtime.clone());
501
12
        ret.push(handle);
502
12

            
503
12
        runtime
504
12
            .spawn(Self::continually_preemptively_build_circuits(
505
12
                sched,
506
12
                Arc::downgrade(self),
507
12
                Arc::downgrade(dir_provider),
508
12
            ))
509
12
            .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
510

            
511
12
        self.mgr
512
12
            .peek_builder()
513
12
            .guardmgr()
514
12
            .install_netdir_provider(&dir_provider.clone().upcast_arc())?;
515

            
516
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
517
        {
518
12
            let () = self
519
12
                .mgr
520
12
                .peek_builder()
521
12
                .vanguardmgr()
522
12
                .launch_background_tasks(&dir_provider.clone().upcast_arc())?;
523
        }
524

            
525
12
        Ok(ret)
526
12
    }
527

            
528
    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
529
    /// launching it if necessary.
530
    pub(crate) async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<B::Circ>> {
531
        self.expire_circuits();
532
        let usage = TargetCircUsage::Dir;
533
        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
534
    }
535

            
536
    /// Return a circuit suitable for exiting to all of the provided
537
    /// `ports`, launching it if necessary.
538
    ///
539
    /// If the list of ports is empty, then the chosen circuit will
540
    /// still end at _some_ exit.
541
    pub(crate) async fn get_or_launch_exit(
542
        &self,
543
        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
544
        ports: &[TargetPort],
545
        isolation: StreamIsolation,
546
        // TODO GEOIP: this cannot be stabilised like this, since Cargo features need to be
547
        //             additive. The function should be refactored to be builder-like.
548
        #[cfg(feature = "geoip")] country_code: Option<CountryCode>,
549
    ) -> Result<Arc<B::Circ>> {
550
        self.expire_circuits();
551
        let time = Instant::now();
552
        {
553
            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
554
            if ports.is_empty() {
555
                predictive.note_usage(None, time);
556
            } else {
557
                for port in ports.iter() {
558
                    predictive.note_usage(Some(*port), time);
559
                }
560
            }
561
        }
562
        let require_stability = ports.iter().any(|p| {
563
            self.mgr
564
                .peek_builder()
565
                .path_config()
566
                .long_lived_ports
567
                .contains(&p.port)
568
        });
569
        let ports = ports.iter().map(Clone::clone).collect();
570
        #[cfg(not(feature = "geoip"))]
571
        let country_code = None;
572
        let usage = TargetCircUsage::Exit {
573
            ports,
574
            isolation,
575
            country_code,
576
            require_stability,
577
        };
578
        self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c)
579
    }
580

            
581
    /// Return a circuit to a specific relay, suitable for using for direct
582
    /// (one-hop) directory downloads.
583
    ///
584
    /// This could be used, for example, to download a descriptor for a bridge.
585
    #[cfg_attr(docsrs, doc(cfg(feature = "specific-relay")))]
586
    #[cfg(feature = "specific-relay")]
587
    pub(crate) async fn get_or_launch_dir_specific<T: IntoOwnedChanTarget>(
588
        &self,
589
        target: T,
590
    ) -> Result<Arc<B::Circ>> {
591
        self.expire_circuits();
592
        let usage = TargetCircUsage::DirSpecificTarget(target.to_owned());
593
        self.mgr
594
            .get_or_launch(&usage, DirInfo::Nothing)
595
            .await
596
            .map(|(c, _)| c)
597
    }
598

            
599
    /// Try to change our configuration settings to `new_config`.
600
    ///
601
    /// The actual behavior here will depend on the value of `how`.
602
    ///
603
    /// Returns whether any of the circuit pools should be cleared.
604
4
    pub(crate) fn reconfigure<CFG: CircMgrConfig>(
605
4
        &self,
606
4
        new_config: &CFG,
607
4
        how: tor_config::Reconfigure,
608
4
    ) -> std::result::Result<RetireCircuits, tor_config::ReconfigureError> {
609
4
        let old_path_rules = self.mgr.peek_builder().path_config();
610
4
        let predictor = self.predictor.lock().expect("poisoned lock");
611
4
        let preemptive_circuits = predictor.config();
612
4
        if preemptive_circuits.initial_predicted_ports
613
4
            != new_config.preemptive_circuits().initial_predicted_ports
614
        {
615
            // This change has no effect, since the list of ports was _initial_.
616
            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
617
4
        }
618

            
619
4
        if how == tor_config::Reconfigure::CheckAllOrNothing {
620
2
            return Ok(RetireCircuits::None);
621
2
        }
622

            
623
2
        let retire_because_of_guardmgr =
624
2
            self.mgr.peek_builder().guardmgr().reconfigure(new_config)?;
625

            
626
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
627
2
        let retire_because_of_vanguardmgr = self
628
2
            .mgr
629
2
            .peek_builder()
630
2
            .vanguardmgr()
631
2
            .reconfigure(new_config.vanguard_config())?;
632

            
633
2
        let new_reachable = &new_config.path_rules().reachable_addrs;
634
2
        if new_reachable != &old_path_rules.reachable_addrs {
635
            let filter = new_config.path_rules().build_guard_filter();
636
            self.mgr.peek_builder().guardmgr().set_filter(filter);
637
2
        }
638

            
639
2
        let discard_all_circuits = !new_config
640
2
            .path_rules()
641
2
            .at_least_as_permissive_as(&old_path_rules)
642
2
            || retire_because_of_guardmgr != tor_guardmgr::RetireCircuits::None;
643

            
644
        #[cfg(all(feature = "vanguards", feature = "hs-common"))]
645
2
        let discard_all_circuits = discard_all_circuits
646
2
            || retire_because_of_vanguardmgr != tor_guardmgr::RetireCircuits::None;
647

            
648
2
        self.mgr
649
2
            .peek_builder()
650
2
            .set_path_config(new_config.path_rules().clone());
651
2
        self.mgr
652
2
            .set_circuit_timing(new_config.circuit_timing().clone());
653
2
        predictor.set_config(new_config.preemptive_circuits().clone());
654
2

            
655
2
        if discard_all_circuits {
656
            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
657
            // retire those circuits that do not conform to the new path rules,
658
            // or do not conform to the new guard configuration.
659
            info!("Path configuration has become more restrictive: retiring existing circuits.");
660
            self.retire_all_circuits();
661
            return Ok(RetireCircuits::All);
662
2
        }
663
2
        Ok(RetireCircuits::None)
664
4
    }
665

            
666
    /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
667
    /// `circmgr` with the consensus parameters from `dirmgr`.
668
    ///
669
    /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
670
    /// dangling.
671
    ///
672
    /// This is a daemon task: it runs indefinitely in the background.
673
12
    async fn keep_circmgr_params_updated<D>(
674
12
        mut events: impl futures::Stream<Item = DirEvent> + Unpin,
675
12
        circmgr: Weak<Self>,
676
12
        dirmgr: Weak<D>,
677
12
    ) where
678
12
        D: NetDirProvider + 'static + ?Sized,
679
12
    {
680
        use DirEvent::*;
681
12
        while let Some(event) = events.next().await {
682
            if matches!(event, NewConsensus) {
683
                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
684
                    if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
685
                        cm.update_network_parameters(netdir.params());
686
                    }
687
                } else {
688
                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
689
                    break;
690
                }
691
            }
692
        }
693
4
    }
694

            
695
    /// Reconfigure this circuit manager using the latest set of
696
    /// network parameters.
697
    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
698
        self.mgr.update_network_parameters(p);
699
        self.mgr.peek_builder().update_network_parameters(p);
700
    }
701

            
702
    /// Run indefinitely, launching circuits as needed to get a good
703
    /// estimate for our circuit build timeouts.
704
    ///
705
    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
706
    ///
707
    /// This is a daemon task: it runs indefinitely in the background.
708
12
    async fn continually_launch_timeout_testing_circuits<D>(
709
12
        mut sched: TaskSchedule<R>,
710
12
        circmgr: Weak<Self>,
711
12
        dirmgr: Weak<D>,
712
12
    ) where
713
12
        D: NetDirProvider + 'static + ?Sized,
714
12
    {
715
12
        while sched.next().await.is_some() {
716
8
            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
717
8
                if let Ok(netdir) = dm.netdir(Timeliness::Unchecked) {
718
                    if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
719
                        warn_report!(e, "Problem launching a timeout testing circuit");
720
                    }
721
                    let delay = netdir
722
                        .params()
723
                        .cbt_testing_delay
724
                        .try_into()
725
                        .expect("Out-of-bounds value from BoundedInt32");
726

            
727
                    drop((cm, dm));
728
                    sched.fire_in(delay);
729
                } else {
730
                    // wait for the provider to announce some event, which will probably be
731
                    // NewConsensus; this is therefore a decent yardstick for rechecking
732
8
                    let _ = dm.events().next().await;
733
                    sched.fire();
734
                }
735
            } else {
736
                return;
737
            }
738
        }
739
4
    }
740

            
741
    /// If we need to launch a testing circuit to judge our circuit
742
    /// build timeouts timeouts, do so.
743
    ///
744
    /// # Note
745
    ///
746
    /// This function is invoked periodically from
747
    /// `continually_launch_timeout_testing_circuits`.
748
    fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
749
        if !self.mgr.peek_builder().learning_timeouts() {
750
            return Ok(());
751
        }
752
        // We expire any too-old circuits here, so they don't get
753
        // counted towards max_circs.
754
        self.expire_circuits();
755
        let max_circs: u64 = netdir
756
            .params()
757
            .cbt_max_open_circuits_for_testing
758
            .try_into()
759
            .expect("Out-of-bounds result from BoundedInt32");
760
        if (self.mgr.n_circs() as u64) < max_circs {
761
            // Actually launch the circuit!
762
            let usage = TargetCircUsage::TimeoutTesting;
763
            let dirinfo = netdir.into();
764
            let mgr = Arc::clone(&self.mgr);
765
            debug!("Launching a circuit to test build times.");
766
            let receiver = mgr.launch_by_usage(&usage, dirinfo)?;
767
            // We don't actually care when this circuit is done,
768
            // so it's okay to drop the Receiver without awaiting it.
769
            drop(receiver);
770
        }
771

            
772
        Ok(())
773
    }
774

            
775
    /// Run forever, periodically telling `circmgr` to update its persistent
776
    /// state.
777
    ///
778
    /// Exit when we notice that `circmgr` has been dropped.
779
    ///
780
    /// This is a daemon task: it runs indefinitely in the background.
781
12
    async fn update_persistent_state<S>(
782
12
        mut sched: TaskSchedule<R>,
783
12
        circmgr: Weak<Self>,
784
12
        statemgr: S,
785
12
    ) where
786
12
        S: StateMgr + std::marker::Send,
787
12
    {
788
24
        while sched.next().await.is_some() {
789
12
            if let Some(circmgr) = Weak::upgrade(&circmgr) {
790
                use tor_persist::LockStatus::*;
791

            
792
12
                match statemgr.try_lock() {
793
                    Err(e) => {
794
                        error_report!(e, "Problem with state lock file");
795
                        break;
796
                    }
797
                    Ok(NewlyAcquired) => {
798
                        info!("We now own the lock on our state files.");
799
                        if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
800
                            error_report!(e, "Unable to upgrade to owned state files");
801
                            break;
802
                        }
803
                    }
804
                    Ok(AlreadyHeld) => {
805
12
                        if let Err(e) = circmgr.store_persistent_state() {
806
                            error_report!(e, "Unable to flush circmgr state");
807
                            break;
808
12
                        }
809
                    }
810
                    Ok(NoLock) => {
811
                        if let Err(e) = circmgr.reload_persistent_state() {
812
                            error_report!(e, "Unable to reload circmgr state");
813
                            break;
814
                        }
815
                    }
816
                }
817
            } else {
818
                debug!("Circmgr has disappeared; task exiting.");
819
                return;
820
            }
821
            // TODO(nickm): This delay is probably too small.
822
            //
823
            // Also, we probably don't even want a fixed delay here.  Instead,
824
            // we should be updating more frequently when the data is volatile
825
            // or has important info to save, and not at all when there are no
826
            // changes.
827
12
            sched.fire_in(Duration::from_secs(60));
828
        }
829

            
830
12
        debug!("State update task exiting (potentially due to handle drop).");
831
12
    }
832

            
833
    /// Switch from having an unowned persistent state to having an owned one.
834
    ///
835
    /// Requires that we hold the lock on the state files.
836
    pub(crate) fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
837
        self.mgr.peek_builder().upgrade_to_owned_state()?;
838
        Ok(())
839
    }
840

            
841
    /// Reload state from the state manager.
842
    ///
843
    /// We only call this method if we _don't_ have the lock on the state
844
    /// files.  If we have the lock, we only want to save.
845
    pub(crate) fn reload_persistent_state(&self) -> Result<()> {
846
        self.mgr.peek_builder().reload_state()?;
847
        Ok(())
848
    }
849

            
850
    /// Run indefinitely, launching circuits where the preemptive circuit
851
    /// predictor thinks it'd be a good idea to have them.
852
    ///
853
    /// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
854
    ///
855
    /// This is a daemon task: it runs indefinitely in the background.
856
    ///
857
    /// # Note
858
    ///
859
    /// This would be better handled entirely within `tor-circmgr`, like
860
    /// other daemon tasks.
861
12
    async fn continually_preemptively_build_circuits<D>(
862
12
        mut sched: TaskSchedule<R>,
863
12
        circmgr: Weak<Self>,
864
12
        dirmgr: Weak<D>,
865
12
    ) where
866
12
        D: NetDirProvider + 'static + ?Sized,
867
12
    {
868
12
        let base_delay = Duration::from_secs(10);
869
12
        let mut retry = RetryDelay::from_duration(base_delay);
870

            
871
12
        while sched.next().await.is_some() {
872
8
            if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
873
8
                if let Ok(netdir) = dm.netdir(Timeliness::Timely) {
874
                    let result = cm
875
                        .launch_circuits_preemptively(DirInfo::Directory(&netdir))
876
                        .await;
877

            
878
                    let delay = match result {
879
                        Ok(()) => {
880
                            retry.reset();
881
                            base_delay
882
                        }
883
                        Err(_) => retry.next_delay(&mut rand::rng()),
884
                    };
885

            
886
                    sched.fire_in(delay);
887
                } else {
888
                    // wait for the provider to announce some event, which will probably be
889
                    // NewConsensus; this is therefore a decent yardstick for rechecking
890
8
                    let _ = dm.events().next().await;
891
                    sched.fire();
892
                }
893
            } else {
894
                return;
895
            }
896
        }
897
4
    }
898

            
899
    /// Launch circuits preemptively, using the preemptive circuit predictor's
900
    /// predictions.
901
    ///
902
    /// # Note
903
    ///
904
    /// This function is invoked periodically from
905
    /// `continually_preemptively_build_circuits()`.
906
    async fn launch_circuits_preemptively(
907
        &self,
908
        netdir: DirInfo<'_>,
909
    ) -> std::result::Result<(), err::PreemptiveCircError> {
910
        trace!("Checking preemptive circuit predictions.");
911
        let (circs, threshold) = {
912
            let path_config = self.mgr.peek_builder().path_config();
913
            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
914
            let threshold = preemptive.config().disable_at_threshold;
915
            (preemptive.predict(&path_config), threshold)
916
        };
917

            
918
        if self.mgr.n_circs() >= threshold {
919
            return Ok(());
920
        }
921
        let mut n_created = 0_usize;
922
        let mut n_errors = 0_usize;
923

            
924
        let futures = circs
925
            .iter()
926
            .map(|usage| self.mgr.get_or_launch(usage, netdir));
927
        let results = futures::future::join_all(futures).await;
928
        for (i, result) in results.into_iter().enumerate() {
929
            match result {
930
                Ok((_, CircProvenance::NewlyCreated)) => {
931
                    debug!("Preeemptive circuit was created for {:?}", circs[i]);
932
                    n_created += 1;
933
                }
934
                Ok((_, CircProvenance::Preexisting)) => {
935
                    trace!("Circuit already existed created for {:?}", circs[i]);
936
                }
937
                Err(e) => {
938
                    warn_report!(e, "Failed to build preemptive circuit {:?}", sv(&circs[i]));
939
                    n_errors += 1;
940
                }
941
            }
942
        }
943

            
944
        if n_created > 0 || n_errors == 0 {
945
            // Either we successfully made a circuit, or we didn't have any
946
            // failures while looking for preexisting circuits.  Progress was
947
            // made, so there's no need to back off.
948
            Ok(())
949
        } else {
950
            // We didn't build any circuits and we hit at least one error:
951
            // We'll call this unsuccessful.
952
            Err(err::PreemptiveCircError)
953
        }
954
    }
955

            
956
    /// Create and return a new (typically anonymous) circuit for use as an
957
    /// onion service circuit of type `kind`.
958
    ///
959
    /// This circuit is guaranteed not to have been used for any traffic
960
    /// previously, and it will not be given out for any other requests in the
961
    /// future unless explicitly re-registered with a circuit manager.
962
    ///
963
    /// If `planned_target` is provided, then the circuit will be built so that
964
    /// it does not share any family members with the provided target.  (The
965
    /// circuit _will not be_ extended to that target itself!)
966
    ///
967
    /// Used to implement onion service clients and services.
968
    #[cfg(feature = "hs-common")]
969
4
    pub(crate) async fn launch_hs_unmanaged<T>(
970
4
        &self,
971
4
        planned_target: Option<T>,
972
4
        dir: &NetDir,
973
4
        kind: HsCircStemKind,
974
4
    ) -> Result<Arc<B::Circ>>
975
4
    where
976
4
        T: IntoOwnedChanTarget,
977
4
    {
978
4
        let usage = TargetCircUsage::HsCircBase {
979
4
            compatible_with_target: planned_target.map(IntoOwnedChanTarget::to_owned),
980
4
            kind,
981
4
        };
982
4
        let (_, client_circ) = self.mgr.launch_unmanaged(&usage, dir.into()).await?;
983
4
        Ok(client_circ)
984
4
    }
985

            
986
    /// Return true if `netdir` has enough information to be used for this
987
    /// circuit manager.
988
    ///
989
    /// (This will check whether the netdir is missing any primary guard
990
    /// microdescriptors)
991
    pub(crate) fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
992
        self.mgr
993
            .peek_builder()
994
            .guardmgr()
995
            .netdir_is_sufficient(netdir)
996
    }
997

            
998
    /// Internal implementation for [`CircMgr::estimate_timeout`].
999
    pub(crate) fn estimate_timeout(
        &self,
        timeout_action: &timeouts::Action,
    ) -> std::time::Duration {
        let (timeout, _abandon) = self.mgr.peek_builder().estimator().timeouts(timeout_action);
        timeout
    }
    /// Internal implementation for [`CircMgr::builder`].
    pub(crate) fn builder(&self) -> &B {
        self.mgr.peek_builder()
    }
    /// Flush state to the state manager, if there is any unsaved state and
    /// we have the lock.
    ///
    /// Return true if we saved something; false if we didn't have the lock.
28
    pub(crate) fn store_persistent_state(&self) -> Result<bool> {
28
        self.mgr.peek_builder().save_state()
28
    }
    /// Expire every circuit that has been dirty for too long.
    ///
    /// Expired circuits are not closed while they still have users,
    /// but they are no longer given out for new requests.
    fn expire_circuits(&self) {
        // TODO: I would prefer not to call this at every request, but
        // it should be fine for now.  (At some point we may no longer
        // need this, or might not need to call it so often, now that
        // our circuit expiration runs on scheduled timers via
        // spawn_expiration_task.)
        let now = self.mgr.peek_runtime().now();
        self.mgr.expire_circs(now);
    }
    /// Mark every circuit that we have launched so far as unsuitable for
    /// any future requests.  This won't close existing circuits that have
    /// streams attached to them, but it will prevent any future streams from
    /// being attached.
    ///
    /// TODO: we may want to expose this eventually.  If we do, we should
    /// be very clear that you don't want to use it haphazardly.
    pub(crate) fn retire_all_circuits(&self) {
        self.mgr.retire_all_circuits();
    }
    /// If `circ_id` is the unique identifier for a circuit that we're
    /// keeping track of, don't give it out for any future requests.
    pub(crate) fn retire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id) {
        let _ = self.mgr.take_circ(circ_id);
    }
    /// Return a stream of events about our estimated clock skew; these events
    /// are `None` when we don't have enough information to make an estimate,
    /// and `Some(`[`SkewEstimate`]`)` otherwise.
    ///
    /// Note that this stream can be lossy: if the estimate changes more than
    /// one before you read from the stream, you might only get the most recent
    /// update.
8
    pub(crate) fn skew_events(&self) -> ClockSkewEvents {
8
        self.mgr.peek_builder().guardmgr().skew_events()
8
    }
    /// Record that a failure occurred on a circuit with a given guard, in a way
    /// that makes us unwilling to use that guard for future circuits.
    ///
    pub(crate) fn note_external_failure(
        &self,
        target: &impl ChanTarget,
        external_failure: ExternalActivity,
    ) {
        self.mgr
            .peek_builder()
            .guardmgr()
            .note_external_failure(target, external_failure);
    }
    /// Record that a success occurred on a circuit with a given guard, in a way
    /// that makes us possibly willing to use that guard for future circuits.
    pub(crate) fn note_external_success(
        &self,
        target: &impl ChanTarget,
        external_activity: ExternalActivity,
    ) {
        self.mgr
            .peek_builder()
            .guardmgr()
            .note_external_success(target, external_activity);
    }
}
impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> Drop for CircMgrInner<B, R> {
16
    fn drop(&mut self) {
16
        match self.store_persistent_state() {
4
            Ok(true) => info!("Flushed persistent state at exit."),
12
            Ok(false) => debug!("Lock not held; no state to flush."),
            Err(e) => error_report!(e, "Unable to flush state on circuit manager drop"),
        }
16
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use mocks::FakeBuilder;
    use tor_guardmgr::GuardMgr;
    use tor_linkspec::OwnedChanTarget;
    use tor_netdir::testprovider::TestNetDirProvider;
    use tor_persist::TestingStateMgr;
    use super::*;
    #[test]
    fn get_params() {
        use tor_netdir::{MdReceiver, PartialNetDir};
        use tor_netdoc::doc::netstatus::NetParams;
        // If it's just fallbackdir, we get the default parameters.
        let fb = FallbackList::from([]);
        let di: DirInfo<'_> = (&fb).into();
        let p1 = di.circ_params(&TargetCircUsage::Dir).unwrap();
        assert!(!p1.extend_by_ed25519_id);
        // Now try with a directory and configured parameters.
        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
        let mut params = NetParams::default();
        params.set("circwindow".into(), 100);
        params.set("ExtendByEd25519ID".into(), 1);
        let mut dir = PartialNetDir::new(consensus, Some(&params));
        for m in microdescs {
            dir.add_microdesc(m);
        }
        let netdir = dir.unwrap_if_sufficient().unwrap();
        let di: DirInfo<'_> = (&netdir).into();
        let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
        assert!(p2.extend_by_ed25519_id);
        // Now try with a bogus circwindow value.
        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
        let mut params = NetParams::default();
        params.set("circwindow".into(), 100_000);
        params.set("ExtendByEd25519ID".into(), 1);
        let mut dir = PartialNetDir::new(consensus, Some(&params));
        for m in microdescs {
            dir.add_microdesc(m);
        }
        let netdir = dir.unwrap_if_sufficient().unwrap();
        let di: DirInfo<'_> = (&netdir).into();
        let p2 = di.circ_params(&TargetCircUsage::Dir).unwrap();
        assert!(p2.extend_by_ed25519_id);
    }
    fn make_circmgr<R: Runtime>(runtime: R) -> Arc<CircMgrInner<FakeBuilder<R>, R>> {
        let config = crate::config::test_config::TestConfig::default();
        let statemgr = TestingStateMgr::new();
        let guardmgr =
            GuardMgr::new(runtime.clone(), statemgr.clone(), &config).expect("Create GuardMgr");
        let builder = FakeBuilder::new(
            &runtime,
            statemgr.clone(),
            &tor_guardmgr::TestConfig::default(),
        );
        let circmgr = Arc::new(CircMgrInner::new_generic(
            &config, &runtime, &guardmgr, builder,
        ));
        let netdir = Arc::new(TestNetDirProvider::new());
        CircMgrInner::launch_background_tasks(&circmgr, &runtime, &netdir, statemgr)
            .expect("launch CircMgrInner background tasks");
        circmgr
    }
    #[test]
    #[cfg(feature = "hs-common")]
    fn test_launch_hs_unmanaged() {
        tor_rtmock::MockRuntime::test_with_various(|runtime| async move {
            let circmgr = make_circmgr(runtime.clone());
            let netdir = tor_netdir::testnet::construct_netdir()
                .unwrap_if_sufficient()
                .unwrap();
            let (ret_tx, ret_rx) = tor_async_utils::oneshot::channel();
            runtime.spawn_identified("launch_hs_unamanged", async move {
                ret_tx
                    .send(
                        circmgr
                            .launch_hs_unmanaged::<OwnedChanTarget>(
                                None,
                                &netdir,
                                HsCircStemKind::Naive,
                            )
                            .await,
                    )
                    .unwrap();
            });
            runtime.advance_by(Duration::from_millis(60)).await;
            ret_rx.await.unwrap().unwrap();
        });
    }
}