tor_circmgr/
mgr.rs

1//! Abstract code to manage a set of circuits.
2//!
3//! This module implements the real logic for deciding when and how to
4//! launch circuits, and for which circuits to hand out in response to
5//! which requests.
6//!
7//! For testing and abstraction purposes, this module _does not_
8//! actually know anything about circuits _per se_.  Instead,
9//! everything is handled using a set of traits that are internal to this
10//! crate:
11//!
12//!  * [`AbstractCirc`] is a view of a circuit.
13//!  * [`AbstractCircBuilder`] knows how to build an `AbstractCirc`.
14//!
15//! Using these traits, the [`AbstractCircMgr`] object manages a set of
16//! circuits, launching them as necessary, and keeping track of the
17//! restrictions on their use.
18
19// TODO:
20// - Testing
21//    - Error from prepare_action()
22//    - Error reported by restrict_mut?
23
24use crate::config::CircuitTiming;
25use crate::usage::{SupportedCircUsage, TargetCircUsage};
26use crate::{timeouts, DirInfo, Error, PathConfig, Result};
27
28use retry_error::RetryError;
29use tor_async_utils::mpsc_channel_no_memquota;
30use tor_basic_utils::retry::RetryDelay;
31use tor_config::MutCfg;
32use tor_error::{debug_report, info_report, internal, warn_report, AbsRetryTime, HasRetryTime};
33#[cfg(feature = "vanguards")]
34use tor_guardmgr::vanguards::VanguardMgr;
35use tor_linkspec::CircTarget;
36use tor_proto::circuit::{CircParameters, Path, UniqId};
37use tor_rtcompat::{Runtime, SleepProviderExt};
38
39use async_trait::async_trait;
40use futures::channel::mpsc;
41use futures::future::{FutureExt, Shared};
42use futures::stream::{FuturesUnordered, StreamExt};
43use futures::task::SpawnExt;
44use oneshot_fused_workaround as oneshot;
45use std::collections::HashMap;
46use std::fmt::Debug;
47use std::hash::Hash;
48use std::panic::AssertUnwindSafe;
49use std::sync::{self, Arc, Weak};
50use std::time::{Duration, Instant};
51use tracing::{debug, warn};
52use weak_table::PtrWeakHashSet;
53
54mod streams;
55
56/// Description of how we got a circuit.
57#[non_exhaustive]
58#[derive(Debug, Copy, Clone, Eq, PartialEq)]
59pub(crate) enum CircProvenance {
60    /// This channel was newly launched, or was in progress and finished while
61    /// we were waiting.
62    NewlyCreated,
63    /// This channel already existed when we asked for it.
64    Preexisting,
65}
66
67#[derive(Clone, Debug, thiserror::Error)]
68#[non_exhaustive]
69pub enum RestrictionFailed {
70    /// Tried to restrict a specification, but the circuit didn't support the
71    /// requested usage.
72    #[error("Specification did not support desired usage")]
73    NotSupported,
74}
75
76/// Minimal abstract view of a circuit.
77///
78/// From this module's point of view, circuits are simply objects
79/// with unique identities, and a possible closed-state.
80#[async_trait]
81pub(crate) trait AbstractCirc: Debug {
82    /// Type for a unique identifier for circuits.
83    type Id: Clone + Debug + Hash + Eq + Send + Sync;
84    /// Return the unique identifier for this circuit.
85    ///
86    /// # Requirements
87    ///
88    /// The values returned by this function are unique for distinct
89    /// circuits.
90    fn id(&self) -> Self::Id;
91
92    /// Return true if this circuit is usable for some purpose.
93    ///
94    /// Reasons a circuit might be unusable include being closed.
95    fn usable(&self) -> bool;
96
97    /// Return a [`Path`] object describing all the hops in this circuit.
98    ///
99    /// Returns an error if the circuit is closed.
100    ///
101    /// Note that this `Path` is not automatically updated if the circuit is
102    /// extended.
103    fn path_ref(&self) -> tor_proto::Result<Arc<Path>>;
104
105    /// Return the number of hops in this circuit.
106    ///
107    /// Returns an error if the circuit is closed.
108    ///
109    /// NOTE: This function will currently return only the number of hops
110    /// _currently_ in the circuit. If there is an extend operation in progress,
111    /// the currently pending hop may or may not be counted, depending on whether
112    /// the extend operation finishes before this call is done.
113    fn n_hops(&self) -> tor_proto::Result<usize>;
114
115    /// Return true if this circuit is closed and therefore unusable.
116    fn is_closing(&self) -> bool;
117
118    /// Return a process-unique identifier for this circuit.
119    fn unique_id(&self) -> UniqId;
120
121    /// Extend the circuit via the most appropriate handshake to a new `target` hop.
122    async fn extend<T: CircTarget + Sync>(
123        &self,
124        target: &T,
125        params: CircParameters,
126    ) -> tor_proto::Result<()>;
127}
128
129/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
130///
131/// You should implement this trait using all default methods for all code that isn't test code.
132pub(crate) trait MockablePlan {
133    /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
134    /// so that it knows what to pass to `::release_advance()`.
135    fn add_blocked_advance_reason(&mut self, _reason: String) {}
136}
137
138/// An object that knows how to build circuits.
139///
140/// AbstractCircBuilder creates circuits in two phases.  First, a plan is
141/// made for how to build the circuit.  This planning phase should be
142/// relatively fast, and must not suspend or block.  Its purpose is to
143/// get an early estimate of which operations the circuit will be able
144/// to support when it's done.
145///
146/// Second, the circuit is actually built, using the plan as input.
147#[async_trait]
148pub(crate) trait AbstractCircBuilder<R: Runtime>: Send + Sync {
149    /// The circuit type that this builder knows how to build.
150    type Circ: AbstractCirc + Send + Sync;
151    /// An opaque type describing how a given circuit will be built.
152    /// It may represent some or all of a path-or it may not.
153    //
154    // TODO: It would be nice to have this parameterized on a lifetime,
155    // and have that lifetime depend on the lifetime of the directory.
156    // But I don't think that rust can do that.
157    //
158    // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
159    type Plan: Send + Debug + MockablePlan;
160
161    // TODO: I'd like to have a Dir type here to represent
162    // create::DirInfo, but that would need to be parameterized too,
163    // and would make everything complicated.
164
165    /// Form a plan for how to build a new circuit that supports `usage`.
166    ///
167    /// Return an opaque Plan object, and a new spec describing what
168    /// the circuit will actually support when it's built.  (For
169    /// example, if the input spec requests a circuit that connect to
170    /// port 80, then "planning" the circuit might involve picking an
171    /// exit that supports port 80, and the resulting spec might be
172    /// the exit's complete list of supported ports.)
173    ///
174    /// # Requirements
175    ///
176    /// The resulting Spec must support `usage`.
177    fn plan_circuit(
178        &self,
179        usage: &TargetCircUsage,
180        dir: DirInfo<'_>,
181    ) -> Result<(Self::Plan, SupportedCircUsage)>;
182
183    /// Construct a circuit according to a given plan.
184    ///
185    /// On success, return a spec describing what the circuit can be used for,
186    /// and the circuit that was just constructed.
187    ///
188    /// This function should implement some kind of a timeout for
189    /// circuits that are taking too long.
190    ///
191    /// # Requirements
192    ///
193    /// The spec that this function returns _must_ support the usage
194    /// that was originally passed to `plan_circuit`.  It _must_ also
195    /// contain the spec that was originally returned by
196    /// `plan_circuit`.
197    async fn build_circuit(
198        &self,
199        plan: Self::Plan,
200    ) -> Result<(SupportedCircUsage, Arc<Self::Circ>)>;
201
202    /// Return a "parallelism factor" with which circuits should be
203    /// constructed for a given purpose.
204    ///
205    /// If this function returns N, then whenever we launch circuits
206    /// for this purpose, then we launch N in parallel.
207    ///
208    /// The default implementation returns 1.  The value of 0 is
209    /// treated as if it were 1.
210    fn launch_parallelism(&self, usage: &TargetCircUsage) -> usize {
211        let _ = usage; // default implementation ignores this.
212        1
213    }
214
215    /// Return a "parallelism factor" for which circuits should be
216    /// used for a given purpose.
217    ///
218    /// If this function returns N, then whenever we select among
219    /// open circuits for this purpose, we choose at random from the
220    /// best N.
221    ///
222    /// The default implementation returns 1.  The value of 0 is
223    /// treated as if it were 1.
224    // TODO: Possibly this doesn't belong in this trait.
225    fn select_parallelism(&self, usage: &TargetCircUsage) -> usize {
226        let _ = usage; // default implementation ignores this.
227        1
228    }
229
230    /// Return true if we are currently attempting to learn circuit
231    /// timeouts by building testing circuits.
232    fn learning_timeouts(&self) -> bool;
233
234    /// Flush state to the state manager if we own the lock.
235    ///
236    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
237    fn save_state(&self) -> Result<bool>;
238
239    /// Return this builder's [`PathConfig`](crate::PathConfig).
240    fn path_config(&self) -> Arc<PathConfig>;
241
242    /// Replace this builder's [`PathConfig`](crate::PathConfig).
243    // TODO: This is dead_code because we only call this for the CircuitBuilder specialization of
244    // CircMgr, not from the generic version, because this trait doesn't provide guardmgr, which is
245    // needed by the [`CircMgr::reconfigure`] function that would be the only caller of this. We
246    // should add `guardmgr` to this trait, make [`CircMgr::reconfigure`] generic, and remove this
247    // dead_code marking.
248    #[allow(dead_code)]
249    fn set_path_config(&self, new_config: PathConfig);
250
251    /// Return a reference to this builder's timeout estimator.
252    fn estimator(&self) -> &timeouts::Estimator;
253
254    /// Return a reference to this builder's `VanguardMgr`.
255    #[cfg(feature = "vanguards")]
256    fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>>;
257
258    /// Replace our state with a new owning state, assuming we have
259    /// storage permission.
260    fn upgrade_to_owned_state(&self) -> Result<()>;
261
262    /// Reload persistent state from disk, if we don't have storage permission.
263    fn reload_state(&self) -> Result<()>;
264
265    /// Return a reference to this builder's `GuardMgr`.
266    fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R>;
267
268    /// Reconfigure this builder using the latest set of network parameters.
269    ///
270    /// (NOTE: for now, this only affects circuit timeout estimation.)
271    fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters);
272}
273
274/// Enumeration to track the expiration state of a circuit.
275///
276/// A circuit an either be unused (at which point it should expire if it is
277/// _still unused_ by a certain time, or dirty (at which point it should
278/// expire after a certain duration).
279///
280/// All circuits start out "unused" and become "dirty" when their spec
281/// is first restricted -- that is, when they are first handed out to be
282/// used for a request.
283#[derive(Debug, Clone, PartialEq, Eq)]
284enum ExpirationInfo {
285    /// The circuit has never been used.
286    Unused {
287        /// A time when the circuit should expire.
288        use_before: Instant,
289    },
290    /// The circuit has been used (or at least, restricted for use with a
291    /// request) at least once.
292    Dirty {
293        /// The time at which this circuit's spec was first restricted.
294        dirty_since: Instant,
295    },
296}
297
298impl ExpirationInfo {
299    /// Return an ExpirationInfo for a newly created circuit.
300    fn new(use_before: Instant) -> Self {
301        ExpirationInfo::Unused { use_before }
302    }
303
304    /// Mark this ExpirationInfo as dirty, if it is not already dirty.
305    fn mark_dirty(&mut self, now: Instant) {
306        if matches!(self, ExpirationInfo::Unused { .. }) {
307            *self = ExpirationInfo::Dirty { dirty_since: now };
308        }
309    }
310}
311
312/// An entry for an open circuit held by an `AbstractCircMgr`.
313#[derive(Debug, Clone)]
314pub(crate) struct OpenEntry<C> {
315    /// The supported usage for this circuit.
316    spec: SupportedCircUsage,
317    /// The circuit under management.
318    circ: Arc<C>,
319    /// When does this circuit expire?
320    ///
321    /// (Note that expired circuits are removed from the manager,
322    /// which does not actually close them until there are no more
323    /// references to them.)
324    expiration: ExpirationInfo,
325}
326
327impl<C: AbstractCirc> OpenEntry<C> {
328    /// Make a new OpenEntry for a given circuit and spec.
329    fn new(spec: SupportedCircUsage, circ: Arc<C>, expiration: ExpirationInfo) -> Self {
330        OpenEntry {
331            spec,
332            circ,
333            expiration,
334        }
335    }
336
337    /// Return true if this circuit can be used for `usage`.
338    pub(crate) fn supports(&self, usage: &TargetCircUsage) -> bool {
339        self.circ.usable() && self.spec.supports(usage)
340    }
341
342    /// Change this circuit's permissible usage, based on its having
343    /// been used for `usage` at time `now`.
344    ///
345    /// Return an error if this circuit may not be used for `usage`.
346    fn restrict_mut(&mut self, usage: &TargetCircUsage, now: Instant) -> Result<()> {
347        self.spec.restrict_mut(usage)?;
348        self.expiration.mark_dirty(now);
349        Ok(())
350    }
351
352    /// Find the "best" entry from a slice of OpenEntry for supporting
353    /// a given `usage`.
354    ///
355    /// If `parallelism` is some N greater than 1, we pick randomly
356    /// from the best `N` circuits.
357    ///
358    /// # Requirements
359    ///
360    /// Requires that `ents` is nonempty, and that every element of `ents`
361    /// supports `spec`.
362    fn find_best<'a>(
363        // we do not mutate `ents`, but to return `&mut Self` we must have a mutable borrow
364        ents: &'a mut [&'a mut Self],
365        usage: &TargetCircUsage,
366        parallelism: usize,
367    ) -> &'a mut Self {
368        let _ = usage; // not yet used.
369        use rand::seq::IndexedMutRandom as _;
370        let parallelism = parallelism.clamp(1, ents.len());
371        // TODO: Actually look over the whole list to see which is better.
372        let slice = &mut ents[0..parallelism];
373        let mut rng = rand::rng();
374        slice.choose_mut(&mut rng).expect("Input list was empty")
375    }
376
377    /// Return true if this circuit has been marked as dirty before
378    /// `dirty_cutoff`, or if it is an unused circuit set to expire before
379    /// `unused_cutoff`.
380    fn should_expire(&self, unused_cutoff: Instant, dirty_cutoff: Instant) -> bool {
381        match self.expiration {
382            ExpirationInfo::Unused { use_before } => use_before <= unused_cutoff,
383            ExpirationInfo::Dirty { dirty_since } => dirty_since <= dirty_cutoff,
384        }
385    }
386}
387
388/// A result type whose "Ok" value is the Id for a circuit from B.
389type PendResult<B, R> = Result<<<B as AbstractCircBuilder<R>>::Circ as AbstractCirc>::Id>;
390
391/// An in-progress circuit request tracked by an `AbstractCircMgr`.
392///
393/// (In addition to tracking circuits, `AbstractCircMgr` tracks
394/// _requests_ for circuits.  The manager uses these entries if it
395/// finds that some circuit created _after_ a request first launched
396/// might meet the request's requirements.)
397struct PendingRequest<B: AbstractCircBuilder<R>, R: Runtime> {
398    /// Usage for the operation requested by this request
399    usage: TargetCircUsage,
400    /// A channel to use for telling this request about circuits that it
401    /// might like.
402    notify: mpsc::Sender<PendResult<B, R>>,
403}
404
405impl<B: AbstractCircBuilder<R>, R: Runtime> PendingRequest<B, R> {
406    /// Return true if this request would be supported by `spec`.
407    fn supported_by(&self, spec: &SupportedCircUsage) -> bool {
408        spec.supports(&self.usage)
409    }
410}
411
412/// An entry for an under-construction in-progress circuit tracked by
413/// an `AbstractCircMgr`.
414#[derive(Debug)]
415struct PendingEntry<B: AbstractCircBuilder<R>, R: Runtime> {
416    /// Specification that this circuit will support, if every pending
417    /// request that is waiting for it is attached to it.
418    ///
419    /// This spec becomes more and more restricted as more pending
420    /// requests are waiting for this circuit.
421    ///
422    /// This spec is contained by circ_spec, and must support the usage
423    /// of every pending request that's waiting for this circuit.
424    tentative_assignment: sync::Mutex<SupportedCircUsage>,
425    /// A shared future for requests to use when waiting for
426    /// notification of this circuit's success.
427    receiver: Shared<oneshot::Receiver<PendResult<B, R>>>,
428}
429
430impl<B: AbstractCircBuilder<R>, R: Runtime> PendingEntry<B, R> {
431    /// Make a new PendingEntry that starts out supporting a given
432    /// spec.  Return that PendingEntry, along with a Sender to use to
433    /// report the result of building this circuit.
434    fn new(circ_spec: &SupportedCircUsage) -> (Self, oneshot::Sender<PendResult<B, R>>) {
435        let tentative_assignment = sync::Mutex::new(circ_spec.clone());
436        let (sender, receiver) = oneshot::channel();
437        let receiver = receiver.shared();
438        let entry = PendingEntry {
439            tentative_assignment,
440            receiver,
441        };
442        (entry, sender)
443    }
444
445    /// Return true if this circuit's current tentative assignment
446    /// supports `usage`.
447    fn supports(&self, usage: &TargetCircUsage) -> bool {
448        let assignment = self.tentative_assignment.lock().expect("poisoned lock");
449        assignment.supports(usage)
450    }
451
452    /// Try to change the tentative assignment of this circuit by
453    /// restricting it for use with `usage`.
454    ///
455    /// Return an error if the current tentative assignment didn't
456    /// support `usage` in the first place.
457    fn tentative_restrict_mut(&self, usage: &TargetCircUsage) -> Result<()> {
458        if let Ok(mut assignment) = self.tentative_assignment.lock() {
459            assignment.restrict_mut(usage)?;
460        }
461        Ok(())
462    }
463
464    /// Find the best PendingEntry values from a slice for use with
465    /// `usage`.
466    ///
467    /// # Requirements
468    ///
469    /// The `ents` slice must not be empty.  Every element of `ents`
470    /// must support the given spec.
471    fn find_best(ents: &[Arc<Self>], usage: &TargetCircUsage) -> Vec<Arc<Self>> {
472        // TODO: Actually look over the whole list to see which is better.
473        let _ = usage; // currently unused
474        vec![Arc::clone(&ents[0])]
475    }
476}
477
478/// Wrapper type to represent the state between planning to build a
479/// circuit and constructing it.
480#[derive(Debug)]
481struct CircBuildPlan<B: AbstractCircBuilder<R>, R: Runtime> {
482    /// The Plan object returned by [`AbstractCircBuilder::plan_circuit`].
483    plan: B::Plan,
484    /// A sender to notify any pending requests when this circuit is done.
485    sender: oneshot::Sender<PendResult<B, R>>,
486    /// A strong entry to the PendingEntry for this circuit build attempt.
487    pending: Arc<PendingEntry<B, R>>,
488}
489
490/// The inner state of an [`AbstractCircMgr`].
491struct CircList<B: AbstractCircBuilder<R>, R: Runtime> {
492    /// A map from circuit ID to [`OpenEntry`] values for all managed
493    /// open circuits.
494    ///
495    /// A circuit is added here from [`AbstractCircMgr::do_launch`] when we find
496    /// that it completes successfully, and has not been cancelled.
497    /// When we decide that such a circuit should no longer be handed out for
498    /// any new requests, we "retire" the circuit by removing it from this map.
499    #[allow(clippy::type_complexity)]
500    open_circs: HashMap<<B::Circ as AbstractCirc>::Id, OpenEntry<B::Circ>>,
501    /// Weak-set of PendingEntry for circuits that are being built.
502    ///
503    /// Because this set only holds weak references, and the only strong
504    /// reference to the PendingEntry is held by the task building the circuit,
505    /// this set's members are lazily removed after the circuit is either built
506    /// or fails to build.
507    ///
508    /// This set is used for two purposes:
509    ///
510    /// 1. When a circuit request finds that there is no open circuit for its
511    ///    purposes, it checks here to see if there is a pending circuit that it
512    ///    could wait for.
513    /// 2. When a pending circuit finishes building, it checks here to make sure
514    ///    that it has not been cancelled. (Removing an entry from this set marks
515    ///    it as cancelled.)
516    ///
517    /// An entry is added here in [`AbstractCircMgr::prepare_action`] when we
518    /// decide that a circuit needs to be launched.
519    ///
520    /// Later, in [`AbstractCircMgr::do_launch`], once the circuit has finished
521    /// (or failed), we remove the entry (by pointer identity).
522    /// If we cannot find the entry, we conclude that the request has been
523    /// _cancelled_, and so we discard any circuit that was created.
524    pending_circs: PtrWeakHashSet<Weak<PendingEntry<B, R>>>,
525    /// Weak-set of PendingRequest for requests that are waiting for a
526    /// circuit to be built.
527    ///
528    /// Because this set only holds weak references, and the only
529    /// strong reference to the PendingRequest is held by the task
530    /// waiting for the circuit to be built, this set's members are
531    /// lazily removed after the request succeeds or fails.
532    pending_requests: PtrWeakHashSet<Weak<PendingRequest<B, R>>>,
533}
534
535impl<B: AbstractCircBuilder<R>, R: Runtime> CircList<B, R> {
536    /// Make a new empty `CircList`
537    fn new() -> Self {
538        CircList {
539            open_circs: HashMap::new(),
540            pending_circs: PtrWeakHashSet::new(),
541            pending_requests: PtrWeakHashSet::new(),
542        }
543    }
544
545    /// Add `e` to the list of open circuits.
546    fn add_open(&mut self, e: OpenEntry<B::Circ>) {
547        let id = e.circ.id();
548        self.open_circs.insert(id, e);
549    }
550
551    /// Find all the usable open circuits that support `usage`.
552    ///
553    /// Return None if there are no such circuits.
554    fn find_open(&mut self, usage: &TargetCircUsage) -> Option<Vec<&mut OpenEntry<B::Circ>>> {
555        let list = self.open_circs.values_mut();
556        let v = SupportedCircUsage::find_supported(list, usage);
557        if v.is_empty() {
558            None
559        } else {
560            Some(v)
561        }
562    }
563
564    /// Find an open circuit by ID.
565    ///
566    /// Return None if no such circuit exists in this list.
567    fn get_open_mut(
568        &mut self,
569        id: &<B::Circ as AbstractCirc>::Id,
570    ) -> Option<&mut OpenEntry<B::Circ>> {
571        self.open_circs.get_mut(id)
572    }
573
574    /// Extract an open circuit by ID, removing it from this list.
575    ///
576    /// Return None if no such circuit exists in this list.
577    fn take_open(&mut self, id: &<B::Circ as AbstractCirc>::Id) -> Option<OpenEntry<B::Circ>> {
578        self.open_circs.remove(id)
579    }
580
581    /// Remove circuits based on expiration times.
582    ///
583    /// We remove every unused circuit that is set to expire by
584    /// `unused_cutoff`, and every dirty circuit that has been dirty
585    /// since before `dirty_cutoff`.
586    fn expire_circs(&mut self, unused_cutoff: Instant, dirty_cutoff: Instant) {
587        self.open_circs
588            .retain(|_k, v| !v.should_expire(unused_cutoff, dirty_cutoff));
589    }
590
591    /// Remove the circuit with given `id`, if it is scheduled to
592    /// expire now, according to the provided expiration times.
593    fn expire_circ(
594        &mut self,
595        id: &<B::Circ as AbstractCirc>::Id,
596        unused_cutoff: Instant,
597        dirty_cutoff: Instant,
598    ) {
599        let should_expire = self
600            .open_circs
601            .get(id)
602            .map(|v| v.should_expire(unused_cutoff, dirty_cutoff))
603            .unwrap_or_else(|| false);
604        if should_expire {
605            self.open_circs.remove(id);
606        }
607    }
608
609    /// Add `pending` to the set of in-progress circuits.
610    fn add_pending_circ(&mut self, pending: Arc<PendingEntry<B, R>>) {
611        self.pending_circs.insert(pending);
612    }
613
614    /// Find all pending circuits that support `usage`.
615    ///
616    /// If no such circuits are currently being built, return None.
617    fn find_pending_circs(&self, usage: &TargetCircUsage) -> Option<Vec<Arc<PendingEntry<B, R>>>> {
618        let result: Vec<_> = self
619            .pending_circs
620            .iter()
621            .filter(|p| p.supports(usage))
622            .filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
623            .collect();
624
625        if result.is_empty() {
626            None
627        } else {
628            Some(result)
629        }
630    }
631
632    /// Return true if `circ` is still pending.
633    ///
634    /// A circuit will become non-pending when finishes (successfully or not), or when it's
635    /// removed from this list via `clear_all_circuits()`.
636    fn circ_is_pending(&self, circ: &Arc<PendingEntry<B, R>>) -> bool {
637        self.pending_circs.contains(circ)
638    }
639
640    /// Construct and add a new entry to the set of request waiting
641    /// for a circuit.
642    ///
643    /// Return the request, and a new receiver stream that it should
644    /// use for notification of possible circuits to use.
645    fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B, R>>) {
646        self.pending_requests.insert(Arc::clone(pending));
647    }
648
649    /// Return all pending requests that would be satisfied by a circuit
650    /// that supports `circ_spec`.
651    fn find_pending_requests(
652        &self,
653        circ_spec: &SupportedCircUsage,
654    ) -> Vec<Arc<PendingRequest<B, R>>> {
655        self.pending_requests
656            .iter()
657            .filter(|pend| pend.supported_by(circ_spec))
658            .collect()
659    }
660
661    /// Clear all pending circuits and open circuits.
662    ///
663    /// Calling `clear_all_circuits` ensures that any request that is answered _after
664    /// this method runs_ will receive a circuit that was launched _after this
665    /// method runs_.
666    fn clear_all_circuits(&mut self) {
667        // Note that removing entries from pending_circs will also cause the
668        // circuit tasks to realize that they are cancelled when they
669        // go to tell anybody about their results.
670        self.pending_circs.clear();
671        self.open_circs.clear();
672    }
673}
674
675/// Timing information for circuits that have been built but never used.
676///
677/// Currently taken from the network parameters.
678struct UnusedTimings {
679    /// Minimum lifetime of a circuit created while learning
680    /// circuit timeouts.
681    learning: Duration,
682    /// Minimum lifetime of a circuit created while not learning
683    /// circuit timeouts.
684    not_learning: Duration,
685}
686
687// This isn't really fallible, given the definitions of the underlying
688// types.
689#[allow(clippy::fallible_impl_from)]
690impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
691    fn from(v: &tor_netdir::params::NetParameters) -> Self {
692        // These try_into() calls can't fail, so unwrap() can't panic.
693        #[allow(clippy::unwrap_used)]
694        UnusedTimings {
695            learning: v
696                .unused_client_circ_timeout_while_learning_cbt
697                .try_into()
698                .unwrap(),
699            not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
700        }
701    }
702}
703
704/// Abstract implementation for circuit management.
705///
706/// The algorithm provided here is fairly simple. In its simplest form:
707///
708/// When somebody asks for a circuit for a given operation: if we find
709/// one open already, we return it.  If we find in-progress circuits
710/// that would meet our needs, we wait for one to finish (or for all
711/// to fail).  And otherwise, we launch one or more circuits to meet the
712/// request's needs.
713///
714/// If this process fails, then we retry it, up to a timeout or a
715/// numerical limit.
716///
717/// If a circuit not previously considered for a given request
718/// finishes before the request is satisfied, and if the circuit would
719/// satisfy the request, we try to give that circuit as an answer to
720/// that request even if it was not one of the circuits that request
721/// was waiting for.
722pub(crate) struct AbstractCircMgr<B: AbstractCircBuilder<R>, R: Runtime> {
723    /// Builder used to construct circuits.
724    builder: B,
725    /// An asynchronous runtime to use for launching tasks and
726    /// checking timeouts.
727    runtime: R,
728    /// A CircList to manage our list of circuits, requests, and
729    /// pending circuits.
730    circs: sync::Mutex<CircList<B, R>>,
731
732    /// Configured information about when to expire circuits and requests.
733    circuit_timing: MutCfg<CircuitTiming>,
734
735    /// Minimum lifetime of an unused circuit.
736    ///
737    /// Derived from the network parameters.
738    unused_timing: sync::Mutex<UnusedTimings>,
739}
740
741/// An action to take in order to satisfy a request for a circuit.
742enum Action<B: AbstractCircBuilder<R>, R: Runtime> {
743    /// We found an open circuit: return immediately.
744    Open(Arc<B::Circ>),
745    /// We found one or more pending circuits: wait until one succeeds,
746    /// or all fail.
747    Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B, R>>>>),
748    /// We should launch circuits: here are the instructions for how
749    /// to do so.
750    Build(Vec<CircBuildPlan<B, R>>),
751}
752
753impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> AbstractCircMgr<B, R> {
754    /// Construct a new AbstractCircMgr.
755    pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
756        let circs = sync::Mutex::new(CircList::new());
757        let dflt_params = tor_netdir::params::NetParameters::default();
758        let unused_timing = (&dflt_params).into();
759        AbstractCircMgr {
760            builder,
761            runtime,
762            circs,
763            circuit_timing: circuit_timing.into(),
764            unused_timing: sync::Mutex::new(unused_timing),
765        }
766    }
767
768    /// Reconfigure this manager using the latest set of network parameters.
769    pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
770        let mut u = self
771            .unused_timing
772            .lock()
773            .expect("Poisoned lock for unused_timing");
774        *u = p.into();
775    }
776
777    /// Return this manager's [`CircuitTiming`].
778    pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
779        self.circuit_timing.get()
780    }
781
782    /// Return this manager's [`CircuitTiming`].
783    pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
784        self.circuit_timing.replace(new_config);
785    }
786    /// Return a circuit suitable for use with a given `usage`,
787    /// creating that circuit if necessary, and restricting it
788    /// under the assumption that it will be used for that spec.
789    ///
790    /// This is the primary entry point for AbstractCircMgr.
791    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor?
792    pub(crate) async fn get_or_launch(
793        self: &Arc<Self>,
794        usage: &TargetCircUsage,
795        dir: DirInfo<'_>,
796    ) -> Result<(Arc<B::Circ>, CircProvenance)> {
797        /// Largest number of "resets" that we will accept in this attempt.
798        ///
799        /// A "reset" is an internally generated error that does not represent a
800        /// real problem; only a "whoops, got to try again" kind of a situation.
801        /// For example, if we reconfigure in the middle of an attempt and need
802        /// to re-launch the circuit, that counts as a "reset", since there was
803        /// nothing actually _wrong_ with the circuit we were building.
804        ///
805        /// We accept more resets than we do real failures. However,
806        /// we don't accept an unlimited number: we don't want to inadvertently
807        /// permit infinite loops here. If we ever bump against this limit, we
808        /// should not automatically increase it: we should instead figure out
809        /// why it is happening and try to make it not happen.
810        const MAX_RESETS: usize = 8;
811
812        let circuit_timing = self.circuit_timing();
813        let timeout_at = self.runtime.now() + circuit_timing.request_timeout;
814        let max_tries = circuit_timing.request_max_retries;
815        // We compute the maximum number of failures by dividing the maximum
816        // number of circuits to attempt by the number that will be launched in
817        // parallel for each iteration.
818        let max_failures = usize::div_ceil(
819            max_tries as usize,
820            std::cmp::max(1, self.builder.launch_parallelism(usage)),
821        );
822
823        let mut retry_schedule = RetryDelay::from_msec(100);
824        let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a circuit");
825
826        let mut n_failures = 0;
827        let mut n_resets = 0;
828
829        for attempt_num in 1.. {
830            // How much time is remaining?
831            let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
832                None => {
833                    retry_err.push(Error::RequestTimeout);
834                    break;
835                }
836                Some(t) => t,
837            };
838
839            let error = match self.prepare_action(usage, dir, true) {
840                Ok(action) => {
841                    // We successfully found an action: Take that action.
842                    let outcome = self
843                        .runtime
844                        .timeout(remaining, Arc::clone(self).take_action(action, usage))
845                        .await;
846
847                    match outcome {
848                        Ok(Ok(circ)) => return Ok(circ),
849                        Ok(Err(e)) => {
850                            debug!("Circuit attempt {} failed.", attempt_num);
851                            Error::RequestFailed(e)
852                        }
853                        Err(_) => {
854                            // We ran out of "remaining" time; there is nothing
855                            // more to be done.
856                            warn!("All circuit attempts failed due to timeout");
857                            retry_err.push(Error::RequestTimeout);
858                            break;
859                        }
860                    }
861                }
862                Err(e) => {
863                    // We couldn't pick the action!
864                    debug_report!(
865                        &e,
866                        "Couldn't pick action for circuit attempt {}",
867                        attempt_num,
868                    );
869                    e
870                }
871            };
872
873            // There's been an error.  See how long we wait before we retry.
874            let now = self.runtime.now();
875            let retry_time =
876                error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::rng()));
877
878            let (count, count_limit) = if error.is_internal_reset() {
879                (&mut n_resets, MAX_RESETS)
880            } else {
881                (&mut n_failures, max_failures)
882            };
883            // Record the error, flattening it if needed.
884            match error {
885                Error::RequestFailed(e) => retry_err.extend(e),
886                e => retry_err.push(e),
887            }
888
889            *count += 1;
890            // If we have reached our limit of this kind of problem, we're done.
891            if *count >= count_limit {
892                warn!("Reached circuit build retry limit, exiting...");
893                break;
894            }
895
896            // Wait, or not, as appropriate.
897            match retry_time {
898                AbsRetryTime::Immediate => {}
899                AbsRetryTime::Never => break,
900                AbsRetryTime::At(t) => {
901                    let remaining = timeout_at.saturating_duration_since(now);
902                    let delay = t.saturating_duration_since(now);
903                    self.runtime.sleep(std::cmp::min(delay, remaining)).await;
904                }
905            }
906        }
907
908        warn!("Request failed");
909        Err(Error::RequestFailed(retry_err))
910    }
911
912    /// Make sure a circuit exists, without actually asking for it.
913    ///
914    /// Make sure that there is a circuit (built or in-progress) that could be
915    /// used for `usage`, and launch one or more circuits in a background task
916    /// if there is not.
917    // TODO: This should probably take some kind of parallelism parameter.
918    #[allow(dead_code)]
919    pub(crate) async fn ensure_circuit(
920        self: &Arc<Self>,
921        usage: &TargetCircUsage,
922        dir: DirInfo<'_>,
923    ) -> Result<()> {
924        let action = self.prepare_action(usage, dir, false)?;
925        if let Action::Build(plans) = action {
926            for plan in plans {
927                let self_clone = Arc::clone(self);
928                let _ignore_receiver = self_clone.spawn_launch(usage, plan);
929            }
930        }
931
932        Ok(())
933    }
934
935    /// Choose which action we should take in order to provide a circuit
936    /// for a given `usage`.
937    ///
938    /// If `restrict_circ` is true, we restrict the spec of any
939    /// circ we decide to use to mark that it _is_ being used for
940    /// `usage`.
941    fn prepare_action(
942        &self,
943        usage: &TargetCircUsage,
944        dir: DirInfo<'_>,
945        restrict_circ: bool,
946    ) -> Result<Action<B, R>> {
947        let mut list = self.circs.lock().expect("poisoned lock");
948
949        if let Some(mut open) = list.find_open(usage) {
950            // We have open circuits that meet the spec: return the best one.
951            let parallelism = self.builder.select_parallelism(usage);
952            let best = OpenEntry::find_best(&mut open, usage, parallelism);
953            if restrict_circ {
954                let now = self.runtime.now();
955                best.restrict_mut(usage, now)?;
956            }
957            // TODO: If we have fewer circuits here than our select
958            // parallelism, perhaps we should launch more?
959
960            return Ok(Action::Open(best.circ.clone()));
961        }
962
963        if let Some(pending) = list.find_pending_circs(usage) {
964            // There are pending circuits that could meet the spec.
965            // Restrict them under the assumption that they could all
966            // be used for this, and then wait until one is ready (or
967            // all have failed)
968            let best = PendingEntry::find_best(&pending, usage);
969            if restrict_circ {
970                for item in &best {
971                    // TODO: Do we want to tentatively restrict _all_ of these?
972                    // not clear to me.
973                    item.tentative_restrict_mut(usage)?;
974                }
975            }
976            let stream = best.iter().map(|item| item.receiver.clone()).collect();
977            // TODO: if we have fewer circuits here than our launch
978            // parallelism, we might want to launch more.
979
980            return Ok(Action::Wait(stream));
981        }
982
983        // Okay, we need to launch circuits here.
984        let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
985        let mut plans = Vec::new();
986        let mut last_err = None;
987        for _ in 0..parallelism {
988            match self.plan_by_usage(dir, usage) {
989                Ok((pending, plan)) => {
990                    list.add_pending_circ(pending);
991                    plans.push(plan);
992                }
993                Err(e) => {
994                    debug!("Unable to make a plan for {:?}: {}", usage, e);
995                    last_err = Some(e);
996                }
997            }
998        }
999        if !plans.is_empty() {
1000            Ok(Action::Build(plans))
1001        } else if let Some(last_err) = last_err {
1002            Err(last_err)
1003        } else {
1004            // we didn't even try to plan anything!
1005            Err(internal!("no plans were built, but no errors were found").into())
1006        }
1007    }
1008
1009    /// Execute an action returned by pick-action, and return the
1010    /// resulting circuit or error.
1011    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
1012    async fn take_action(
1013        self: Arc<Self>,
1014        act: Action<B, R>,
1015        usage: &TargetCircUsage,
1016    ) -> std::result::Result<(Arc<B::Circ>, CircProvenance), RetryError<Box<Error>>> {
1017        /// Store the error `err` into `retry_err`, as appropriate.
1018        fn record_error(
1019            retry_err: &mut RetryError<Box<Error>>,
1020            source: streams::Source,
1021            building: bool,
1022            mut err: Error,
1023        ) {
1024            if source == streams::Source::Right {
1025                // We don't care about this error, since it is from neither a circuit we launched
1026                // nor one that we're waiting on.
1027                return;
1028            }
1029            if !building {
1030                // We aren't building our own circuits, so our errors are
1031                // secondary reports of other circuits' failures.
1032                err = Error::PendingFailed(Box::new(err));
1033            }
1034            retry_err.push(err);
1035        }
1036        /// Return a string describing what it means, within the context of this
1037        /// function, to have gotten an answer from `source`.
1038        fn describe_source(building: bool, source: streams::Source) -> &'static str {
1039            match (building, source) {
1040                (_, streams::Source::Right) => "optimistic advice",
1041                (true, streams::Source::Left) => "circuit we're building",
1042                (false, streams::Source::Left) => "pending circuit",
1043            }
1044        }
1045
1046        // Get or make a stream of futures to wait on.
1047        let (building, wait_on_stream) = match act {
1048            Action::Open(c) => {
1049                // There's already a perfectly good open circuit; we can return
1050                // it now.
1051                return Ok((c, CircProvenance::Preexisting));
1052            }
1053            Action::Wait(f) => {
1054                // There is one or more pending circuit that we're waiting for.
1055                // If any succeeds, we try to use it.  If they all fail, we
1056                // fail.
1057                (false, f)
1058            }
1059            Action::Build(plans) => {
1060                // We're going to launch one or more circuits in parallel.  We
1061                // report success if any succeeds, and failure of they all fail.
1062                let futures = FuturesUnordered::new();
1063                for plan in plans {
1064                    let self_clone = Arc::clone(&self);
1065                    // (This is where we actually launch circuits.)
1066                    futures.push(self_clone.spawn_launch(usage, plan));
1067                }
1068                (true, futures)
1069            }
1070        };
1071
1072        // Insert ourself into the list of pending requests, and make a
1073        // stream for us to listen on for notification from pending circuits
1074        // other than those we are pending on.
1075        let (pending_request, additional_stream) = {
1076            // We don't want this queue to participate in memory quota tracking.
1077            // There isn't any circuit yet, so there wouldn't be anything to account it to.
1078            // If this queue has the oldest data, probably the whole system is badly broken.
1079            // Tearing down the whole circuit manager won't help.
1080            let (send, recv) = mpsc_channel_no_memquota(8);
1081            let pending = Arc::new(PendingRequest {
1082                usage: usage.clone(),
1083                notify: send,
1084            });
1085
1086            let mut list = self.circs.lock().expect("poisoned lock");
1087            list.add_pending_request(&pending);
1088
1089            (pending, recv)
1090        };
1091
1092        // We use our "select_biased" stream combiner here to ensure that:
1093        //   1) Circuits from wait_on_stream (the ones we're pending on) are
1094        //      preferred.
1095        //   2) We exit this function when those circuits are exhausted.
1096        //   3) We still get notified about other circuits that might meet our
1097        //      interests.
1098        //
1099        // The events from Left stream are the oes that we explicitly asked for,
1100        // so we'll treat errors there as real problems.  The events from the
1101        // Right stream are ones that we got opportunistically told about; it's
1102        // not a big deal if those fail.
1103        let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
1104
1105        let mut retry_error = RetryError::in_attempt_to("wait for circuits");
1106
1107        while let Some((src, id)) = incoming.next().await {
1108            match id {
1109                Ok(Ok(ref id)) => {
1110                    // Great, we have a circuit. See if we can use it!
1111                    let mut list = self.circs.lock().expect("poisoned lock");
1112                    if let Some(ent) = list.get_open_mut(id) {
1113                        let now = self.runtime.now();
1114                        match ent.restrict_mut(usage, now) {
1115                            Ok(()) => {
1116                                // Great, this will work.  We drop the
1117                                // pending request now explicitly to remove
1118                                // it from the list.
1119                                drop(pending_request);
1120                                if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
1121                                    // Since this circuit hasn't been used yet, schedule expiration task after `max_dirtiness` from now.
1122                                    spawn_expiration_task(
1123                                        &self.runtime,
1124                                        Arc::downgrade(&self),
1125                                        ent.circ.id(),
1126                                        now + self.circuit_timing().max_dirtiness,
1127                                    );
1128                                }
1129                                return Ok((ent.circ.clone(), CircProvenance::NewlyCreated));
1130                            }
1131                            Err(e) => {
1132                                // In this case, a `UsageMismatched` error just means that we lost the race
1133                                // to restrict this circuit.
1134                                let e = match e {
1135                                    Error::UsageMismatched(e) => Error::LostUsabilityRace(e),
1136                                    x => x,
1137                                };
1138                                if src == streams::Source::Left {
1139                                    info_report!(
1140                                        &e,
1141                                        "{} suggested we use {:?}, but restrictions failed",
1142                                        describe_source(building, src),
1143                                        id,
1144                                    );
1145                                } else {
1146                                    debug_report!(
1147                                        &e,
1148                                        "{} suggested we use {:?}, but restrictions failed",
1149                                        describe_source(building, src),
1150                                        id,
1151                                    );
1152                                }
1153                                record_error(&mut retry_error, src, building, e);
1154                                continue;
1155                            }
1156                        }
1157                    }
1158                }
1159                Ok(Err(ref e)) => {
1160                    debug!("{} sent error {:?}", describe_source(building, src), e);
1161                    record_error(&mut retry_error, src, building, e.clone());
1162                }
1163                Err(oneshot::Canceled) => {
1164                    debug!(
1165                        "{} went away (Canceled), quitting take_action right away",
1166                        describe_source(building, src)
1167                    );
1168                    record_error(&mut retry_error, src, building, Error::PendingCanceled);
1169                    return Err(retry_error);
1170                }
1171            }
1172
1173            debug!(
1174                "While waiting on circuit: {:?} from {}",
1175                id,
1176                describe_source(building, src)
1177            );
1178        }
1179
1180        // Nothing worked.  We drop the pending request now explicitly
1181        // to remove it from the list.  (We could just let it get dropped
1182        // implicitly, but that's a bit confusing.)
1183        drop(pending_request);
1184
1185        Err(retry_error)
1186    }
1187
1188    /// Given a directory and usage, compute the necessary objects to
1189    /// build a circuit: A [`PendingEntry`] to keep track of the in-process
1190    /// circuit, and a [`CircBuildPlan`] that we'll give to the thread
1191    /// that will build the circuit.
1192    ///
1193    /// The caller should probably add the resulting `PendingEntry` to
1194    /// `self.circs`.
1195    ///
1196    /// This is an internal function that we call when we're pretty sure
1197    /// we want to build a circuit.
1198    #[allow(clippy::type_complexity)]
1199    fn plan_by_usage(
1200        &self,
1201        dir: DirInfo<'_>,
1202        usage: &TargetCircUsage,
1203    ) -> Result<(Arc<PendingEntry<B, R>>, CircBuildPlan<B, R>)> {
1204        let (plan, bspec) = self.builder.plan_circuit(usage, dir)?;
1205        let (pending, sender) = PendingEntry::new(&bspec);
1206        let pending = Arc::new(pending);
1207
1208        let plan = CircBuildPlan {
1209            plan,
1210            sender,
1211            pending: Arc::clone(&pending),
1212        };
1213
1214        Ok((pending, plan))
1215    }
1216
1217    /// Launch a managed circuit for a target usage, without checking
1218    /// whether one already exists or is pending.
1219    ///
1220    /// Return a listener that will be informed when the circuit is done.
1221    pub(crate) fn launch_by_usage(
1222        self: &Arc<Self>,
1223        usage: &TargetCircUsage,
1224        dir: DirInfo<'_>,
1225    ) -> Result<Shared<oneshot::Receiver<PendResult<B, R>>>> {
1226        let (pending, plan) = self.plan_by_usage(dir, usage)?;
1227
1228        self.circs
1229            .lock()
1230            .expect("Poisoned lock for circuit list")
1231            .add_pending_circ(pending);
1232
1233        Ok(Arc::clone(self).spawn_launch(usage, plan))
1234    }
1235
1236    /// Spawn a background task to launch a circuit, and report its status.
1237    ///
1238    /// The `usage` argument is the usage from the original request that made
1239    /// us build this circuit.
1240    fn spawn_launch(
1241        self: Arc<Self>,
1242        usage: &TargetCircUsage,
1243        plan: CircBuildPlan<B, R>,
1244    ) -> Shared<oneshot::Receiver<PendResult<B, R>>> {
1245        let _ = usage; // Currently unused.
1246        let CircBuildPlan {
1247            mut plan,
1248            sender,
1249            pending,
1250        } = plan;
1251        let request_loyalty = self.circuit_timing().request_loyalty;
1252
1253        let wait_on_future = pending.receiver.clone();
1254        let runtime = self.runtime.clone();
1255        let runtime_copy = self.runtime.clone();
1256
1257        let tid = rand::random::<u64>();
1258        // We release this block when the circuit builder task terminates.
1259        let reason = format!("circuit builder task {}", tid);
1260        runtime.block_advance(reason.clone());
1261        // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
1262        // correctly.
1263        plan.add_blocked_advance_reason(reason);
1264
1265        runtime
1266            .spawn(async move {
1267                let self_clone = Arc::clone(&self);
1268                let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
1269                let (new_spec, reply) = match future.await {
1270                    Ok(x) => x, // Success or regular failure
1271                    Err(e) => {
1272                        // Okay, this is a panic.  We have to tell the calling
1273                        // thread about it, then exit this circuit builder task.
1274                        let _ = sender.send(Err(internal!("circuit build task panicked").into()));
1275                        std::panic::panic_any(e);
1276                    }
1277                };
1278
1279                // Tell anybody who was listening about it that this
1280                // circuit is now usable or failed.
1281                //
1282                // (We ignore any errors from `send`: That just means that nobody
1283                // was waiting for this circuit.)
1284                let _ = sender.send(reply.clone());
1285
1286                if let Some(new_spec) = new_spec {
1287                    // Wait briefly before we notify opportunistically.  This
1288                    // delay will give the circuits that were originally
1289                    // specifically intended for a request a little more time
1290                    // to finish, before we offer it this circuit instead.
1291                    let sl = runtime_copy.sleep(request_loyalty);
1292                    runtime_copy.allow_one_advance(request_loyalty);
1293                    sl.await;
1294
1295                    let pending = {
1296                        let list = self.circs.lock().expect("poisoned lock");
1297                        list.find_pending_requests(&new_spec)
1298                    };
1299                    for pending_request in pending {
1300                        let _ = pending_request.notify.clone().try_send(reply.clone());
1301                    }
1302                }
1303                runtime_copy.release_advance(format!("circuit builder task {}", tid));
1304            })
1305            .expect("Couldn't spawn circuit-building task");
1306
1307        wait_on_future
1308    }
1309
1310    /// Run in the background to launch a circuit. Return a 2-tuple of the new
1311    /// circuit spec and the outcome that should be sent to the initiator.
1312    async fn do_launch(
1313        self: Arc<Self>,
1314        plan: <B as AbstractCircBuilder<R>>::Plan,
1315        pending: Arc<PendingEntry<B, R>>,
1316    ) -> (Option<SupportedCircUsage>, PendResult<B, R>) {
1317        let outcome = self.builder.build_circuit(plan).await;
1318
1319        match outcome {
1320            Err(e) => (None, Err(e)),
1321            Ok((new_spec, circ)) => {
1322                let id = circ.id();
1323
1324                let use_duration = self.pick_use_duration();
1325                let exp_inst = self.runtime.now() + use_duration;
1326                let runtime_copy = self.runtime.clone();
1327                spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), circ.id(), exp_inst);
1328                // I used to call restrict_mut here, but now I'm not so
1329                // sure. Doing restrict_mut makes sure that this
1330                // circuit will be suitable for the request that asked
1331                // for us in the first place, but that should be
1332                // ensured anyway by our tracking its tentative
1333                // assignment.
1334                //
1335                // new_spec.restrict_mut(&usage_copy).unwrap();
1336                let use_before = ExpirationInfo::new(exp_inst);
1337                let open_ent = OpenEntry::new(new_spec.clone(), circ, use_before);
1338                {
1339                    let mut list = self.circs.lock().expect("poisoned lock");
1340                    // Finally, before we return this circuit, we need to make
1341                    // sure that this pending circuit is still pending.  (If it
1342                    // is not pending, then it was cancelled through a call to
1343                    // `retire_all_circuits`, and the configuration that we used
1344                    // to launch it is now sufficiently outdated that we should
1345                    // no longer give this circuit to a client.)
1346                    if list.circ_is_pending(&pending) {
1347                        list.add_open(open_ent);
1348                        // We drop our reference to 'pending' here:
1349                        // this should make all the weak references to
1350                        // the `PendingEntry` become dangling.
1351                        drop(pending);
1352                        (Some(new_spec), Ok(id))
1353                    } else {
1354                        // This circuit is no longer pending! It must have been cancelled, probably
1355                        // by a call to retire_all_circuits()
1356                        drop(pending); // ibid
1357                        (None, Err(Error::CircCanceled))
1358                    }
1359                }
1360            }
1361        }
1362    }
1363
1364    /// Plan and launch a new circuit to a given target, bypassing our managed
1365    /// pool of circuits.
1366    ///
1367    /// This method will always return a new circuit, and never return a circuit
1368    /// that this CircMgr gives out for anything else.
1369    ///
1370    /// The new circuit will participate in the guard and timeout apparatus as
1371    /// appropriate, no retry attempt will be made if the circuit fails.
1372    #[cfg(feature = "hs-common")]
1373    pub(crate) async fn launch_unmanaged(
1374        &self,
1375        usage: &TargetCircUsage,
1376        dir: DirInfo<'_>,
1377    ) -> Result<(SupportedCircUsage, Arc<B::Circ>)> {
1378        let (_, plan) = self.plan_by_usage(dir, usage)?;
1379        self.builder.build_circuit(plan.plan).await
1380    }
1381
1382    /// Remove the circuit with a given `id` from this manager.
1383    ///
1384    /// After this function is called, that circuit will no longer be handed
1385    /// out to any future requests.
1386    ///
1387    /// Return None if we have no circuit with the given ID.
1388    pub(crate) fn take_circ(&self, id: &<B::Circ as AbstractCirc>::Id) -> Option<Arc<B::Circ>> {
1389        let mut list = self.circs.lock().expect("poisoned lock");
1390        list.take_open(id).map(|e| e.circ)
1391    }
1392
1393    /// Remove all open and pending circuits and from this manager, to ensure
1394    /// they can't be given out for any more requests.
1395    ///
1396    /// Calling `retire_all_circuits` ensures that any circuit request that gets
1397    /// an  answer _after this method runs_ will receive a circuit that was
1398    /// launched _after this method runs_.
1399    ///
1400    /// We call this method this when our configuration changes in such a way
1401    /// that we want to make sure that any new (or pending) requests will
1402    /// receive circuits that are built using the new configuration.
1403    //
1404    // For more information, see documentation on [`CircuitList::open_circs`],
1405    // [`CircuitList::pending_circs`], and comments in `do_launch`.
1406    pub(crate) fn retire_all_circuits(&self) {
1407        let mut list = self.circs.lock().expect("poisoned lock");
1408        list.clear_all_circuits();
1409    }
1410
1411    /// Expire circuits according to the rules in `config` and the
1412    /// current time `now`.
1413    ///
1414    /// Expired circuits will not be automatically closed, but they will
1415    /// no longer be given out for new circuits.
1416    pub(crate) fn expire_circs(&self, now: Instant) {
1417        let mut list = self.circs.lock().expect("poisoned lock");
1418        if let Some(dirty_cutoff) = now.checked_sub(self.circuit_timing().max_dirtiness) {
1419            list.expire_circs(now, dirty_cutoff);
1420        }
1421    }
1422
1423    /// Consider expiring the circuit with given circuit `id`,
1424    /// according to the rules in `config` and the current time `now`.
1425    pub(crate) fn expire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id, now: Instant) {
1426        let mut list = self.circs.lock().expect("poisoned lock");
1427        if let Some(dirty_cutoff) = now.checked_sub(self.circuit_timing().max_dirtiness) {
1428            list.expire_circ(circ_id, now, dirty_cutoff);
1429        }
1430    }
1431
1432    /// Return the number of open circuits held by this circuit manager.
1433    pub(crate) fn n_circs(&self) -> usize {
1434        let list = self.circs.lock().expect("poisoned lock");
1435        list.open_circs.len()
1436    }
1437
1438    /// Return the number of pending circuits tracked by this circuit manager.
1439    #[cfg(test)]
1440    pub(crate) fn n_pending_circs(&self) -> usize {
1441        let list = self.circs.lock().expect("poisoned lock");
1442        list.pending_circs.len()
1443    }
1444
1445    /// Get a reference to this manager's runtime.
1446    pub(crate) fn peek_runtime(&self) -> &R {
1447        &self.runtime
1448    }
1449
1450    /// Get a reference to this manager's builder.
1451    pub(crate) fn peek_builder(&self) -> &B {
1452        &self.builder
1453    }
1454
1455    /// Pick a duration by when a new circuit should expire from now
1456    /// if it has not yet been used
1457    fn pick_use_duration(&self) -> Duration {
1458        let timings = self
1459            .unused_timing
1460            .lock()
1461            .expect("Poisoned lock for unused_timing");
1462
1463        if self.builder.learning_timeouts() {
1464            timings.learning
1465        } else {
1466            // TODO: In Tor, this calculation also depends on
1467            // stuff related to predicted ports and channel
1468            // padding.
1469            use tor_basic_utils::RngExt as _;
1470            let mut rng = rand::rng();
1471            rng.gen_range_checked(timings.not_learning..=timings.not_learning * 2)
1472                .expect("T .. 2x T turned out to be an empty duration range?!")
1473        }
1474    }
1475}
1476
1477/// Spawn an expiration task that expires a circuit at given instant.
1478///
1479/// If given instant is earlier than now, expire the circuit immediately.
1480/// Otherwise, spawn a timer expiration task on given runtime.
1481///
1482/// When the timeout occurs, if the circuit manager is still present,
1483/// the task will ask the manager to expire the circuit, if the circuit
1484/// is ready to expire.
1485fn spawn_expiration_task<B, R>(
1486    runtime: &R,
1487    circmgr: Weak<AbstractCircMgr<B, R>>,
1488    circ_id: <<B as AbstractCircBuilder<R>>::Circ as AbstractCirc>::Id,
1489    exp_inst: Instant,
1490) where
1491    R: Runtime,
1492    B: 'static + AbstractCircBuilder<R>,
1493{
1494    let now = runtime.now();
1495    let rt_copy = runtime.clone();
1496    let duration = exp_inst.saturating_duration_since(now);
1497
1498    if duration == Duration::ZERO {
1499        // Circuit should already expire. Expire it now.
1500        let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
1501            cm
1502        } else {
1503            // Circuits manager has already been dropped, so are the references it held.
1504            return;
1505        };
1506        cm.expire_circ(&circ_id, now);
1507    } else {
1508        // Spawn a timer expiration task with given expiration instant.
1509        if let Err(e) = runtime.spawn(async move {
1510            rt_copy.sleep(duration).await;
1511            let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
1512                cm
1513            } else {
1514                return;
1515            };
1516            cm.expire_circ(&circ_id, exp_inst);
1517        }) {
1518            warn_report!(e, "Unable to launch expiration task");
1519        }
1520    }
1521}
1522
1523#[cfg(test)]
1524mod test {
1525    // @@ begin test lint list maintained by maint/add_warning @@
1526    #![allow(clippy::bool_assert_comparison)]
1527    #![allow(clippy::clone_on_copy)]
1528    #![allow(clippy::dbg_macro)]
1529    #![allow(clippy::mixed_attributes_style)]
1530    #![allow(clippy::print_stderr)]
1531    #![allow(clippy::print_stdout)]
1532    #![allow(clippy::single_char_pattern)]
1533    #![allow(clippy::unwrap_used)]
1534    #![allow(clippy::unchecked_duration_subtraction)]
1535    #![allow(clippy::useless_vec)]
1536    #![allow(clippy::needless_pass_by_value)]
1537    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1538    use super::*;
1539    use crate::isolation::test::{assert_isoleq, IsolationTokenEq};
1540    use crate::mocks::{FakeBuilder, FakeCirc, FakeId, FakeOp};
1541    use crate::usage::{ExitPolicy, SupportedCircUsage};
1542    use crate::{Error, IsolationToken, StreamIsolation, TargetCircUsage, TargetPort, TargetPorts};
1543    use once_cell::sync::Lazy;
1544    use tor_guardmgr::fallback::FallbackList;
1545    use tor_guardmgr::TestConfig;
1546    use tor_llcrypto::pk::ed25519::Ed25519Identity;
1547    use tor_netdir::testnet;
1548    use tor_persist::TestingStateMgr;
1549    use tor_rtcompat::SleepProvider;
1550    use tor_rtmock::MockRuntime;
1551
1552    #[allow(deprecated)] // TODO #1885
1553    use tor_rtmock::MockSleepRuntime;
1554
1555    static FALLBACKS_EMPTY: Lazy<FallbackList> = Lazy::new(|| [].into());
1556
1557    fn di() -> DirInfo<'static> {
1558        (&*FALLBACKS_EMPTY).into()
1559    }
1560
1561    fn target_to_spec(target: &TargetCircUsage) -> SupportedCircUsage {
1562        match target {
1563            TargetCircUsage::Exit {
1564                ports,
1565                isolation,
1566                country_code,
1567                require_stability,
1568            } => SupportedCircUsage::Exit {
1569                policy: ExitPolicy::from_target_ports(&TargetPorts::from(&ports[..])),
1570                isolation: Some(isolation.clone()),
1571                country_code: country_code.clone(),
1572                all_relays_stable: *require_stability,
1573            },
1574            _ => unimplemented!(),
1575        }
1576    }
1577
1578    impl<U: PartialEq> IsolationTokenEq for OpenEntry<U> {
1579        fn isol_eq(&self, other: &Self) -> bool {
1580            self.spec.isol_eq(&other.spec)
1581                && self.circ == other.circ
1582                && self.expiration == other.expiration
1583        }
1584    }
1585
1586    impl<U: PartialEq> IsolationTokenEq for &mut OpenEntry<U> {
1587        fn isol_eq(&self, other: &Self) -> bool {
1588            self.spec.isol_eq(&other.spec)
1589                && self.circ == other.circ
1590                && self.expiration == other.expiration
1591        }
1592    }
1593
1594    fn make_builder<R: Runtime>(runtime: &R) -> FakeBuilder<R> {
1595        let state_mgr = TestingStateMgr::new();
1596        let guard_config = TestConfig::default();
1597        FakeBuilder::new(runtime, state_mgr, &guard_config)
1598    }
1599
1600    #[test]
1601    fn basic_tests() {
1602        MockRuntime::test_with_various(|rt| async move {
1603            #[allow(deprecated)] // TODO #1885
1604            let rt = MockSleepRuntime::new(rt);
1605
1606            let builder = make_builder(&rt);
1607
1608            let mgr = Arc::new(AbstractCircMgr::new(
1609                builder,
1610                rt.clone(),
1611                CircuitTiming::default(),
1612            ));
1613
1614            let webports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1615
1616            // Check initialization.
1617            assert_eq!(mgr.n_circs(), 0);
1618            assert!(mgr.peek_builder().script.lock().unwrap().is_empty());
1619
1620            // Launch a circuit; make sure we get it.
1621            let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
1622            let c1 = c1.unwrap().0;
1623            assert_eq!(mgr.n_circs(), 1);
1624
1625            // Make sure we get the one we already made if we ask for it.
1626            let port80 = TargetCircUsage::new_from_ipv4_ports(&[80]);
1627            let c2 = mgr.get_or_launch(&port80, di()).await;
1628
1629            let c2 = c2.unwrap().0;
1630            assert!(FakeCirc::eq(&c1, &c2));
1631            assert_eq!(mgr.n_circs(), 1);
1632
1633            // Now try launching two circuits "at once" to make sure that our
1634            // pending-circuit code works.
1635
1636            let dnsport = TargetCircUsage::new_from_ipv4_ports(&[53]);
1637            let dnsport_restrict = TargetCircUsage::Exit {
1638                ports: vec![TargetPort::ipv4(53)],
1639                isolation: StreamIsolation::builder().build().unwrap(),
1640                country_code: None,
1641                require_stability: false,
1642            };
1643
1644            let (c3, c4) = rt
1645                .wait_for(futures::future::join(
1646                    mgr.get_or_launch(&dnsport, di()),
1647                    mgr.get_or_launch(&dnsport_restrict, di()),
1648                ))
1649                .await;
1650
1651            let c3 = c3.unwrap().0;
1652            let c4 = c4.unwrap().0;
1653            assert!(!FakeCirc::eq(&c1, &c3));
1654            assert!(FakeCirc::eq(&c3, &c4));
1655            assert_eq!(c3.id(), c4.id());
1656            assert_eq!(mgr.n_circs(), 2);
1657
1658            // Now we're going to remove c3 from consideration.  It's the
1659            // same as c4, so removing c4 will give us None.
1660            let c3_taken = mgr.take_circ(&c3.id()).unwrap();
1661            let now_its_gone = mgr.take_circ(&c4.id());
1662            assert!(FakeCirc::eq(&c3_taken, &c3));
1663            assert!(now_its_gone.is_none());
1664            assert_eq!(mgr.n_circs(), 1);
1665
1666            // Having removed them, let's launch another dnsport and make
1667            // sure we get a different circuit.
1668            let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
1669            let c5 = c5.unwrap().0;
1670            assert!(!FakeCirc::eq(&c3, &c5));
1671            assert!(!FakeCirc::eq(&c4, &c5));
1672            assert_eq!(mgr.n_circs(), 2);
1673
1674            // Now try launch_by_usage.
1675            let prev = mgr.n_pending_circs();
1676            assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
1677            assert_eq!(mgr.n_pending_circs(), prev + 1);
1678            // TODO: Actually make sure that launch_by_usage launched
1679            // the right thing.
1680        });
1681    }
1682
1683    #[test]
1684    fn request_timeout() {
1685        MockRuntime::test_with_various(|rt| async move {
1686            #[allow(deprecated)] // TODO #1885
1687            let rt = MockSleepRuntime::new(rt);
1688
1689            let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1690
1691            // This will fail once, and then completely time out.  The
1692            // result will be a failure.
1693            let builder = make_builder(&rt);
1694            builder.set(&ports, vec![FakeOp::Fail, FakeOp::Timeout]);
1695
1696            let mgr = Arc::new(AbstractCircMgr::new(
1697                builder,
1698                rt.clone(),
1699                CircuitTiming::default(),
1700            ));
1701            let c1 = mgr
1702                .peek_runtime()
1703                .wait_for(mgr.get_or_launch(&ports, di()))
1704                .await;
1705
1706            assert!(matches!(c1, Err(Error::RequestFailed(_))));
1707        });
1708    }
1709
1710    #[test]
1711    fn request_timeout2() {
1712        MockRuntime::test_with_various(|rt| async move {
1713            #[allow(deprecated)] // TODO #1885
1714            let rt = MockSleepRuntime::new(rt);
1715
1716            // Now try a more complicated case: we'll try to get things so
1717            // that we wait for a little over our predicted time because
1718            // of our wait-for-next-action logic.
1719            let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1720            let builder = make_builder(&rt);
1721            builder.set(
1722                &ports,
1723                vec![
1724                    FakeOp::Delay(Duration::from_millis(60_000 - 25)),
1725                    FakeOp::NoPlan,
1726                ],
1727            );
1728
1729            let mgr = Arc::new(AbstractCircMgr::new(
1730                builder,
1731                rt.clone(),
1732                CircuitTiming::default(),
1733            ));
1734            let c1 = mgr
1735                .peek_runtime()
1736                .wait_for(mgr.get_or_launch(&ports, di()))
1737                .await;
1738
1739            assert!(matches!(c1, Err(Error::RequestFailed(_))));
1740        });
1741    }
1742
1743    #[test]
1744    fn request_unplannable() {
1745        MockRuntime::test_with_various(|rt| async move {
1746            #[allow(deprecated)] // TODO #1885
1747            let rt = MockSleepRuntime::new(rt);
1748
1749            let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1750
1751            // This will fail a the planning stages, a lot.
1752            let builder = make_builder(&rt);
1753            builder.set(&ports, vec![FakeOp::NoPlan; 2000]);
1754
1755            let mgr = Arc::new(AbstractCircMgr::new(
1756                builder,
1757                rt.clone(),
1758                CircuitTiming::default(),
1759            ));
1760            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
1761
1762            assert!(matches!(c1, Err(Error::RequestFailed(_))));
1763        });
1764    }
1765
1766    #[test]
1767    fn request_fails_too_much() {
1768        MockRuntime::test_with_various(|rt| async move {
1769            #[allow(deprecated)] // TODO #1885
1770            let rt = MockSleepRuntime::new(rt);
1771            let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1772
1773            // This will fail 1000 times, which is above the retry limit.
1774            let builder = make_builder(&rt);
1775            builder.set(&ports, vec![FakeOp::Fail; 1000]);
1776
1777            let mgr = Arc::new(AbstractCircMgr::new(
1778                builder,
1779                rt.clone(),
1780                CircuitTiming::default(),
1781            ));
1782            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
1783
1784            assert!(matches!(c1, Err(Error::RequestFailed(_))));
1785        });
1786    }
1787
1788    #[test]
1789    fn request_wrong_spec() {
1790        MockRuntime::test_with_various(|rt| async move {
1791            #[allow(deprecated)] // TODO #1885
1792            let rt = MockSleepRuntime::new(rt);
1793            let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1794
1795            // The first time this is called, it will build a circuit
1796            // with the wrong spec.  (A circuit builder should never
1797            // actually _do_ that, but it's something we code for.)
1798            let builder = make_builder(&rt);
1799            builder.set(
1800                &ports,
1801                vec![FakeOp::WrongSpec(target_to_spec(
1802                    &TargetCircUsage::new_from_ipv4_ports(&[22]),
1803                ))],
1804            );
1805
1806            let mgr = Arc::new(AbstractCircMgr::new(
1807                builder,
1808                rt.clone(),
1809                CircuitTiming::default(),
1810            ));
1811            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
1812
1813            assert!(c1.is_ok());
1814        });
1815    }
1816
1817    #[test]
1818    fn request_retried() {
1819        MockRuntime::test_with_various(|rt| async move {
1820            #[allow(deprecated)] // TODO #1885
1821            let rt = MockSleepRuntime::new(rt);
1822            let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1823
1824            // This will fail twice, and then succeed. The result will be
1825            // a success.
1826            let builder = make_builder(&rt);
1827            builder.set(&ports, vec![FakeOp::Fail, FakeOp::Fail]);
1828
1829            let mgr = Arc::new(AbstractCircMgr::new(
1830                builder,
1831                rt.clone(),
1832                CircuitTiming::default(),
1833            ));
1834
1835            // This test doesn't exercise any timeout behaviour.
1836            rt.block_advance("test doesn't require advancing");
1837
1838            let (c1, c2) = rt
1839                .wait_for(futures::future::join(
1840                    mgr.get_or_launch(&ports, di()),
1841                    mgr.get_or_launch(&ports, di()),
1842                ))
1843                .await;
1844
1845            let c1 = c1.unwrap().0;
1846            let c2 = c2.unwrap().0;
1847
1848            assert!(FakeCirc::eq(&c1, &c2));
1849        });
1850    }
1851
1852    #[test]
1853    fn isolated() {
1854        MockRuntime::test_with_various(|rt| async move {
1855            #[allow(deprecated)] // TODO #1885
1856            let rt = MockSleepRuntime::new(rt);
1857            let builder = make_builder(&rt);
1858            let mgr = Arc::new(AbstractCircMgr::new(
1859                builder,
1860                rt.clone(),
1861                CircuitTiming::default(),
1862            ));
1863
1864            // Set our isolation so that iso1 and iso2 can't share a circuit,
1865            // but no_iso can share a circuit with either.
1866            let iso1 = TargetCircUsage::Exit {
1867                ports: vec![TargetPort::ipv4(443)],
1868                isolation: StreamIsolation::builder()
1869                    .owner_token(IsolationToken::new())
1870                    .build()
1871                    .unwrap(),
1872                country_code: None,
1873                require_stability: false,
1874            };
1875            let iso2 = TargetCircUsage::Exit {
1876                ports: vec![TargetPort::ipv4(443)],
1877                isolation: StreamIsolation::builder()
1878                    .owner_token(IsolationToken::new())
1879                    .build()
1880                    .unwrap(),
1881                country_code: None,
1882                require_stability: false,
1883            };
1884            let no_iso1 = TargetCircUsage::new_from_ipv4_ports(&[443]);
1885            let no_iso2 = no_iso1.clone();
1886
1887            // We're going to try launching these circuits in 24 different
1888            // orders, to make sure that the outcome is correct each time.
1889            use itertools::Itertools;
1890            let timeouts: Vec<_> = [0_u64, 2, 4, 6]
1891                .iter()
1892                .map(|d| Duration::from_millis(*d))
1893                .collect();
1894
1895            for delays in timeouts.iter().permutations(4) {
1896                let d1 = delays[0];
1897                let d2 = delays[1];
1898                let d3 = delays[2];
1899                let d4 = delays[2];
1900                let (c_iso1, c_iso2, c_no_iso1, c_no_iso2) = rt
1901                    .wait_for(futures::future::join4(
1902                        async {
1903                            rt.sleep(*d1).await;
1904                            mgr.get_or_launch(&iso1, di()).await
1905                        },
1906                        async {
1907                            rt.sleep(*d2).await;
1908                            mgr.get_or_launch(&iso2, di()).await
1909                        },
1910                        async {
1911                            rt.sleep(*d3).await;
1912                            mgr.get_or_launch(&no_iso1, di()).await
1913                        },
1914                        async {
1915                            rt.sleep(*d4).await;
1916                            mgr.get_or_launch(&no_iso2, di()).await
1917                        },
1918                    ))
1919                    .await;
1920
1921                let c_iso1 = c_iso1.unwrap().0;
1922                let c_iso2 = c_iso2.unwrap().0;
1923                let c_no_iso1 = c_no_iso1.unwrap().0;
1924                let c_no_iso2 = c_no_iso2.unwrap().0;
1925
1926                assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
1927                assert!(!FakeCirc::eq(&c_iso1, &c_no_iso1));
1928                assert!(!FakeCirc::eq(&c_iso1, &c_no_iso2));
1929                assert!(!FakeCirc::eq(&c_iso2, &c_no_iso1));
1930                assert!(!FakeCirc::eq(&c_iso2, &c_no_iso2));
1931                assert!(FakeCirc::eq(&c_no_iso1, &c_no_iso2));
1932            }
1933        });
1934    }
1935
1936    #[test]
1937    fn opportunistic() {
1938        MockRuntime::test_with_various(|rt| async move {
1939            #[allow(deprecated)] // TODO #1885
1940            let rt = MockSleepRuntime::new(rt);
1941
1942            // The first request will time out completely, but we're
1943            // making a second request after we launch it.  That
1944            // request should succeed, and notify the first request.
1945
1946            let ports1 = TargetCircUsage::new_from_ipv4_ports(&[80]);
1947            let ports2 = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1948
1949            let builder = make_builder(&rt);
1950            builder.set(&ports1, vec![FakeOp::Timeout]);
1951
1952            let mgr = Arc::new(AbstractCircMgr::new(
1953                builder,
1954                rt.clone(),
1955                CircuitTiming::default(),
1956            ));
1957            // Note that ports2 will be wider than ports1, so the second
1958            // request will have to launch a new circuit.
1959
1960            let (c1, c2) = rt
1961                .wait_for(futures::future::join(
1962                    mgr.get_or_launch(&ports1, di()),
1963                    async {
1964                        rt.sleep(Duration::from_millis(100)).await;
1965                        mgr.get_or_launch(&ports2, di()).await
1966                    },
1967                ))
1968                .await;
1969
1970            if let (Ok((c1, _)), Ok((c2, _))) = (c1, c2) {
1971                assert!(FakeCirc::eq(&c1, &c2));
1972            } else {
1973                panic!();
1974            };
1975        });
1976    }
1977
1978    #[test]
1979    fn prebuild() {
1980        MockRuntime::test_with_various(|rt| async move {
1981            // This time we're going to use ensure_circuit() to make
1982            // sure that a circuit gets built, and then launch two
1983            // other circuits that will use it.
1984            #[allow(deprecated)] // TODO #1885
1985            let rt = MockSleepRuntime::new(rt);
1986            let builder = make_builder(&rt);
1987            let mgr = Arc::new(AbstractCircMgr::new(
1988                builder,
1989                rt.clone(),
1990                CircuitTiming::default(),
1991            ));
1992
1993            let ports1 = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1994            let ports2 = TargetCircUsage::new_from_ipv4_ports(&[80]);
1995            let ports3 = TargetCircUsage::new_from_ipv4_ports(&[443]);
1996
1997            let (ok, c1, c2) = rt
1998                .wait_for(futures::future::join3(
1999                    mgr.ensure_circuit(&ports1, di()),
2000                    async {
2001                        rt.sleep(Duration::from_millis(10)).await;
2002                        mgr.get_or_launch(&ports2, di()).await
2003                    },
2004                    async {
2005                        rt.sleep(Duration::from_millis(50)).await;
2006                        mgr.get_or_launch(&ports3, di()).await
2007                    },
2008                ))
2009                .await;
2010
2011            assert!(ok.is_ok());
2012
2013            let c1 = c1.unwrap().0;
2014            let c2 = c2.unwrap().0;
2015
2016            // If we had launched these separately, they wouldn't share
2017            // a circuit.
2018            assert!(FakeCirc::eq(&c1, &c2));
2019        });
2020    }
2021
2022    #[test]
2023    fn expiration() {
2024        MockRuntime::test_with_various(|rt| async move {
2025            use crate::config::CircuitTimingBuilder;
2026            // Now let's make some circuits -- one dirty, one clean, and
2027            // make sure that one expires and one doesn't.
2028            #[allow(deprecated)] // TODO #1885
2029            let rt = MockSleepRuntime::new(rt);
2030            let builder = make_builder(&rt);
2031
2032            let circuit_timing = CircuitTimingBuilder::default()
2033                .max_dirtiness(Duration::from_secs(15))
2034                .build()
2035                .unwrap();
2036
2037            let mgr = Arc::new(AbstractCircMgr::new(builder, rt.clone(), circuit_timing));
2038
2039            let imap = TargetCircUsage::new_from_ipv4_ports(&[993]);
2040            let pop = TargetCircUsage::new_from_ipv4_ports(&[995]);
2041
2042            let (ok, pop1) = rt
2043                .wait_for(futures::future::join(
2044                    mgr.ensure_circuit(&imap, di()),
2045                    mgr.get_or_launch(&pop, di()),
2046                ))
2047                .await;
2048
2049            assert!(ok.is_ok());
2050            let pop1 = pop1.unwrap().0;
2051
2052            rt.advance(Duration::from_secs(30)).await;
2053            rt.advance(Duration::from_secs(15)).await;
2054            let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap().0;
2055
2056            // This should expire the pop circuit, since it came from
2057            // get_or_launch() [which marks the circuit as being
2058            // used].  It should not expire the imap circuit, since
2059            // it was not dirty until 15 seconds after the cutoff.
2060            let now = rt.now();
2061
2062            mgr.expire_circs(now);
2063
2064            let (pop2, imap2) = rt
2065                .wait_for(futures::future::join(
2066                    mgr.get_or_launch(&pop, di()),
2067                    mgr.get_or_launch(&imap, di()),
2068                ))
2069                .await;
2070
2071            let pop2 = pop2.unwrap().0;
2072            let imap2 = imap2.unwrap().0;
2073
2074            assert!(!FakeCirc::eq(&pop2, &pop1));
2075            assert!(FakeCirc::eq(&imap2, &imap1));
2076        });
2077    }
2078
2079    /// Returns three exit policies; one that permits nothing, one that permits ports 80
2080    /// and 443 only, and one that permits all ports.
2081    fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
2082        // FIXME(eta): the below is copypasta; would be nice to have a better way of
2083        //             constructing ExitPolicy objects for testing maybe
2084        let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2085
2086        // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
2087        // exits.  Odd-numbered ones allow only ports 80 and 443;
2088        // even-numbered ones allow all ports.
2089        let id_noexit: Ed25519Identity = [0x05; 32].into();
2090        let id_webexit: Ed25519Identity = [0x11; 32].into();
2091        let id_fullexit: Ed25519Identity = [0x20; 32].into();
2092
2093        let not_exit = network.by_id(&id_noexit).unwrap();
2094        let web_exit = network.by_id(&id_webexit).unwrap();
2095        let full_exit = network.by_id(&id_fullexit).unwrap();
2096
2097        let ep_none = ExitPolicy::from_relay(&not_exit);
2098        let ep_web = ExitPolicy::from_relay(&web_exit);
2099        let ep_full = ExitPolicy::from_relay(&full_exit);
2100        (ep_none, ep_web, ep_full)
2101    }
2102
2103    #[test]
2104    fn test_find_supported() {
2105        let (ep_none, ep_web, ep_full) = get_exit_policies();
2106        let fake_circ = Arc::new(FakeCirc { id: FakeId::next() });
2107        let expiration = ExpirationInfo::Unused {
2108            use_before: Instant::now() + Duration::from_secs(60 * 60),
2109        };
2110
2111        let mut entry_none = OpenEntry::new(
2112            SupportedCircUsage::Exit {
2113                policy: ep_none,
2114                isolation: None,
2115                country_code: None,
2116                all_relays_stable: true,
2117            },
2118            fake_circ.clone(),
2119            expiration.clone(),
2120        );
2121        let mut entry_none_c = entry_none.clone();
2122        let mut entry_web = OpenEntry::new(
2123            SupportedCircUsage::Exit {
2124                policy: ep_web,
2125                isolation: None,
2126                country_code: None,
2127                all_relays_stable: true,
2128            },
2129            fake_circ.clone(),
2130            expiration.clone(),
2131        );
2132        let mut entry_web_c = entry_web.clone();
2133        let mut entry_full = OpenEntry::new(
2134            SupportedCircUsage::Exit {
2135                policy: ep_full,
2136                isolation: None,
2137                country_code: None,
2138                all_relays_stable: true,
2139            },
2140            fake_circ,
2141            expiration,
2142        );
2143        let mut entry_full_c = entry_full.clone();
2144
2145        let usage_web = TargetCircUsage::new_from_ipv4_ports(&[80]);
2146        let empty: Vec<&mut OpenEntry<FakeCirc>> = vec![];
2147
2148        assert_isoleq!(
2149            SupportedCircUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
2150            empty
2151        );
2152
2153        // HACK(eta): We have to faff around with clones and such because
2154        //            `abstract_spec_find_supported` has a silly signature that involves `&mut`
2155        //            refs, which we can't have more than one of.
2156
2157        assert_isoleq!(
2158            SupportedCircUsage::find_supported(
2159                vec![&mut entry_none, &mut entry_web].into_iter(),
2160                &usage_web,
2161            ),
2162            vec![&mut entry_web_c]
2163        );
2164
2165        assert_isoleq!(
2166            SupportedCircUsage::find_supported(
2167                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2168                &usage_web,
2169            ),
2170            vec![&mut entry_web_c, &mut entry_full_c]
2171        );
2172
2173        // Test preemptive circuit usage:
2174
2175        let usage_preemptive_web = TargetCircUsage::Preemptive {
2176            port: Some(TargetPort::ipv4(80)),
2177            circs: 2,
2178            require_stability: false,
2179        };
2180        let usage_preemptive_dns = TargetCircUsage::Preemptive {
2181            port: None,
2182            circs: 2,
2183            require_stability: false,
2184        };
2185
2186        // shouldn't return anything unless there are >=2 circuits
2187
2188        assert_isoleq!(
2189            SupportedCircUsage::find_supported(
2190                vec![&mut entry_none].into_iter(),
2191                &usage_preemptive_web
2192            ),
2193            empty
2194        );
2195
2196        assert_isoleq!(
2197            SupportedCircUsage::find_supported(
2198                vec![&mut entry_none].into_iter(),
2199                &usage_preemptive_dns
2200            ),
2201            empty
2202        );
2203
2204        assert_isoleq!(
2205            SupportedCircUsage::find_supported(
2206                vec![&mut entry_none, &mut entry_web].into_iter(),
2207                &usage_preemptive_web
2208            ),
2209            empty
2210        );
2211
2212        assert_isoleq!(
2213            SupportedCircUsage::find_supported(
2214                vec![&mut entry_none, &mut entry_web].into_iter(),
2215                &usage_preemptive_dns
2216            ),
2217            vec![&mut entry_none_c, &mut entry_web_c]
2218        );
2219
2220        assert_isoleq!(
2221            SupportedCircUsage::find_supported(
2222                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2223                &usage_preemptive_web
2224            ),
2225            vec![&mut entry_web_c, &mut entry_full_c]
2226        );
2227    }
2228
2229    #[test]
2230    fn test_circlist_preemptive_target_circs() {
2231        MockRuntime::test_with_various(|rt| async move {
2232            #[allow(deprecated)] // TODO #1885
2233            let rt = MockSleepRuntime::new(rt);
2234            let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2235            let dirinfo = DirInfo::Directory(&netdir);
2236
2237            let builder = make_builder(&rt);
2238
2239            for circs in [2, 8].iter() {
2240                let mut circlist = CircList::<FakeBuilder<MockRuntime>, MockRuntime>::new();
2241
2242                let preemptive_target = TargetCircUsage::Preemptive {
2243                    port: Some(TargetPort::ipv4(80)),
2244                    circs: *circs,
2245                    require_stability: false,
2246                };
2247
2248                for _ in 0..*circs {
2249                    assert!(circlist.find_open(&preemptive_target).is_none());
2250
2251                    let usage = TargetCircUsage::new_from_ipv4_ports(&[80]);
2252                    let (plan, _) = builder.plan_circuit(&usage, dirinfo).unwrap();
2253                    let (spec, circ) = rt.wait_for(builder.build_circuit(plan)).await.unwrap();
2254                    let entry = OpenEntry::new(
2255                        spec,
2256                        circ,
2257                        ExpirationInfo::new(rt.now() + Duration::from_secs(60)),
2258                    );
2259                    circlist.add_open(entry);
2260                }
2261
2262                assert!(circlist.find_open(&preemptive_target).is_some());
2263            }
2264        });
2265    }
2266}