tor_circmgr/mgr.rs
1//! Abstract code to manage a set of circuits.
2//!
3//! This module implements the real logic for deciding when and how to
4//! launch circuits, and for which circuits to hand out in response to
5//! which requests.
6//!
7//! For testing and abstraction purposes, this module _does not_
8//! actually know anything about circuits _per se_. Instead,
9//! everything is handled using a set of traits that are internal to this
10//! crate:
11//!
12//! * [`AbstractCirc`] is a view of a circuit.
13//! * [`AbstractCircBuilder`] knows how to build an `AbstractCirc`.
14//!
15//! Using these traits, the [`AbstractCircMgr`] object manages a set of
16//! circuits, launching them as necessary, and keeping track of the
17//! restrictions on their use.
18
19// TODO:
20// - Testing
21// - Error from prepare_action()
22// - Error reported by restrict_mut?
23
24use crate::config::CircuitTiming;
25use crate::usage::{SupportedCircUsage, TargetCircUsage};
26use crate::{timeouts, DirInfo, Error, PathConfig, Result};
27
28use retry_error::RetryError;
29use tor_async_utils::mpsc_channel_no_memquota;
30use tor_basic_utils::retry::RetryDelay;
31use tor_config::MutCfg;
32use tor_error::{debug_report, info_report, internal, warn_report, AbsRetryTime, HasRetryTime};
33#[cfg(feature = "vanguards")]
34use tor_guardmgr::vanguards::VanguardMgr;
35use tor_linkspec::CircTarget;
36use tor_proto::circuit::{CircParameters, Path, UniqId};
37use tor_rtcompat::{Runtime, SleepProviderExt};
38
39use async_trait::async_trait;
40use futures::channel::mpsc;
41use futures::future::{FutureExt, Shared};
42use futures::stream::{FuturesUnordered, StreamExt};
43use futures::task::SpawnExt;
44use oneshot_fused_workaround as oneshot;
45use std::collections::HashMap;
46use std::fmt::Debug;
47use std::hash::Hash;
48use std::panic::AssertUnwindSafe;
49use std::sync::{self, Arc, Weak};
50use std::time::{Duration, Instant};
51use tracing::{debug, warn};
52use weak_table::PtrWeakHashSet;
53
54mod streams;
55
56/// Description of how we got a circuit.
57#[non_exhaustive]
58#[derive(Debug, Copy, Clone, Eq, PartialEq)]
59pub(crate) enum CircProvenance {
60 /// This channel was newly launched, or was in progress and finished while
61 /// we were waiting.
62 NewlyCreated,
63 /// This channel already existed when we asked for it.
64 Preexisting,
65}
66
67#[derive(Clone, Debug, thiserror::Error)]
68#[non_exhaustive]
69pub enum RestrictionFailed {
70 /// Tried to restrict a specification, but the circuit didn't support the
71 /// requested usage.
72 #[error("Specification did not support desired usage")]
73 NotSupported,
74}
75
76/// Minimal abstract view of a circuit.
77///
78/// From this module's point of view, circuits are simply objects
79/// with unique identities, and a possible closed-state.
80#[async_trait]
81pub(crate) trait AbstractCirc: Debug {
82 /// Type for a unique identifier for circuits.
83 type Id: Clone + Debug + Hash + Eq + Send + Sync;
84 /// Return the unique identifier for this circuit.
85 ///
86 /// # Requirements
87 ///
88 /// The values returned by this function are unique for distinct
89 /// circuits.
90 fn id(&self) -> Self::Id;
91
92 /// Return true if this circuit is usable for some purpose.
93 ///
94 /// Reasons a circuit might be unusable include being closed.
95 fn usable(&self) -> bool;
96
97 /// Return a [`Path`] object describing all the hops in this circuit.
98 ///
99 /// Returns an error if the circuit is closed.
100 ///
101 /// Note that this `Path` is not automatically updated if the circuit is
102 /// extended.
103 fn path_ref(&self) -> tor_proto::Result<Arc<Path>>;
104
105 /// Return the number of hops in this circuit.
106 ///
107 /// Returns an error if the circuit is closed.
108 ///
109 /// NOTE: This function will currently return only the number of hops
110 /// _currently_ in the circuit. If there is an extend operation in progress,
111 /// the currently pending hop may or may not be counted, depending on whether
112 /// the extend operation finishes before this call is done.
113 fn n_hops(&self) -> tor_proto::Result<usize>;
114
115 /// Return true if this circuit is closed and therefore unusable.
116 fn is_closing(&self) -> bool;
117
118 /// Return a process-unique identifier for this circuit.
119 fn unique_id(&self) -> UniqId;
120
121 /// Extend the circuit via the most appropriate handshake to a new `target` hop.
122 async fn extend<T: CircTarget + Sync>(
123 &self,
124 target: &T,
125 params: CircParameters,
126 ) -> tor_proto::Result<()>;
127}
128
129/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
130///
131/// You should implement this trait using all default methods for all code that isn't test code.
132pub(crate) trait MockablePlan {
133 /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
134 /// so that it knows what to pass to `::release_advance()`.
135 fn add_blocked_advance_reason(&mut self, _reason: String) {}
136}
137
138/// An object that knows how to build circuits.
139///
140/// AbstractCircBuilder creates circuits in two phases. First, a plan is
141/// made for how to build the circuit. This planning phase should be
142/// relatively fast, and must not suspend or block. Its purpose is to
143/// get an early estimate of which operations the circuit will be able
144/// to support when it's done.
145///
146/// Second, the circuit is actually built, using the plan as input.
147#[async_trait]
148pub(crate) trait AbstractCircBuilder<R: Runtime>: Send + Sync {
149 /// The circuit type that this builder knows how to build.
150 type Circ: AbstractCirc + Send + Sync;
151 /// An opaque type describing how a given circuit will be built.
152 /// It may represent some or all of a path-or it may not.
153 //
154 // TODO: It would be nice to have this parameterized on a lifetime,
155 // and have that lifetime depend on the lifetime of the directory.
156 // But I don't think that rust can do that.
157 //
158 // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
159 type Plan: Send + Debug + MockablePlan;
160
161 // TODO: I'd like to have a Dir type here to represent
162 // create::DirInfo, but that would need to be parameterized too,
163 // and would make everything complicated.
164
165 /// Form a plan for how to build a new circuit that supports `usage`.
166 ///
167 /// Return an opaque Plan object, and a new spec describing what
168 /// the circuit will actually support when it's built. (For
169 /// example, if the input spec requests a circuit that connect to
170 /// port 80, then "planning" the circuit might involve picking an
171 /// exit that supports port 80, and the resulting spec might be
172 /// the exit's complete list of supported ports.)
173 ///
174 /// # Requirements
175 ///
176 /// The resulting Spec must support `usage`.
177 fn plan_circuit(
178 &self,
179 usage: &TargetCircUsage,
180 dir: DirInfo<'_>,
181 ) -> Result<(Self::Plan, SupportedCircUsage)>;
182
183 /// Construct a circuit according to a given plan.
184 ///
185 /// On success, return a spec describing what the circuit can be used for,
186 /// and the circuit that was just constructed.
187 ///
188 /// This function should implement some kind of a timeout for
189 /// circuits that are taking too long.
190 ///
191 /// # Requirements
192 ///
193 /// The spec that this function returns _must_ support the usage
194 /// that was originally passed to `plan_circuit`. It _must_ also
195 /// contain the spec that was originally returned by
196 /// `plan_circuit`.
197 async fn build_circuit(
198 &self,
199 plan: Self::Plan,
200 ) -> Result<(SupportedCircUsage, Arc<Self::Circ>)>;
201
202 /// Return a "parallelism factor" with which circuits should be
203 /// constructed for a given purpose.
204 ///
205 /// If this function returns N, then whenever we launch circuits
206 /// for this purpose, then we launch N in parallel.
207 ///
208 /// The default implementation returns 1. The value of 0 is
209 /// treated as if it were 1.
210 fn launch_parallelism(&self, usage: &TargetCircUsage) -> usize {
211 let _ = usage; // default implementation ignores this.
212 1
213 }
214
215 /// Return a "parallelism factor" for which circuits should be
216 /// used for a given purpose.
217 ///
218 /// If this function returns N, then whenever we select among
219 /// open circuits for this purpose, we choose at random from the
220 /// best N.
221 ///
222 /// The default implementation returns 1. The value of 0 is
223 /// treated as if it were 1.
224 // TODO: Possibly this doesn't belong in this trait.
225 fn select_parallelism(&self, usage: &TargetCircUsage) -> usize {
226 let _ = usage; // default implementation ignores this.
227 1
228 }
229
230 /// Return true if we are currently attempting to learn circuit
231 /// timeouts by building testing circuits.
232 fn learning_timeouts(&self) -> bool;
233
234 /// Flush state to the state manager if we own the lock.
235 ///
236 /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
237 fn save_state(&self) -> Result<bool>;
238
239 /// Return this builder's [`PathConfig`](crate::PathConfig).
240 fn path_config(&self) -> Arc<PathConfig>;
241
242 /// Replace this builder's [`PathConfig`](crate::PathConfig).
243 // TODO: This is dead_code because we only call this for the CircuitBuilder specialization of
244 // CircMgr, not from the generic version, because this trait doesn't provide guardmgr, which is
245 // needed by the [`CircMgr::reconfigure`] function that would be the only caller of this. We
246 // should add `guardmgr` to this trait, make [`CircMgr::reconfigure`] generic, and remove this
247 // dead_code marking.
248 #[allow(dead_code)]
249 fn set_path_config(&self, new_config: PathConfig);
250
251 /// Return a reference to this builder's timeout estimator.
252 fn estimator(&self) -> &timeouts::Estimator;
253
254 /// Return a reference to this builder's `VanguardMgr`.
255 #[cfg(feature = "vanguards")]
256 fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>>;
257
258 /// Replace our state with a new owning state, assuming we have
259 /// storage permission.
260 fn upgrade_to_owned_state(&self) -> Result<()>;
261
262 /// Reload persistent state from disk, if we don't have storage permission.
263 fn reload_state(&self) -> Result<()>;
264
265 /// Return a reference to this builder's `GuardMgr`.
266 fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R>;
267
268 /// Reconfigure this builder using the latest set of network parameters.
269 ///
270 /// (NOTE: for now, this only affects circuit timeout estimation.)
271 fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters);
272}
273
274/// Enumeration to track the expiration state of a circuit.
275///
276/// A circuit an either be unused (at which point it should expire if it is
277/// _still unused_ by a certain time, or dirty (at which point it should
278/// expire after a certain duration).
279///
280/// All circuits start out "unused" and become "dirty" when their spec
281/// is first restricted -- that is, when they are first handed out to be
282/// used for a request.
283#[derive(Debug, Clone, PartialEq, Eq)]
284enum ExpirationInfo {
285 /// The circuit has never been used.
286 Unused {
287 /// A time when the circuit should expire.
288 use_before: Instant,
289 },
290 /// The circuit has been used (or at least, restricted for use with a
291 /// request) at least once.
292 Dirty {
293 /// The time at which this circuit's spec was first restricted.
294 dirty_since: Instant,
295 },
296}
297
298impl ExpirationInfo {
299 /// Return an ExpirationInfo for a newly created circuit.
300 fn new(use_before: Instant) -> Self {
301 ExpirationInfo::Unused { use_before }
302 }
303
304 /// Mark this ExpirationInfo as dirty, if it is not already dirty.
305 fn mark_dirty(&mut self, now: Instant) {
306 if matches!(self, ExpirationInfo::Unused { .. }) {
307 *self = ExpirationInfo::Dirty { dirty_since: now };
308 }
309 }
310}
311
312/// An entry for an open circuit held by an `AbstractCircMgr`.
313#[derive(Debug, Clone)]
314pub(crate) struct OpenEntry<C> {
315 /// The supported usage for this circuit.
316 spec: SupportedCircUsage,
317 /// The circuit under management.
318 circ: Arc<C>,
319 /// When does this circuit expire?
320 ///
321 /// (Note that expired circuits are removed from the manager,
322 /// which does not actually close them until there are no more
323 /// references to them.)
324 expiration: ExpirationInfo,
325}
326
327impl<C: AbstractCirc> OpenEntry<C> {
328 /// Make a new OpenEntry for a given circuit and spec.
329 fn new(spec: SupportedCircUsage, circ: Arc<C>, expiration: ExpirationInfo) -> Self {
330 OpenEntry {
331 spec,
332 circ,
333 expiration,
334 }
335 }
336
337 /// Return true if this circuit can be used for `usage`.
338 pub(crate) fn supports(&self, usage: &TargetCircUsage) -> bool {
339 self.circ.usable() && self.spec.supports(usage)
340 }
341
342 /// Change this circuit's permissible usage, based on its having
343 /// been used for `usage` at time `now`.
344 ///
345 /// Return an error if this circuit may not be used for `usage`.
346 fn restrict_mut(&mut self, usage: &TargetCircUsage, now: Instant) -> Result<()> {
347 self.spec.restrict_mut(usage)?;
348 self.expiration.mark_dirty(now);
349 Ok(())
350 }
351
352 /// Find the "best" entry from a slice of OpenEntry for supporting
353 /// a given `usage`.
354 ///
355 /// If `parallelism` is some N greater than 1, we pick randomly
356 /// from the best `N` circuits.
357 ///
358 /// # Requirements
359 ///
360 /// Requires that `ents` is nonempty, and that every element of `ents`
361 /// supports `spec`.
362 fn find_best<'a>(
363 // we do not mutate `ents`, but to return `&mut Self` we must have a mutable borrow
364 ents: &'a mut [&'a mut Self],
365 usage: &TargetCircUsage,
366 parallelism: usize,
367 ) -> &'a mut Self {
368 let _ = usage; // not yet used.
369 use rand::seq::IndexedMutRandom as _;
370 let parallelism = parallelism.clamp(1, ents.len());
371 // TODO: Actually look over the whole list to see which is better.
372 let slice = &mut ents[0..parallelism];
373 let mut rng = rand::rng();
374 slice.choose_mut(&mut rng).expect("Input list was empty")
375 }
376
377 /// Return true if this circuit has been marked as dirty before
378 /// `dirty_cutoff`, or if it is an unused circuit set to expire before
379 /// `unused_cutoff`.
380 fn should_expire(&self, unused_cutoff: Instant, dirty_cutoff: Instant) -> bool {
381 match self.expiration {
382 ExpirationInfo::Unused { use_before } => use_before <= unused_cutoff,
383 ExpirationInfo::Dirty { dirty_since } => dirty_since <= dirty_cutoff,
384 }
385 }
386}
387
388/// A result type whose "Ok" value is the Id for a circuit from B.
389type PendResult<B, R> = Result<<<B as AbstractCircBuilder<R>>::Circ as AbstractCirc>::Id>;
390
391/// An in-progress circuit request tracked by an `AbstractCircMgr`.
392///
393/// (In addition to tracking circuits, `AbstractCircMgr` tracks
394/// _requests_ for circuits. The manager uses these entries if it
395/// finds that some circuit created _after_ a request first launched
396/// might meet the request's requirements.)
397struct PendingRequest<B: AbstractCircBuilder<R>, R: Runtime> {
398 /// Usage for the operation requested by this request
399 usage: TargetCircUsage,
400 /// A channel to use for telling this request about circuits that it
401 /// might like.
402 notify: mpsc::Sender<PendResult<B, R>>,
403}
404
405impl<B: AbstractCircBuilder<R>, R: Runtime> PendingRequest<B, R> {
406 /// Return true if this request would be supported by `spec`.
407 fn supported_by(&self, spec: &SupportedCircUsage) -> bool {
408 spec.supports(&self.usage)
409 }
410}
411
412/// An entry for an under-construction in-progress circuit tracked by
413/// an `AbstractCircMgr`.
414#[derive(Debug)]
415struct PendingEntry<B: AbstractCircBuilder<R>, R: Runtime> {
416 /// Specification that this circuit will support, if every pending
417 /// request that is waiting for it is attached to it.
418 ///
419 /// This spec becomes more and more restricted as more pending
420 /// requests are waiting for this circuit.
421 ///
422 /// This spec is contained by circ_spec, and must support the usage
423 /// of every pending request that's waiting for this circuit.
424 tentative_assignment: sync::Mutex<SupportedCircUsage>,
425 /// A shared future for requests to use when waiting for
426 /// notification of this circuit's success.
427 receiver: Shared<oneshot::Receiver<PendResult<B, R>>>,
428}
429
430impl<B: AbstractCircBuilder<R>, R: Runtime> PendingEntry<B, R> {
431 /// Make a new PendingEntry that starts out supporting a given
432 /// spec. Return that PendingEntry, along with a Sender to use to
433 /// report the result of building this circuit.
434 fn new(circ_spec: &SupportedCircUsage) -> (Self, oneshot::Sender<PendResult<B, R>>) {
435 let tentative_assignment = sync::Mutex::new(circ_spec.clone());
436 let (sender, receiver) = oneshot::channel();
437 let receiver = receiver.shared();
438 let entry = PendingEntry {
439 tentative_assignment,
440 receiver,
441 };
442 (entry, sender)
443 }
444
445 /// Return true if this circuit's current tentative assignment
446 /// supports `usage`.
447 fn supports(&self, usage: &TargetCircUsage) -> bool {
448 let assignment = self.tentative_assignment.lock().expect("poisoned lock");
449 assignment.supports(usage)
450 }
451
452 /// Try to change the tentative assignment of this circuit by
453 /// restricting it for use with `usage`.
454 ///
455 /// Return an error if the current tentative assignment didn't
456 /// support `usage` in the first place.
457 fn tentative_restrict_mut(&self, usage: &TargetCircUsage) -> Result<()> {
458 if let Ok(mut assignment) = self.tentative_assignment.lock() {
459 assignment.restrict_mut(usage)?;
460 }
461 Ok(())
462 }
463
464 /// Find the best PendingEntry values from a slice for use with
465 /// `usage`.
466 ///
467 /// # Requirements
468 ///
469 /// The `ents` slice must not be empty. Every element of `ents`
470 /// must support the given spec.
471 fn find_best(ents: &[Arc<Self>], usage: &TargetCircUsage) -> Vec<Arc<Self>> {
472 // TODO: Actually look over the whole list to see which is better.
473 let _ = usage; // currently unused
474 vec![Arc::clone(&ents[0])]
475 }
476}
477
478/// Wrapper type to represent the state between planning to build a
479/// circuit and constructing it.
480#[derive(Debug)]
481struct CircBuildPlan<B: AbstractCircBuilder<R>, R: Runtime> {
482 /// The Plan object returned by [`AbstractCircBuilder::plan_circuit`].
483 plan: B::Plan,
484 /// A sender to notify any pending requests when this circuit is done.
485 sender: oneshot::Sender<PendResult<B, R>>,
486 /// A strong entry to the PendingEntry for this circuit build attempt.
487 pending: Arc<PendingEntry<B, R>>,
488}
489
490/// The inner state of an [`AbstractCircMgr`].
491struct CircList<B: AbstractCircBuilder<R>, R: Runtime> {
492 /// A map from circuit ID to [`OpenEntry`] values for all managed
493 /// open circuits.
494 ///
495 /// A circuit is added here from [`AbstractCircMgr::do_launch`] when we find
496 /// that it completes successfully, and has not been cancelled.
497 /// When we decide that such a circuit should no longer be handed out for
498 /// any new requests, we "retire" the circuit by removing it from this map.
499 #[allow(clippy::type_complexity)]
500 open_circs: HashMap<<B::Circ as AbstractCirc>::Id, OpenEntry<B::Circ>>,
501 /// Weak-set of PendingEntry for circuits that are being built.
502 ///
503 /// Because this set only holds weak references, and the only strong
504 /// reference to the PendingEntry is held by the task building the circuit,
505 /// this set's members are lazily removed after the circuit is either built
506 /// or fails to build.
507 ///
508 /// This set is used for two purposes:
509 ///
510 /// 1. When a circuit request finds that there is no open circuit for its
511 /// purposes, it checks here to see if there is a pending circuit that it
512 /// could wait for.
513 /// 2. When a pending circuit finishes building, it checks here to make sure
514 /// that it has not been cancelled. (Removing an entry from this set marks
515 /// it as cancelled.)
516 ///
517 /// An entry is added here in [`AbstractCircMgr::prepare_action`] when we
518 /// decide that a circuit needs to be launched.
519 ///
520 /// Later, in [`AbstractCircMgr::do_launch`], once the circuit has finished
521 /// (or failed), we remove the entry (by pointer identity).
522 /// If we cannot find the entry, we conclude that the request has been
523 /// _cancelled_, and so we discard any circuit that was created.
524 pending_circs: PtrWeakHashSet<Weak<PendingEntry<B, R>>>,
525 /// Weak-set of PendingRequest for requests that are waiting for a
526 /// circuit to be built.
527 ///
528 /// Because this set only holds weak references, and the only
529 /// strong reference to the PendingRequest is held by the task
530 /// waiting for the circuit to be built, this set's members are
531 /// lazily removed after the request succeeds or fails.
532 pending_requests: PtrWeakHashSet<Weak<PendingRequest<B, R>>>,
533}
534
535impl<B: AbstractCircBuilder<R>, R: Runtime> CircList<B, R> {
536 /// Make a new empty `CircList`
537 fn new() -> Self {
538 CircList {
539 open_circs: HashMap::new(),
540 pending_circs: PtrWeakHashSet::new(),
541 pending_requests: PtrWeakHashSet::new(),
542 }
543 }
544
545 /// Add `e` to the list of open circuits.
546 fn add_open(&mut self, e: OpenEntry<B::Circ>) {
547 let id = e.circ.id();
548 self.open_circs.insert(id, e);
549 }
550
551 /// Find all the usable open circuits that support `usage`.
552 ///
553 /// Return None if there are no such circuits.
554 fn find_open(&mut self, usage: &TargetCircUsage) -> Option<Vec<&mut OpenEntry<B::Circ>>> {
555 let list = self.open_circs.values_mut();
556 let v = SupportedCircUsage::find_supported(list, usage);
557 if v.is_empty() {
558 None
559 } else {
560 Some(v)
561 }
562 }
563
564 /// Find an open circuit by ID.
565 ///
566 /// Return None if no such circuit exists in this list.
567 fn get_open_mut(
568 &mut self,
569 id: &<B::Circ as AbstractCirc>::Id,
570 ) -> Option<&mut OpenEntry<B::Circ>> {
571 self.open_circs.get_mut(id)
572 }
573
574 /// Extract an open circuit by ID, removing it from this list.
575 ///
576 /// Return None if no such circuit exists in this list.
577 fn take_open(&mut self, id: &<B::Circ as AbstractCirc>::Id) -> Option<OpenEntry<B::Circ>> {
578 self.open_circs.remove(id)
579 }
580
581 /// Remove circuits based on expiration times.
582 ///
583 /// We remove every unused circuit that is set to expire by
584 /// `unused_cutoff`, and every dirty circuit that has been dirty
585 /// since before `dirty_cutoff`.
586 fn expire_circs(&mut self, unused_cutoff: Instant, dirty_cutoff: Instant) {
587 self.open_circs
588 .retain(|_k, v| !v.should_expire(unused_cutoff, dirty_cutoff));
589 }
590
591 /// Remove the circuit with given `id`, if it is scheduled to
592 /// expire now, according to the provided expiration times.
593 fn expire_circ(
594 &mut self,
595 id: &<B::Circ as AbstractCirc>::Id,
596 unused_cutoff: Instant,
597 dirty_cutoff: Instant,
598 ) {
599 let should_expire = self
600 .open_circs
601 .get(id)
602 .map(|v| v.should_expire(unused_cutoff, dirty_cutoff))
603 .unwrap_or_else(|| false);
604 if should_expire {
605 self.open_circs.remove(id);
606 }
607 }
608
609 /// Add `pending` to the set of in-progress circuits.
610 fn add_pending_circ(&mut self, pending: Arc<PendingEntry<B, R>>) {
611 self.pending_circs.insert(pending);
612 }
613
614 /// Find all pending circuits that support `usage`.
615 ///
616 /// If no such circuits are currently being built, return None.
617 fn find_pending_circs(&self, usage: &TargetCircUsage) -> Option<Vec<Arc<PendingEntry<B, R>>>> {
618 let result: Vec<_> = self
619 .pending_circs
620 .iter()
621 .filter(|p| p.supports(usage))
622 .filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
623 .collect();
624
625 if result.is_empty() {
626 None
627 } else {
628 Some(result)
629 }
630 }
631
632 /// Return true if `circ` is still pending.
633 ///
634 /// A circuit will become non-pending when finishes (successfully or not), or when it's
635 /// removed from this list via `clear_all_circuits()`.
636 fn circ_is_pending(&self, circ: &Arc<PendingEntry<B, R>>) -> bool {
637 self.pending_circs.contains(circ)
638 }
639
640 /// Construct and add a new entry to the set of request waiting
641 /// for a circuit.
642 ///
643 /// Return the request, and a new receiver stream that it should
644 /// use for notification of possible circuits to use.
645 fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B, R>>) {
646 self.pending_requests.insert(Arc::clone(pending));
647 }
648
649 /// Return all pending requests that would be satisfied by a circuit
650 /// that supports `circ_spec`.
651 fn find_pending_requests(
652 &self,
653 circ_spec: &SupportedCircUsage,
654 ) -> Vec<Arc<PendingRequest<B, R>>> {
655 self.pending_requests
656 .iter()
657 .filter(|pend| pend.supported_by(circ_spec))
658 .collect()
659 }
660
661 /// Clear all pending circuits and open circuits.
662 ///
663 /// Calling `clear_all_circuits` ensures that any request that is answered _after
664 /// this method runs_ will receive a circuit that was launched _after this
665 /// method runs_.
666 fn clear_all_circuits(&mut self) {
667 // Note that removing entries from pending_circs will also cause the
668 // circuit tasks to realize that they are cancelled when they
669 // go to tell anybody about their results.
670 self.pending_circs.clear();
671 self.open_circs.clear();
672 }
673}
674
675/// Timing information for circuits that have been built but never used.
676///
677/// Currently taken from the network parameters.
678struct UnusedTimings {
679 /// Minimum lifetime of a circuit created while learning
680 /// circuit timeouts.
681 learning: Duration,
682 /// Minimum lifetime of a circuit created while not learning
683 /// circuit timeouts.
684 not_learning: Duration,
685}
686
687// This isn't really fallible, given the definitions of the underlying
688// types.
689#[allow(clippy::fallible_impl_from)]
690impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
691 fn from(v: &tor_netdir::params::NetParameters) -> Self {
692 // These try_into() calls can't fail, so unwrap() can't panic.
693 #[allow(clippy::unwrap_used)]
694 UnusedTimings {
695 learning: v
696 .unused_client_circ_timeout_while_learning_cbt
697 .try_into()
698 .unwrap(),
699 not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
700 }
701 }
702}
703
704/// Abstract implementation for circuit management.
705///
706/// The algorithm provided here is fairly simple. In its simplest form:
707///
708/// When somebody asks for a circuit for a given operation: if we find
709/// one open already, we return it. If we find in-progress circuits
710/// that would meet our needs, we wait for one to finish (or for all
711/// to fail). And otherwise, we launch one or more circuits to meet the
712/// request's needs.
713///
714/// If this process fails, then we retry it, up to a timeout or a
715/// numerical limit.
716///
717/// If a circuit not previously considered for a given request
718/// finishes before the request is satisfied, and if the circuit would
719/// satisfy the request, we try to give that circuit as an answer to
720/// that request even if it was not one of the circuits that request
721/// was waiting for.
722pub(crate) struct AbstractCircMgr<B: AbstractCircBuilder<R>, R: Runtime> {
723 /// Builder used to construct circuits.
724 builder: B,
725 /// An asynchronous runtime to use for launching tasks and
726 /// checking timeouts.
727 runtime: R,
728 /// A CircList to manage our list of circuits, requests, and
729 /// pending circuits.
730 circs: sync::Mutex<CircList<B, R>>,
731
732 /// Configured information about when to expire circuits and requests.
733 circuit_timing: MutCfg<CircuitTiming>,
734
735 /// Minimum lifetime of an unused circuit.
736 ///
737 /// Derived from the network parameters.
738 unused_timing: sync::Mutex<UnusedTimings>,
739}
740
741/// An action to take in order to satisfy a request for a circuit.
742enum Action<B: AbstractCircBuilder<R>, R: Runtime> {
743 /// We found an open circuit: return immediately.
744 Open(Arc<B::Circ>),
745 /// We found one or more pending circuits: wait until one succeeds,
746 /// or all fail.
747 Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B, R>>>>),
748 /// We should launch circuits: here are the instructions for how
749 /// to do so.
750 Build(Vec<CircBuildPlan<B, R>>),
751}
752
753impl<B: AbstractCircBuilder<R> + 'static, R: Runtime> AbstractCircMgr<B, R> {
754 /// Construct a new AbstractCircMgr.
755 pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
756 let circs = sync::Mutex::new(CircList::new());
757 let dflt_params = tor_netdir::params::NetParameters::default();
758 let unused_timing = (&dflt_params).into();
759 AbstractCircMgr {
760 builder,
761 runtime,
762 circs,
763 circuit_timing: circuit_timing.into(),
764 unused_timing: sync::Mutex::new(unused_timing),
765 }
766 }
767
768 /// Reconfigure this manager using the latest set of network parameters.
769 pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
770 let mut u = self
771 .unused_timing
772 .lock()
773 .expect("Poisoned lock for unused_timing");
774 *u = p.into();
775 }
776
777 /// Return this manager's [`CircuitTiming`].
778 pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
779 self.circuit_timing.get()
780 }
781
782 /// Return this manager's [`CircuitTiming`].
783 pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
784 self.circuit_timing.replace(new_config);
785 }
786 /// Return a circuit suitable for use with a given `usage`,
787 /// creating that circuit if necessary, and restricting it
788 /// under the assumption that it will be used for that spec.
789 ///
790 /// This is the primary entry point for AbstractCircMgr.
791 pub(crate) async fn get_or_launch(
792 self: &Arc<Self>,
793 usage: &TargetCircUsage,
794 dir: DirInfo<'_>,
795 ) -> Result<(Arc<B::Circ>, CircProvenance)> {
796 /// Largest number of "resets" that we will accept in this attempt.
797 ///
798 /// A "reset" is an internally generated error that does not represent a
799 /// real problem; only a "whoops, got to try again" kind of a situation.
800 /// For example, if we reconfigure in the middle of an attempt and need
801 /// to re-launch the circuit, that counts as a "reset", since there was
802 /// nothing actually _wrong_ with the circuit we were building.
803 ///
804 /// We accept more resets than we do real failures. However,
805 /// we don't accept an unlimited number: we don't want to inadvertently
806 /// permit infinite loops here. If we ever bump against this limit, we
807 /// should not automatically increase it: we should instead figure out
808 /// why it is happening and try to make it not happen.
809 const MAX_RESETS: usize = 8;
810
811 let circuit_timing = self.circuit_timing();
812 let timeout_at = self.runtime.now() + circuit_timing.request_timeout;
813 let max_tries = circuit_timing.request_max_retries;
814 // We compute the maximum number of failures by dividing the maximum
815 // number of circuits to attempt by the number that will be launched in
816 // parallel for each iteration.
817 let max_failures = usize::div_ceil(
818 max_tries as usize,
819 std::cmp::max(1, self.builder.launch_parallelism(usage)),
820 );
821
822 let mut retry_schedule = RetryDelay::from_msec(100);
823 let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a circuit");
824
825 let mut n_failures = 0;
826 let mut n_resets = 0;
827
828 for attempt_num in 1.. {
829 // How much time is remaining?
830 let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
831 None => {
832 retry_err.push(Error::RequestTimeout);
833 break;
834 }
835 Some(t) => t,
836 };
837
838 let error = match self.prepare_action(usage, dir, true) {
839 Ok(action) => {
840 // We successfully found an action: Take that action.
841 let outcome = self
842 .runtime
843 .timeout(remaining, Arc::clone(self).take_action(action, usage))
844 .await;
845
846 match outcome {
847 Ok(Ok(circ)) => return Ok(circ),
848 Ok(Err(e)) => {
849 debug!("Circuit attempt {} failed.", attempt_num);
850 Error::RequestFailed(e)
851 }
852 Err(_) => {
853 // We ran out of "remaining" time; there is nothing
854 // more to be done.
855 warn!("All circuit attempts failed due to timeout");
856 retry_err.push(Error::RequestTimeout);
857 break;
858 }
859 }
860 }
861 Err(e) => {
862 // We couldn't pick the action!
863 debug_report!(
864 &e,
865 "Couldn't pick action for circuit attempt {}",
866 attempt_num,
867 );
868 e
869 }
870 };
871
872 // There's been an error. See how long we wait before we retry.
873 let now = self.runtime.now();
874 let retry_time =
875 error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::rng()));
876
877 let (count, count_limit) = if error.is_internal_reset() {
878 (&mut n_resets, MAX_RESETS)
879 } else {
880 (&mut n_failures, max_failures)
881 };
882 // Record the error, flattening it if needed.
883 match error {
884 Error::RequestFailed(e) => retry_err.extend(e),
885 e => retry_err.push(e),
886 }
887
888 *count += 1;
889 // If we have reached our limit of this kind of problem, we're done.
890 if *count >= count_limit {
891 warn!("Reached circuit build retry limit, exiting...");
892 break;
893 }
894
895 // Wait, or not, as appropriate.
896 match retry_time {
897 AbsRetryTime::Immediate => {}
898 AbsRetryTime::Never => break,
899 AbsRetryTime::At(t) => {
900 let remaining = timeout_at.saturating_duration_since(now);
901 let delay = t.saturating_duration_since(now);
902 self.runtime.sleep(std::cmp::min(delay, remaining)).await;
903 }
904 }
905 }
906
907 warn!("Request failed");
908 Err(Error::RequestFailed(retry_err))
909 }
910
911 /// Make sure a circuit exists, without actually asking for it.
912 ///
913 /// Make sure that there is a circuit (built or in-progress) that could be
914 /// used for `usage`, and launch one or more circuits in a background task
915 /// if there is not.
916 // TODO: This should probably take some kind of parallelism parameter.
917 #[allow(dead_code)]
918 pub(crate) async fn ensure_circuit(
919 self: &Arc<Self>,
920 usage: &TargetCircUsage,
921 dir: DirInfo<'_>,
922 ) -> Result<()> {
923 let action = self.prepare_action(usage, dir, false)?;
924 if let Action::Build(plans) = action {
925 for plan in plans {
926 let self_clone = Arc::clone(self);
927 let _ignore_receiver = self_clone.spawn_launch(usage, plan);
928 }
929 }
930
931 Ok(())
932 }
933
934 /// Choose which action we should take in order to provide a circuit
935 /// for a given `usage`.
936 ///
937 /// If `restrict_circ` is true, we restrict the spec of any
938 /// circ we decide to use to mark that it _is_ being used for
939 /// `usage`.
940 fn prepare_action(
941 &self,
942 usage: &TargetCircUsage,
943 dir: DirInfo<'_>,
944 restrict_circ: bool,
945 ) -> Result<Action<B, R>> {
946 let mut list = self.circs.lock().expect("poisoned lock");
947
948 if let Some(mut open) = list.find_open(usage) {
949 // We have open circuits that meet the spec: return the best one.
950 let parallelism = self.builder.select_parallelism(usage);
951 let best = OpenEntry::find_best(&mut open, usage, parallelism);
952 if restrict_circ {
953 let now = self.runtime.now();
954 best.restrict_mut(usage, now)?;
955 }
956 // TODO: If we have fewer circuits here than our select
957 // parallelism, perhaps we should launch more?
958
959 return Ok(Action::Open(best.circ.clone()));
960 }
961
962 if let Some(pending) = list.find_pending_circs(usage) {
963 // There are pending circuits that could meet the spec.
964 // Restrict them under the assumption that they could all
965 // be used for this, and then wait until one is ready (or
966 // all have failed)
967 let best = PendingEntry::find_best(&pending, usage);
968 if restrict_circ {
969 for item in &best {
970 // TODO: Do we want to tentatively restrict _all_ of these?
971 // not clear to me.
972 item.tentative_restrict_mut(usage)?;
973 }
974 }
975 let stream = best.iter().map(|item| item.receiver.clone()).collect();
976 // TODO: if we have fewer circuits here than our launch
977 // parallelism, we might want to launch more.
978
979 return Ok(Action::Wait(stream));
980 }
981
982 // Okay, we need to launch circuits here.
983 let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
984 let mut plans = Vec::new();
985 let mut last_err = None;
986 for _ in 0..parallelism {
987 match self.plan_by_usage(dir, usage) {
988 Ok((pending, plan)) => {
989 list.add_pending_circ(pending);
990 plans.push(plan);
991 }
992 Err(e) => {
993 debug!("Unable to make a plan for {:?}: {}", usage, e);
994 last_err = Some(e);
995 }
996 }
997 }
998 if !plans.is_empty() {
999 Ok(Action::Build(plans))
1000 } else if let Some(last_err) = last_err {
1001 Err(last_err)
1002 } else {
1003 // we didn't even try to plan anything!
1004 Err(internal!("no plans were built, but no errors were found").into())
1005 }
1006 }
1007
1008 /// Execute an action returned by pick-action, and return the
1009 /// resulting circuit or error.
1010 async fn take_action(
1011 self: Arc<Self>,
1012 act: Action<B, R>,
1013 usage: &TargetCircUsage,
1014 ) -> std::result::Result<(Arc<B::Circ>, CircProvenance), RetryError<Box<Error>>> {
1015 /// Store the error `err` into `retry_err`, as appropriate.
1016 fn record_error(
1017 retry_err: &mut RetryError<Box<Error>>,
1018 source: streams::Source,
1019 building: bool,
1020 mut err: Error,
1021 ) {
1022 if source == streams::Source::Right {
1023 // We don't care about this error, since it is from neither a circuit we launched
1024 // nor one that we're waiting on.
1025 return;
1026 }
1027 if !building {
1028 // We aren't building our own circuits, so our errors are
1029 // secondary reports of other circuits' failures.
1030 err = Error::PendingFailed(Box::new(err));
1031 }
1032 retry_err.push(err);
1033 }
1034 /// Return a string describing what it means, within the context of this
1035 /// function, to have gotten an answer from `source`.
1036 fn describe_source(building: bool, source: streams::Source) -> &'static str {
1037 match (building, source) {
1038 (_, streams::Source::Right) => "optimistic advice",
1039 (true, streams::Source::Left) => "circuit we're building",
1040 (false, streams::Source::Left) => "pending circuit",
1041 }
1042 }
1043
1044 // Get or make a stream of futures to wait on.
1045 let (building, wait_on_stream) = match act {
1046 Action::Open(c) => {
1047 // There's already a perfectly good open circuit; we can return
1048 // it now.
1049 return Ok((c, CircProvenance::Preexisting));
1050 }
1051 Action::Wait(f) => {
1052 // There is one or more pending circuit that we're waiting for.
1053 // If any succeeds, we try to use it. If they all fail, we
1054 // fail.
1055 (false, f)
1056 }
1057 Action::Build(plans) => {
1058 // We're going to launch one or more circuits in parallel. We
1059 // report success if any succeeds, and failure of they all fail.
1060 let futures = FuturesUnordered::new();
1061 for plan in plans {
1062 let self_clone = Arc::clone(&self);
1063 // (This is where we actually launch circuits.)
1064 futures.push(self_clone.spawn_launch(usage, plan));
1065 }
1066 (true, futures)
1067 }
1068 };
1069
1070 // Insert ourself into the list of pending requests, and make a
1071 // stream for us to listen on for notification from pending circuits
1072 // other than those we are pending on.
1073 let (pending_request, additional_stream) = {
1074 // We don't want this queue to participate in memory quota tracking.
1075 // There isn't any circuit yet, so there wouldn't be anything to account it to.
1076 // If this queue has the oldest data, probably the whole system is badly broken.
1077 // Tearing down the whole circuit manager won't help.
1078 let (send, recv) = mpsc_channel_no_memquota(8);
1079 let pending = Arc::new(PendingRequest {
1080 usage: usage.clone(),
1081 notify: send,
1082 });
1083
1084 let mut list = self.circs.lock().expect("poisoned lock");
1085 list.add_pending_request(&pending);
1086
1087 (pending, recv)
1088 };
1089
1090 // We use our "select_biased" stream combiner here to ensure that:
1091 // 1) Circuits from wait_on_stream (the ones we're pending on) are
1092 // preferred.
1093 // 2) We exit this function when those circuits are exhausted.
1094 // 3) We still get notified about other circuits that might meet our
1095 // interests.
1096 //
1097 // The events from Left stream are the oes that we explicitly asked for,
1098 // so we'll treat errors there as real problems. The events from the
1099 // Right stream are ones that we got opportunistically told about; it's
1100 // not a big deal if those fail.
1101 let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
1102
1103 let mut retry_error = RetryError::in_attempt_to("wait for circuits");
1104
1105 while let Some((src, id)) = incoming.next().await {
1106 match id {
1107 Ok(Ok(ref id)) => {
1108 // Great, we have a circuit. See if we can use it!
1109 let mut list = self.circs.lock().expect("poisoned lock");
1110 if let Some(ent) = list.get_open_mut(id) {
1111 let now = self.runtime.now();
1112 match ent.restrict_mut(usage, now) {
1113 Ok(()) => {
1114 // Great, this will work. We drop the
1115 // pending request now explicitly to remove
1116 // it from the list.
1117 drop(pending_request);
1118 if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
1119 // Since this circuit hasn't been used yet, schedule expiration task after `max_dirtiness` from now.
1120 spawn_expiration_task(
1121 &self.runtime,
1122 Arc::downgrade(&self),
1123 ent.circ.id(),
1124 now + self.circuit_timing().max_dirtiness,
1125 );
1126 }
1127 return Ok((ent.circ.clone(), CircProvenance::NewlyCreated));
1128 }
1129 Err(e) => {
1130 // In this case, a `UsageMismatched` error just means that we lost the race
1131 // to restrict this circuit.
1132 let e = match e {
1133 Error::UsageMismatched(e) => Error::LostUsabilityRace(e),
1134 x => x,
1135 };
1136 if src == streams::Source::Left {
1137 info_report!(
1138 &e,
1139 "{} suggested we use {:?}, but restrictions failed",
1140 describe_source(building, src),
1141 id,
1142 );
1143 } else {
1144 debug_report!(
1145 &e,
1146 "{} suggested we use {:?}, but restrictions failed",
1147 describe_source(building, src),
1148 id,
1149 );
1150 }
1151 record_error(&mut retry_error, src, building, e);
1152 continue;
1153 }
1154 }
1155 }
1156 }
1157 Ok(Err(ref e)) => {
1158 debug!("{} sent error {:?}", describe_source(building, src), e);
1159 record_error(&mut retry_error, src, building, e.clone());
1160 }
1161 Err(oneshot::Canceled) => {
1162 debug!(
1163 "{} went away (Canceled), quitting take_action right away",
1164 describe_source(building, src)
1165 );
1166 record_error(&mut retry_error, src, building, Error::PendingCanceled);
1167 return Err(retry_error);
1168 }
1169 }
1170
1171 debug!(
1172 "While waiting on circuit: {:?} from {}",
1173 id,
1174 describe_source(building, src)
1175 );
1176 }
1177
1178 // Nothing worked. We drop the pending request now explicitly
1179 // to remove it from the list. (We could just let it get dropped
1180 // implicitly, but that's a bit confusing.)
1181 drop(pending_request);
1182
1183 Err(retry_error)
1184 }
1185
1186 /// Given a directory and usage, compute the necessary objects to
1187 /// build a circuit: A [`PendingEntry`] to keep track of the in-process
1188 /// circuit, and a [`CircBuildPlan`] that we'll give to the thread
1189 /// that will build the circuit.
1190 ///
1191 /// The caller should probably add the resulting `PendingEntry` to
1192 /// `self.circs`.
1193 ///
1194 /// This is an internal function that we call when we're pretty sure
1195 /// we want to build a circuit.
1196 #[allow(clippy::type_complexity)]
1197 fn plan_by_usage(
1198 &self,
1199 dir: DirInfo<'_>,
1200 usage: &TargetCircUsage,
1201 ) -> Result<(Arc<PendingEntry<B, R>>, CircBuildPlan<B, R>)> {
1202 let (plan, bspec) = self.builder.plan_circuit(usage, dir)?;
1203 let (pending, sender) = PendingEntry::new(&bspec);
1204 let pending = Arc::new(pending);
1205
1206 let plan = CircBuildPlan {
1207 plan,
1208 sender,
1209 pending: Arc::clone(&pending),
1210 };
1211
1212 Ok((pending, plan))
1213 }
1214
1215 /// Launch a managed circuit for a target usage, without checking
1216 /// whether one already exists or is pending.
1217 ///
1218 /// Return a listener that will be informed when the circuit is done.
1219 pub(crate) fn launch_by_usage(
1220 self: &Arc<Self>,
1221 usage: &TargetCircUsage,
1222 dir: DirInfo<'_>,
1223 ) -> Result<Shared<oneshot::Receiver<PendResult<B, R>>>> {
1224 let (pending, plan) = self.plan_by_usage(dir, usage)?;
1225
1226 self.circs
1227 .lock()
1228 .expect("Poisoned lock for circuit list")
1229 .add_pending_circ(pending);
1230
1231 Ok(Arc::clone(self).spawn_launch(usage, plan))
1232 }
1233
1234 /// Spawn a background task to launch a circuit, and report its status.
1235 ///
1236 /// The `usage` argument is the usage from the original request that made
1237 /// us build this circuit.
1238 fn spawn_launch(
1239 self: Arc<Self>,
1240 usage: &TargetCircUsage,
1241 plan: CircBuildPlan<B, R>,
1242 ) -> Shared<oneshot::Receiver<PendResult<B, R>>> {
1243 let _ = usage; // Currently unused.
1244 let CircBuildPlan {
1245 mut plan,
1246 sender,
1247 pending,
1248 } = plan;
1249 let request_loyalty = self.circuit_timing().request_loyalty;
1250
1251 let wait_on_future = pending.receiver.clone();
1252 let runtime = self.runtime.clone();
1253 let runtime_copy = self.runtime.clone();
1254
1255 let tid = rand::random::<u64>();
1256 // We release this block when the circuit builder task terminates.
1257 let reason = format!("circuit builder task {}", tid);
1258 runtime.block_advance(reason.clone());
1259 // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
1260 // correctly.
1261 plan.add_blocked_advance_reason(reason);
1262
1263 runtime
1264 .spawn(async move {
1265 let self_clone = Arc::clone(&self);
1266 let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
1267 let (new_spec, reply) = match future.await {
1268 Ok(x) => x, // Success or regular failure
1269 Err(e) => {
1270 // Okay, this is a panic. We have to tell the calling
1271 // thread about it, then exit this circuit builder task.
1272 let _ = sender.send(Err(internal!("circuit build task panicked").into()));
1273 std::panic::panic_any(e);
1274 }
1275 };
1276
1277 // Tell anybody who was listening about it that this
1278 // circuit is now usable or failed.
1279 //
1280 // (We ignore any errors from `send`: That just means that nobody
1281 // was waiting for this circuit.)
1282 let _ = sender.send(reply.clone());
1283
1284 if let Some(new_spec) = new_spec {
1285 // Wait briefly before we notify opportunistically. This
1286 // delay will give the circuits that were originally
1287 // specifically intended for a request a little more time
1288 // to finish, before we offer it this circuit instead.
1289 let sl = runtime_copy.sleep(request_loyalty);
1290 runtime_copy.allow_one_advance(request_loyalty);
1291 sl.await;
1292
1293 let pending = {
1294 let list = self.circs.lock().expect("poisoned lock");
1295 list.find_pending_requests(&new_spec)
1296 };
1297 for pending_request in pending {
1298 let _ = pending_request.notify.clone().try_send(reply.clone());
1299 }
1300 }
1301 runtime_copy.release_advance(format!("circuit builder task {}", tid));
1302 })
1303 .expect("Couldn't spawn circuit-building task");
1304
1305 wait_on_future
1306 }
1307
1308 /// Run in the background to launch a circuit. Return a 2-tuple of the new
1309 /// circuit spec and the outcome that should be sent to the initiator.
1310 async fn do_launch(
1311 self: Arc<Self>,
1312 plan: <B as AbstractCircBuilder<R>>::Plan,
1313 pending: Arc<PendingEntry<B, R>>,
1314 ) -> (Option<SupportedCircUsage>, PendResult<B, R>) {
1315 let outcome = self.builder.build_circuit(plan).await;
1316
1317 match outcome {
1318 Err(e) => (None, Err(e)),
1319 Ok((new_spec, circ)) => {
1320 let id = circ.id();
1321
1322 let use_duration = self.pick_use_duration();
1323 let exp_inst = self.runtime.now() + use_duration;
1324 let runtime_copy = self.runtime.clone();
1325 spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), circ.id(), exp_inst);
1326 // I used to call restrict_mut here, but now I'm not so
1327 // sure. Doing restrict_mut makes sure that this
1328 // circuit will be suitable for the request that asked
1329 // for us in the first place, but that should be
1330 // ensured anyway by our tracking its tentative
1331 // assignment.
1332 //
1333 // new_spec.restrict_mut(&usage_copy).unwrap();
1334 let use_before = ExpirationInfo::new(exp_inst);
1335 let open_ent = OpenEntry::new(new_spec.clone(), circ, use_before);
1336 {
1337 let mut list = self.circs.lock().expect("poisoned lock");
1338 // Finally, before we return this circuit, we need to make
1339 // sure that this pending circuit is still pending. (If it
1340 // is not pending, then it was cancelled through a call to
1341 // `retire_all_circuits`, and the configuration that we used
1342 // to launch it is now sufficiently outdated that we should
1343 // no longer give this circuit to a client.)
1344 if list.circ_is_pending(&pending) {
1345 list.add_open(open_ent);
1346 // We drop our reference to 'pending' here:
1347 // this should make all the weak references to
1348 // the `PendingEntry` become dangling.
1349 drop(pending);
1350 (Some(new_spec), Ok(id))
1351 } else {
1352 // This circuit is no longer pending! It must have been cancelled, probably
1353 // by a call to retire_all_circuits()
1354 drop(pending); // ibid
1355 (None, Err(Error::CircCanceled))
1356 }
1357 }
1358 }
1359 }
1360 }
1361
1362 /// Plan and launch a new circuit to a given target, bypassing our managed
1363 /// pool of circuits.
1364 ///
1365 /// This method will always return a new circuit, and never return a circuit
1366 /// that this CircMgr gives out for anything else.
1367 ///
1368 /// The new circuit will participate in the guard and timeout apparatus as
1369 /// appropriate, no retry attempt will be made if the circuit fails.
1370 #[cfg(feature = "hs-common")]
1371 pub(crate) async fn launch_unmanaged(
1372 &self,
1373 usage: &TargetCircUsage,
1374 dir: DirInfo<'_>,
1375 ) -> Result<(SupportedCircUsage, Arc<B::Circ>)> {
1376 let (_, plan) = self.plan_by_usage(dir, usage)?;
1377 self.builder.build_circuit(plan.plan).await
1378 }
1379
1380 /// Remove the circuit with a given `id` from this manager.
1381 ///
1382 /// After this function is called, that circuit will no longer be handed
1383 /// out to any future requests.
1384 ///
1385 /// Return None if we have no circuit with the given ID.
1386 pub(crate) fn take_circ(&self, id: &<B::Circ as AbstractCirc>::Id) -> Option<Arc<B::Circ>> {
1387 let mut list = self.circs.lock().expect("poisoned lock");
1388 list.take_open(id).map(|e| e.circ)
1389 }
1390
1391 /// Remove all open and pending circuits and from this manager, to ensure
1392 /// they can't be given out for any more requests.
1393 ///
1394 /// Calling `retire_all_circuits` ensures that any circuit request that gets
1395 /// an answer _after this method runs_ will receive a circuit that was
1396 /// launched _after this method runs_.
1397 ///
1398 /// We call this method this when our configuration changes in such a way
1399 /// that we want to make sure that any new (or pending) requests will
1400 /// receive circuits that are built using the new configuration.
1401 //
1402 // For more information, see documentation on [`CircuitList::open_circs`],
1403 // [`CircuitList::pending_circs`], and comments in `do_launch`.
1404 pub(crate) fn retire_all_circuits(&self) {
1405 let mut list = self.circs.lock().expect("poisoned lock");
1406 list.clear_all_circuits();
1407 }
1408
1409 /// Expire circuits according to the rules in `config` and the
1410 /// current time `now`.
1411 ///
1412 /// Expired circuits will not be automatically closed, but they will
1413 /// no longer be given out for new circuits.
1414 pub(crate) fn expire_circs(&self, now: Instant) {
1415 let mut list = self.circs.lock().expect("poisoned lock");
1416 if let Some(dirty_cutoff) = now.checked_sub(self.circuit_timing().max_dirtiness) {
1417 list.expire_circs(now, dirty_cutoff);
1418 }
1419 }
1420
1421 /// Consider expiring the circuit with given circuit `id`,
1422 /// according to the rules in `config` and the current time `now`.
1423 pub(crate) fn expire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id, now: Instant) {
1424 let mut list = self.circs.lock().expect("poisoned lock");
1425 if let Some(dirty_cutoff) = now.checked_sub(self.circuit_timing().max_dirtiness) {
1426 list.expire_circ(circ_id, now, dirty_cutoff);
1427 }
1428 }
1429
1430 /// Return the number of open circuits held by this circuit manager.
1431 pub(crate) fn n_circs(&self) -> usize {
1432 let list = self.circs.lock().expect("poisoned lock");
1433 list.open_circs.len()
1434 }
1435
1436 /// Return the number of pending circuits tracked by this circuit manager.
1437 #[cfg(test)]
1438 pub(crate) fn n_pending_circs(&self) -> usize {
1439 let list = self.circs.lock().expect("poisoned lock");
1440 list.pending_circs.len()
1441 }
1442
1443 /// Get a reference to this manager's runtime.
1444 pub(crate) fn peek_runtime(&self) -> &R {
1445 &self.runtime
1446 }
1447
1448 /// Get a reference to this manager's builder.
1449 pub(crate) fn peek_builder(&self) -> &B {
1450 &self.builder
1451 }
1452
1453 /// Pick a duration by when a new circuit should expire from now
1454 /// if it has not yet been used
1455 fn pick_use_duration(&self) -> Duration {
1456 let timings = self
1457 .unused_timing
1458 .lock()
1459 .expect("Poisoned lock for unused_timing");
1460
1461 if self.builder.learning_timeouts() {
1462 timings.learning
1463 } else {
1464 // TODO: In Tor, this calculation also depends on
1465 // stuff related to predicted ports and channel
1466 // padding.
1467 use tor_basic_utils::RngExt as _;
1468 let mut rng = rand::rng();
1469 rng.gen_range_checked(timings.not_learning..=timings.not_learning * 2)
1470 .expect("T .. 2x T turned out to be an empty duration range?!")
1471 }
1472 }
1473}
1474
1475/// Spawn an expiration task that expires a circuit at given instant.
1476///
1477/// If given instant is earlier than now, expire the circuit immediately.
1478/// Otherwise, spawn a timer expiration task on given runtime.
1479///
1480/// When the timeout occurs, if the circuit manager is still present,
1481/// the task will ask the manager to expire the circuit, if the circuit
1482/// is ready to expire.
1483fn spawn_expiration_task<B, R>(
1484 runtime: &R,
1485 circmgr: Weak<AbstractCircMgr<B, R>>,
1486 circ_id: <<B as AbstractCircBuilder<R>>::Circ as AbstractCirc>::Id,
1487 exp_inst: Instant,
1488) where
1489 R: Runtime,
1490 B: 'static + AbstractCircBuilder<R>,
1491{
1492 let now = runtime.now();
1493 let rt_copy = runtime.clone();
1494 let duration = exp_inst.saturating_duration_since(now);
1495
1496 if duration == Duration::ZERO {
1497 // Circuit should already expire. Expire it now.
1498 let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
1499 cm
1500 } else {
1501 // Circuits manager has already been dropped, so are the references it held.
1502 return;
1503 };
1504 cm.expire_circ(&circ_id, now);
1505 } else {
1506 // Spawn a timer expiration task with given expiration instant.
1507 if let Err(e) = runtime.spawn(async move {
1508 rt_copy.sleep(duration).await;
1509 let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
1510 cm
1511 } else {
1512 return;
1513 };
1514 cm.expire_circ(&circ_id, exp_inst);
1515 }) {
1516 warn_report!(e, "Unable to launch expiration task");
1517 }
1518 }
1519}
1520
1521#[cfg(test)]
1522mod test {
1523 // @@ begin test lint list maintained by maint/add_warning @@
1524 #![allow(clippy::bool_assert_comparison)]
1525 #![allow(clippy::clone_on_copy)]
1526 #![allow(clippy::dbg_macro)]
1527 #![allow(clippy::mixed_attributes_style)]
1528 #![allow(clippy::print_stderr)]
1529 #![allow(clippy::print_stdout)]
1530 #![allow(clippy::single_char_pattern)]
1531 #![allow(clippy::unwrap_used)]
1532 #![allow(clippy::unchecked_duration_subtraction)]
1533 #![allow(clippy::useless_vec)]
1534 #![allow(clippy::needless_pass_by_value)]
1535 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1536 use super::*;
1537 use crate::isolation::test::{assert_isoleq, IsolationTokenEq};
1538 use crate::mocks::{FakeBuilder, FakeCirc, FakeId, FakeOp};
1539 use crate::usage::{ExitPolicy, SupportedCircUsage};
1540 use crate::{Error, IsolationToken, StreamIsolation, TargetCircUsage, TargetPort, TargetPorts};
1541 use once_cell::sync::Lazy;
1542 use tor_guardmgr::fallback::FallbackList;
1543 use tor_guardmgr::TestConfig;
1544 use tor_llcrypto::pk::ed25519::Ed25519Identity;
1545 use tor_netdir::testnet;
1546 use tor_persist::TestingStateMgr;
1547 use tor_rtcompat::SleepProvider;
1548 use tor_rtmock::MockRuntime;
1549
1550 #[allow(deprecated)] // TODO #1885
1551 use tor_rtmock::MockSleepRuntime;
1552
1553 static FALLBACKS_EMPTY: Lazy<FallbackList> = Lazy::new(|| [].into());
1554
1555 fn di() -> DirInfo<'static> {
1556 (&*FALLBACKS_EMPTY).into()
1557 }
1558
1559 fn target_to_spec(target: &TargetCircUsage) -> SupportedCircUsage {
1560 match target {
1561 TargetCircUsage::Exit {
1562 ports,
1563 isolation,
1564 country_code,
1565 require_stability,
1566 } => SupportedCircUsage::Exit {
1567 policy: ExitPolicy::from_target_ports(&TargetPorts::from(&ports[..])),
1568 isolation: Some(isolation.clone()),
1569 country_code: country_code.clone(),
1570 all_relays_stable: *require_stability,
1571 },
1572 _ => unimplemented!(),
1573 }
1574 }
1575
1576 impl<U: PartialEq> IsolationTokenEq for OpenEntry<U> {
1577 fn isol_eq(&self, other: &Self) -> bool {
1578 self.spec.isol_eq(&other.spec)
1579 && self.circ == other.circ
1580 && self.expiration == other.expiration
1581 }
1582 }
1583
1584 impl<U: PartialEq> IsolationTokenEq for &mut OpenEntry<U> {
1585 fn isol_eq(&self, other: &Self) -> bool {
1586 self.spec.isol_eq(&other.spec)
1587 && self.circ == other.circ
1588 && self.expiration == other.expiration
1589 }
1590 }
1591
1592 fn make_builder<R: Runtime>(runtime: &R) -> FakeBuilder<R> {
1593 let state_mgr = TestingStateMgr::new();
1594 let guard_config = TestConfig::default();
1595 FakeBuilder::new(runtime, state_mgr, &guard_config)
1596 }
1597
1598 #[test]
1599 fn basic_tests() {
1600 MockRuntime::test_with_various(|rt| async move {
1601 #[allow(deprecated)] // TODO #1885
1602 let rt = MockSleepRuntime::new(rt);
1603
1604 let builder = make_builder(&rt);
1605
1606 let mgr = Arc::new(AbstractCircMgr::new(
1607 builder,
1608 rt.clone(),
1609 CircuitTiming::default(),
1610 ));
1611
1612 let webports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1613
1614 // Check initialization.
1615 assert_eq!(mgr.n_circs(), 0);
1616 assert!(mgr.peek_builder().script.lock().unwrap().is_empty());
1617
1618 // Launch a circuit; make sure we get it.
1619 let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
1620 let c1 = c1.unwrap().0;
1621 assert_eq!(mgr.n_circs(), 1);
1622
1623 // Make sure we get the one we already made if we ask for it.
1624 let port80 = TargetCircUsage::new_from_ipv4_ports(&[80]);
1625 let c2 = mgr.get_or_launch(&port80, di()).await;
1626
1627 let c2 = c2.unwrap().0;
1628 assert!(FakeCirc::eq(&c1, &c2));
1629 assert_eq!(mgr.n_circs(), 1);
1630
1631 // Now try launching two circuits "at once" to make sure that our
1632 // pending-circuit code works.
1633
1634 let dnsport = TargetCircUsage::new_from_ipv4_ports(&[53]);
1635 let dnsport_restrict = TargetCircUsage::Exit {
1636 ports: vec![TargetPort::ipv4(53)],
1637 isolation: StreamIsolation::builder().build().unwrap(),
1638 country_code: None,
1639 require_stability: false,
1640 };
1641
1642 let (c3, c4) = rt
1643 .wait_for(futures::future::join(
1644 mgr.get_or_launch(&dnsport, di()),
1645 mgr.get_or_launch(&dnsport_restrict, di()),
1646 ))
1647 .await;
1648
1649 let c3 = c3.unwrap().0;
1650 let c4 = c4.unwrap().0;
1651 assert!(!FakeCirc::eq(&c1, &c3));
1652 assert!(FakeCirc::eq(&c3, &c4));
1653 assert_eq!(c3.id(), c4.id());
1654 assert_eq!(mgr.n_circs(), 2);
1655
1656 // Now we're going to remove c3 from consideration. It's the
1657 // same as c4, so removing c4 will give us None.
1658 let c3_taken = mgr.take_circ(&c3.id()).unwrap();
1659 let now_its_gone = mgr.take_circ(&c4.id());
1660 assert!(FakeCirc::eq(&c3_taken, &c3));
1661 assert!(now_its_gone.is_none());
1662 assert_eq!(mgr.n_circs(), 1);
1663
1664 // Having removed them, let's launch another dnsport and make
1665 // sure we get a different circuit.
1666 let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
1667 let c5 = c5.unwrap().0;
1668 assert!(!FakeCirc::eq(&c3, &c5));
1669 assert!(!FakeCirc::eq(&c4, &c5));
1670 assert_eq!(mgr.n_circs(), 2);
1671
1672 // Now try launch_by_usage.
1673 let prev = mgr.n_pending_circs();
1674 assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
1675 assert_eq!(mgr.n_pending_circs(), prev + 1);
1676 // TODO: Actually make sure that launch_by_usage launched
1677 // the right thing.
1678 });
1679 }
1680
1681 #[test]
1682 fn request_timeout() {
1683 MockRuntime::test_with_various(|rt| async move {
1684 #[allow(deprecated)] // TODO #1885
1685 let rt = MockSleepRuntime::new(rt);
1686
1687 let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1688
1689 // This will fail once, and then completely time out. The
1690 // result will be a failure.
1691 let builder = make_builder(&rt);
1692 builder.set(&ports, vec![FakeOp::Fail, FakeOp::Timeout]);
1693
1694 let mgr = Arc::new(AbstractCircMgr::new(
1695 builder,
1696 rt.clone(),
1697 CircuitTiming::default(),
1698 ));
1699 let c1 = mgr
1700 .peek_runtime()
1701 .wait_for(mgr.get_or_launch(&ports, di()))
1702 .await;
1703
1704 assert!(matches!(c1, Err(Error::RequestFailed(_))));
1705 });
1706 }
1707
1708 #[test]
1709 fn request_timeout2() {
1710 MockRuntime::test_with_various(|rt| async move {
1711 #[allow(deprecated)] // TODO #1885
1712 let rt = MockSleepRuntime::new(rt);
1713
1714 // Now try a more complicated case: we'll try to get things so
1715 // that we wait for a little over our predicted time because
1716 // of our wait-for-next-action logic.
1717 let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1718 let builder = make_builder(&rt);
1719 builder.set(
1720 &ports,
1721 vec![
1722 FakeOp::Delay(Duration::from_millis(60_000 - 25)),
1723 FakeOp::NoPlan,
1724 ],
1725 );
1726
1727 let mgr = Arc::new(AbstractCircMgr::new(
1728 builder,
1729 rt.clone(),
1730 CircuitTiming::default(),
1731 ));
1732 let c1 = mgr
1733 .peek_runtime()
1734 .wait_for(mgr.get_or_launch(&ports, di()))
1735 .await;
1736
1737 assert!(matches!(c1, Err(Error::RequestFailed(_))));
1738 });
1739 }
1740
1741 #[test]
1742 fn request_unplannable() {
1743 MockRuntime::test_with_various(|rt| async move {
1744 #[allow(deprecated)] // TODO #1885
1745 let rt = MockSleepRuntime::new(rt);
1746
1747 let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1748
1749 // This will fail a the planning stages, a lot.
1750 let builder = make_builder(&rt);
1751 builder.set(&ports, vec![FakeOp::NoPlan; 2000]);
1752
1753 let mgr = Arc::new(AbstractCircMgr::new(
1754 builder,
1755 rt.clone(),
1756 CircuitTiming::default(),
1757 ));
1758 let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
1759
1760 assert!(matches!(c1, Err(Error::RequestFailed(_))));
1761 });
1762 }
1763
1764 #[test]
1765 fn request_fails_too_much() {
1766 MockRuntime::test_with_various(|rt| async move {
1767 #[allow(deprecated)] // TODO #1885
1768 let rt = MockSleepRuntime::new(rt);
1769 let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1770
1771 // This will fail 1000 times, which is above the retry limit.
1772 let builder = make_builder(&rt);
1773 builder.set(&ports, vec![FakeOp::Fail; 1000]);
1774
1775 let mgr = Arc::new(AbstractCircMgr::new(
1776 builder,
1777 rt.clone(),
1778 CircuitTiming::default(),
1779 ));
1780 let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
1781
1782 assert!(matches!(c1, Err(Error::RequestFailed(_))));
1783 });
1784 }
1785
1786 #[test]
1787 fn request_wrong_spec() {
1788 MockRuntime::test_with_various(|rt| async move {
1789 #[allow(deprecated)] // TODO #1885
1790 let rt = MockSleepRuntime::new(rt);
1791 let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1792
1793 // The first time this is called, it will build a circuit
1794 // with the wrong spec. (A circuit builder should never
1795 // actually _do_ that, but it's something we code for.)
1796 let builder = make_builder(&rt);
1797 builder.set(
1798 &ports,
1799 vec![FakeOp::WrongSpec(target_to_spec(
1800 &TargetCircUsage::new_from_ipv4_ports(&[22]),
1801 ))],
1802 );
1803
1804 let mgr = Arc::new(AbstractCircMgr::new(
1805 builder,
1806 rt.clone(),
1807 CircuitTiming::default(),
1808 ));
1809 let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;
1810
1811 assert!(c1.is_ok());
1812 });
1813 }
1814
1815 #[test]
1816 fn request_retried() {
1817 MockRuntime::test_with_various(|rt| async move {
1818 #[allow(deprecated)] // TODO #1885
1819 let rt = MockSleepRuntime::new(rt);
1820 let ports = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1821
1822 // This will fail twice, and then succeed. The result will be
1823 // a success.
1824 let builder = make_builder(&rt);
1825 builder.set(&ports, vec![FakeOp::Fail, FakeOp::Fail]);
1826
1827 let mgr = Arc::new(AbstractCircMgr::new(
1828 builder,
1829 rt.clone(),
1830 CircuitTiming::default(),
1831 ));
1832
1833 // This test doesn't exercise any timeout behaviour.
1834 rt.block_advance("test doesn't require advancing");
1835
1836 let (c1, c2) = rt
1837 .wait_for(futures::future::join(
1838 mgr.get_or_launch(&ports, di()),
1839 mgr.get_or_launch(&ports, di()),
1840 ))
1841 .await;
1842
1843 let c1 = c1.unwrap().0;
1844 let c2 = c2.unwrap().0;
1845
1846 assert!(FakeCirc::eq(&c1, &c2));
1847 });
1848 }
1849
1850 #[test]
1851 fn isolated() {
1852 MockRuntime::test_with_various(|rt| async move {
1853 #[allow(deprecated)] // TODO #1885
1854 let rt = MockSleepRuntime::new(rt);
1855 let builder = make_builder(&rt);
1856 let mgr = Arc::new(AbstractCircMgr::new(
1857 builder,
1858 rt.clone(),
1859 CircuitTiming::default(),
1860 ));
1861
1862 // Set our isolation so that iso1 and iso2 can't share a circuit,
1863 // but no_iso can share a circuit with either.
1864 let iso1 = TargetCircUsage::Exit {
1865 ports: vec![TargetPort::ipv4(443)],
1866 isolation: StreamIsolation::builder()
1867 .owner_token(IsolationToken::new())
1868 .build()
1869 .unwrap(),
1870 country_code: None,
1871 require_stability: false,
1872 };
1873 let iso2 = TargetCircUsage::Exit {
1874 ports: vec![TargetPort::ipv4(443)],
1875 isolation: StreamIsolation::builder()
1876 .owner_token(IsolationToken::new())
1877 .build()
1878 .unwrap(),
1879 country_code: None,
1880 require_stability: false,
1881 };
1882 let no_iso1 = TargetCircUsage::new_from_ipv4_ports(&[443]);
1883 let no_iso2 = no_iso1.clone();
1884
1885 // We're going to try launching these circuits in 24 different
1886 // orders, to make sure that the outcome is correct each time.
1887 use itertools::Itertools;
1888 let timeouts: Vec<_> = [0_u64, 2, 4, 6]
1889 .iter()
1890 .map(|d| Duration::from_millis(*d))
1891 .collect();
1892
1893 for delays in timeouts.iter().permutations(4) {
1894 let d1 = delays[0];
1895 let d2 = delays[1];
1896 let d3 = delays[2];
1897 let d4 = delays[2];
1898 let (c_iso1, c_iso2, c_no_iso1, c_no_iso2) = rt
1899 .wait_for(futures::future::join4(
1900 async {
1901 rt.sleep(*d1).await;
1902 mgr.get_or_launch(&iso1, di()).await
1903 },
1904 async {
1905 rt.sleep(*d2).await;
1906 mgr.get_or_launch(&iso2, di()).await
1907 },
1908 async {
1909 rt.sleep(*d3).await;
1910 mgr.get_or_launch(&no_iso1, di()).await
1911 },
1912 async {
1913 rt.sleep(*d4).await;
1914 mgr.get_or_launch(&no_iso2, di()).await
1915 },
1916 ))
1917 .await;
1918
1919 let c_iso1 = c_iso1.unwrap().0;
1920 let c_iso2 = c_iso2.unwrap().0;
1921 let c_no_iso1 = c_no_iso1.unwrap().0;
1922 let c_no_iso2 = c_no_iso2.unwrap().0;
1923
1924 assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
1925 assert!(!FakeCirc::eq(&c_iso1, &c_no_iso1));
1926 assert!(!FakeCirc::eq(&c_iso1, &c_no_iso2));
1927 assert!(!FakeCirc::eq(&c_iso2, &c_no_iso1));
1928 assert!(!FakeCirc::eq(&c_iso2, &c_no_iso2));
1929 assert!(FakeCirc::eq(&c_no_iso1, &c_no_iso2));
1930 }
1931 });
1932 }
1933
1934 #[test]
1935 fn opportunistic() {
1936 MockRuntime::test_with_various(|rt| async move {
1937 #[allow(deprecated)] // TODO #1885
1938 let rt = MockSleepRuntime::new(rt);
1939
1940 // The first request will time out completely, but we're
1941 // making a second request after we launch it. That
1942 // request should succeed, and notify the first request.
1943
1944 let ports1 = TargetCircUsage::new_from_ipv4_ports(&[80]);
1945 let ports2 = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1946
1947 let builder = make_builder(&rt);
1948 builder.set(&ports1, vec![FakeOp::Timeout]);
1949
1950 let mgr = Arc::new(AbstractCircMgr::new(
1951 builder,
1952 rt.clone(),
1953 CircuitTiming::default(),
1954 ));
1955 // Note that ports2 will be wider than ports1, so the second
1956 // request will have to launch a new circuit.
1957
1958 let (c1, c2) = rt
1959 .wait_for(futures::future::join(
1960 mgr.get_or_launch(&ports1, di()),
1961 async {
1962 rt.sleep(Duration::from_millis(100)).await;
1963 mgr.get_or_launch(&ports2, di()).await
1964 },
1965 ))
1966 .await;
1967
1968 if let (Ok((c1, _)), Ok((c2, _))) = (c1, c2) {
1969 assert!(FakeCirc::eq(&c1, &c2));
1970 } else {
1971 panic!();
1972 };
1973 });
1974 }
1975
1976 #[test]
1977 fn prebuild() {
1978 MockRuntime::test_with_various(|rt| async move {
1979 // This time we're going to use ensure_circuit() to make
1980 // sure that a circuit gets built, and then launch two
1981 // other circuits that will use it.
1982 #[allow(deprecated)] // TODO #1885
1983 let rt = MockSleepRuntime::new(rt);
1984 let builder = make_builder(&rt);
1985 let mgr = Arc::new(AbstractCircMgr::new(
1986 builder,
1987 rt.clone(),
1988 CircuitTiming::default(),
1989 ));
1990
1991 let ports1 = TargetCircUsage::new_from_ipv4_ports(&[80, 443]);
1992 let ports2 = TargetCircUsage::new_from_ipv4_ports(&[80]);
1993 let ports3 = TargetCircUsage::new_from_ipv4_ports(&[443]);
1994
1995 let (ok, c1, c2) = rt
1996 .wait_for(futures::future::join3(
1997 mgr.ensure_circuit(&ports1, di()),
1998 async {
1999 rt.sleep(Duration::from_millis(10)).await;
2000 mgr.get_or_launch(&ports2, di()).await
2001 },
2002 async {
2003 rt.sleep(Duration::from_millis(50)).await;
2004 mgr.get_or_launch(&ports3, di()).await
2005 },
2006 ))
2007 .await;
2008
2009 assert!(ok.is_ok());
2010
2011 let c1 = c1.unwrap().0;
2012 let c2 = c2.unwrap().0;
2013
2014 // If we had launched these separately, they wouldn't share
2015 // a circuit.
2016 assert!(FakeCirc::eq(&c1, &c2));
2017 });
2018 }
2019
2020 #[test]
2021 fn expiration() {
2022 MockRuntime::test_with_various(|rt| async move {
2023 use crate::config::CircuitTimingBuilder;
2024 // Now let's make some circuits -- one dirty, one clean, and
2025 // make sure that one expires and one doesn't.
2026 #[allow(deprecated)] // TODO #1885
2027 let rt = MockSleepRuntime::new(rt);
2028 let builder = make_builder(&rt);
2029
2030 let circuit_timing = CircuitTimingBuilder::default()
2031 .max_dirtiness(Duration::from_secs(15))
2032 .build()
2033 .unwrap();
2034
2035 let mgr = Arc::new(AbstractCircMgr::new(builder, rt.clone(), circuit_timing));
2036
2037 let imap = TargetCircUsage::new_from_ipv4_ports(&[993]);
2038 let pop = TargetCircUsage::new_from_ipv4_ports(&[995]);
2039
2040 let (ok, pop1) = rt
2041 .wait_for(futures::future::join(
2042 mgr.ensure_circuit(&imap, di()),
2043 mgr.get_or_launch(&pop, di()),
2044 ))
2045 .await;
2046
2047 assert!(ok.is_ok());
2048 let pop1 = pop1.unwrap().0;
2049
2050 rt.advance(Duration::from_secs(30)).await;
2051 rt.advance(Duration::from_secs(15)).await;
2052 let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap().0;
2053
2054 // This should expire the pop circuit, since it came from
2055 // get_or_launch() [which marks the circuit as being
2056 // used]. It should not expire the imap circuit, since
2057 // it was not dirty until 15 seconds after the cutoff.
2058 let now = rt.now();
2059
2060 mgr.expire_circs(now);
2061
2062 let (pop2, imap2) = rt
2063 .wait_for(futures::future::join(
2064 mgr.get_or_launch(&pop, di()),
2065 mgr.get_or_launch(&imap, di()),
2066 ))
2067 .await;
2068
2069 let pop2 = pop2.unwrap().0;
2070 let imap2 = imap2.unwrap().0;
2071
2072 assert!(!FakeCirc::eq(&pop2, &pop1));
2073 assert!(FakeCirc::eq(&imap2, &imap1));
2074 });
2075 }
2076
2077 /// Returns three exit policies; one that permits nothing, one that permits ports 80
2078 /// and 443 only, and one that permits all ports.
2079 fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
2080 // FIXME(eta): the below is copypasta; would be nice to have a better way of
2081 // constructing ExitPolicy objects for testing maybe
2082 let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2083
2084 // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
2085 // exits. Odd-numbered ones allow only ports 80 and 443;
2086 // even-numbered ones allow all ports.
2087 let id_noexit: Ed25519Identity = [0x05; 32].into();
2088 let id_webexit: Ed25519Identity = [0x11; 32].into();
2089 let id_fullexit: Ed25519Identity = [0x20; 32].into();
2090
2091 let not_exit = network.by_id(&id_noexit).unwrap();
2092 let web_exit = network.by_id(&id_webexit).unwrap();
2093 let full_exit = network.by_id(&id_fullexit).unwrap();
2094
2095 let ep_none = ExitPolicy::from_relay(¬_exit);
2096 let ep_web = ExitPolicy::from_relay(&web_exit);
2097 let ep_full = ExitPolicy::from_relay(&full_exit);
2098 (ep_none, ep_web, ep_full)
2099 }
2100
2101 #[test]
2102 fn test_find_supported() {
2103 let (ep_none, ep_web, ep_full) = get_exit_policies();
2104 let fake_circ = Arc::new(FakeCirc { id: FakeId::next() });
2105 let expiration = ExpirationInfo::Unused {
2106 use_before: Instant::now() + Duration::from_secs(60 * 60),
2107 };
2108
2109 let mut entry_none = OpenEntry::new(
2110 SupportedCircUsage::Exit {
2111 policy: ep_none,
2112 isolation: None,
2113 country_code: None,
2114 all_relays_stable: true,
2115 },
2116 fake_circ.clone(),
2117 expiration.clone(),
2118 );
2119 let mut entry_none_c = entry_none.clone();
2120 let mut entry_web = OpenEntry::new(
2121 SupportedCircUsage::Exit {
2122 policy: ep_web,
2123 isolation: None,
2124 country_code: None,
2125 all_relays_stable: true,
2126 },
2127 fake_circ.clone(),
2128 expiration.clone(),
2129 );
2130 let mut entry_web_c = entry_web.clone();
2131 let mut entry_full = OpenEntry::new(
2132 SupportedCircUsage::Exit {
2133 policy: ep_full,
2134 isolation: None,
2135 country_code: None,
2136 all_relays_stable: true,
2137 },
2138 fake_circ,
2139 expiration,
2140 );
2141 let mut entry_full_c = entry_full.clone();
2142
2143 let usage_web = TargetCircUsage::new_from_ipv4_ports(&[80]);
2144 let empty: Vec<&mut OpenEntry<FakeCirc>> = vec![];
2145
2146 assert_isoleq!(
2147 SupportedCircUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
2148 empty
2149 );
2150
2151 // HACK(eta): We have to faff around with clones and such because
2152 // `abstract_spec_find_supported` has a silly signature that involves `&mut`
2153 // refs, which we can't have more than one of.
2154
2155 assert_isoleq!(
2156 SupportedCircUsage::find_supported(
2157 vec![&mut entry_none, &mut entry_web].into_iter(),
2158 &usage_web,
2159 ),
2160 vec![&mut entry_web_c]
2161 );
2162
2163 assert_isoleq!(
2164 SupportedCircUsage::find_supported(
2165 vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2166 &usage_web,
2167 ),
2168 vec![&mut entry_web_c, &mut entry_full_c]
2169 );
2170
2171 // Test preemptive circuit usage:
2172
2173 let usage_preemptive_web = TargetCircUsage::Preemptive {
2174 port: Some(TargetPort::ipv4(80)),
2175 circs: 2,
2176 require_stability: false,
2177 };
2178 let usage_preemptive_dns = TargetCircUsage::Preemptive {
2179 port: None,
2180 circs: 2,
2181 require_stability: false,
2182 };
2183
2184 // shouldn't return anything unless there are >=2 circuits
2185
2186 assert_isoleq!(
2187 SupportedCircUsage::find_supported(
2188 vec![&mut entry_none].into_iter(),
2189 &usage_preemptive_web
2190 ),
2191 empty
2192 );
2193
2194 assert_isoleq!(
2195 SupportedCircUsage::find_supported(
2196 vec![&mut entry_none].into_iter(),
2197 &usage_preemptive_dns
2198 ),
2199 empty
2200 );
2201
2202 assert_isoleq!(
2203 SupportedCircUsage::find_supported(
2204 vec![&mut entry_none, &mut entry_web].into_iter(),
2205 &usage_preemptive_web
2206 ),
2207 empty
2208 );
2209
2210 assert_isoleq!(
2211 SupportedCircUsage::find_supported(
2212 vec![&mut entry_none, &mut entry_web].into_iter(),
2213 &usage_preemptive_dns
2214 ),
2215 vec![&mut entry_none_c, &mut entry_web_c]
2216 );
2217
2218 assert_isoleq!(
2219 SupportedCircUsage::find_supported(
2220 vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
2221 &usage_preemptive_web
2222 ),
2223 vec![&mut entry_web_c, &mut entry_full_c]
2224 );
2225 }
2226
2227 #[test]
2228 fn test_circlist_preemptive_target_circs() {
2229 MockRuntime::test_with_various(|rt| async move {
2230 #[allow(deprecated)] // TODO #1885
2231 let rt = MockSleepRuntime::new(rt);
2232 let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
2233 let dirinfo = DirInfo::Directory(&netdir);
2234
2235 let builder = make_builder(&rt);
2236
2237 for circs in [2, 8].iter() {
2238 let mut circlist = CircList::<FakeBuilder<MockRuntime>, MockRuntime>::new();
2239
2240 let preemptive_target = TargetCircUsage::Preemptive {
2241 port: Some(TargetPort::ipv4(80)),
2242 circs: *circs,
2243 require_stability: false,
2244 };
2245
2246 for _ in 0..*circs {
2247 assert!(circlist.find_open(&preemptive_target).is_none());
2248
2249 let usage = TargetCircUsage::new_from_ipv4_ports(&[80]);
2250 let (plan, _) = builder.plan_circuit(&usage, dirinfo).unwrap();
2251 let (spec, circ) = rt.wait_for(builder.build_circuit(plan)).await.unwrap();
2252 let entry = OpenEntry::new(
2253 spec,
2254 circ,
2255 ExpirationInfo::new(rt.now() + Duration::from_secs(60)),
2256 );
2257 circlist.add_open(entry);
2258 }
2259
2260 assert!(circlist.find_open(&preemptive_target).is_some());
2261 }
2262 });
2263 }
2264}