Skip to main content

tor_dirmgr/
bridgedesc.rs

1//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
2
3use std::borrow::Cow;
4use std::cmp::Ordering;
5use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
6use std::fmt::{self, Debug, Display};
7use std::num::NonZeroU8;
8use std::panic::AssertUnwindSafe;
9use std::sync::{Arc, Mutex, MutexGuard, Weak};
10
11use async_trait::async_trait;
12use derive_more::{Deref, DerefMut};
13use educe::Educe;
14use futures::FutureExt;
15use futures::future;
16use futures::select;
17use futures::stream::{BoxStream, StreamExt};
18use futures::task::SpawnError;
19use tracing::{debug, info, trace};
20
21use safelog::sensitive;
22use tor_basic_utils::retry::RetryDelay;
23use tor_checkable::{SelfSigned, Timebound};
24use tor_circmgr::CircMgr;
25use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
26use tor_error::{ErrorKind, HasKind, error_report, internal};
27use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
28use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
29use tor_netdoc::doc::routerdesc::RouterDesc;
30use tor_rtcompat::{Runtime, SpawnExt as _};
31use web_time_compat::{Duration, Instant, SystemTime};
32
33use crate::event::FlagPublisher;
34use crate::storage::CachedBridgeDescriptor;
35use crate::{DirMgrStore, DynStore};
36
37#[cfg(test)]
38mod bdtest;
39
40/// The key we use in all our data structures
41///
42/// This type saves typing and would make it easier to change the bridge descriptor manager
43/// to take and handle another way of identifying the bridges it is working with.
44type BridgeKey = BridgeConfig;
45
46/// Active vs dormant state, as far as the bridge descriptor manager is concerned
47///
48/// This is usually derived in higher layers from `arti_client::DormantMode`,
49/// whether `TorClient::bootstrap()` has been called, etc.
50#[non_exhaustive]
51#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
52// TODO: These proliferating `Dormancy` enums should be centralized and unified with `TaskHandle`
53//     https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
54pub enum Dormancy {
55    /// Dormant (inactive)
56    ///
57    /// Bridge descriptor downloads, or refreshes, will not be started.
58    ///
59    /// In-progress downloads will be stopped if possible,
60    /// but they may continue until they complete (or fail).
61    // TODO async task cancellation: actually cancel these in this case
62    ///
63    /// So a dormant BridgeDescMgr may still continue to
64    /// change the return value from [`bridges()`](BridgeDescProvider::bridges)
65    /// and continue to report [`BridgeDescEvent`]s.
66    ///
67    /// When the BridgeDescMgr is dormant,
68    /// `bridges()` may return stale descriptors
69    /// (that is, descriptors which ought to have been refetched and may no longer be valid),
70    /// or stale errors
71    /// (that is, errors which occurred some time ago,
72    /// and which would normally have been retried by now).
73    Dormant,
74
75    /// Active
76    ///
77    /// Bridge descriptors will be downloaded as requested.
78    ///
79    /// When a bridge descriptor manager has been `Dormant`,
80    /// it may continue to provide stale data (as described)
81    /// for a while after it is made `Active`,
82    /// until the required refreshes and retries have taken place (or failed).
83    Active,
84}
85
86/// **Downloader and cache for bridges' router descriptors**
87///
88/// This is a handle which is cheap to clone and has internal mutability.
89#[derive(Clone)]
90pub struct BridgeDescMgr<R: Runtime, M = ()>
91where
92    M: Mockable<R>,
93{
94    /// The actual manager
95    ///
96    /// We have the `Arc` in here, rather than in our callers, because this
97    /// makes the API nicer for them, and also because some of our tasks
98    /// want a handle they can use to relock and modify the state.
99    mgr: Arc<Manager<R, M>>,
100}
101
102/// Configuration for the `BridgeDescMgr`
103///
104/// Currently, the only way to make this is via its `Default` impl.
105// TODO: there should be some way to override the defaults.  See #629 for considerations.
106#[derive(Debug, Clone)]
107pub struct BridgeDescDownloadConfig {
108    /// How many bridge descriptor downloads to attempt in parallel?
109    parallelism: NonZeroU8,
110
111    /// Default/initial time to retry a failure to download a descriptor
112    ///
113    /// (This has the semantics of an initial delay for [`RetryDelay`],
114    /// and is used unless there is more specific retry information for the particular failure.)
115    retry: Duration,
116
117    /// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
118    prefetch: Duration,
119
120    /// Minimum interval between successive refetches of the descriptor for the same bridge
121    ///
122    /// This limits the download activity which can be caused by an errant bridge.
123    ///
124    /// If the descriptor's validity information is shorter than this, we will use
125    /// it after it has expired (rather than treating the bridge as broken).
126    min_refetch: Duration,
127
128    /// Maximum interval between successive refetches of the descriptor for the same bridge
129    ///
130    /// This sets an upper bound on how old a descriptor we are willing to use.
131    /// When this time expires, a refetch attempt will be started even if the
132    /// descriptor is not going to expire soon.
133    //
134    // TODO: When this is configurable, we need to make sure we reject
135    // configurations with max_refresh < min_refresh, or we may panic.
136    max_refetch: Duration,
137}
138
139impl Default for BridgeDescDownloadConfig {
140    fn default() -> Self {
141        let secs = Duration::from_secs;
142        BridgeDescDownloadConfig {
143            parallelism: 4.try_into().expect("parallelism is zero"),
144            retry: secs(30),
145            prefetch: secs(1000),
146            min_refetch: secs(3600),
147            max_refetch: secs(3600 * 3), // matches C Tor behaviour
148        }
149    }
150}
151
152/// Mockable internal methods for within the `BridgeDescMgr`
153///
154/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
155///
156/// This (`()`) is the default for the type parameter in
157/// [`BridgeDescMgr`],
158/// and it is the only publicly available implementation,
159/// since this trait is sealed.
160pub trait Mockable<R>: mockable::MockableAPI<R> {}
161impl<R: Runtime> Mockable<R> for () {}
162
163/// Private module which seals [`Mockable`]
164/// by containing [`MockableAPI`](mockable::MockableAPI)
165mod mockable {
166    use super::*;
167
168    /// Defines the actual mockable APIs
169    ///
170    /// Not nameable (and therefore not implementable)
171    /// outside the `bridgedesc` module,
172    #[async_trait]
173    pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
174        /// Circuit manager
175        type CircMgr: Send + Sync + 'static;
176
177        /// Download this bridge's descriptor, and return it as a string
178        ///
179        /// Runs in a task.
180        /// Called by `Manager::download_descriptor`, which handles parsing and validation.
181        ///
182        /// If `if_modified_since` is `Some`,
183        /// should tolerate an HTTP 304 Not Modified and return `None` in that case.
184        /// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
185        async fn download(
186            self,
187            runtime: &R,
188            circmgr: &Self::CircMgr,
189            bridge: &BridgeConfig,
190            if_modified_since: Option<SystemTime>,
191        ) -> Result<Option<String>, Error>;
192    }
193}
194#[async_trait]
195impl<R: Runtime> mockable::MockableAPI<R> for () {
196    type CircMgr = Arc<CircMgr<R>>;
197
198    /// Actual code for downloading a descriptor document
199    async fn download(
200        self,
201        runtime: &R,
202        circmgr: &Self::CircMgr,
203        bridge: &BridgeConfig,
204        _if_modified_since: Option<SystemTime>,
205    ) -> Result<Option<String>, Error> {
206        // TODO actually support _if_modified_since
207        let tunnel = circmgr.get_or_launch_dir_specific(bridge).await?;
208        let mut stream = tunnel
209            .begin_dir_stream()
210            .await
211            .map_err(Error::StreamFailed)?;
212        let request = tor_dirclient::request::RoutersOwnDescRequest::new();
213        let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
214            .await
215            .map_err(|dce| match dce {
216                tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
217                _ => internal!(
218                    "tor_dirclient::send_request gave non-RequestFailed {:?}",
219                    dce
220                )
221                .into(),
222            })?;
223        let output = response.into_output_string()?;
224        Ok(Some(output))
225    }
226}
227
228/// The actual manager.
229struct Manager<R: Runtime, M: Mockable<R>> {
230    /// The mutable state
231    state: Mutex<State>,
232
233    /// Runtime, used for tasks and sleeping
234    runtime: R,
235
236    /// Circuit manager, used for creating circuits
237    circmgr: M::CircMgr,
238
239    /// Persistent state store
240    store: Arc<Mutex<DynStore>>,
241
242    /// Mock for testing, usually `()`
243    mockable: M,
244}
245
246/// State: our downloaded descriptors (cache), and records of what we're doing
247///
248/// Various functions (both tasks and public entrypoints),
249/// which generally start with a `Manager`,
250/// lock the mutex and modify this.
251///
252/// Generally, the flow is:
253///
254///  * A public entrypoint, or task, obtains a [`StateGuard`].
255///    It modifies the state to represent the callers' new requirements,
256///    or things it has done, by updating the state,
257///    preserving the invariants but disturbing the "liveness" (see below).
258///
259///  * [`StateGuard::drop`] calls [`State::process`].
260///    This restores the liveness properties.
261///
262/// ### Possible states of a bridge:
263///
264/// A bridge can be in one of the following states,
265/// represented by its presence in these particular data structures inside `State`:
266///
267///  * `running`/`queued`: newly added, no outcome yet.
268///  * `current` + `running`/`queued`: we are fetching (or going to)
269///  * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
270///  * `current = Err` + `retry_schedule`: failed, will retry at some point
271///
272/// ### Invariants:
273///
274/// Can be disrupted in the middle of a principal function,
275/// but should be restored on return.
276///
277/// * **Tracked**:
278///   Each bridge appears at most once in
279///   `running`, `queued`, `refetch_schedule` and `retry_schedule`.
280///   We call such a bridge Tracked.
281///
282/// * **Current**
283///   Every bridge in `current` is Tracked.
284///   (But not every Tracked bridge is necessarily in `current`, yet.)
285///
286/// * **Schedules**
287///   Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
288///
289/// * **Input**:
290///   Exactly each bridge that was passed to
291///   the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
292///   (If we encountered spawn failures, we treat this as trying to shut down,
293///   so we cease attempts to get bridges, and discard the relevant state, violating this.)
294///
295/// * **Limit**:
296///   `running` is capped at the effective parallelism: zero if we are dormant,
297///   the configured parallelism otherwise.
298///
299/// ### Liveness properties:
300///
301/// These can be disrupted by any function which holds a [`StateGuard`].
302/// Will be restored by [`process()`](State::process),
303/// which is called when `StateGuard` is dropped.
304///
305/// Functions that take a `StateGuard` may disturb these invariants
306/// and rely on someone else to restore them.
307///
308/// * **Running**:
309///   If `queued` is nonempty, `running` is full.
310///
311/// * **Timeout**:
312///   `earliest_timeout` is the earliest timeout in
313///   either `retry_schedule` or `refetch_schedule`.
314///   (Disturbances of this property which occur due to system time warps
315///   are not necessarily detected and remedied in a timely way,
316///   but will be remedied no later than after `max_refetch`.)
317struct State {
318    /// Our configuration
319    config: Arc<BridgeDescDownloadConfig>,
320
321    /// People who will be told when `current` changes.
322    subscribers: FlagPublisher<BridgeDescEvent>,
323
324    /// Our current idea of our output, which we give out handles onto.
325    current: Arc<BridgeDescList>,
326
327    /// Bridges whose descriptors we are currently downloading.
328    running: HashMap<BridgeKey, RunningInfo>,
329
330    /// Bridges which we want to download,
331    /// but we're waiting for `running` to be less than `effective_parallelism()`.
332    queued: VecDeque<QueuedEntry>,
333
334    /// Are we dormant?
335    dormancy: Dormancy,
336
337    /// Bridges that we have a descriptor for,
338    /// and when they should be refetched due to validity expiry.
339    ///
340    /// This is indexed by `SystemTime` because that helps avoids undesirable behaviors
341    /// when the system clock changes.
342    refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
343
344    /// Bridges that failed earlier, and when they should be retried.
345    retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
346
347    /// Earliest time from either `retry_schedule` or `refetch_schedule`
348    ///
349    /// `None` means "wait indefinitely".
350    earliest_timeout: postage::watch::Sender<Option<Instant>>,
351}
352
353impl Debug for State {
354    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
355        /// Helper to format one bridge entry somewhere
356        fn fmt_bridge(
357            f: &mut fmt::Formatter,
358            b: &BridgeConfig,
359            info: &(dyn Display + '_),
360        ) -> fmt::Result {
361            let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
362            writeln!(f, "    {:80.80} | {}", info, b)
363        }
364
365        /// Helper to format one of the schedules
366        fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
367            f: &mut fmt::Formatter,
368            summary: &str,
369            name: &str,
370            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
371        ) -> fmt::Result {
372            writeln!(f, "  {}:", name)?;
373            for b in schedule {
374                fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
375            }
376            Ok(())
377        }
378
379        // We are going to have to go multi-line because of the bridge lines,
380        // so do completely bespoke formatting rather than `std::fmt::DebugStruct`
381        // or a derive.
382        writeln!(f, "State {{")?;
383        // We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
384        writeln!(f, "  earliest_timeout: ???, ..,")?;
385        writeln!(f, "  current:")?;
386        for (b, v) in &*self.current {
387            fmt_bridge(
388                f,
389                b,
390                &match v {
391                    Err(e) => Cow::from(format!("C Err {}", e)),
392                    Ok(_) => "C Ok".into(),
393                },
394            )?;
395        }
396        writeln!(f, "  running:")?;
397        for b in self.running.keys() {
398            fmt_bridge(f, b, &"R")?;
399        }
400        writeln!(f, "  queued:")?;
401        for qe in &self.queued {
402            fmt_bridge(f, &qe.bridge, &"Q")?;
403        }
404        fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
405        fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
406        write!(f, "}}")?;
407
408        Ok(())
409    }
410}
411
412/// Value of the entry in `running`
413#[derive(Debug)]
414struct RunningInfo {
415    /// For cancelling downloads no longer wanted
416    join: JoinHandle,
417
418    /// If this previously failed, the persistent retry delay.
419    retry_delay: Option<RetryDelay>,
420}
421
422/// Entry in `queued`
423#[derive(Debug)]
424struct QueuedEntry {
425    /// The bridge to fetch
426    bridge: BridgeKey,
427
428    /// If this previously failed, the persistent retry delay.
429    retry_delay: Option<RetryDelay>,
430}
431
432/// Entry in one of the `*_schedule`s
433///
434/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
435/// So don't deduplicate by `[Partial]Eq`, or use as a key in a map.
436#[derive(Debug)]
437struct RefetchEntry<TT, RD> {
438    /// When should we requeued this bridge for fetching
439    ///
440    /// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
441    when: TT,
442
443    /// The bridge to refetch
444    bridge: BridgeKey,
445
446    /// Retry delay
447    ///
448    /// `RetryDelay` if we previously failed (ie, if this is a retry entry);
449    /// otherwise `()`.
450    retry_delay: RD,
451}
452
453impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
454    fn cmp(&self, other: &Self) -> Ordering {
455        self.when.cmp(&other.when).reverse()
456        // We don't care about the ordering of BridgeConfig or retry_delay.
457        // Different BridgeConfig with the same fetch time will be fetched in "some order".
458    }
459}
460
461impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
462    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
463        Some(self.cmp(other))
464    }
465}
466
467impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
468    fn eq(&self, other: &Self) -> bool {
469        self.cmp(other) == Ordering::Equal
470    }
471}
472
473impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
474
475/// Dummy task join handle
476///
477/// We would like to be able to cancel now-redundant downloads
478/// using something like `tokio::task::JoinHandle::abort()`.
479/// tor-rtcompat doesn't support that so we stub it for now.
480///
481/// Providing this stub means the place where the cancellation needs to take place
482/// already has the appropriate call to our [`JoinHandle::abort`].
483#[derive(Debug)]
484struct JoinHandle;
485
486impl JoinHandle {
487    /// Would abort this async task, if we could do that.
488    fn abort(&self) {}
489}
490
491impl<R: Runtime> BridgeDescMgr<R> {
492    /// Create a new `BridgeDescMgr`
493    ///
494    /// This is the public constructor.
495    //
496    // TODO: That this constructor requires a DirMgr is rather odd.
497    // In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
498    // However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
499    // implementation is constructible only from the dirmgr's config.  This should probably be
500    // tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
501    pub fn new(
502        config: &BridgeDescDownloadConfig,
503        runtime: R,
504        store: DirMgrStore<R>,
505        circmgr: Arc<tor_circmgr::CircMgr<R>>,
506        dormancy: Dormancy,
507    ) -> Result<Self, StartupError> {
508        Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
509    }
510}
511
512/// If download was successful, what we obtained
513///
514/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
515#[derive(Debug)]
516struct Downloaded {
517    /// The bridge descriptor, fully parsed and verified
518    desc: BridgeDesc,
519
520    /// When we should start a refresh for this descriptor
521    ///
522    /// This is derived from the expiry time,
523    /// and clamped according to limits in the configuration).
524    refetch: SystemTime,
525}
526
527impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
528    /// Actual constructor, which takes a mockable
529    //
530    // Allow passing `runtime` by value, which is usual API for this kind of setup function.
531    #[allow(clippy::needless_pass_by_value)]
532    fn new_internal(
533        runtime: R,
534        circmgr: M::CircMgr,
535        store: Arc<Mutex<DynStore>>,
536        config: &BridgeDescDownloadConfig,
537        dormancy: Dormancy,
538        mockable: M,
539    ) -> Result<Self, StartupError> {
540        /// Convenience alias
541        fn default<T: Default>() -> T {
542            Default::default()
543        }
544
545        let config = config.clone().into();
546        let (earliest_timeout, timeout_update) = postage::watch::channel();
547
548        let state = Mutex::new(State {
549            config,
550            subscribers: default(),
551            current: default(),
552            running: default(),
553            queued: default(),
554            dormancy,
555            retry_schedule: default(),
556            refetch_schedule: default(),
557            earliest_timeout,
558        });
559        let mgr = Arc::new(Manager {
560            state,
561            runtime: runtime.clone(),
562            circmgr,
563            store,
564            mockable,
565        });
566
567        runtime
568            .spawn(timeout_task(
569                runtime.clone(),
570                Arc::downgrade(&mgr),
571                timeout_update,
572            ))
573            .map_err(|cause| StartupError::Spawn {
574                spawning: "timeout task",
575                cause: cause.into(),
576            })?;
577
578        Ok(BridgeDescMgr { mgr })
579    }
580
581    /// Consistency check convenience wrapper
582    #[cfg(test)]
583    fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
584    where
585        I: IntoIterator<Item = &'i BridgeKey>,
586    {
587        self.mgr
588            .lock_only()
589            .check_consistency(&self.mgr.runtime, input_bridges);
590    }
591
592    /// Set whether this `BridgeDescMgr` is active
593    // TODO this should instead be handled by a central mechanism; see TODO on Dormancy
594    pub fn set_dormancy(&self, dormancy: Dormancy) {
595        self.mgr.lock_then_process().dormancy = dormancy;
596    }
597}
598
599impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
600    fn bridges(&self) -> Arc<BridgeDescList> {
601        self.mgr.lock_only().current.clone()
602    }
603
604    fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
605        let stream = self.mgr.lock_only().subscribers.subscribe();
606        Box::pin(stream) as _
607    }
608
609    fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
610        /// Helper: Called for each bridge that is currently Tracked.
611        ///
612        /// Checks if `new_bridges` has `bridge`.  If so, removes it from `new_bridges`,
613        /// and returns `true`, indicating that this bridge should be kept.
614        ///
615        /// If not, returns `false`, indicating that this bridge should be removed,
616        /// and logs a message.
617        fn note_found_keep_p(
618            new_bridges: &mut HashSet<BridgeKey>,
619            bridge: &BridgeKey,
620            was_state: &str,
621        ) -> bool {
622            let keep = new_bridges.remove(bridge);
623            if !keep {
624                debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
625            }
626            keep
627        }
628
629        /// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
630        /// removing them as we go.
631        fn filter_schedule<TT: Ord + Copy, RD>(
632            new_bridges: &mut HashSet<BridgeKey>,
633            schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
634            was_state: &str,
635        ) {
636            schedule.retain(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
637        }
638
639        let mut state = self.mgr.lock_then_process();
640        let state = &mut **state;
641
642        // We go through our own data structures, comparing them with `new_bridges`.
643        // Entries in our own structures that aren't in `new_bridges` are removed.
644        // Entries that *are* are removed from `new_bridges`.
645        // Eventually `new_bridges` is just the list of new bridges to *add*.
646        let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
647
648        // Is there anything in `current` that ought to be deleted?
649        if state.current.keys().any(|b| !new_bridges.contains(b)) {
650            // Found a bridge In `current` but not `new`
651            // We need to remove it (and any others like it) from `current`.
652            //
653            // Disturbs the invariant *Schedules*:
654            // After this maybe the schedules have entries they shouldn't.
655            let current: BridgeDescList = state
656                .current
657                .iter()
658                .filter(|(b, _)| new_bridges.contains(&**b))
659                .map(|(b, v)| (b.clone(), v.clone()))
660                .collect();
661            state.set_current_and_notify(current);
662        } else {
663            // Nothing is being removed, so we can keep `current`.
664        }
665        // Bridges being newly requested will be added to `current`
666        // later, after they have been fetched.
667
668        // Is there anything in running we should abort?
669        state.running.retain(|b, ri| {
670            let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
671            if !keep {
672                ri.join.abort();
673            }
674            keep
675        });
676
677        // Is there anything in queued we should forget about?
678        state
679            .queued
680            .retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
681
682        // Restore the invariant *Schedules*, that the schedules contain only things in current,
683        // by removing the same things from the schedules that we earlier removed from current.
684        filter_schedule(
685            &mut new_bridges,
686            &mut state.retry_schedule,
687            "previously failed",
688        );
689        filter_schedule(
690            &mut new_bridges,
691            &mut state.refetch_schedule,
692            "previously downloaded",
693        );
694
695        // OK now we have the list of bridges to add (if any).
696        state.queued.extend(new_bridges.into_iter().map(|bridge| {
697            debug!(r#" added bridge, queueing for download "{}""#, &bridge);
698            QueuedEntry {
699                bridge,
700                retry_delay: None,
701            }
702        }));
703
704        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
705        // to make further progress and restore the liveness properties.
706    }
707}
708
709impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
710    /// Obtain a lock on state, for functions that want to disrupt liveness properties
711    ///
712    /// When `StateGuard` is dropped, the liveness properties will be restored
713    /// by making whatever progress is required.
714    ///
715    /// See [`State`].
716    fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
717        StateGuard {
718            state: self.lock_only(),
719            mgr: self,
720        }
721    }
722
723    /// Obtains the lock on state.
724    ///
725    /// Caller ought not to modify state
726    /// so as to invalidate invariants or liveness properties.
727    /// Callers which are part of the algorithms in this crate
728    /// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
729    fn lock_only(&self) -> MutexGuard<State> {
730        self.state.lock().expect("bridge desc manager poisoned")
731    }
732}
733
734/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
735///
736/// The holder must still maintain the invariants.
737///
738/// Obtained from [`Manager::lock_then_process`].  See [`State`].
739#[derive(Educe, Deref, DerefMut)]
740#[educe(Debug)]
741struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
742    /// Reference to the mutable state
743    #[deref]
744    #[deref_mut]
745    state: MutexGuard<'s, State>,
746
747    /// Reference to the outer container
748    ///
749    /// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
750    /// for use by spawned tasks.
751    #[educe(Debug(ignore))]
752    mgr: &'s Arc<Manager<R, M>>,
753}
754
755impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
756    fn drop(&mut self) {
757        self.state.process(self.mgr);
758    }
759}
760
761impl State {
762    /// Ensure progress is made, by restoring all the liveness invariants
763    ///
764    /// This includes launching circuits as needed.
765    fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
766        // Restore liveness property *Running*
767        self.consider_launching(mgr);
768
769        let now_wall = mgr.runtime.wallclock();
770
771        // Mitigate clock warping
772        //
773        // If the earliest `SystemTime` is more than `max_refetch` away,
774        // the clock must have warped.  If that happens we clamp
775        // them all to `max_refetch`.
776        //
777        // (This is not perfect but will mitigate the worst effects by ensuring
778        // that we do *something* at least every `max_refetch`, in the worst case,
779        // other than just getting completely stuck.)
780        let max_refetch_wall = now_wall + self.config.max_refetch;
781        if self
782            .refetch_schedule
783            .peek()
784            .map(|re| re.when > max_refetch_wall)
785            == Some(true)
786        {
787            info!("bridge descriptor manager: clock warped, clamping refetch times");
788            self.refetch_schedule = self
789                .refetch_schedule
790                .drain()
791                .map(|mut re| {
792                    re.when = max_refetch_wall;
793                    re
794                })
795                .collect();
796        }
797
798        // Restore liveness property *Timeout**
799        // postage::watch will tell up the timeout task about the new wake-up time.
800        let new_earliest_timeout = [
801            // First retry.  These are std Instant.
802            self.retry_schedule.peek().map(|re| re.when),
803            // First refetch.  These are SystemTime, so we must convert them.
804            self.refetch_schedule.peek().map(|re| {
805                // If duration_since gives Err, that means when is before now,
806                // ie we should not be waiting: the wait duration should be 0.
807                let wait = re.when.duration_since(now_wall).unwrap_or_default();
808
809                mgr.runtime.now() + wait
810            }),
811        ]
812        .into_iter()
813        .flatten()
814        .min();
815        *self.earliest_timeout.borrow_mut() = new_earliest_timeout;
816    }
817
818    /// Launch download attempts if we can
819    ///
820    /// Specifically: if we have things in `queued`, and `running` is shorter than
821    /// `effective_parallelism()`, we launch task(s) to attempt download(s).
822    ///
823    /// Restores liveness invariant *Running*.
824    ///
825    /// Idempotent.  Forms part of `process`.
826    #[allow(clippy::blocks_in_conditions)]
827    fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
828        let mut to_remove = vec![];
829
830        while self.running.len() < self.effective_parallelism() {
831            let QueuedEntry {
832                bridge,
833                retry_delay,
834            } = match self.queued.pop_front() {
835                Some(qe) => qe,
836                None => break,
837            };
838            match mgr
839                .runtime
840                .spawn({
841                    let config = self.config.clone();
842                    let bridge = bridge.clone();
843                    let inner = mgr.clone();
844                    let mockable = inner.mockable.clone();
845
846                    // The task which actually downloads a descriptor.
847                    async move {
848                        let got =
849                            AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
850                                .catch_unwind()
851                                .await
852                                .unwrap_or_else(|_| {
853                                    Err(internal!("download descriptor task panicked!").into())
854                                });
855                        match &got {
856                            Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
857                            Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
858                        };
859                        let mut state = inner.lock_then_process();
860                        state.record_download_outcome(bridge, got);
861                        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
862                        // to make further progress and restore the liveness properties.
863                    }
864                })
865                .map(|()| JoinHandle)
866            {
867                Ok(join) => {
868                    self.running
869                        .insert(bridge, RunningInfo { join, retry_delay });
870                }
871                Err(_) => {
872                    // Spawn failed.
873                    //
874                    // We are going to forget about this bridge.
875                    // And we're going to do that without notifying anyone.
876                    // We *do* want to remove it from `current` because simply forgetting
877                    // about a refetch could leave expired data there.
878                    // We amortize this, so we don't do a lot of O(n^2) work on shutdown.
879                    to_remove.push(bridge);
880                }
881            }
882        }
883
884        if !to_remove.is_empty() {
885            self.modify_current(|current| {
886                for bridge in to_remove {
887                    current.remove(&bridge);
888                }
889            });
890        }
891    }
892
893    /// Modify `current` and notify subscribers
894    ///
895    /// Helper function which modifies only `current`, not any of the rest of the state.
896    /// it is the caller's responsibility to ensure that the invariants are upheld.
897    ///
898    /// The implementation actually involves cloning `current`,
899    /// so it is best to amortize calls to this function.
900    fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
901        let mut current = (*self.current).clone();
902        let r = f(&mut current);
903        self.set_current_and_notify(current);
904        r
905    }
906
907    /// Set `current` to a value and notify
908    ///
909    /// Helper function which modifies only `current`, not any of the rest of the state.
910    /// it is the caller's responsibility to ensure that the invariants are upheld.
911    fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
912        self.current = new.into();
913        self.subscribers.publish(BridgeDescEvent::SomethingChanged);
914    }
915
916    /// Obtain the currently-desired level of parallelism
917    ///
918    /// Helper function.  The return value depends the mutable state and also the `config`.
919    ///
920    /// This is how we implement dormancy.
921    fn effective_parallelism(&self) -> usize {
922        match self.dormancy {
923            Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
924            Dormancy::Dormant => 0,
925        }
926    }
927}
928
929impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
930    /// Record a download outcome.
931    ///
932    /// Final act of the descriptor download task.
933    /// `got` is from [`download_descriptor`](Manager::download_descriptor).
934    fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
935        let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
936            Some(ri) => ri,
937            None => {
938                debug!("bridge descriptor download completed for no-longer-configured bridge");
939                return;
940            }
941        };
942
943        let insert = match got {
944            Ok(Downloaded { desc, refetch }) => {
945                // Successful download.  Schedule the refetch, and we'll insert Ok.
946
947                self.refetch_schedule.push(RefetchEntry {
948                    when: refetch,
949                    bridge: bridge.clone(),
950                    retry_delay: (),
951                });
952
953                Ok(desc)
954            }
955            Err(err) => {
956                // Failed.  Schedule the retry, and we'll insert Err.
957
958                let mut retry_delay =
959                    retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
960
961                let retry = err.retry_time();
962                // We retry at least as early as
963                let now = self.mgr.runtime.now();
964                let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
965                // Retry at least as early as max_refetch.  That way if a bridge is
966                // misconfigured we will see it be fixed eventually.
967                let retry = {
968                    let earliest = now;
969                    let latest = || now + self.config.max_refetch;
970                    match retry {
971                        AbsRetryTime::Immediate => earliest,
972                        AbsRetryTime::Never => latest(),
973                        AbsRetryTime::At(i) => i.clamp(earliest, latest()),
974                    }
975                };
976                self.retry_schedule.push(RefetchEntry {
977                    when: retry,
978                    bridge: bridge.clone(),
979                    retry_delay,
980                });
981
982                Err(Box::new(err) as _)
983            }
984        };
985
986        self.modify_current(|current| current.insert(bridge, insert));
987    }
988}
989
990impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
991    /// Downloads a descriptor.
992    ///
993    /// The core of the descriptor download task
994    /// launched by `State::consider_launching`.
995    ///
996    /// Uses Mockable::download to actually get the document.
997    /// So most of this function is parsing and checking.
998    ///
999    /// The returned value is precisely the `got` input to
1000    /// [`record_download_outcome`](StateGuard::record_download_outcome).
1001    async fn download_descriptor(
1002        &self,
1003        mockable: M,
1004        bridge: &BridgeConfig,
1005        config: &BridgeDescDownloadConfig,
1006    ) -> Result<Downloaded, Error> {
1007        // convenience alias, capturing the usual parameters from our variables.
1008        let process_document = |text| process_document(&self.runtime, config, text);
1009
1010        let store = || {
1011            self.store
1012                .lock()
1013                .map_err(|_| internal!("bridge descriptor store poisoned"))
1014        };
1015
1016        let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
1017            .unwrap_or_else(|err| {
1018                error_report!(
1019                    err,
1020                    r#"bridge descriptor cache lookup failed, for "{}""#,
1021                    sensitive(bridge),
1022                );
1023                None
1024            });
1025
1026        let now = self.runtime.wallclock();
1027        let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
1028            if cached.fetched > now {
1029                // was fetched "in the future"
1030                None
1031            } else {
1032                // let's see if it's any use
1033                match process_document(&cached.document) {
1034                    Err(err) => {
1035                        // We had a doc in the cache but our attempt to use it failed
1036                        // We wouldn't have written a bad cache entry.
1037                        // So one of the following must be true:
1038                        //  * We were buggy or are stricter now or something
1039                        //  * The document was valid but its validity time has expired
1040                        // In any case we can't reuse it.
1041                        // (This happens in normal operation, when a document expires.)
1042                        trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
1043                        None
1044                    }
1045                    Ok(got) => {
1046                        // The cached document looks valid.
1047                        // But how long ago did we fetch it?
1048                        // We need to enforce max_refresh even for still-valid documents.
1049                        if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
1050                            // Was fetched recently, too.  We can just reuse it.
1051                            return Ok(got);
1052                        }
1053                        Some(got)
1054                    }
1055                }
1056            }
1057        } else {
1058            None
1059        };
1060
1061        // If cached_good is Some, we found a plausible cache entry; if we got here, it was
1062        // past its max_refresh.  So in that case we want to send a request with
1063        // if-modified-since.  If we get Not Modified, we can reuse it (and update the fetched time).
1064        let if_modified_since = cached_good
1065            .as_ref()
1066            .map(|got| got.desc.as_ref().published());
1067
1068        debug!(
1069            r#"starting download for "{}"{}"#,
1070            bridge,
1071            match if_modified_since {
1072                Some(ims) => format!(
1073                    " if-modified-since {}",
1074                    humantime::format_rfc3339_seconds(ims),
1075                ),
1076                None => "".into(),
1077            }
1078        );
1079
1080        let text = mockable
1081            .clone()
1082            .download(&self.runtime, &self.circmgr, bridge, if_modified_since)
1083            .await?;
1084
1085        let (document, got) = if let Some(text) = text {
1086            let got = process_document(&text)?;
1087            (text, got)
1088        } else if let Some(cached) = cached_good {
1089            (
1090                cache_entry
1091                    .expect("cached_good but not cache_entry")
1092                    .document,
1093                cached,
1094            )
1095        } else {
1096            return Err(internal!("download gave None but no if-modified-since").into());
1097        };
1098
1099        // IEFI catches cache store errors, which we log but don't do anything else with
1100        (|| {
1101            let cached = CachedBridgeDescriptor {
1102                document,
1103                fetched: now, // this is from before we started the fetch, which is correct
1104            };
1105
1106            // Calculate when the cache should forget about this.
1107            // We want to add a bit of slop for the purposes of mild clock skew handling,
1108            // etc., and the prefetch time is a good proxy for that.
1109            let until = got
1110                .refetch
1111                .checked_add(config.prefetch)
1112                .unwrap_or(got.refetch /*uh*/);
1113
1114            store()?.store_bridgedesc(bridge, cached, until)?;
1115            Ok(())
1116        })()
1117        .unwrap_or_else(|err: crate::Error| {
1118            error_report!(err, "failed to cache downloaded bridge descriptor",);
1119        });
1120
1121        Ok(got)
1122    }
1123}
1124
1125/// Processes and analyses a textual descriptor document into a `Downloaded`
1126///
1127/// Parses it, checks the signature, checks the document validity times,
1128/// and if that's all good, calculates when will want to refetch it.
1129fn process_document<R: Runtime>(
1130    runtime: &R,
1131    config: &BridgeDescDownloadConfig,
1132    text: &str,
1133) -> Result<Downloaded, Error> {
1134    let desc = RouterDesc::parse(text)?;
1135
1136    // We *could* just trust this because we have trustworthy provenance
1137    // we know that the channel machinery authenticated the identity keys in `bridge`.
1138    // But let's do some cross-checking anyway.
1139    // `check_signature` checks the self-signature.
1140    let desc = desc.check_signature().map_err(Arc::new)?;
1141
1142    let now = runtime.wallclock();
1143    desc.is_valid_at(&now)?;
1144
1145    // Justification that use of "dangerously" is correct:
1146    // 1. We have checked this just above, so it is valid now.
1147    // 2. We are extracting the timeout and implement our own refetch logic using expires.
1148    let (desc, (_, expires)) = desc.dangerously_into_parts();
1149
1150    // Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
1151    // The following situations can result in a nominally-expired descriptor being used:
1152    //
1153    // 1. We primarily enforce the timeout by looking at the expiry time,
1154    //    subtracting a configured constant, and scheduling the start of a refetch then.
1155    //    If it takes us longer to do the retry, than the prefetch constant,
1156    //    we'll still be providing the old descriptor to consumers in the meantime.
1157    //
1158    // 2. We apply a minimum time before we will refetch a descriptor.
1159    //    So if the validity time is unreasonably short, we'll use it beyond that time.
1160    //
1161    // 3. Clock warping could confuse this algorithm.  This is inevitable because we
1162    //    are relying on calendar times (SystemTime) in the descriptor, and because
1163    //    we don't have a mechanism for being told about clock warps rather than the
1164    //    passage of time.
1165    //
1166    // We think this is all OK given that a bridge descriptor is used for trying to
1167    // connect to the bridge itself.  In particular, we don't want to completely trust
1168    // bridges to control our retry logic.
1169    let refetch = match expires {
1170        Some(expires) => expires
1171            .checked_sub(config.prefetch)
1172            .ok_or(Error::ExtremeValidityTime)?,
1173
1174        None => now
1175            .checked_add(config.max_refetch)
1176            .ok_or(Error::ExtremeValidityTime)?,
1177    };
1178    let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
1179
1180    let desc = BridgeDesc::new(Arc::new(desc));
1181
1182    Ok(Downloaded { desc, refetch })
1183}
1184
1185/// Task which waits for the timeout, and requeues bridges that need to be refetched
1186///
1187/// This task's job is to execute the wakeup instructions provided via `updates`.
1188///
1189/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
1190/// which is maintained to be the earliest time any of the schedules says we should wake up
1191/// (liveness property *Timeout*).
1192async fn timeout_task<R: Runtime, M: Mockable<R>>(
1193    runtime: R,
1194    inner: Weak<Manager<R, M>>,
1195    update: postage::watch::Receiver<Option<Instant>>,
1196) {
1197    /// Requeue things in `*_schedule` whose time for action has arrived
1198    ///
1199    /// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
1200    /// into the `Option` which appears in [`QueuedEntry`].
1201    ///
1202    /// Helper function.  Idempotent.
1203    fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
1204        queued: &mut VecDeque<QueuedEntry>,
1205        schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
1206        now: TT,
1207        retry_delay_map: RDM,
1208    ) {
1209        while let Some(ent) = schedule.peek() {
1210            if ent.when > now {
1211                break;
1212            }
1213            let re = schedule.pop().expect("schedule became empty!");
1214            let bridge = re.bridge;
1215            let retry_delay = retry_delay_map(re.retry_delay);
1216
1217            queued.push_back(QueuedEntry {
1218                bridge,
1219                retry_delay,
1220            });
1221        }
1222    }
1223
1224    let mut next_wakeup = Some(runtime.now());
1225    let mut update = update.fuse();
1226    loop {
1227        select! {
1228            // Someone modified the schedules, and sent us a new earliest timeout
1229            changed = update.next() => {
1230                // changed is Option<Option< >>.
1231                // The outer Option is from the Stream impl for watch::Receiver - None means EOF.
1232                // The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
1233                next_wakeup = if let Some(changed) = changed {
1234                    changed
1235                } else {
1236                    // Oh, actually, the watch::Receiver is EOF - we're to shut down
1237                    break
1238                }
1239            },
1240
1241            // Wait until the specified earliest wakeup time
1242            () = async {
1243                if let Some(next_wakeup) = next_wakeup {
1244                    let now = runtime.now();
1245                    if next_wakeup > now {
1246                        let duration = next_wakeup - now;
1247                        runtime.sleep(duration).await;
1248                    }
1249                } else {
1250                    #[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
1251                    { future::pending().await }
1252                }
1253            }.fuse() => {
1254                // We have reached the pre-programmed time.  Check what needs doing.
1255
1256                let inner = if let Some(i) = inner.upgrade() { i } else { break; };
1257                let mut state = inner.lock_then_process();
1258                let state = &mut **state; // Do the DerefMut once so we can borrow fields
1259
1260                requeue_as_required(
1261                    &mut state.queued,
1262                    &mut state.refetch_schedule,
1263                    runtime.wallclock(),
1264                    |()| None,
1265                );
1266
1267                requeue_as_required(
1268                    &mut state.queued,
1269                    &mut state.retry_schedule,
1270                    runtime.now(),
1271                    Some,
1272                );
1273
1274                // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
1275                // to make further progress and restore the liveness properties.
1276            }
1277        }
1278    }
1279}
1280
1281/// Error which occurs during bridge descriptor manager startup
1282#[derive(Clone, Debug, thiserror::Error)]
1283#[non_exhaustive]
1284pub enum StartupError {
1285    /// No circuit manager in the directory manager
1286    #[error(
1287        "tried to create bridge descriptor manager from directory manager with no circuit manager"
1288    )]
1289    MissingCircMgr,
1290
1291    /// Unable to spawn task
1292    //
1293    // TODO lots of our Errors have a variant exactly like this.
1294    // Maybe we should make a struct tor_error::SpawnError.
1295    #[error("Unable to spawn {spawning}")]
1296    Spawn {
1297        /// What we were trying to spawn.
1298        spawning: &'static str,
1299        /// What happened when we tried to spawn it.
1300        #[source]
1301        cause: Arc<SpawnError>,
1302    },
1303}
1304
1305impl HasKind for StartupError {
1306    fn kind(&self) -> ErrorKind {
1307        use ErrorKind as EK;
1308        use StartupError as SE;
1309        match self {
1310            SE::MissingCircMgr => EK::Internal,
1311            SE::Spawn { cause, .. } => cause.kind(),
1312        }
1313    }
1314}
1315
1316/// An error which occurred trying to obtain the descriptor for a particular bridge
1317#[derive(Clone, Debug, thiserror::Error)]
1318#[non_exhaustive]
1319pub enum Error {
1320    /// Couldn't establish a circuit to the bridge
1321    #[error("Failed to establish circuit")]
1322    CircuitFailed(#[from] tor_circmgr::Error),
1323
1324    /// Couldn't establish a directory stream to the bridge
1325    #[error("Failed to establish directory stream")]
1326    StreamFailed(#[source] tor_circmgr::Error),
1327
1328    /// Directory request failed
1329    #[error("Directory request failed")]
1330    RequestFailed(#[from] tor_dirclient::RequestFailedError),
1331
1332    /// Failed to parse descriptor in response
1333    #[error("Failed to parse descriptor in response")]
1334    ParseFailed(#[from] tor_netdoc::Error),
1335
1336    /// Signature check failed
1337    #[error("Signature check failed")]
1338    SignatureCheckFailed(#[from] Arc<signature::Error>),
1339
1340    /// Obtained descriptor but it is outside its validity time
1341    #[error("Descriptor is outside its validity time, as supplied")]
1342    BadValidityTime(#[from] tor_checkable::TimeValidityError),
1343
1344    /// A bridge descriptor has very extreme validity times
1345    /// such that our refetch time calculations overflow.
1346    #[error("Descriptor validity time range is too extreme for us to cope with")]
1347    ExtremeValidityTime,
1348
1349    /// There was a programming error somewhere in our code, or the calling code.
1350    #[error("Programming error")]
1351    Bug(#[from] tor_error::Bug),
1352
1353    /// Error used for testing
1354    #[cfg(test)]
1355    #[error("Error for testing, {0:?}, retry at {1:?}")]
1356    TestError(&'static str, RetryTime),
1357}
1358
1359impl HasKind for Error {
1360    fn kind(&self) -> ErrorKind {
1361        use Error as E;
1362        use ErrorKind as EK;
1363        let bridge_protocol_violation = EK::TorAccessFailed;
1364        match self {
1365            // We trust that tor_circmgr returns TorAccessFailed when it ought to.
1366            E::CircuitFailed(e) => e.kind(),
1367            E::StreamFailed(e) => e.kind(),
1368            E::RequestFailed(e) => e.kind(),
1369            E::ParseFailed(..) => bridge_protocol_violation,
1370            E::SignatureCheckFailed(..) => bridge_protocol_violation,
1371            E::ExtremeValidityTime => bridge_protocol_violation,
1372            E::BadValidityTime(..) => EK::ClockSkew,
1373            E::Bug(e) => e.kind(),
1374            #[cfg(test)]
1375            E::TestError(..) => EK::Internal,
1376        }
1377    }
1378}
1379
1380impl HasRetryTime for Error {
1381    fn retry_time(&self) -> RetryTime {
1382        use Error as E;
1383        use RetryTime as R;
1384        match self {
1385            // Errors with their own retry times
1386            E::CircuitFailed(e) => e.retry_time(),
1387
1388            // Remote misbehavior, maybe the network is being strange?
1389            E::StreamFailed(..) => R::AfterWaiting,
1390            E::RequestFailed(..) => R::AfterWaiting,
1391
1392            // Remote misconfiguration, detected *after* we successfully made the channel
1393            // (so not a network problem).  We'll say "never" for RetryTime,
1394            // even though actually we will in fact retry in at most `max_refetch`.
1395            E::ParseFailed(..) => R::Never,
1396            E::SignatureCheckFailed(..) => R::Never,
1397            E::BadValidityTime(..) => R::Never,
1398            E::ExtremeValidityTime => R::Never,
1399
1400            // Probably, things are broken here, rather than remotely.
1401            E::Bug(..) => R::Never,
1402
1403            #[cfg(test)]
1404            E::TestError(_, retry) => *retry,
1405        }
1406    }
1407}
1408
1409impl BridgeDescError for Error {}
1410
1411impl State {
1412    /// Consistency check (for testing)
1413    ///
1414    /// `input` should be what was passed to `set_bridges` (or `None` if not known).
1415    ///
1416    /// Does not make any changes.
1417    /// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
1418    #[cfg(test)]
1419    fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
1420    where
1421        R: Runtime,
1422        I: IntoIterator<Item = &'i BridgeKey>,
1423    {
1424        /// Where we found a thing was Tracked
1425        #[derive(Debug, Clone, Copy, Eq, PartialEq)]
1426        enum Where {
1427            /// Found in `running`
1428            Running,
1429            /// Found in `queued`
1430            Queued,
1431            /// Found in the schedule `sch`
1432            Schedule {
1433                sch_name: &'static str,
1434                /// Starts out as `false`, set to `true` when we find this in `current`
1435                found_in_current: bool,
1436            },
1437        }
1438
1439        /// Records the expected input from `input`, and what we have found so far
1440        struct Tracked {
1441            /// Were we told what the last `set_bridges` call got as input?
1442            known_input: bool,
1443            /// `Some` means we have seen this bridge in one our records (other than `current`)
1444            tracked: HashMap<BridgeKey, Option<Where>>,
1445            /// Earliest instant found in any schedule
1446            earliest: Option<Instant>,
1447        }
1448
1449        let mut tracked = if let Some(input) = input {
1450            let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
1451            Tracked {
1452                tracked,
1453                known_input: true,
1454                earliest: None,
1455            }
1456        } else {
1457            Tracked {
1458                tracked: HashMap::new(),
1459                known_input: false,
1460                earliest: None,
1461            }
1462        };
1463
1464        impl Tracked {
1465            /// Note that `bridge` is Tracked
1466            fn note(&mut self, where_: Where, b: &BridgeKey) {
1467                match self.tracked.get(b) {
1468                    // Invariant *Tracked* - ie appears at most once
1469                    Some(Some(prev_where)) => {
1470                        panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
1471                    }
1472                    // Invariant *Input (every tracked bridge is was in input)*
1473                    None if self.known_input => {
1474                        panic!("unexpected {:?} {:?}", where_, b);
1475                    }
1476                    // OK, we've not seen it before, note it as being here
1477                    _ => {
1478                        self.tracked.insert(b.clone(), Some(where_));
1479                    }
1480                }
1481            }
1482        }
1483
1484        /// Walk `schedule` and update `tracked` (including `tracked.earliest`)
1485        ///
1486        /// Check invariant *Tracked* and *Schedule* wrt this schedule.
1487        #[cfg(test)]
1488        fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
1489            tracked: &mut Tracked,
1490            sch_name: &'static str,
1491            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
1492            conv_time: CT,
1493        ) {
1494            let where_ = Where::Schedule {
1495                sch_name,
1496                found_in_current: false,
1497            };
1498
1499            if let Some(first) = schedule.peek() {
1500                // Of course this is a heap, so this ought to be a wasteful scan,
1501                // but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
1502                for re in schedule {
1503                    tracked.note(where_, &re.bridge);
1504                }
1505
1506                let scanned = schedule
1507                    .iter()
1508                    .map(|re| re.when)
1509                    .min()
1510                    .expect("schedule empty!");
1511                assert_eq!(scanned, first.when);
1512                tracked.earliest = Some(
1513                    [tracked.earliest, Some(conv_time(scanned))]
1514                        .into_iter()
1515                        .flatten()
1516                        .min()
1517                        .expect("flatten of chain Some was empty"),
1518                );
1519            }
1520        }
1521
1522        // *Timeout* (prep)
1523        //
1524        // This will fail if there is clock skew, but won't mind if
1525        // the earliest refetch time is in the past.
1526        let now_wall = runtime.wallclock();
1527        let now_mono = runtime.now();
1528        let adj_wall = |wallclock: SystemTime| {
1529            // Good grief what a palaver!
1530            if let Ok(ahead) = wallclock.duration_since(now_wall) {
1531                now_mono + ahead
1532            } else if let Ok(behind) = now_wall.duration_since(wallclock) {
1533                now_mono
1534                    .checked_sub(behind)
1535                    .expect("time subtraction underflow")
1536            } else {
1537                panic!("times should be totally ordered!")
1538            }
1539        };
1540
1541        // *Tracked*
1542        //
1543        // We walk our data structures in turn
1544
1545        for b in self.running.keys() {
1546            tracked.note(Where::Running, b);
1547        }
1548        for qe in &self.queued {
1549            tracked.note(Where::Queued, &qe.bridge);
1550        }
1551
1552        walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
1553        walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
1554
1555        // *Current*
1556        for b in self.current.keys() {
1557            let found = tracked
1558                .tracked
1559                .get_mut(b)
1560                .and_then(Option::as_mut)
1561                .unwrap_or_else(|| panic!("current but untracked {:?}", b));
1562            if let Where::Schedule {
1563                found_in_current, ..
1564            } = found
1565            {
1566                *found_in_current = true;
1567            }
1568        }
1569
1570        // *Input (sense: every input bridge is tracked)*
1571        //
1572        // (Will not cope if spawn ever failed, since that violates the invariant.)
1573        for (b, where_) in &tracked.tracked {
1574            match where_ {
1575                None => panic!("missing {}", &b),
1576                Some(Where::Schedule {
1577                    sch_name,
1578                    found_in_current,
1579                }) => {
1580                    assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
1581                }
1582                _ => {}
1583            }
1584        }
1585
1586        // *Limit*
1587        let parallelism = self.effective_parallelism();
1588        assert!(self.running.len() <= parallelism);
1589
1590        // *Running*
1591        assert!(self.running.len() == parallelism || self.queued.is_empty());
1592
1593        // *Timeout* (final)
1594        assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
1595    }
1596}