tor_dirmgr/
bridgedesc.rs

1//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
2
3use std::borrow::Cow;
4use std::cmp::Ordering;
5use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
6use std::fmt::{self, Debug, Display};
7use std::num::NonZeroU8;
8use std::panic::AssertUnwindSafe;
9use std::sync::{Arc, Mutex, MutexGuard, Weak};
10use std::time::{Duration, Instant, SystemTime};
11
12use async_trait::async_trait;
13use derive_more::{Deref, DerefMut};
14use educe::Educe;
15use futures::future;
16use futures::select;
17use futures::stream::{BoxStream, StreamExt};
18use futures::task::{SpawnError, SpawnExt as _};
19use futures::FutureExt;
20use tracing::{debug, error, info, trace};
21
22use safelog::sensitive;
23use tor_basic_utils::retry::RetryDelay;
24use tor_basic_utils::BinaryHeapExt as _;
25use tor_checkable::{SelfSigned, Timebound};
26use tor_circmgr::CircMgr;
27use tor_error::{error_report, internal, ErrorKind, HasKind};
28use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
29use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
30use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
31use tor_netdoc::doc::routerdesc::RouterDesc;
32use tor_rtcompat::Runtime;
33
34use crate::event::FlagPublisher;
35use crate::storage::CachedBridgeDescriptor;
36use crate::{DirMgrStore, DynStore};
37
38#[cfg(test)]
39mod bdtest;
40
41/// The key we use in all our data structures
42///
43/// This type saves typing and would make it easier to change the bridge descriptor manager
44/// to take and handle another way of identifying the bridges it is working with.
45type BridgeKey = BridgeConfig;
46
47/// Active vs dormant state, as far as the bridge descriptor manager is concerned
48///
49/// This is usually derived in higher layers from `arti_client::DormantMode`,
50/// whether `TorClient::bootstrap()` has been called, etc.
51#[non_exhaustive]
52#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
53// TODO: These proliferating `Dormancy` enums should be centralized and unified with `TaskHandle`
54//     https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
55pub enum Dormancy {
56    /// Dormant (inactive)
57    ///
58    /// Bridge descriptor downloads, or refreshes, will not be started.
59    ///
60    /// In-progress downloads will be stopped if possible,
61    /// but they may continue until they complete (or fail).
62    // TODO async task cancellation: actually cancel these in this case
63    ///
64    /// So a dormant BridgeDescMgr may still continue to
65    /// change the return value from [`bridges()`](BridgeDescProvider::bridges)
66    /// and continue to report [`BridgeDescEvent`]s.
67    ///
68    /// When the BridgeDescMgr is dormant,
69    /// `bridges()` may return stale descriptors
70    /// (that is, descriptors which ought to have been refetched and may no longer be valid),
71    /// or stale errors
72    /// (that is, errors which occurred some time ago,
73    /// and which would normally have been retried by now).
74    Dormant,
75
76    /// Active
77    ///
78    /// Bridge descriptors will be downloaded as requested.
79    ///
80    /// When a bridge descriptor manager has been `Dormant`,
81    /// it may continue to provide stale data (as described)
82    /// for a while after it is made `Active`,
83    /// until the required refreshes and retries have taken place (or failed).
84    Active,
85}
86
87/// **Downloader and cache for bridges' router descriptors**
88///
89/// This is a handle which is cheap to clone and has internal mutability.
90#[derive(Clone)]
91pub struct BridgeDescMgr<R: Runtime, M = ()>
92where
93    M: Mockable<R>,
94{
95    /// The actual manager
96    ///
97    /// We have the `Arc` in here, rather than in our callers, because this
98    /// makes the API nicer for them, and also because some of our tasks
99    /// want a handle they can use to relock and modify the state.
100    mgr: Arc<Manager<R, M>>,
101}
102
103/// Configuration for the `BridgeDescMgr`
104///
105/// Currently, the only way to make this is via its `Default` impl.
106// TODO: there should be some way to override the defaults.  See #629 for considerations.
107#[derive(Debug, Clone)]
108pub struct BridgeDescDownloadConfig {
109    /// How many bridge descriptor downloads to attempt in parallel?
110    parallelism: NonZeroU8,
111
112    /// Default/initial time to retry a failure to download a descriptor
113    ///
114    /// (This has the semantics of an initial delay for [`RetryDelay`],
115    /// and is used unless there is more specific retry information for the particular failure.)
116    retry: Duration,
117
118    /// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
119    prefetch: Duration,
120
121    /// Minimum interval between successive refetches of the descriptor for the same bridge
122    ///
123    /// This limits the download activity which can be caused by an errant bridge.
124    ///
125    /// If the descriptor's validity information is shorter than this, we will use
126    /// it after it has expired (rather than treating the bridge as broken).
127    min_refetch: Duration,
128
129    /// Maximum interval between successive refetches of the descriptor for the same bridge
130    ///
131    /// This sets an upper bound on how old a descriptor we are willing to use.
132    /// When this time expires, a refetch attempt will be started even if the
133    /// descriptor is not going to expire soon.
134    //
135    // TODO: When this is configurable, we need to make sure we reject
136    // configurations with max_refresh < min_refresh, or we may panic.
137    max_refetch: Duration,
138}
139
140impl Default for BridgeDescDownloadConfig {
141    fn default() -> Self {
142        let secs = Duration::from_secs;
143        BridgeDescDownloadConfig {
144            parallelism: 4.try_into().expect("parallelism is zero"),
145            retry: secs(30),
146            prefetch: secs(1000),
147            min_refetch: secs(3600),
148            max_refetch: secs(3600 * 3), // matches C Tor behaviour
149        }
150    }
151}
152
153/// Mockable internal methods for within the `BridgeDescMgr`
154///
155/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
156///
157/// This (`()`) is the default for the type parameter in
158/// [`BridgeDescMgr`],
159/// and it is the only publicly available implementation,
160/// since this trait is sealed.
161pub trait Mockable<R>: mockable::MockableAPI<R> {}
162impl<R: Runtime> Mockable<R> for () {}
163
164/// Private module which seals [`Mockable`]
165/// by containing [`MockableAPI`](mockable::MockableAPI)
166mod mockable {
167    use super::*;
168
169    /// Defines the actual mockable APIs
170    ///
171    /// Not nameable (and therefore not implementable)
172    /// outside the `bridgedesc` module,
173    #[async_trait]
174    pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
175        /// Circuit manager
176        type CircMgr: Send + Sync + 'static;
177
178        /// Download this bridge's descriptor, and return it as a string
179        ///
180        /// Runs in a task.
181        /// Called by `Manager::download_descriptor`, which handles parsing and validation.
182        ///
183        /// If `if_modified_since` is `Some`,
184        /// should tolerate an HTTP 304 Not Modified and return `None` in that case.
185        /// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
186        async fn download(
187            self,
188            runtime: &R,
189            circmgr: &Self::CircMgr,
190            bridge: &BridgeConfig,
191            if_modified_since: Option<SystemTime>,
192        ) -> Result<Option<String>, Error>;
193    }
194}
195#[async_trait]
196impl<R: Runtime> mockable::MockableAPI<R> for () {
197    type CircMgr = Arc<CircMgr<R>>;
198
199    /// Actual code for downloading a descriptor document
200    async fn download(
201        self,
202        runtime: &R,
203        circmgr: &Self::CircMgr,
204        bridge: &BridgeConfig,
205        _if_modified_since: Option<SystemTime>,
206    ) -> Result<Option<String>, Error> {
207        // TODO actually support _if_modified_since
208        let circuit = circmgr.get_or_launch_dir_specific(bridge).await?;
209        let mut stream = circuit
210            .begin_dir_stream()
211            .await
212            .map_err(Error::StreamFailed)?;
213        let request = tor_dirclient::request::RoutersOwnDescRequest::new();
214        let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
215            .await
216            .map_err(|dce| match dce {
217                tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
218                _ => internal!(
219                    "tor_dirclient::send_request gave non-RequestFailed {:?}",
220                    dce
221                )
222                .into(),
223            })?;
224        let output = response.into_output_string()?;
225        Ok(Some(output))
226    }
227}
228
229/// The actual manager.
230struct Manager<R: Runtime, M: Mockable<R>> {
231    /// The mutable state
232    state: Mutex<State>,
233
234    /// Runtime, used for tasks and sleeping
235    runtime: R,
236
237    /// Circuit manager, used for creating circuits
238    circmgr: M::CircMgr,
239
240    /// Persistent state store
241    store: Arc<Mutex<DynStore>>,
242
243    /// Mock for testing, usually `()`
244    mockable: M,
245}
246
247/// State: our downloaded descriptors (cache), and records of what we're doing
248///
249/// Various functions (both tasks and public entrypoints),
250/// which generally start with a `Manager`,
251/// lock the mutex and modify this.
252///
253/// Generally, the flow is:
254///
255///  * A public entrypoint, or task, obtains a [`StateGuard`].
256///    It modifies the state to represent the callers' new requirements,
257///    or things it has done, by updating the state,
258///    preserving the invariants but disturbing the "liveness" (see below).
259///
260///  * [`StateGuard::drop`] calls [`State::process`].
261///    This restores the liveness properties.
262///
263/// ### Possible states of a bridge:
264///
265/// A bridge can be in one of the following states,
266/// represented by its presence in these particular data structures inside `State`:
267///
268///  * `running`/`queued`: newly added, no outcome yet.
269///  * `current` + `running`/`queued`: we are fetching (or going to)
270///  * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
271///  * `current = Err` + `retry_schedule`: failed, will retry at some point
272///
273/// ### Invariants:
274///
275/// Can be disrupted in the middle of a principal function,
276/// but should be restored on return.
277///
278/// * **Tracked**:
279///   Each bridge appears at most once in
280///   `running`, `queued`, `refetch_schedule` and `retry_schedule`.
281///   We call such a bridge Tracked.
282///
283/// * **Current**
284///   Every bridge in `current` is Tracked.
285///   (But not every Tracked bridge is necessarily in `current`, yet.)
286///
287/// * **Schedules**
288///   Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
289///
290/// * **Input**:
291///   Exactly each bridge that was passed to
292///   the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
293///   (If we encountered spawn failures, we treat this as trying to shut down,
294///   so we cease attempts to get bridges, and discard the relevant state, violating this.)
295///
296/// * **Limit**:
297///   `running` is capped at the effective parallelism: zero if we are dormant,
298///   the configured parallelism otherwise.
299///
300/// ### Liveness properties:
301///
302/// These can be disrupted by any function which holds a [`StateGuard`].
303/// Will be restored by [`process()`](State::process),
304/// which is called when `StateGuard` is dropped.
305///
306/// Functions that take a `StateGuard` may disturb these invariants
307/// and rely on someone else to restore them.
308///
309/// * **Running**:
310///   If `queued` is nonempty, `running` is full.
311///
312/// * **Timeout**:
313///   `earliest_timeout` is the earliest timeout in
314///   either `retry_schedule` or `refetch_schedule`.
315///   (Disturbances of this property which occur due to system time warps
316///   are not necessarily detected and remedied in a timely way,
317///   but will be remedied no later than after `max_refetch`.)
318struct State {
319    /// Our configuration
320    config: Arc<BridgeDescDownloadConfig>,
321
322    /// People who will be told when `current` changes.
323    subscribers: FlagPublisher<BridgeDescEvent>,
324
325    /// Our current idea of our output, which we give out handles onto.
326    current: Arc<BridgeDescList>,
327
328    /// Bridges whose descriptors we are currently downloading.
329    running: HashMap<BridgeKey, RunningInfo>,
330
331    /// Bridges which we want to download,
332    /// but we're waiting for `running` to be less than `effective_parallelism()`.
333    queued: VecDeque<QueuedEntry>,
334
335    /// Are we dormant?
336    dormancy: Dormancy,
337
338    /// Bridges that we have a descriptor for,
339    /// and when they should be refetched due to validity expiry.
340    ///
341    /// This is indexed by `SystemTime` because that helps avoids undesirable behaviors
342    /// when the system clock changes.
343    refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
344
345    /// Bridges that failed earlier, and when they should be retried.
346    retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
347
348    /// Earliest time from either `retry_schedule` or `refetch_schedule`
349    ///
350    /// `None` means "wait indefinitely".
351    earliest_timeout: postage::watch::Sender<Option<Instant>>,
352}
353
354impl Debug for State {
355    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356        /// Helper to format one bridge entry somewhere
357        fn fmt_bridge(
358            f: &mut fmt::Formatter,
359            b: &BridgeConfig,
360            info: &(dyn Display + '_),
361        ) -> fmt::Result {
362            let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
363            writeln!(f, "    {:80.80} | {}", info, b)
364        }
365
366        /// Helper to format one of the schedules
367        fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
368            f: &mut fmt::Formatter,
369            summary: &str,
370            name: &str,
371            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
372        ) -> fmt::Result {
373            writeln!(f, "  {}:", name)?;
374            for b in schedule {
375                fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
376            }
377            Ok(())
378        }
379
380        // We are going to have to go multi-line because of the bridge lines,
381        // so do completely bespoke formatting rather than `std::fmt::DebugStruct`
382        // or a derive.
383        writeln!(f, "State {{")?;
384        // We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
385        writeln!(f, "  earliest_timeout: ???, ..,")?;
386        writeln!(f, "  current:")?;
387        for (b, v) in &*self.current {
388            fmt_bridge(
389                f,
390                b,
391                &match v {
392                    Err(e) => Cow::from(format!("C Err {}", e)),
393                    Ok(_) => "C Ok".into(),
394                },
395            )?;
396        }
397        writeln!(f, "  running:")?;
398        for b in self.running.keys() {
399            fmt_bridge(f, b, &"R")?;
400        }
401        writeln!(f, "  queued:")?;
402        for qe in &self.queued {
403            fmt_bridge(f, &qe.bridge, &"Q")?;
404        }
405        fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
406        fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
407        write!(f, "}}")?;
408
409        Ok(())
410    }
411}
412
413/// Value of the entry in `running`
414#[derive(Debug)]
415struct RunningInfo {
416    /// For cancelling downloads no longer wanted
417    join: JoinHandle,
418
419    /// If this previously failed, the persistent retry delay.
420    retry_delay: Option<RetryDelay>,
421}
422
423/// Entry in `queued`
424#[derive(Debug)]
425struct QueuedEntry {
426    /// The bridge to fetch
427    bridge: BridgeKey,
428
429    /// If this previously failed, the persistent retry delay.
430    retry_delay: Option<RetryDelay>,
431}
432
433/// Entry in one of the `*_schedule`s
434///
435/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
436/// So don't deduplicate by `[Partial]Eq`, or use as a key in a map.
437#[derive(Debug)]
438struct RefetchEntry<TT, RD> {
439    /// When should we requeued this bridge for fetching
440    ///
441    /// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
442    when: TT,
443
444    /// The bridge to refetch
445    bridge: BridgeKey,
446
447    /// Retry delay
448    ///
449    /// `RetryDelay` if we previously failed (ie, if this is a retry entry);
450    /// otherwise `()`.
451    retry_delay: RD,
452}
453
454impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
455    fn cmp(&self, other: &Self) -> Ordering {
456        self.when.cmp(&other.when).reverse()
457        // We don't care about the ordering of BridgeConfig or retry_delay.
458        // Different BridgeConfig with the same fetch time will be fetched in "some order".
459    }
460}
461
462impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
463    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
464        Some(self.cmp(other))
465    }
466}
467
468impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
469    fn eq(&self, other: &Self) -> bool {
470        self.cmp(other) == Ordering::Equal
471    }
472}
473
474impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
475
476/// Dummy task join handle
477///
478/// We would like to be able to cancel now-redundant downloads
479/// using something like `tokio::task::JoinHandle::abort()`.
480/// tor-rtcompat doesn't support that so we stub it for now.
481///
482/// Providing this stub means the place where the cancellation needs to take place
483/// already has the appropriate call to our [`JoinHandle::abort`].
484#[derive(Debug)]
485struct JoinHandle;
486
487impl JoinHandle {
488    /// Would abort this async task, if we could do that.
489    fn abort(&self) {}
490}
491
492impl<R: Runtime> BridgeDescMgr<R> {
493    /// Create a new `BridgeDescMgr`
494    ///
495    /// This is the public constructor.
496    //
497    // TODO: That this constructor requires a DirMgr is rather odd.
498    // In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
499    // However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
500    // implementation is constructible only from the dirmgr's config.  This should probably be
501    // tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
502    pub fn new(
503        config: &BridgeDescDownloadConfig,
504        runtime: R,
505        store: DirMgrStore<R>,
506        circmgr: Arc<tor_circmgr::CircMgr<R>>,
507        dormancy: Dormancy,
508    ) -> Result<Self, StartupError> {
509        Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
510    }
511}
512
513/// If download was successful, what we obtained
514///
515/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
516#[derive(Debug)]
517struct Downloaded {
518    /// The bridge descriptor, fully parsed and verified
519    desc: BridgeDesc,
520
521    /// When we should start a refresh for this descriptor
522    ///
523    /// This is derived from the expiry time,
524    /// and clamped according to limits in the configuration).
525    refetch: SystemTime,
526}
527
528impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
529    /// Actual constructor, which takes a mockable
530    //
531    // Allow passing `runtime` by value, which is usual API for this kind of setup function.
532    #[allow(clippy::needless_pass_by_value)]
533    fn new_internal(
534        runtime: R,
535        circmgr: M::CircMgr,
536        store: Arc<Mutex<DynStore>>,
537        config: &BridgeDescDownloadConfig,
538        dormancy: Dormancy,
539        mockable: M,
540    ) -> Result<Self, StartupError> {
541        /// Convenience alias
542        fn default<T: Default>() -> T {
543            Default::default()
544        }
545
546        let config = config.clone().into();
547        let (earliest_timeout, timeout_update) = postage::watch::channel();
548
549        let state = Mutex::new(State {
550            config,
551            subscribers: default(),
552            current: default(),
553            running: default(),
554            queued: default(),
555            dormancy,
556            retry_schedule: default(),
557            refetch_schedule: default(),
558            earliest_timeout,
559        });
560        let mgr = Arc::new(Manager {
561            state,
562            runtime: runtime.clone(),
563            circmgr,
564            store,
565            mockable,
566        });
567
568        runtime
569            .spawn(timeout_task(
570                runtime.clone(),
571                Arc::downgrade(&mgr),
572                timeout_update,
573            ))
574            .map_err(|cause| StartupError::Spawn {
575                spawning: "timeout task",
576                cause: cause.into(),
577            })?;
578
579        Ok(BridgeDescMgr { mgr })
580    }
581
582    /// Consistency check convenience wrapper
583    #[cfg(test)]
584    fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
585    where
586        I: IntoIterator<Item = &'i BridgeKey>,
587    {
588        self.mgr
589            .lock_only()
590            .check_consistency(&self.mgr.runtime, input_bridges);
591    }
592
593    /// Set whether this `BridgeDescMgr` is active
594    // TODO this should instead be handled by a central mechanism; see TODO on Dormancy
595    pub fn set_dormancy(&self, dormancy: Dormancy) {
596        self.mgr.lock_then_process().dormancy = dormancy;
597    }
598}
599
600impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
601    fn bridges(&self) -> Arc<BridgeDescList> {
602        self.mgr.lock_only().current.clone()
603    }
604
605    fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
606        let stream = self.mgr.lock_only().subscribers.subscribe();
607        Box::pin(stream) as _
608    }
609
610    fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
611        /// Helper: Called for each bridge that is currently Tracked.
612        ///
613        /// Checks if `new_bridges` has `bridge`.  If so, removes it from `new_bridges`,
614        /// and returns `true`, indicating that this bridge should be kept.
615        ///
616        /// If not, returns `false`, indicating that this bridge should be removed,
617        /// and logs a message.
618        fn note_found_keep_p(
619            new_bridges: &mut HashSet<BridgeKey>,
620            bridge: &BridgeKey,
621            was_state: &str,
622        ) -> bool {
623            let keep = new_bridges.remove(bridge);
624            if !keep {
625                debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
626            }
627            keep
628        }
629
630        /// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
631        /// removing them as we go.
632        fn filter_schedule<TT: Ord + Copy, RD>(
633            new_bridges: &mut HashSet<BridgeKey>,
634            schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
635            was_state: &str,
636        ) {
637            schedule.retain_ext(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
638        }
639
640        let mut state = self.mgr.lock_then_process();
641        let state = &mut **state;
642
643        // We go through our own data structures, comparing them with `new_bridges`.
644        // Entries in our own structures that aren't in `new_bridges` are removed.
645        // Entries that *are* are removed from `new_bridges`.
646        // Eventually `new_bridges` is just the list of new bridges to *add*.
647        let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
648
649        // Is there anything in `current` that ought to be deleted?
650        if state.current.keys().any(|b| !new_bridges.contains(b)) {
651            // Found a bridge In `current` but not `new`
652            // We need to remove it (and any others like it) from `current`.
653            //
654            // Disturbs the invariant *Schedules*:
655            // After this maybe the schedules have entries they shouldn't.
656            let current: BridgeDescList = state
657                .current
658                .iter()
659                .filter(|(b, _)| new_bridges.contains(&**b))
660                .map(|(b, v)| (b.clone(), v.clone()))
661                .collect();
662            state.set_current_and_notify(current);
663        } else {
664            // Nothing is being removed, so we can keep `current`.
665        }
666        // Bridges being newly requested will be added to `current`
667        // later, after they have been fetched.
668
669        // Is there anything in running we should abort?
670        state.running.retain(|b, ri| {
671            let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
672            if !keep {
673                ri.join.abort();
674            }
675            keep
676        });
677
678        // Is there anything in queued we should forget about?
679        state
680            .queued
681            .retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
682
683        // Restore the invariant *Schedules*, that the schedules contain only things in current,
684        // by removing the same things from the schedules that we earlier removed from current.
685        filter_schedule(
686            &mut new_bridges,
687            &mut state.retry_schedule,
688            "previously failed",
689        );
690        filter_schedule(
691            &mut new_bridges,
692            &mut state.refetch_schedule,
693            "previously downloaded",
694        );
695
696        // OK now we have the list of bridges to add (if any).
697        state.queued.extend(new_bridges.into_iter().map(|bridge| {
698            debug!(r#" added bridge, queueing for download "{}""#, &bridge);
699            QueuedEntry {
700                bridge,
701                retry_delay: None,
702            }
703        }));
704
705        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
706        // to make further progress and restore the liveness properties.
707    }
708}
709
710impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
711    /// Obtain a lock on state, for functions that want to disrupt liveness properties
712    ///
713    /// When `StateGuard` is dropped, the liveness properties will be restored
714    /// by making whatever progress is required.
715    ///
716    /// See [`State`].
717    fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
718        StateGuard {
719            state: self.lock_only(),
720            mgr: self,
721        }
722    }
723
724    /// Obtains the lock on state.
725    ///
726    /// Caller ought not to modify state
727    /// so as to invalidate invariants or liveness properties.
728    /// Callers which are part of the algorithms in this crate
729    /// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
730    fn lock_only(&self) -> MutexGuard<State> {
731        self.state.lock().expect("bridge desc manager poisoned")
732    }
733}
734
735/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
736///
737/// The holder must still maintain the invariants.
738///
739/// Obtained from [`Manager::lock_then_process`].  See [`State`].
740#[derive(Educe, Deref, DerefMut)]
741#[educe(Debug)]
742struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
743    /// Reference to the mutable state
744    #[deref]
745    #[deref_mut]
746    state: MutexGuard<'s, State>,
747
748    /// Reference to the outer container
749    ///
750    /// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
751    /// for use by spawned tasks.
752    #[educe(Debug(ignore))]
753    mgr: &'s Arc<Manager<R, M>>,
754}
755
756impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
757    fn drop(&mut self) {
758        self.state.process(self.mgr);
759    }
760}
761
762impl State {
763    /// Ensure progress is made, by restoring all the liveness invariants
764    ///
765    /// This includes launching circuits as needed.
766    fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
767        // Restore liveness property *Running*
768        self.consider_launching(mgr);
769
770        let now_wall = mgr.runtime.wallclock();
771
772        // Mitigate clock warping
773        //
774        // If the earliest `SystemTime` is more than `max_refetch` away,
775        // the clock must have warped.  If that happens we clamp
776        // them all to `max_refetch`.
777        //
778        // (This is not perfect but will mitigate the worst effects by ensuring
779        // that we do *something* at least every `max_refetch`, in the worst case,
780        // other than just getting completely stuck.)
781        let max_refetch_wall = now_wall + self.config.max_refetch;
782        if self
783            .refetch_schedule
784            .peek()
785            .map(|re| re.when > max_refetch_wall)
786            == Some(true)
787        {
788            info!("bridge descriptor manager: clock warped, clamping refetch times");
789            self.refetch_schedule = self
790                .refetch_schedule
791                .drain()
792                .map(|mut re| {
793                    re.when = max_refetch_wall;
794                    re
795                })
796                .collect();
797        }
798
799        // Restore liveness property *Timeout**
800        // postage::watch will tell up the timeout task about the new wake-up time.
801        let new_earliest_timeout = [
802            // First retry.  These are std Instant.
803            self.retry_schedule.peek().map(|re| re.when),
804            // First refetch.  These are SystemTime, so we must convert them.
805            self.refetch_schedule.peek().map(|re| {
806                // If duration_since gives Err, that means when is before now,
807                // ie we should not be waiting: the wait duration should be 0.
808                let wait = re.when.duration_since(now_wall).unwrap_or_default();
809
810                mgr.runtime.now() + wait
811            }),
812        ]
813        .into_iter()
814        .flatten()
815        .min();
816        *self.earliest_timeout.borrow_mut() = new_earliest_timeout;
817    }
818
819    /// Launch download attempts if we can
820    ///
821    /// Specifically: if we have things in `queued`, and `running` is shorter than
822    /// `effective_parallelism()`, we launch task(s) to attempt download(s).
823    ///
824    /// Restores liveness invariant *Running*.
825    ///
826    /// Idempotent.  Forms part of `process`.
827    #[allow(clippy::blocks_in_conditions)]
828    fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
829        let mut to_remove = vec![];
830
831        while self.running.len() < self.effective_parallelism() {
832            let QueuedEntry {
833                bridge,
834                retry_delay,
835            } = match self.queued.pop_front() {
836                Some(qe) => qe,
837                None => break,
838            };
839            match mgr
840                .runtime
841                .spawn({
842                    let config = self.config.clone();
843                    let bridge = bridge.clone();
844                    let inner = mgr.clone();
845                    let mockable = inner.mockable.clone();
846
847                    // The task which actually downloads a descriptor.
848                    async move {
849                        let got =
850                            AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
851                                .catch_unwind()
852                                .await
853                                .unwrap_or_else(|_| {
854                                    Err(internal!("download descriptor task panicked!").into())
855                                });
856                        match &got {
857                            Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
858                            Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
859                        };
860                        let mut state = inner.lock_then_process();
861                        state.record_download_outcome(bridge, got);
862                        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
863                        // to make further progress and restore the liveness properties.
864                    }
865                })
866                .map(|()| JoinHandle)
867            {
868                Ok(join) => {
869                    self.running
870                        .insert(bridge, RunningInfo { join, retry_delay });
871                }
872                Err(_) => {
873                    // Spawn failed.
874                    //
875                    // We are going to forget about this bridge.
876                    // And we're going to do that without notifying anyone.
877                    // We *do* want to remove it from `current` because simply forgetting
878                    // about a refetch could leave expired data there.
879                    // We amortize this, so we don't do a lot of O(n^2) work on shutdown.
880                    to_remove.push(bridge);
881                }
882            }
883        }
884
885        if !to_remove.is_empty() {
886            self.modify_current(|current| {
887                for bridge in to_remove {
888                    current.remove(&bridge);
889                }
890            });
891        }
892    }
893
894    /// Modify `current` and notify subscribers
895    ///
896    /// Helper function which modifies only `current`, not any of the rest of the state.
897    /// it is the caller's responsibility to ensure that the invariants are upheld.
898    ///
899    /// The implementation actually involves cloning `current`,
900    /// so it is best to amortize calls to this function.
901    fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
902        let mut current = (*self.current).clone();
903        let r = f(&mut current);
904        self.set_current_and_notify(current);
905        r
906    }
907
908    /// Set `current` to a value and notify
909    ///
910    /// Helper function which modifies only `current`, not any of the rest of the state.
911    /// it is the caller's responsibility to ensure that the invariants are upheld.
912    fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
913        self.current = new.into();
914        self.subscribers.publish(BridgeDescEvent::SomethingChanged);
915    }
916
917    /// Obtain the currently-desired level of parallelism
918    ///
919    /// Helper function.  The return value depends the mutable state and also the `config`.
920    ///
921    /// This is how we implement dormancy.
922    fn effective_parallelism(&self) -> usize {
923        match self.dormancy {
924            Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
925            Dormancy::Dormant => 0,
926        }
927    }
928}
929
930impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
931    /// Record a download outcome.
932    ///
933    /// Final act of the descriptor download task.
934    /// `got` is from [`download_descriptor`](Manager::download_descriptor).
935    fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
936        let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
937            Some(ri) => ri,
938            None => {
939                debug!("bridge descriptor download completed for no-longer-configured bridge");
940                return;
941            }
942        };
943
944        let insert = match got {
945            Ok(Downloaded { desc, refetch }) => {
946                // Successful download.  Schedule the refetch, and we'll insert Ok.
947
948                self.refetch_schedule.push(RefetchEntry {
949                    when: refetch,
950                    bridge: bridge.clone(),
951                    retry_delay: (),
952                });
953
954                Ok(desc)
955            }
956            Err(err) => {
957                // Failed.  Schedule the retry, and we'll insert Err.
958
959                let mut retry_delay =
960                    retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
961
962                let retry = err.retry_time();
963                // We retry at least as early as
964                let now = self.mgr.runtime.now();
965                let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
966                // Retry at least as early as max_refetch.  That way if a bridge is
967                // misconfigured we will see it be fixed eventually.
968                let retry = {
969                    let earliest = now;
970                    let latest = || now + self.config.max_refetch;
971                    match retry {
972                        AbsRetryTime::Immediate => earliest,
973                        AbsRetryTime::Never => latest(),
974                        AbsRetryTime::At(i) => i.clamp(earliest, latest()),
975                    }
976                };
977                self.retry_schedule.push(RefetchEntry {
978                    when: retry,
979                    bridge: bridge.clone(),
980                    retry_delay,
981                });
982
983                Err(Box::new(err) as _)
984            }
985        };
986
987        self.modify_current(|current| current.insert(bridge, insert));
988    }
989}
990
991impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
992    /// Downloads a descriptor.
993    ///
994    /// The core of the descriptor download task
995    /// launched by `State::consider_launching`.
996    ///
997    /// Uses Mockable::download to actually get the document.
998    /// So most of this function is parsing and checking.
999    ///
1000    /// The returned value is precisely the `got` input to
1001    /// [`record_download_outcome`](StateGuard::record_download_outcome).
1002    async fn download_descriptor(
1003        &self,
1004        mockable: M,
1005        bridge: &BridgeConfig,
1006        config: &BridgeDescDownloadConfig,
1007    ) -> Result<Downloaded, Error> {
1008        // convenience alias, capturing the usual parameters from our variables.
1009        let process_document = |text| process_document(&self.runtime, config, text);
1010
1011        let store = || {
1012            self.store
1013                .lock()
1014                .map_err(|_| internal!("bridge descriptor store poisoned"))
1015        };
1016
1017        let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
1018            .unwrap_or_else(|err| {
1019                error_report!(
1020                    err,
1021                    r#"bridge descriptor cache lookup failed, for "{}""#,
1022                    sensitive(bridge),
1023                );
1024                None
1025            });
1026
1027        let now = self.runtime.wallclock();
1028        let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
1029            if cached.fetched > now {
1030                // was fetched "in the future"
1031                None
1032            } else {
1033                // let's see if it's any use
1034                match process_document(&cached.document) {
1035                    Err(err) => {
1036                        // We had a doc in the cache but our attempt to use it failed
1037                        // We wouldn't have written a bad cache entry.
1038                        // So one of the following must be true:
1039                        //  * We were buggy or are stricter now or something
1040                        //  * The document was valid but its validity time has expired
1041                        // In any case we can't reuse it.
1042                        // (This happens in normal operation, when a document expires.)
1043                        trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
1044                        None
1045                    }
1046                    Ok(got) => {
1047                        // The cached document looks valid.
1048                        // But how long ago did we fetch it?
1049                        // We need to enforce max_refresh even for still-valid documents.
1050                        if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
1051                            // Was fetched recently, too.  We can just reuse it.
1052                            return Ok(got);
1053                        }
1054                        Some(got)
1055                    }
1056                }
1057            }
1058        } else {
1059            None
1060        };
1061
1062        // If cached_good is Some, we found a plausible cache entry; if we got here, it was
1063        // past its max_refresh.  So in that case we want to send a request with
1064        // if-modified-since.  If we get Not Modified, we can reuse it (and update the fetched time).
1065        let if_modified_since = cached_good
1066            .as_ref()
1067            .map(|got| got.desc.as_ref().published());
1068
1069        debug!(
1070            r#"starting download for "{}"{}"#,
1071            bridge,
1072            match if_modified_since {
1073                Some(ims) => format!(
1074                    " if-modified-since {}",
1075                    humantime::format_rfc3339_seconds(ims),
1076                ),
1077                None => "".into(),
1078            }
1079        );
1080
1081        let text = mockable
1082            .clone()
1083            .download(&self.runtime, &self.circmgr, bridge, if_modified_since)
1084            .await?;
1085
1086        let (document, got) = if let Some(text) = text {
1087            let got = process_document(&text)?;
1088            (text, got)
1089        } else if let Some(cached) = cached_good {
1090            (
1091                cache_entry
1092                    .expect("cached_good but not cache_entry")
1093                    .document,
1094                cached,
1095            )
1096        } else {
1097            return Err(internal!("download gave None but no if-modified-since").into());
1098        };
1099
1100        // IEFI catches cache store errors, which we log but don't do anything else with
1101        (|| {
1102            let cached = CachedBridgeDescriptor {
1103                document,
1104                fetched: now, // this is from before we started the fetch, which is correct
1105            };
1106
1107            // Calculate when the cache should forget about this.
1108            // We want to add a bit of slop for the purposes of mild clock skew handling,
1109            // etc., and the prefetch time is a good proxy for that.
1110            let until = got
1111                .refetch
1112                .checked_add(config.prefetch)
1113                .unwrap_or(got.refetch /*uh*/);
1114
1115            store()?.store_bridgedesc(bridge, cached, until)?;
1116            Ok(())
1117        })()
1118        .unwrap_or_else(|err: crate::Error| {
1119            error_report!(err, "failed to cache downloaded bridge descriptor",);
1120        });
1121
1122        Ok(got)
1123    }
1124}
1125
1126/// Processes and analyses a textual descriptor document into a `Downloaded`
1127///
1128/// Parses it, checks the signature, checks the document validity times,
1129/// and if that's all good, calculates when will want to refetch it.
1130fn process_document<R: Runtime>(
1131    runtime: &R,
1132    config: &BridgeDescDownloadConfig,
1133    text: &str,
1134) -> Result<Downloaded, Error> {
1135    let desc = RouterDesc::parse(text)?;
1136
1137    // We *could* just trust this because we have trustworthy provenance
1138    // we know that the channel machinery authenticated the identity keys in `bridge`.
1139    // But let's do some cross-checking anyway.
1140    // `check_signature` checks the self-signature.
1141    let desc = desc.check_signature().map_err(Arc::new)?;
1142
1143    let now = runtime.wallclock();
1144    desc.is_valid_at(&now)?;
1145
1146    // Justification that use of "dangerously" is correct:
1147    // 1. We have checked this just above, so it is valid now.
1148    // 2. We are extracting the timeout and implement our own refetch logic using expires.
1149    let (desc, (_, expires)) = desc.dangerously_into_parts();
1150
1151    // Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
1152    // The following situations can result in a nominally-expired descriptor being used:
1153    //
1154    // 1. We primarily enforce the timeout by looking at the expiry time,
1155    //    subtracting a configured constant, and scheduling the start of a refetch then.
1156    //    If it takes us longer to do the retry, than the prefetch constant,
1157    //    we'll still be providing the old descriptor to consumers in the meantime.
1158    //
1159    // 2. We apply a minimum time before we will refetch a descriptor.
1160    //    So if the validity time is unreasonably short, we'll use it beyond that time.
1161    //
1162    // 3. Clock warping could confuse this algorithm.  This is inevitable because we
1163    //    are relying on calendar times (SystemTime) in the descriptor, and because
1164    //    we don't have a mechanism for being told about clock warps rather than the
1165    //    passage of time.
1166    //
1167    // We think this is all OK given that a bridge descriptor is used for trying to
1168    // connect to the bridge itself.  In particular, we don't want to completely trust
1169    // bridges to control our retry logic.
1170    let refetch = match expires {
1171        Some(expires) => expires
1172            .checked_sub(config.prefetch)
1173            .ok_or(Error::ExtremeValidityTime)?,
1174
1175        None => now
1176            .checked_add(config.max_refetch)
1177            .ok_or(Error::ExtremeValidityTime)?,
1178    };
1179    let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
1180
1181    let desc = BridgeDesc::new(Arc::new(desc));
1182
1183    Ok(Downloaded { desc, refetch })
1184}
1185
1186/// Task which waits for the timeout, and requeues bridges that need to be refetched
1187///
1188/// This task's job is to execute the wakeup instructions provided via `updates`.
1189///
1190/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
1191/// which is maintained to be the earliest time any of the schedules says we should wake up
1192/// (liveness property *Timeout*).
1193async fn timeout_task<R: Runtime, M: Mockable<R>>(
1194    runtime: R,
1195    inner: Weak<Manager<R, M>>,
1196    update: postage::watch::Receiver<Option<Instant>>,
1197) {
1198    /// Requeue things in `*_schedule` whose time for action has arrived
1199    ///
1200    /// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
1201    /// into the `Option` which appears in [`QueuedEntry`].
1202    ///
1203    /// Helper function.  Idempotent.
1204    fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
1205        queued: &mut VecDeque<QueuedEntry>,
1206        schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
1207        now: TT,
1208        retry_delay_map: RDM,
1209    ) {
1210        while let Some(ent) = schedule.peek() {
1211            if ent.when > now {
1212                break;
1213            }
1214            let re = schedule.pop().expect("schedule became empty!");
1215            let bridge = re.bridge;
1216            let retry_delay = retry_delay_map(re.retry_delay);
1217
1218            queued.push_back(QueuedEntry {
1219                bridge,
1220                retry_delay,
1221            });
1222        }
1223    }
1224
1225    let mut next_wakeup = Some(runtime.now());
1226    let mut update = update.fuse();
1227    loop {
1228        select! {
1229            // Someone modified the schedules, and sent us a new earliest timeout
1230            changed = update.next() => {
1231                // changed is Option<Option< >>.
1232                // The outer Option is from the Stream impl for watch::Receiver - None means EOF.
1233                // The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
1234                next_wakeup = if let Some(changed) = changed {
1235                    changed
1236                } else {
1237                    // Oh, actually, the watch::Receiver is EOF - we're to shut down
1238                    break
1239                }
1240            },
1241
1242            // Wait until the specified earliest wakeup time
1243            () = async {
1244                if let Some(next_wakeup) = next_wakeup {
1245                    let now = runtime.now();
1246                    if next_wakeup > now {
1247                        let duration = next_wakeup - now;
1248                        runtime.sleep(duration).await;
1249                    }
1250                } else {
1251                    #[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
1252                    { future::pending().await }
1253                }
1254            }.fuse() => {
1255                // We have reached the pre-programmed time.  Check what needs doing.
1256
1257                let inner = if let Some(i) = inner.upgrade() { i } else { break; };
1258                let mut state = inner.lock_then_process();
1259                let state = &mut **state; // Do the DerefMut once so we can borrow fields
1260
1261                requeue_as_required(
1262                    &mut state.queued,
1263                    &mut state.refetch_schedule,
1264                    runtime.wallclock(),
1265                    |()| None,
1266                );
1267
1268                requeue_as_required(
1269                    &mut state.queued,
1270                    &mut state.retry_schedule,
1271                    runtime.now(),
1272                    Some,
1273                );
1274
1275                // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
1276                // to make further progress and restore the liveness properties.
1277            }
1278        }
1279    }
1280}
1281
1282/// Error which occurs during bridge descriptor manager startup
1283#[derive(Clone, Debug, thiserror::Error)]
1284#[non_exhaustive]
1285pub enum StartupError {
1286    /// No circuit manager in the directory manager
1287    #[error(
1288        "tried to create bridge descriptor manager from directory manager with no circuit manager"
1289    )]
1290    MissingCircMgr,
1291
1292    /// Unable to spawn task
1293    //
1294    // TODO lots of our Errors have a variant exactly like this.
1295    // Maybe we should make a struct tor_error::SpawnError.
1296    #[error("Unable to spawn {spawning}")]
1297    Spawn {
1298        /// What we were trying to spawn.
1299        spawning: &'static str,
1300        /// What happened when we tried to spawn it.
1301        #[source]
1302        cause: Arc<SpawnError>,
1303    },
1304}
1305
1306impl HasKind for StartupError {
1307    fn kind(&self) -> ErrorKind {
1308        use ErrorKind as EK;
1309        use StartupError as SE;
1310        match self {
1311            SE::MissingCircMgr => EK::Internal,
1312            SE::Spawn { cause, .. } => cause.kind(),
1313        }
1314    }
1315}
1316
1317/// An error which occurred trying to obtain the descriptor for a particular bridge
1318#[derive(Clone, Debug, thiserror::Error)]
1319#[non_exhaustive]
1320pub enum Error {
1321    /// Couldn't establish a circuit to the bridge
1322    #[error("Failed to establish circuit")]
1323    CircuitFailed(#[from] tor_circmgr::Error),
1324
1325    /// Couldn't establish a directory stream to the bridge
1326    #[error("Failed to establish directory stream")]
1327    StreamFailed(#[source] tor_proto::Error),
1328
1329    /// Directory request failed
1330    #[error("Directory request failed")]
1331    RequestFailed(#[from] tor_dirclient::RequestFailedError),
1332
1333    /// Failed to parse descriptor in response
1334    #[error("Failed to parse descriptor in response")]
1335    ParseFailed(#[from] tor_netdoc::Error),
1336
1337    /// Signature check failed
1338    #[error("Signature check failed")]
1339    SignatureCheckFailed(#[from] Arc<signature::Error>),
1340
1341    /// Obtained descriptor but it is outside its validity time
1342    #[error("Descriptor is outside its validity time, as supplied")]
1343    BadValidityTime(#[from] tor_checkable::TimeValidityError),
1344
1345    /// A bridge descriptor has very extreme validity times
1346    /// such that our refetch time calculations overflow.
1347    #[error("Descriptor validity time range is too extreme for us to cope with")]
1348    ExtremeValidityTime,
1349
1350    /// There was a programming error somewhere in our code, or the calling code.
1351    #[error("Programming error")]
1352    Bug(#[from] tor_error::Bug),
1353
1354    /// Error used for testing
1355    #[cfg(test)]
1356    #[error("Error for testing, {0:?}, retry at {1:?}")]
1357    TestError(&'static str, RetryTime),
1358}
1359
1360impl HasKind for Error {
1361    fn kind(&self) -> ErrorKind {
1362        use Error as E;
1363        use ErrorKind as EK;
1364        let bridge_protocol_violation = EK::TorAccessFailed;
1365        match self {
1366            // We trust that tor_circmgr returns TorAccessFailed when it ought to.
1367            E::CircuitFailed(e) => e.kind(),
1368            E::StreamFailed(e) => e.kind(),
1369            E::RequestFailed(e) => e.kind(),
1370            E::ParseFailed(..) => bridge_protocol_violation,
1371            E::SignatureCheckFailed(..) => bridge_protocol_violation,
1372            E::ExtremeValidityTime => bridge_protocol_violation,
1373            E::BadValidityTime(..) => EK::ClockSkew,
1374            E::Bug(e) => e.kind(),
1375            #[cfg(test)]
1376            E::TestError(..) => EK::Internal,
1377        }
1378    }
1379}
1380
1381impl HasRetryTime for Error {
1382    fn retry_time(&self) -> RetryTime {
1383        use Error as E;
1384        use RetryTime as R;
1385        match self {
1386            // Errors with their own retry times
1387            E::CircuitFailed(e) => e.retry_time(),
1388
1389            // Remote misbehavior, maybe the network is being strange?
1390            E::StreamFailed(..) => R::AfterWaiting,
1391            E::RequestFailed(..) => R::AfterWaiting,
1392
1393            // Remote misconfiguration, detected *after* we successfully made the channel
1394            // (so not a network problem).  We'll say "never" for RetryTime,
1395            // even though actually we will in fact retry in at most `max_refetch`.
1396            E::ParseFailed(..) => R::Never,
1397            E::SignatureCheckFailed(..) => R::Never,
1398            E::BadValidityTime(..) => R::Never,
1399            E::ExtremeValidityTime => R::Never,
1400
1401            // Probably, things are broken here, rather than remotely.
1402            E::Bug(..) => R::Never,
1403
1404            #[cfg(test)]
1405            E::TestError(_, retry) => *retry,
1406        }
1407    }
1408}
1409
1410impl BridgeDescError for Error {}
1411
1412impl State {
1413    /// Consistency check (for testing)
1414    ///
1415    /// `input` should be what was passed to `set_bridges` (or `None` if not known).
1416    ///
1417    /// Does not make any changes.
1418    /// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
1419    #[cfg(test)]
1420    fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
1421    where
1422        R: Runtime,
1423        I: IntoIterator<Item = &'i BridgeKey>,
1424    {
1425        /// Where we found a thing was Tracked
1426        #[derive(Debug, Clone, Copy, Eq, PartialEq)]
1427        enum Where {
1428            /// Found in `running`
1429            Running,
1430            /// Found in `queued`
1431            Queued,
1432            /// Found in the schedule `sch`
1433            Schedule {
1434                sch_name: &'static str,
1435                /// Starts out as `false`, set to `true` when we find this in `current`
1436                found_in_current: bool,
1437            },
1438        }
1439
1440        /// Records the expected input from `input`, and what we have found so far
1441        struct Tracked {
1442            /// Were we told what the last `set_bridges` call got as input?
1443            known_input: bool,
1444            /// `Some` means we have seen this bridge in one our records (other than `current`)
1445            tracked: HashMap<BridgeKey, Option<Where>>,
1446            /// Earliest instant found in any schedule
1447            earliest: Option<Instant>,
1448        }
1449
1450        let mut tracked = if let Some(input) = input {
1451            let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
1452            Tracked {
1453                tracked,
1454                known_input: true,
1455                earliest: None,
1456            }
1457        } else {
1458            Tracked {
1459                tracked: HashMap::new(),
1460                known_input: false,
1461                earliest: None,
1462            }
1463        };
1464
1465        impl Tracked {
1466            /// Note that `bridge` is Tracked
1467            fn note(&mut self, where_: Where, b: &BridgeKey) {
1468                match self.tracked.get(b) {
1469                    // Invariant *Tracked* - ie appears at most once
1470                    Some(Some(prev_where)) => {
1471                        panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
1472                    }
1473                    // Invariant *Input (every tracked bridge is was in input)*
1474                    None if self.known_input => {
1475                        panic!("unexpected {:?} {:?}", where_, b);
1476                    }
1477                    // OK, we've not seen it before, note it as being here
1478                    _ => {
1479                        self.tracked.insert(b.clone(), Some(where_));
1480                    }
1481                }
1482            }
1483        }
1484
1485        /// Walk `schedule` and update `tracked` (including `tracked.earliest`)
1486        ///
1487        /// Check invariant *Tracked* and *Schedule* wrt this schedule.
1488        #[cfg(test)]
1489        fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
1490            tracked: &mut Tracked,
1491            sch_name: &'static str,
1492            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
1493            conv_time: CT,
1494        ) {
1495            let where_ = Where::Schedule {
1496                sch_name,
1497                found_in_current: false,
1498            };
1499
1500            if let Some(first) = schedule.peek() {
1501                // Of course this is a heap, so this ought to be a wasteful scan,
1502                // but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
1503                for re in schedule {
1504                    tracked.note(where_, &re.bridge);
1505                }
1506
1507                let scanned = schedule
1508                    .iter()
1509                    .map(|re| re.when)
1510                    .min()
1511                    .expect("schedule empty!");
1512                assert_eq!(scanned, first.when);
1513                tracked.earliest = Some(
1514                    [tracked.earliest, Some(conv_time(scanned))]
1515                        .into_iter()
1516                        .flatten()
1517                        .min()
1518                        .expect("flatten of chain Some was empty"),
1519                );
1520            }
1521        }
1522
1523        // *Timeout* (prep)
1524        //
1525        // This will fail if there is clock skew, but won't mind if
1526        // the earliest refetch time is in the past.
1527        let now_wall = runtime.wallclock();
1528        let now_mono = runtime.now();
1529        let adj_wall = |wallclock: SystemTime| {
1530            // Good grief what a palaver!
1531            if let Ok(ahead) = wallclock.duration_since(now_wall) {
1532                now_mono + ahead
1533            } else if let Ok(behind) = now_wall.duration_since(wallclock) {
1534                now_mono
1535                    .checked_sub(behind)
1536                    .expect("time subtraction underflow")
1537            } else {
1538                panic!("times should be totally ordered!")
1539            }
1540        };
1541
1542        // *Tracked*
1543        //
1544        // We walk our data structures in turn
1545
1546        for b in self.running.keys() {
1547            tracked.note(Where::Running, b);
1548        }
1549        for qe in &self.queued {
1550            tracked.note(Where::Queued, &qe.bridge);
1551        }
1552
1553        walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
1554        walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
1555
1556        // *Current*
1557        for b in self.current.keys() {
1558            let found = tracked
1559                .tracked
1560                .get_mut(b)
1561                .and_then(Option::as_mut)
1562                .unwrap_or_else(|| panic!("current but untracked {:?}", b));
1563            if let Where::Schedule {
1564                found_in_current, ..
1565            } = found
1566            {
1567                *found_in_current = true;
1568            }
1569        }
1570
1571        // *Input (sense: every input bridge is tracked)*
1572        //
1573        // (Will not cope if spawn ever failed, since that violates the invariant.)
1574        for (b, where_) in &tracked.tracked {
1575            match where_ {
1576                None => panic!("missing {}", &b),
1577                Some(Where::Schedule {
1578                    sch_name,
1579                    found_in_current,
1580                }) => {
1581                    assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
1582                }
1583                _ => {}
1584            }
1585        }
1586
1587        // *Limit*
1588        let parallelism = self.effective_parallelism();
1589        assert!(self.running.len() <= parallelism);
1590
1591        // *Running*
1592        assert!(self.running.len() == parallelism || self.queued.is_empty());
1593
1594        // *Timeout* (final)
1595        assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
1596    }
1597}