tor_dirmgr/bridgedesc.rs
1//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
2
3use std::borrow::Cow;
4use std::cmp::Ordering;
5use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
6use std::fmt::{self, Debug, Display};
7use std::num::NonZeroU8;
8use std::panic::AssertUnwindSafe;
9use std::sync::{Arc, Mutex, MutexGuard, Weak};
10
11use async_trait::async_trait;
12use derive_more::{Deref, DerefMut};
13use educe::Educe;
14use futures::FutureExt;
15use futures::future;
16use futures::select;
17use futures::stream::{BoxStream, StreamExt};
18use futures::task::SpawnError;
19use tracing::{debug, info, trace};
20
21use safelog::sensitive;
22use tor_basic_utils::retry::RetryDelay;
23use tor_checkable::{SelfSigned, Timebound};
24use tor_circmgr::CircMgr;
25use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
26use tor_error::{ErrorKind, HasKind, error_report, internal};
27use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
28use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
29use tor_netdoc::doc::routerdesc::RouterDesc;
30use tor_rtcompat::{Runtime, SpawnExt as _};
31use web_time_compat::{Duration, Instant, SystemTime};
32
33use crate::event::FlagPublisher;
34use crate::storage::CachedBridgeDescriptor;
35use crate::{DirMgrStore, DynStore};
36
37#[cfg(test)]
38mod bdtest;
39
40/// The key we use in all our data structures
41///
42/// This type saves typing and would make it easier to change the bridge descriptor manager
43/// to take and handle another way of identifying the bridges it is working with.
44type BridgeKey = BridgeConfig;
45
46/// Active vs dormant state, as far as the bridge descriptor manager is concerned
47///
48/// This is usually derived in higher layers from `arti_client::DormantMode`,
49/// whether `TorClient::bootstrap()` has been called, etc.
50#[non_exhaustive]
51#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
52// TODO: These proliferating `Dormancy` enums should be centralized and unified with `TaskHandle`
53// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
54pub enum Dormancy {
55 /// Dormant (inactive)
56 ///
57 /// Bridge descriptor downloads, or refreshes, will not be started.
58 ///
59 /// In-progress downloads will be stopped if possible,
60 /// but they may continue until they complete (or fail).
61 // TODO async task cancellation: actually cancel these in this case
62 ///
63 /// So a dormant BridgeDescMgr may still continue to
64 /// change the return value from [`bridges()`](BridgeDescProvider::bridges)
65 /// and continue to report [`BridgeDescEvent`]s.
66 ///
67 /// When the BridgeDescMgr is dormant,
68 /// `bridges()` may return stale descriptors
69 /// (that is, descriptors which ought to have been refetched and may no longer be valid),
70 /// or stale errors
71 /// (that is, errors which occurred some time ago,
72 /// and which would normally have been retried by now).
73 Dormant,
74
75 /// Active
76 ///
77 /// Bridge descriptors will be downloaded as requested.
78 ///
79 /// When a bridge descriptor manager has been `Dormant`,
80 /// it may continue to provide stale data (as described)
81 /// for a while after it is made `Active`,
82 /// until the required refreshes and retries have taken place (or failed).
83 Active,
84}
85
86/// **Downloader and cache for bridges' router descriptors**
87///
88/// This is a handle which is cheap to clone and has internal mutability.
89#[derive(Clone)]
90pub struct BridgeDescMgr<R: Runtime, M = ()>
91where
92 M: Mockable<R>,
93{
94 /// The actual manager
95 ///
96 /// We have the `Arc` in here, rather than in our callers, because this
97 /// makes the API nicer for them, and also because some of our tasks
98 /// want a handle they can use to relock and modify the state.
99 mgr: Arc<Manager<R, M>>,
100}
101
102/// Configuration for the `BridgeDescMgr`
103///
104/// Currently, the only way to make this is via its `Default` impl.
105// TODO: there should be some way to override the defaults. See #629 for considerations.
106#[derive(Debug, Clone)]
107pub struct BridgeDescDownloadConfig {
108 /// How many bridge descriptor downloads to attempt in parallel?
109 parallelism: NonZeroU8,
110
111 /// Default/initial time to retry a failure to download a descriptor
112 ///
113 /// (This has the semantics of an initial delay for [`RetryDelay`],
114 /// and is used unless there is more specific retry information for the particular failure.)
115 retry: Duration,
116
117 /// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
118 prefetch: Duration,
119
120 /// Minimum interval between successive refetches of the descriptor for the same bridge
121 ///
122 /// This limits the download activity which can be caused by an errant bridge.
123 ///
124 /// If the descriptor's validity information is shorter than this, we will use
125 /// it after it has expired (rather than treating the bridge as broken).
126 min_refetch: Duration,
127
128 /// Maximum interval between successive refetches of the descriptor for the same bridge
129 ///
130 /// This sets an upper bound on how old a descriptor we are willing to use.
131 /// When this time expires, a refetch attempt will be started even if the
132 /// descriptor is not going to expire soon.
133 //
134 // TODO: When this is configurable, we need to make sure we reject
135 // configurations with max_refresh < min_refresh, or we may panic.
136 max_refetch: Duration,
137}
138
139impl Default for BridgeDescDownloadConfig {
140 fn default() -> Self {
141 let secs = Duration::from_secs;
142 BridgeDescDownloadConfig {
143 parallelism: 4.try_into().expect("parallelism is zero"),
144 retry: secs(30),
145 prefetch: secs(1000),
146 min_refetch: secs(3600),
147 max_refetch: secs(3600 * 3), // matches C Tor behaviour
148 }
149 }
150}
151
152/// Mockable internal methods for within the `BridgeDescMgr`
153///
154/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
155///
156/// This (`()`) is the default for the type parameter in
157/// [`BridgeDescMgr`],
158/// and it is the only publicly available implementation,
159/// since this trait is sealed.
160pub trait Mockable<R>: mockable::MockableAPI<R> {}
161impl<R: Runtime> Mockable<R> for () {}
162
163/// Private module which seals [`Mockable`]
164/// by containing [`MockableAPI`](mockable::MockableAPI)
165mod mockable {
166 use super::*;
167
168 /// Defines the actual mockable APIs
169 ///
170 /// Not nameable (and therefore not implementable)
171 /// outside the `bridgedesc` module,
172 #[async_trait]
173 pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
174 /// Circuit manager
175 type CircMgr: Send + Sync + 'static;
176
177 /// Download this bridge's descriptor, and return it as a string
178 ///
179 /// Runs in a task.
180 /// Called by `Manager::download_descriptor`, which handles parsing and validation.
181 ///
182 /// If `if_modified_since` is `Some`,
183 /// should tolerate an HTTP 304 Not Modified and return `None` in that case.
184 /// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
185 async fn download(
186 self,
187 runtime: &R,
188 circmgr: &Self::CircMgr,
189 bridge: &BridgeConfig,
190 if_modified_since: Option<SystemTime>,
191 ) -> Result<Option<String>, Error>;
192 }
193}
194#[async_trait]
195impl<R: Runtime> mockable::MockableAPI<R> for () {
196 type CircMgr = Arc<CircMgr<R>>;
197
198 /// Actual code for downloading a descriptor document
199 async fn download(
200 self,
201 runtime: &R,
202 circmgr: &Self::CircMgr,
203 bridge: &BridgeConfig,
204 _if_modified_since: Option<SystemTime>,
205 ) -> Result<Option<String>, Error> {
206 // TODO actually support _if_modified_since
207 let tunnel = circmgr.get_or_launch_dir_specific(bridge).await?;
208 let mut stream = tunnel
209 .begin_dir_stream()
210 .await
211 .map_err(Error::StreamFailed)?;
212 let request = tor_dirclient::request::RoutersOwnDescRequest::new();
213 let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
214 .await
215 .map_err(|dce| match dce {
216 tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
217 _ => internal!(
218 "tor_dirclient::send_request gave non-RequestFailed {:?}",
219 dce
220 )
221 .into(),
222 })?;
223 let output = response.into_output_string()?;
224 Ok(Some(output))
225 }
226}
227
228/// The actual manager.
229struct Manager<R: Runtime, M: Mockable<R>> {
230 /// The mutable state
231 state: Mutex<State>,
232
233 /// Runtime, used for tasks and sleeping
234 runtime: R,
235
236 /// Circuit manager, used for creating circuits
237 circmgr: M::CircMgr,
238
239 /// Persistent state store
240 store: Arc<Mutex<DynStore>>,
241
242 /// Mock for testing, usually `()`
243 mockable: M,
244}
245
246/// State: our downloaded descriptors (cache), and records of what we're doing
247///
248/// Various functions (both tasks and public entrypoints),
249/// which generally start with a `Manager`,
250/// lock the mutex and modify this.
251///
252/// Generally, the flow is:
253///
254/// * A public entrypoint, or task, obtains a [`StateGuard`].
255/// It modifies the state to represent the callers' new requirements,
256/// or things it has done, by updating the state,
257/// preserving the invariants but disturbing the "liveness" (see below).
258///
259/// * [`StateGuard::drop`] calls [`State::process`].
260/// This restores the liveness properties.
261///
262/// ### Possible states of a bridge:
263///
264/// A bridge can be in one of the following states,
265/// represented by its presence in these particular data structures inside `State`:
266///
267/// * `running`/`queued`: newly added, no outcome yet.
268/// * `current` + `running`/`queued`: we are fetching (or going to)
269/// * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
270/// * `current = Err` + `retry_schedule`: failed, will retry at some point
271///
272/// ### Invariants:
273///
274/// Can be disrupted in the middle of a principal function,
275/// but should be restored on return.
276///
277/// * **Tracked**:
278/// Each bridge appears at most once in
279/// `running`, `queued`, `refetch_schedule` and `retry_schedule`.
280/// We call such a bridge Tracked.
281///
282/// * **Current**
283/// Every bridge in `current` is Tracked.
284/// (But not every Tracked bridge is necessarily in `current`, yet.)
285///
286/// * **Schedules**
287/// Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
288///
289/// * **Input**:
290/// Exactly each bridge that was passed to
291/// the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
292/// (If we encountered spawn failures, we treat this as trying to shut down,
293/// so we cease attempts to get bridges, and discard the relevant state, violating this.)
294///
295/// * **Limit**:
296/// `running` is capped at the effective parallelism: zero if we are dormant,
297/// the configured parallelism otherwise.
298///
299/// ### Liveness properties:
300///
301/// These can be disrupted by any function which holds a [`StateGuard`].
302/// Will be restored by [`process()`](State::process),
303/// which is called when `StateGuard` is dropped.
304///
305/// Functions that take a `StateGuard` may disturb these invariants
306/// and rely on someone else to restore them.
307///
308/// * **Running**:
309/// If `queued` is nonempty, `running` is full.
310///
311/// * **Timeout**:
312/// `earliest_timeout` is the earliest timeout in
313/// either `retry_schedule` or `refetch_schedule`.
314/// (Disturbances of this property which occur due to system time warps
315/// are not necessarily detected and remedied in a timely way,
316/// but will be remedied no later than after `max_refetch`.)
317struct State {
318 /// Our configuration
319 config: Arc<BridgeDescDownloadConfig>,
320
321 /// People who will be told when `current` changes.
322 subscribers: FlagPublisher<BridgeDescEvent>,
323
324 /// Our current idea of our output, which we give out handles onto.
325 current: Arc<BridgeDescList>,
326
327 /// Bridges whose descriptors we are currently downloading.
328 running: HashMap<BridgeKey, RunningInfo>,
329
330 /// Bridges which we want to download,
331 /// but we're waiting for `running` to be less than `effective_parallelism()`.
332 queued: VecDeque<QueuedEntry>,
333
334 /// Are we dormant?
335 dormancy: Dormancy,
336
337 /// Bridges that we have a descriptor for,
338 /// and when they should be refetched due to validity expiry.
339 ///
340 /// This is indexed by `SystemTime` because that helps avoids undesirable behaviors
341 /// when the system clock changes.
342 refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
343
344 /// Bridges that failed earlier, and when they should be retried.
345 retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
346
347 /// Earliest time from either `retry_schedule` or `refetch_schedule`
348 ///
349 /// `None` means "wait indefinitely".
350 earliest_timeout: postage::watch::Sender<Option<Instant>>,
351}
352
353impl Debug for State {
354 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
355 /// Helper to format one bridge entry somewhere
356 fn fmt_bridge(
357 f: &mut fmt::Formatter,
358 b: &BridgeConfig,
359 info: &(dyn Display + '_),
360 ) -> fmt::Result {
361 let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
362 writeln!(f, " {:80.80} | {}", info, b)
363 }
364
365 /// Helper to format one of the schedules
366 fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
367 f: &mut fmt::Formatter,
368 summary: &str,
369 name: &str,
370 schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
371 ) -> fmt::Result {
372 writeln!(f, " {}:", name)?;
373 for b in schedule {
374 fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
375 }
376 Ok(())
377 }
378
379 // We are going to have to go multi-line because of the bridge lines,
380 // so do completely bespoke formatting rather than `std::fmt::DebugStruct`
381 // or a derive.
382 writeln!(f, "State {{")?;
383 // We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
384 writeln!(f, " earliest_timeout: ???, ..,")?;
385 writeln!(f, " current:")?;
386 for (b, v) in &*self.current {
387 fmt_bridge(
388 f,
389 b,
390 &match v {
391 Err(e) => Cow::from(format!("C Err {}", e)),
392 Ok(_) => "C Ok".into(),
393 },
394 )?;
395 }
396 writeln!(f, " running:")?;
397 for b in self.running.keys() {
398 fmt_bridge(f, b, &"R")?;
399 }
400 writeln!(f, " queued:")?;
401 for qe in &self.queued {
402 fmt_bridge(f, &qe.bridge, &"Q")?;
403 }
404 fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
405 fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
406 write!(f, "}}")?;
407
408 Ok(())
409 }
410}
411
412/// Value of the entry in `running`
413#[derive(Debug)]
414struct RunningInfo {
415 /// For cancelling downloads no longer wanted
416 join: JoinHandle,
417
418 /// If this previously failed, the persistent retry delay.
419 retry_delay: Option<RetryDelay>,
420}
421
422/// Entry in `queued`
423#[derive(Debug)]
424struct QueuedEntry {
425 /// The bridge to fetch
426 bridge: BridgeKey,
427
428 /// If this previously failed, the persistent retry delay.
429 retry_delay: Option<RetryDelay>,
430}
431
432/// Entry in one of the `*_schedule`s
433///
434/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
435/// So don't deduplicate by `[Partial]Eq`, or use as a key in a map.
436#[derive(Debug)]
437struct RefetchEntry<TT, RD> {
438 /// When should we requeued this bridge for fetching
439 ///
440 /// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
441 when: TT,
442
443 /// The bridge to refetch
444 bridge: BridgeKey,
445
446 /// Retry delay
447 ///
448 /// `RetryDelay` if we previously failed (ie, if this is a retry entry);
449 /// otherwise `()`.
450 retry_delay: RD,
451}
452
453impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
454 fn cmp(&self, other: &Self) -> Ordering {
455 self.when.cmp(&other.when).reverse()
456 // We don't care about the ordering of BridgeConfig or retry_delay.
457 // Different BridgeConfig with the same fetch time will be fetched in "some order".
458 }
459}
460
461impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
462 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
463 Some(self.cmp(other))
464 }
465}
466
467impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
468 fn eq(&self, other: &Self) -> bool {
469 self.cmp(other) == Ordering::Equal
470 }
471}
472
473impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
474
475/// Dummy task join handle
476///
477/// We would like to be able to cancel now-redundant downloads
478/// using something like `tokio::task::JoinHandle::abort()`.
479/// tor-rtcompat doesn't support that so we stub it for now.
480///
481/// Providing this stub means the place where the cancellation needs to take place
482/// already has the appropriate call to our [`JoinHandle::abort`].
483#[derive(Debug)]
484struct JoinHandle;
485
486impl JoinHandle {
487 /// Would abort this async task, if we could do that.
488 fn abort(&self) {}
489}
490
491impl<R: Runtime> BridgeDescMgr<R> {
492 /// Create a new `BridgeDescMgr`
493 ///
494 /// This is the public constructor.
495 //
496 // TODO: That this constructor requires a DirMgr is rather odd.
497 // In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
498 // However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
499 // implementation is constructible only from the dirmgr's config. This should probably be
500 // tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
501 pub fn new(
502 config: &BridgeDescDownloadConfig,
503 runtime: R,
504 store: DirMgrStore<R>,
505 circmgr: Arc<tor_circmgr::CircMgr<R>>,
506 dormancy: Dormancy,
507 ) -> Result<Self, StartupError> {
508 Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
509 }
510}
511
512/// If download was successful, what we obtained
513///
514/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
515#[derive(Debug)]
516struct Downloaded {
517 /// The bridge descriptor, fully parsed and verified
518 desc: BridgeDesc,
519
520 /// When we should start a refresh for this descriptor
521 ///
522 /// This is derived from the expiry time,
523 /// and clamped according to limits in the configuration).
524 refetch: SystemTime,
525}
526
527impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
528 /// Actual constructor, which takes a mockable
529 //
530 // Allow passing `runtime` by value, which is usual API for this kind of setup function.
531 #[allow(clippy::needless_pass_by_value)]
532 fn new_internal(
533 runtime: R,
534 circmgr: M::CircMgr,
535 store: Arc<Mutex<DynStore>>,
536 config: &BridgeDescDownloadConfig,
537 dormancy: Dormancy,
538 mockable: M,
539 ) -> Result<Self, StartupError> {
540 /// Convenience alias
541 fn default<T: Default>() -> T {
542 Default::default()
543 }
544
545 let config = config.clone().into();
546 let (earliest_timeout, timeout_update) = postage::watch::channel();
547
548 let state = Mutex::new(State {
549 config,
550 subscribers: default(),
551 current: default(),
552 running: default(),
553 queued: default(),
554 dormancy,
555 retry_schedule: default(),
556 refetch_schedule: default(),
557 earliest_timeout,
558 });
559 let mgr = Arc::new(Manager {
560 state,
561 runtime: runtime.clone(),
562 circmgr,
563 store,
564 mockable,
565 });
566
567 runtime
568 .spawn(timeout_task(
569 runtime.clone(),
570 Arc::downgrade(&mgr),
571 timeout_update,
572 ))
573 .map_err(|cause| StartupError::Spawn {
574 spawning: "timeout task",
575 cause: cause.into(),
576 })?;
577
578 Ok(BridgeDescMgr { mgr })
579 }
580
581 /// Consistency check convenience wrapper
582 #[cfg(test)]
583 fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
584 where
585 I: IntoIterator<Item = &'i BridgeKey>,
586 {
587 self.mgr
588 .lock_only()
589 .check_consistency(&self.mgr.runtime, input_bridges);
590 }
591
592 /// Set whether this `BridgeDescMgr` is active
593 // TODO this should instead be handled by a central mechanism; see TODO on Dormancy
594 pub fn set_dormancy(&self, dormancy: Dormancy) {
595 self.mgr.lock_then_process().dormancy = dormancy;
596 }
597}
598
599impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
600 fn bridges(&self) -> Arc<BridgeDescList> {
601 self.mgr.lock_only().current.clone()
602 }
603
604 fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
605 let stream = self.mgr.lock_only().subscribers.subscribe();
606 Box::pin(stream) as _
607 }
608
609 fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
610 /// Helper: Called for each bridge that is currently Tracked.
611 ///
612 /// Checks if `new_bridges` has `bridge`. If so, removes it from `new_bridges`,
613 /// and returns `true`, indicating that this bridge should be kept.
614 ///
615 /// If not, returns `false`, indicating that this bridge should be removed,
616 /// and logs a message.
617 fn note_found_keep_p(
618 new_bridges: &mut HashSet<BridgeKey>,
619 bridge: &BridgeKey,
620 was_state: &str,
621 ) -> bool {
622 let keep = new_bridges.remove(bridge);
623 if !keep {
624 debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
625 }
626 keep
627 }
628
629 /// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
630 /// removing them as we go.
631 fn filter_schedule<TT: Ord + Copy, RD>(
632 new_bridges: &mut HashSet<BridgeKey>,
633 schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
634 was_state: &str,
635 ) {
636 schedule.retain(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
637 }
638
639 let mut state = self.mgr.lock_then_process();
640 let state = &mut **state;
641
642 // We go through our own data structures, comparing them with `new_bridges`.
643 // Entries in our own structures that aren't in `new_bridges` are removed.
644 // Entries that *are* are removed from `new_bridges`.
645 // Eventually `new_bridges` is just the list of new bridges to *add*.
646 let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
647
648 // Is there anything in `current` that ought to be deleted?
649 if state.current.keys().any(|b| !new_bridges.contains(b)) {
650 // Found a bridge In `current` but not `new`
651 // We need to remove it (and any others like it) from `current`.
652 //
653 // Disturbs the invariant *Schedules*:
654 // After this maybe the schedules have entries they shouldn't.
655 let current: BridgeDescList = state
656 .current
657 .iter()
658 .filter(|(b, _)| new_bridges.contains(&**b))
659 .map(|(b, v)| (b.clone(), v.clone()))
660 .collect();
661 state.set_current_and_notify(current);
662 } else {
663 // Nothing is being removed, so we can keep `current`.
664 }
665 // Bridges being newly requested will be added to `current`
666 // later, after they have been fetched.
667
668 // Is there anything in running we should abort?
669 state.running.retain(|b, ri| {
670 let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
671 if !keep {
672 ri.join.abort();
673 }
674 keep
675 });
676
677 // Is there anything in queued we should forget about?
678 state
679 .queued
680 .retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
681
682 // Restore the invariant *Schedules*, that the schedules contain only things in current,
683 // by removing the same things from the schedules that we earlier removed from current.
684 filter_schedule(
685 &mut new_bridges,
686 &mut state.retry_schedule,
687 "previously failed",
688 );
689 filter_schedule(
690 &mut new_bridges,
691 &mut state.refetch_schedule,
692 "previously downloaded",
693 );
694
695 // OK now we have the list of bridges to add (if any).
696 state.queued.extend(new_bridges.into_iter().map(|bridge| {
697 debug!(r#" added bridge, queueing for download "{}""#, &bridge);
698 QueuedEntry {
699 bridge,
700 retry_delay: None,
701 }
702 }));
703
704 // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
705 // to make further progress and restore the liveness properties.
706 }
707}
708
709impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
710 /// Obtain a lock on state, for functions that want to disrupt liveness properties
711 ///
712 /// When `StateGuard` is dropped, the liveness properties will be restored
713 /// by making whatever progress is required.
714 ///
715 /// See [`State`].
716 fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
717 StateGuard {
718 state: self.lock_only(),
719 mgr: self,
720 }
721 }
722
723 /// Obtains the lock on state.
724 ///
725 /// Caller ought not to modify state
726 /// so as to invalidate invariants or liveness properties.
727 /// Callers which are part of the algorithms in this crate
728 /// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
729 fn lock_only(&self) -> MutexGuard<State> {
730 self.state.lock().expect("bridge desc manager poisoned")
731 }
732}
733
734/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
735///
736/// The holder must still maintain the invariants.
737///
738/// Obtained from [`Manager::lock_then_process`]. See [`State`].
739#[derive(Educe, Deref, DerefMut)]
740#[educe(Debug)]
741struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
742 /// Reference to the mutable state
743 #[deref]
744 #[deref_mut]
745 state: MutexGuard<'s, State>,
746
747 /// Reference to the outer container
748 ///
749 /// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
750 /// for use by spawned tasks.
751 #[educe(Debug(ignore))]
752 mgr: &'s Arc<Manager<R, M>>,
753}
754
755impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
756 fn drop(&mut self) {
757 self.state.process(self.mgr);
758 }
759}
760
761impl State {
762 /// Ensure progress is made, by restoring all the liveness invariants
763 ///
764 /// This includes launching circuits as needed.
765 fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
766 // Restore liveness property *Running*
767 self.consider_launching(mgr);
768
769 let now_wall = mgr.runtime.wallclock();
770
771 // Mitigate clock warping
772 //
773 // If the earliest `SystemTime` is more than `max_refetch` away,
774 // the clock must have warped. If that happens we clamp
775 // them all to `max_refetch`.
776 //
777 // (This is not perfect but will mitigate the worst effects by ensuring
778 // that we do *something* at least every `max_refetch`, in the worst case,
779 // other than just getting completely stuck.)
780 let max_refetch_wall = now_wall + self.config.max_refetch;
781 if self
782 .refetch_schedule
783 .peek()
784 .map(|re| re.when > max_refetch_wall)
785 == Some(true)
786 {
787 info!("bridge descriptor manager: clock warped, clamping refetch times");
788 self.refetch_schedule = self
789 .refetch_schedule
790 .drain()
791 .map(|mut re| {
792 re.when = max_refetch_wall;
793 re
794 })
795 .collect();
796 }
797
798 // Restore liveness property *Timeout**
799 // postage::watch will tell up the timeout task about the new wake-up time.
800 let new_earliest_timeout = [
801 // First retry. These are std Instant.
802 self.retry_schedule.peek().map(|re| re.when),
803 // First refetch. These are SystemTime, so we must convert them.
804 self.refetch_schedule.peek().map(|re| {
805 // If duration_since gives Err, that means when is before now,
806 // ie we should not be waiting: the wait duration should be 0.
807 let wait = re.when.duration_since(now_wall).unwrap_or_default();
808
809 mgr.runtime.now() + wait
810 }),
811 ]
812 .into_iter()
813 .flatten()
814 .min();
815 *self.earliest_timeout.borrow_mut() = new_earliest_timeout;
816 }
817
818 /// Launch download attempts if we can
819 ///
820 /// Specifically: if we have things in `queued`, and `running` is shorter than
821 /// `effective_parallelism()`, we launch task(s) to attempt download(s).
822 ///
823 /// Restores liveness invariant *Running*.
824 ///
825 /// Idempotent. Forms part of `process`.
826 #[allow(clippy::blocks_in_conditions)]
827 fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
828 let mut to_remove = vec![];
829
830 while self.running.len() < self.effective_parallelism() {
831 let QueuedEntry {
832 bridge,
833 retry_delay,
834 } = match self.queued.pop_front() {
835 Some(qe) => qe,
836 None => break,
837 };
838 match mgr
839 .runtime
840 .spawn({
841 let config = self.config.clone();
842 let bridge = bridge.clone();
843 let inner = mgr.clone();
844 let mockable = inner.mockable.clone();
845
846 // The task which actually downloads a descriptor.
847 async move {
848 let got =
849 AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
850 .catch_unwind()
851 .await
852 .unwrap_or_else(|_| {
853 Err(internal!("download descriptor task panicked!").into())
854 });
855 match &got {
856 Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
857 Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
858 };
859 let mut state = inner.lock_then_process();
860 state.record_download_outcome(bridge, got);
861 // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
862 // to make further progress and restore the liveness properties.
863 }
864 })
865 .map(|()| JoinHandle)
866 {
867 Ok(join) => {
868 self.running
869 .insert(bridge, RunningInfo { join, retry_delay });
870 }
871 Err(_) => {
872 // Spawn failed.
873 //
874 // We are going to forget about this bridge.
875 // And we're going to do that without notifying anyone.
876 // We *do* want to remove it from `current` because simply forgetting
877 // about a refetch could leave expired data there.
878 // We amortize this, so we don't do a lot of O(n^2) work on shutdown.
879 to_remove.push(bridge);
880 }
881 }
882 }
883
884 if !to_remove.is_empty() {
885 self.modify_current(|current| {
886 for bridge in to_remove {
887 current.remove(&bridge);
888 }
889 });
890 }
891 }
892
893 /// Modify `current` and notify subscribers
894 ///
895 /// Helper function which modifies only `current`, not any of the rest of the state.
896 /// it is the caller's responsibility to ensure that the invariants are upheld.
897 ///
898 /// The implementation actually involves cloning `current`,
899 /// so it is best to amortize calls to this function.
900 fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
901 let mut current = (*self.current).clone();
902 let r = f(&mut current);
903 self.set_current_and_notify(current);
904 r
905 }
906
907 /// Set `current` to a value and notify
908 ///
909 /// Helper function which modifies only `current`, not any of the rest of the state.
910 /// it is the caller's responsibility to ensure that the invariants are upheld.
911 fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
912 self.current = new.into();
913 self.subscribers.publish(BridgeDescEvent::SomethingChanged);
914 }
915
916 /// Obtain the currently-desired level of parallelism
917 ///
918 /// Helper function. The return value depends the mutable state and also the `config`.
919 ///
920 /// This is how we implement dormancy.
921 fn effective_parallelism(&self) -> usize {
922 match self.dormancy {
923 Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
924 Dormancy::Dormant => 0,
925 }
926 }
927}
928
929impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
930 /// Record a download outcome.
931 ///
932 /// Final act of the descriptor download task.
933 /// `got` is from [`download_descriptor`](Manager::download_descriptor).
934 fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
935 let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
936 Some(ri) => ri,
937 None => {
938 debug!("bridge descriptor download completed for no-longer-configured bridge");
939 return;
940 }
941 };
942
943 let insert = match got {
944 Ok(Downloaded { desc, refetch }) => {
945 // Successful download. Schedule the refetch, and we'll insert Ok.
946
947 self.refetch_schedule.push(RefetchEntry {
948 when: refetch,
949 bridge: bridge.clone(),
950 retry_delay: (),
951 });
952
953 Ok(desc)
954 }
955 Err(err) => {
956 // Failed. Schedule the retry, and we'll insert Err.
957
958 let mut retry_delay =
959 retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
960
961 let retry = err.retry_time();
962 // We retry at least as early as
963 let now = self.mgr.runtime.now();
964 let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
965 // Retry at least as early as max_refetch. That way if a bridge is
966 // misconfigured we will see it be fixed eventually.
967 let retry = {
968 let earliest = now;
969 let latest = || now + self.config.max_refetch;
970 match retry {
971 AbsRetryTime::Immediate => earliest,
972 AbsRetryTime::Never => latest(),
973 AbsRetryTime::At(i) => i.clamp(earliest, latest()),
974 }
975 };
976 self.retry_schedule.push(RefetchEntry {
977 when: retry,
978 bridge: bridge.clone(),
979 retry_delay,
980 });
981
982 Err(Box::new(err) as _)
983 }
984 };
985
986 self.modify_current(|current| current.insert(bridge, insert));
987 }
988}
989
990impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
991 /// Downloads a descriptor.
992 ///
993 /// The core of the descriptor download task
994 /// launched by `State::consider_launching`.
995 ///
996 /// Uses Mockable::download to actually get the document.
997 /// So most of this function is parsing and checking.
998 ///
999 /// The returned value is precisely the `got` input to
1000 /// [`record_download_outcome`](StateGuard::record_download_outcome).
1001 async fn download_descriptor(
1002 &self,
1003 mockable: M,
1004 bridge: &BridgeConfig,
1005 config: &BridgeDescDownloadConfig,
1006 ) -> Result<Downloaded, Error> {
1007 // convenience alias, capturing the usual parameters from our variables.
1008 let process_document = |text| process_document(&self.runtime, config, text);
1009
1010 let store = || {
1011 self.store
1012 .lock()
1013 .map_err(|_| internal!("bridge descriptor store poisoned"))
1014 };
1015
1016 let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
1017 .unwrap_or_else(|err| {
1018 error_report!(
1019 err,
1020 r#"bridge descriptor cache lookup failed, for "{}""#,
1021 sensitive(bridge),
1022 );
1023 None
1024 });
1025
1026 let now = self.runtime.wallclock();
1027 let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
1028 if cached.fetched > now {
1029 // was fetched "in the future"
1030 None
1031 } else {
1032 // let's see if it's any use
1033 match process_document(&cached.document) {
1034 Err(err) => {
1035 // We had a doc in the cache but our attempt to use it failed
1036 // We wouldn't have written a bad cache entry.
1037 // So one of the following must be true:
1038 // * We were buggy or are stricter now or something
1039 // * The document was valid but its validity time has expired
1040 // In any case we can't reuse it.
1041 // (This happens in normal operation, when a document expires.)
1042 trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
1043 None
1044 }
1045 Ok(got) => {
1046 // The cached document looks valid.
1047 // But how long ago did we fetch it?
1048 // We need to enforce max_refresh even for still-valid documents.
1049 if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
1050 // Was fetched recently, too. We can just reuse it.
1051 return Ok(got);
1052 }
1053 Some(got)
1054 }
1055 }
1056 }
1057 } else {
1058 None
1059 };
1060
1061 // If cached_good is Some, we found a plausible cache entry; if we got here, it was
1062 // past its max_refresh. So in that case we want to send a request with
1063 // if-modified-since. If we get Not Modified, we can reuse it (and update the fetched time).
1064 let if_modified_since = cached_good
1065 .as_ref()
1066 .map(|got| got.desc.as_ref().published());
1067
1068 debug!(
1069 r#"starting download for "{}"{}"#,
1070 bridge,
1071 match if_modified_since {
1072 Some(ims) => format!(
1073 " if-modified-since {}",
1074 humantime::format_rfc3339_seconds(ims),
1075 ),
1076 None => "".into(),
1077 }
1078 );
1079
1080 let text = mockable
1081 .clone()
1082 .download(&self.runtime, &self.circmgr, bridge, if_modified_since)
1083 .await?;
1084
1085 let (document, got) = if let Some(text) = text {
1086 let got = process_document(&text)?;
1087 (text, got)
1088 } else if let Some(cached) = cached_good {
1089 (
1090 cache_entry
1091 .expect("cached_good but not cache_entry")
1092 .document,
1093 cached,
1094 )
1095 } else {
1096 return Err(internal!("download gave None but no if-modified-since").into());
1097 };
1098
1099 // IEFI catches cache store errors, which we log but don't do anything else with
1100 (|| {
1101 let cached = CachedBridgeDescriptor {
1102 document,
1103 fetched: now, // this is from before we started the fetch, which is correct
1104 };
1105
1106 // Calculate when the cache should forget about this.
1107 // We want to add a bit of slop for the purposes of mild clock skew handling,
1108 // etc., and the prefetch time is a good proxy for that.
1109 let until = got
1110 .refetch
1111 .checked_add(config.prefetch)
1112 .unwrap_or(got.refetch /*uh*/);
1113
1114 store()?.store_bridgedesc(bridge, cached, until)?;
1115 Ok(())
1116 })()
1117 .unwrap_or_else(|err: crate::Error| {
1118 error_report!(err, "failed to cache downloaded bridge descriptor",);
1119 });
1120
1121 Ok(got)
1122 }
1123}
1124
1125/// Processes and analyses a textual descriptor document into a `Downloaded`
1126///
1127/// Parses it, checks the signature, checks the document validity times,
1128/// and if that's all good, calculates when will want to refetch it.
1129fn process_document<R: Runtime>(
1130 runtime: &R,
1131 config: &BridgeDescDownloadConfig,
1132 text: &str,
1133) -> Result<Downloaded, Error> {
1134 let desc = RouterDesc::parse(text)?;
1135
1136 // We *could* just trust this because we have trustworthy provenance
1137 // we know that the channel machinery authenticated the identity keys in `bridge`.
1138 // But let's do some cross-checking anyway.
1139 // `check_signature` checks the self-signature.
1140 let desc = desc.check_signature().map_err(Arc::new)?;
1141
1142 let now = runtime.wallclock();
1143 desc.is_valid_at(&now)?;
1144
1145 // Justification that use of "dangerously" is correct:
1146 // 1. We have checked this just above, so it is valid now.
1147 // 2. We are extracting the timeout and implement our own refetch logic using expires.
1148 let (desc, (_, expires)) = desc.dangerously_into_parts();
1149
1150 // Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
1151 // The following situations can result in a nominally-expired descriptor being used:
1152 //
1153 // 1. We primarily enforce the timeout by looking at the expiry time,
1154 // subtracting a configured constant, and scheduling the start of a refetch then.
1155 // If it takes us longer to do the retry, than the prefetch constant,
1156 // we'll still be providing the old descriptor to consumers in the meantime.
1157 //
1158 // 2. We apply a minimum time before we will refetch a descriptor.
1159 // So if the validity time is unreasonably short, we'll use it beyond that time.
1160 //
1161 // 3. Clock warping could confuse this algorithm. This is inevitable because we
1162 // are relying on calendar times (SystemTime) in the descriptor, and because
1163 // we don't have a mechanism for being told about clock warps rather than the
1164 // passage of time.
1165 //
1166 // We think this is all OK given that a bridge descriptor is used for trying to
1167 // connect to the bridge itself. In particular, we don't want to completely trust
1168 // bridges to control our retry logic.
1169 let refetch = match expires {
1170 Some(expires) => expires
1171 .checked_sub(config.prefetch)
1172 .ok_or(Error::ExtremeValidityTime)?,
1173
1174 None => now
1175 .checked_add(config.max_refetch)
1176 .ok_or(Error::ExtremeValidityTime)?,
1177 };
1178 let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
1179
1180 let desc = BridgeDesc::new(Arc::new(desc));
1181
1182 Ok(Downloaded { desc, refetch })
1183}
1184
1185/// Task which waits for the timeout, and requeues bridges that need to be refetched
1186///
1187/// This task's job is to execute the wakeup instructions provided via `updates`.
1188///
1189/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
1190/// which is maintained to be the earliest time any of the schedules says we should wake up
1191/// (liveness property *Timeout*).
1192async fn timeout_task<R: Runtime, M: Mockable<R>>(
1193 runtime: R,
1194 inner: Weak<Manager<R, M>>,
1195 update: postage::watch::Receiver<Option<Instant>>,
1196) {
1197 /// Requeue things in `*_schedule` whose time for action has arrived
1198 ///
1199 /// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
1200 /// into the `Option` which appears in [`QueuedEntry`].
1201 ///
1202 /// Helper function. Idempotent.
1203 fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
1204 queued: &mut VecDeque<QueuedEntry>,
1205 schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
1206 now: TT,
1207 retry_delay_map: RDM,
1208 ) {
1209 while let Some(ent) = schedule.peek() {
1210 if ent.when > now {
1211 break;
1212 }
1213 let re = schedule.pop().expect("schedule became empty!");
1214 let bridge = re.bridge;
1215 let retry_delay = retry_delay_map(re.retry_delay);
1216
1217 queued.push_back(QueuedEntry {
1218 bridge,
1219 retry_delay,
1220 });
1221 }
1222 }
1223
1224 let mut next_wakeup = Some(runtime.now());
1225 let mut update = update.fuse();
1226 loop {
1227 select! {
1228 // Someone modified the schedules, and sent us a new earliest timeout
1229 changed = update.next() => {
1230 // changed is Option<Option< >>.
1231 // The outer Option is from the Stream impl for watch::Receiver - None means EOF.
1232 // The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
1233 next_wakeup = if let Some(changed) = changed {
1234 changed
1235 } else {
1236 // Oh, actually, the watch::Receiver is EOF - we're to shut down
1237 break
1238 }
1239 },
1240
1241 // Wait until the specified earliest wakeup time
1242 () = async {
1243 if let Some(next_wakeup) = next_wakeup {
1244 let now = runtime.now();
1245 if next_wakeup > now {
1246 let duration = next_wakeup - now;
1247 runtime.sleep(duration).await;
1248 }
1249 } else {
1250 #[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
1251 { future::pending().await }
1252 }
1253 }.fuse() => {
1254 // We have reached the pre-programmed time. Check what needs doing.
1255
1256 let inner = if let Some(i) = inner.upgrade() { i } else { break; };
1257 let mut state = inner.lock_then_process();
1258 let state = &mut **state; // Do the DerefMut once so we can borrow fields
1259
1260 requeue_as_required(
1261 &mut state.queued,
1262 &mut state.refetch_schedule,
1263 runtime.wallclock(),
1264 |()| None,
1265 );
1266
1267 requeue_as_required(
1268 &mut state.queued,
1269 &mut state.retry_schedule,
1270 runtime.now(),
1271 Some,
1272 );
1273
1274 // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
1275 // to make further progress and restore the liveness properties.
1276 }
1277 }
1278 }
1279}
1280
1281/// Error which occurs during bridge descriptor manager startup
1282#[derive(Clone, Debug, thiserror::Error)]
1283#[non_exhaustive]
1284pub enum StartupError {
1285 /// No circuit manager in the directory manager
1286 #[error(
1287 "tried to create bridge descriptor manager from directory manager with no circuit manager"
1288 )]
1289 MissingCircMgr,
1290
1291 /// Unable to spawn task
1292 //
1293 // TODO lots of our Errors have a variant exactly like this.
1294 // Maybe we should make a struct tor_error::SpawnError.
1295 #[error("Unable to spawn {spawning}")]
1296 Spawn {
1297 /// What we were trying to spawn.
1298 spawning: &'static str,
1299 /// What happened when we tried to spawn it.
1300 #[source]
1301 cause: Arc<SpawnError>,
1302 },
1303}
1304
1305impl HasKind for StartupError {
1306 fn kind(&self) -> ErrorKind {
1307 use ErrorKind as EK;
1308 use StartupError as SE;
1309 match self {
1310 SE::MissingCircMgr => EK::Internal,
1311 SE::Spawn { cause, .. } => cause.kind(),
1312 }
1313 }
1314}
1315
1316/// An error which occurred trying to obtain the descriptor for a particular bridge
1317#[derive(Clone, Debug, thiserror::Error)]
1318#[non_exhaustive]
1319pub enum Error {
1320 /// Couldn't establish a circuit to the bridge
1321 #[error("Failed to establish circuit")]
1322 CircuitFailed(#[from] tor_circmgr::Error),
1323
1324 /// Couldn't establish a directory stream to the bridge
1325 #[error("Failed to establish directory stream")]
1326 StreamFailed(#[source] tor_circmgr::Error),
1327
1328 /// Directory request failed
1329 #[error("Directory request failed")]
1330 RequestFailed(#[from] tor_dirclient::RequestFailedError),
1331
1332 /// Failed to parse descriptor in response
1333 #[error("Failed to parse descriptor in response")]
1334 ParseFailed(#[from] tor_netdoc::Error),
1335
1336 /// Signature check failed
1337 #[error("Signature check failed")]
1338 SignatureCheckFailed(#[from] Arc<signature::Error>),
1339
1340 /// Obtained descriptor but it is outside its validity time
1341 #[error("Descriptor is outside its validity time, as supplied")]
1342 BadValidityTime(#[from] tor_checkable::TimeValidityError),
1343
1344 /// A bridge descriptor has very extreme validity times
1345 /// such that our refetch time calculations overflow.
1346 #[error("Descriptor validity time range is too extreme for us to cope with")]
1347 ExtremeValidityTime,
1348
1349 /// There was a programming error somewhere in our code, or the calling code.
1350 #[error("Programming error")]
1351 Bug(#[from] tor_error::Bug),
1352
1353 /// Error used for testing
1354 #[cfg(test)]
1355 #[error("Error for testing, {0:?}, retry at {1:?}")]
1356 TestError(&'static str, RetryTime),
1357}
1358
1359impl HasKind for Error {
1360 fn kind(&self) -> ErrorKind {
1361 use Error as E;
1362 use ErrorKind as EK;
1363 let bridge_protocol_violation = EK::TorAccessFailed;
1364 match self {
1365 // We trust that tor_circmgr returns TorAccessFailed when it ought to.
1366 E::CircuitFailed(e) => e.kind(),
1367 E::StreamFailed(e) => e.kind(),
1368 E::RequestFailed(e) => e.kind(),
1369 E::ParseFailed(..) => bridge_protocol_violation,
1370 E::SignatureCheckFailed(..) => bridge_protocol_violation,
1371 E::ExtremeValidityTime => bridge_protocol_violation,
1372 E::BadValidityTime(..) => EK::ClockSkew,
1373 E::Bug(e) => e.kind(),
1374 #[cfg(test)]
1375 E::TestError(..) => EK::Internal,
1376 }
1377 }
1378}
1379
1380impl HasRetryTime for Error {
1381 fn retry_time(&self) -> RetryTime {
1382 use Error as E;
1383 use RetryTime as R;
1384 match self {
1385 // Errors with their own retry times
1386 E::CircuitFailed(e) => e.retry_time(),
1387
1388 // Remote misbehavior, maybe the network is being strange?
1389 E::StreamFailed(..) => R::AfterWaiting,
1390 E::RequestFailed(..) => R::AfterWaiting,
1391
1392 // Remote misconfiguration, detected *after* we successfully made the channel
1393 // (so not a network problem). We'll say "never" for RetryTime,
1394 // even though actually we will in fact retry in at most `max_refetch`.
1395 E::ParseFailed(..) => R::Never,
1396 E::SignatureCheckFailed(..) => R::Never,
1397 E::BadValidityTime(..) => R::Never,
1398 E::ExtremeValidityTime => R::Never,
1399
1400 // Probably, things are broken here, rather than remotely.
1401 E::Bug(..) => R::Never,
1402
1403 #[cfg(test)]
1404 E::TestError(_, retry) => *retry,
1405 }
1406 }
1407}
1408
1409impl BridgeDescError for Error {}
1410
1411impl State {
1412 /// Consistency check (for testing)
1413 ///
1414 /// `input` should be what was passed to `set_bridges` (or `None` if not known).
1415 ///
1416 /// Does not make any changes.
1417 /// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
1418 #[cfg(test)]
1419 fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
1420 where
1421 R: Runtime,
1422 I: IntoIterator<Item = &'i BridgeKey>,
1423 {
1424 /// Where we found a thing was Tracked
1425 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
1426 enum Where {
1427 /// Found in `running`
1428 Running,
1429 /// Found in `queued`
1430 Queued,
1431 /// Found in the schedule `sch`
1432 Schedule {
1433 sch_name: &'static str,
1434 /// Starts out as `false`, set to `true` when we find this in `current`
1435 found_in_current: bool,
1436 },
1437 }
1438
1439 /// Records the expected input from `input`, and what we have found so far
1440 struct Tracked {
1441 /// Were we told what the last `set_bridges` call got as input?
1442 known_input: bool,
1443 /// `Some` means we have seen this bridge in one our records (other than `current`)
1444 tracked: HashMap<BridgeKey, Option<Where>>,
1445 /// Earliest instant found in any schedule
1446 earliest: Option<Instant>,
1447 }
1448
1449 let mut tracked = if let Some(input) = input {
1450 let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
1451 Tracked {
1452 tracked,
1453 known_input: true,
1454 earliest: None,
1455 }
1456 } else {
1457 Tracked {
1458 tracked: HashMap::new(),
1459 known_input: false,
1460 earliest: None,
1461 }
1462 };
1463
1464 impl Tracked {
1465 /// Note that `bridge` is Tracked
1466 fn note(&mut self, where_: Where, b: &BridgeKey) {
1467 match self.tracked.get(b) {
1468 // Invariant *Tracked* - ie appears at most once
1469 Some(Some(prev_where)) => {
1470 panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
1471 }
1472 // Invariant *Input (every tracked bridge is was in input)*
1473 None if self.known_input => {
1474 panic!("unexpected {:?} {:?}", where_, b);
1475 }
1476 // OK, we've not seen it before, note it as being here
1477 _ => {
1478 self.tracked.insert(b.clone(), Some(where_));
1479 }
1480 }
1481 }
1482 }
1483
1484 /// Walk `schedule` and update `tracked` (including `tracked.earliest`)
1485 ///
1486 /// Check invariant *Tracked* and *Schedule* wrt this schedule.
1487 #[cfg(test)]
1488 fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
1489 tracked: &mut Tracked,
1490 sch_name: &'static str,
1491 schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
1492 conv_time: CT,
1493 ) {
1494 let where_ = Where::Schedule {
1495 sch_name,
1496 found_in_current: false,
1497 };
1498
1499 if let Some(first) = schedule.peek() {
1500 // Of course this is a heap, so this ought to be a wasteful scan,
1501 // but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
1502 for re in schedule {
1503 tracked.note(where_, &re.bridge);
1504 }
1505
1506 let scanned = schedule
1507 .iter()
1508 .map(|re| re.when)
1509 .min()
1510 .expect("schedule empty!");
1511 assert_eq!(scanned, first.when);
1512 tracked.earliest = Some(
1513 [tracked.earliest, Some(conv_time(scanned))]
1514 .into_iter()
1515 .flatten()
1516 .min()
1517 .expect("flatten of chain Some was empty"),
1518 );
1519 }
1520 }
1521
1522 // *Timeout* (prep)
1523 //
1524 // This will fail if there is clock skew, but won't mind if
1525 // the earliest refetch time is in the past.
1526 let now_wall = runtime.wallclock();
1527 let now_mono = runtime.now();
1528 let adj_wall = |wallclock: SystemTime| {
1529 // Good grief what a palaver!
1530 if let Ok(ahead) = wallclock.duration_since(now_wall) {
1531 now_mono + ahead
1532 } else if let Ok(behind) = now_wall.duration_since(wallclock) {
1533 now_mono
1534 .checked_sub(behind)
1535 .expect("time subtraction underflow")
1536 } else {
1537 panic!("times should be totally ordered!")
1538 }
1539 };
1540
1541 // *Tracked*
1542 //
1543 // We walk our data structures in turn
1544
1545 for b in self.running.keys() {
1546 tracked.note(Where::Running, b);
1547 }
1548 for qe in &self.queued {
1549 tracked.note(Where::Queued, &qe.bridge);
1550 }
1551
1552 walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
1553 walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
1554
1555 // *Current*
1556 for b in self.current.keys() {
1557 let found = tracked
1558 .tracked
1559 .get_mut(b)
1560 .and_then(Option::as_mut)
1561 .unwrap_or_else(|| panic!("current but untracked {:?}", b));
1562 if let Where::Schedule {
1563 found_in_current, ..
1564 } = found
1565 {
1566 *found_in_current = true;
1567 }
1568 }
1569
1570 // *Input (sense: every input bridge is tracked)*
1571 //
1572 // (Will not cope if spawn ever failed, since that violates the invariant.)
1573 for (b, where_) in &tracked.tracked {
1574 match where_ {
1575 None => panic!("missing {}", &b),
1576 Some(Where::Schedule {
1577 sch_name,
1578 found_in_current,
1579 }) => {
1580 assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
1581 }
1582 _ => {}
1583 }
1584 }
1585
1586 // *Limit*
1587 let parallelism = self.effective_parallelism();
1588 assert!(self.running.len() <= parallelism);
1589
1590 // *Running*
1591 assert!(self.running.len() == parallelism || self.queued.is_empty());
1592
1593 // *Timeout* (final)
1594 assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
1595 }
1596}