tor_dirmgr/bridgedesc.rs
1//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
2
3use std::borrow::Cow;
4use std::cmp::Ordering;
5use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
6use std::fmt::{self, Debug, Display};
7use std::num::NonZeroU8;
8use std::panic::AssertUnwindSafe;
9use std::sync::{Arc, Mutex, MutexGuard, Weak};
10use std::time::{Duration, Instant, SystemTime};
11
12use async_trait::async_trait;
13use derive_more::{Deref, DerefMut};
14use educe::Educe;
15use futures::future;
16use futures::select;
17use futures::stream::{BoxStream, StreamExt};
18use futures::task::{SpawnError, SpawnExt as _};
19use futures::FutureExt;
20use tracing::{debug, error, info, trace};
21
22use safelog::sensitive;
23use tor_basic_utils::retry::RetryDelay;
24use tor_basic_utils::BinaryHeapExt as _;
25use tor_checkable::{SelfSigned, Timebound};
26use tor_circmgr::CircMgr;
27use tor_error::{error_report, internal, ErrorKind, HasKind};
28use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
29use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
30use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
31use tor_netdoc::doc::routerdesc::RouterDesc;
32use tor_rtcompat::Runtime;
33
34use crate::event::FlagPublisher;
35use crate::storage::CachedBridgeDescriptor;
36use crate::{DirMgrStore, DynStore};
37
38#[cfg(test)]
39mod bdtest;
40
41/// The key we use in all our data structures
42///
43/// This type saves typing and would make it easier to change the bridge descriptor manager
44/// to take and handle another way of identifying the bridges it is working with.
45type BridgeKey = BridgeConfig;
46
47/// Active vs dormant state, as far as the bridge descriptor manager is concerned
48///
49/// This is usually derived in higher layers from `arti_client::DormantMode`,
50/// whether `TorClient::bootstrap()` has been called, etc.
51#[non_exhaustive]
52#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
53// TODO: These proliferating `Dormancy` enums should be centralized and unified with `TaskHandle`
54// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
55pub enum Dormancy {
56 /// Dormant (inactive)
57 ///
58 /// Bridge descriptor downloads, or refreshes, will not be started.
59 ///
60 /// In-progress downloads will be stopped if possible,
61 /// but they may continue until they complete (or fail).
62 // TODO async task cancellation: actually cancel these in this case
63 ///
64 /// So a dormant BridgeDescMgr may still continue to
65 /// change the return value from [`bridges()`](BridgeDescProvider::bridges)
66 /// and continue to report [`BridgeDescEvent`]s.
67 ///
68 /// When the BridgeDescMgr is dormant,
69 /// `bridges()` may return stale descriptors
70 /// (that is, descriptors which ought to have been refetched and may no longer be valid),
71 /// or stale errors
72 /// (that is, errors which occurred some time ago,
73 /// and which would normally have been retried by now).
74 Dormant,
75
76 /// Active
77 ///
78 /// Bridge descriptors will be downloaded as requested.
79 ///
80 /// When a bridge descriptor manager has been `Dormant`,
81 /// it may continue to provide stale data (as described)
82 /// for a while after it is made `Active`,
83 /// until the required refreshes and retries have taken place (or failed).
84 Active,
85}
86
87/// **Downloader and cache for bridges' router descriptors**
88///
89/// This is a handle which is cheap to clone and has internal mutability.
90#[derive(Clone)]
91pub struct BridgeDescMgr<R: Runtime, M = ()>
92where
93 M: Mockable<R>,
94{
95 /// The actual manager
96 ///
97 /// We have the `Arc` in here, rather than in our callers, because this
98 /// makes the API nicer for them, and also because some of our tasks
99 /// want a handle they can use to relock and modify the state.
100 mgr: Arc<Manager<R, M>>,
101}
102
103/// Configuration for the `BridgeDescMgr`
104///
105/// Currently, the only way to make this is via its `Default` impl.
106// TODO: there should be some way to override the defaults. See #629 for considerations.
107#[derive(Debug, Clone)]
108pub struct BridgeDescDownloadConfig {
109 /// How many bridge descriptor downloads to attempt in parallel?
110 parallelism: NonZeroU8,
111
112 /// Default/initial time to retry a failure to download a descriptor
113 ///
114 /// (This has the semantics of an initial delay for [`RetryDelay`],
115 /// and is used unless there is more specific retry information for the particular failure.)
116 retry: Duration,
117
118 /// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
119 prefetch: Duration,
120
121 /// Minimum interval between successive refetches of the descriptor for the same bridge
122 ///
123 /// This limits the download activity which can be caused by an errant bridge.
124 ///
125 /// If the descriptor's validity information is shorter than this, we will use
126 /// it after it has expired (rather than treating the bridge as broken).
127 min_refetch: Duration,
128
129 /// Maximum interval between successive refetches of the descriptor for the same bridge
130 ///
131 /// This sets an upper bound on how old a descriptor we are willing to use.
132 /// When this time expires, a refetch attempt will be started even if the
133 /// descriptor is not going to expire soon.
134 //
135 // TODO: When this is configurable, we need to make sure we reject
136 // configurations with max_refresh < min_refresh, or we may panic.
137 max_refetch: Duration,
138}
139
140impl Default for BridgeDescDownloadConfig {
141 fn default() -> Self {
142 let secs = Duration::from_secs;
143 BridgeDescDownloadConfig {
144 parallelism: 4.try_into().expect("parallelism is zero"),
145 retry: secs(30),
146 prefetch: secs(1000),
147 min_refetch: secs(3600),
148 max_refetch: secs(3600 * 3), // matches C Tor behaviour
149 }
150 }
151}
152
153/// Mockable internal methods for within the `BridgeDescMgr`
154///
155/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
156///
157/// This (`()`) is the default for the type parameter in
158/// [`BridgeDescMgr`],
159/// and it is the only publicly available implementation,
160/// since this trait is sealed.
161pub trait Mockable<R>: mockable::MockableAPI<R> {}
162impl<R: Runtime> Mockable<R> for () {}
163
164/// Private module which seals [`Mockable`]
165/// by containing [`MockableAPI`](mockable::MockableAPI)
166mod mockable {
167 use super::*;
168
169 /// Defines the actual mockable APIs
170 ///
171 /// Not nameable (and therefore not implementable)
172 /// outside the `bridgedesc` module,
173 #[async_trait]
174 pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
175 /// Circuit manager
176 type CircMgr: Send + Sync + 'static;
177
178 /// Download this bridge's descriptor, and return it as a string
179 ///
180 /// Runs in a task.
181 /// Called by `Manager::download_descriptor`, which handles parsing and validation.
182 ///
183 /// If `if_modified_since` is `Some`,
184 /// should tolerate an HTTP 304 Not Modified and return `None` in that case.
185 /// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
186 async fn download(
187 self,
188 runtime: &R,
189 circmgr: &Self::CircMgr,
190 bridge: &BridgeConfig,
191 if_modified_since: Option<SystemTime>,
192 ) -> Result<Option<String>, Error>;
193 }
194}
195#[async_trait]
196impl<R: Runtime> mockable::MockableAPI<R> for () {
197 type CircMgr = Arc<CircMgr<R>>;
198
199 /// Actual code for downloading a descriptor document
200 async fn download(
201 self,
202 runtime: &R,
203 circmgr: &Self::CircMgr,
204 bridge: &BridgeConfig,
205 _if_modified_since: Option<SystemTime>,
206 ) -> Result<Option<String>, Error> {
207 // TODO actually support _if_modified_since
208 let circuit = circmgr.get_or_launch_dir_specific(bridge).await?;
209 let mut stream = circuit
210 .begin_dir_stream()
211 .await
212 .map_err(Error::StreamFailed)?;
213 let request = tor_dirclient::request::RoutersOwnDescRequest::new();
214 let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
215 .await
216 .map_err(|dce| match dce {
217 tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
218 _ => internal!(
219 "tor_dirclient::send_request gave non-RequestFailed {:?}",
220 dce
221 )
222 .into(),
223 })?;
224 let output = response.into_output_string()?;
225 Ok(Some(output))
226 }
227}
228
229/// The actual manager.
230struct Manager<R: Runtime, M: Mockable<R>> {
231 /// The mutable state
232 state: Mutex<State>,
233
234 /// Runtime, used for tasks and sleeping
235 runtime: R,
236
237 /// Circuit manager, used for creating circuits
238 circmgr: M::CircMgr,
239
240 /// Persistent state store
241 store: Arc<Mutex<DynStore>>,
242
243 /// Mock for testing, usually `()`
244 mockable: M,
245}
246
247/// State: our downloaded descriptors (cache), and records of what we're doing
248///
249/// Various functions (both tasks and public entrypoints),
250/// which generally start with a `Manager`,
251/// lock the mutex and modify this.
252///
253/// Generally, the flow is:
254///
255/// * A public entrypoint, or task, obtains a [`StateGuard`].
256/// It modifies the state to represent the callers' new requirements,
257/// or things it has done, by updating the state,
258/// preserving the invariants but disturbing the "liveness" (see below).
259///
260/// * [`StateGuard::drop`] calls [`State::process`].
261/// This restores the liveness properties.
262///
263/// ### Possible states of a bridge:
264///
265/// A bridge can be in one of the following states,
266/// represented by its presence in these particular data structures inside `State`:
267///
268/// * `running`/`queued`: newly added, no outcome yet.
269/// * `current` + `running`/`queued`: we are fetching (or going to)
270/// * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
271/// * `current = Err` + `retry_schedule`: failed, will retry at some point
272///
273/// ### Invariants:
274///
275/// Can be disrupted in the middle of a principal function,
276/// but should be restored on return.
277///
278/// * **Tracked**:
279/// Each bridge appears at most once in
280/// `running`, `queued`, `refetch_schedule` and `retry_schedule`.
281/// We call such a bridge Tracked.
282///
283/// * **Current**
284/// Every bridge in `current` is Tracked.
285/// (But not every Tracked bridge is necessarily in `current`, yet.)
286///
287/// * **Schedules**
288/// Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
289///
290/// * **Input**:
291/// Exactly each bridge that was passed to
292/// the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
293/// (If we encountered spawn failures, we treat this as trying to shut down,
294/// so we cease attempts to get bridges, and discard the relevant state, violating this.)
295///
296/// * **Limit**:
297/// `running` is capped at the effective parallelism: zero if we are dormant,
298/// the configured parallelism otherwise.
299///
300/// ### Liveness properties:
301///
302/// These can be disrupted by any function which holds a [`StateGuard`].
303/// Will be restored by [`process()`](State::process),
304/// which is called when `StateGuard` is dropped.
305///
306/// Functions that take a `StateGuard` may disturb these invariants
307/// and rely on someone else to restore them.
308///
309/// * **Running**:
310/// If `queued` is nonempty, `running` is full.
311///
312/// * **Timeout**:
313/// `earliest_timeout` is the earliest timeout in
314/// either `retry_schedule` or `refetch_schedule`.
315/// (Disturbances of this property which occur due to system time warps
316/// are not necessarily detected and remedied in a timely way,
317/// but will be remedied no later than after `max_refetch`.)
318struct State {
319 /// Our configuration
320 config: Arc<BridgeDescDownloadConfig>,
321
322 /// People who will be told when `current` changes.
323 subscribers: FlagPublisher<BridgeDescEvent>,
324
325 /// Our current idea of our output, which we give out handles onto.
326 current: Arc<BridgeDescList>,
327
328 /// Bridges whose descriptors we are currently downloading.
329 running: HashMap<BridgeKey, RunningInfo>,
330
331 /// Bridges which we want to download,
332 /// but we're waiting for `running` to be less than `effective_parallelism()`.
333 queued: VecDeque<QueuedEntry>,
334
335 /// Are we dormant?
336 dormancy: Dormancy,
337
338 /// Bridges that we have a descriptor for,
339 /// and when they should be refetched due to validity expiry.
340 ///
341 /// This is indexed by `SystemTime` because that helps avoids undesirable behaviors
342 /// when the system clock changes.
343 refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
344
345 /// Bridges that failed earlier, and when they should be retried.
346 retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
347
348 /// Earliest time from either `retry_schedule` or `refetch_schedule`
349 ///
350 /// `None` means "wait indefinitely".
351 earliest_timeout: postage::watch::Sender<Option<Instant>>,
352}
353
354impl Debug for State {
355 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356 /// Helper to format one bridge entry somewhere
357 fn fmt_bridge(
358 f: &mut fmt::Formatter,
359 b: &BridgeConfig,
360 info: &(dyn Display + '_),
361 ) -> fmt::Result {
362 let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
363 writeln!(f, " {:80.80} | {}", info, b)
364 }
365
366 /// Helper to format one of the schedules
367 fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
368 f: &mut fmt::Formatter,
369 summary: &str,
370 name: &str,
371 schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
372 ) -> fmt::Result {
373 writeln!(f, " {}:", name)?;
374 for b in schedule {
375 fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
376 }
377 Ok(())
378 }
379
380 // We are going to have to go multi-line because of the bridge lines,
381 // so do completely bespoke formatting rather than `std::fmt::DebugStruct`
382 // or a derive.
383 writeln!(f, "State {{")?;
384 // We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
385 writeln!(f, " earliest_timeout: ???, ..,")?;
386 writeln!(f, " current:")?;
387 for (b, v) in &*self.current {
388 fmt_bridge(
389 f,
390 b,
391 &match v {
392 Err(e) => Cow::from(format!("C Err {}", e)),
393 Ok(_) => "C Ok".into(),
394 },
395 )?;
396 }
397 writeln!(f, " running:")?;
398 for b in self.running.keys() {
399 fmt_bridge(f, b, &"R")?;
400 }
401 writeln!(f, " queued:")?;
402 for qe in &self.queued {
403 fmt_bridge(f, &qe.bridge, &"Q")?;
404 }
405 fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
406 fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
407 write!(f, "}}")?;
408
409 Ok(())
410 }
411}
412
413/// Value of the entry in `running`
414#[derive(Debug)]
415struct RunningInfo {
416 /// For cancelling downloads no longer wanted
417 join: JoinHandle,
418
419 /// If this previously failed, the persistent retry delay.
420 retry_delay: Option<RetryDelay>,
421}
422
423/// Entry in `queued`
424#[derive(Debug)]
425struct QueuedEntry {
426 /// The bridge to fetch
427 bridge: BridgeKey,
428
429 /// If this previously failed, the persistent retry delay.
430 retry_delay: Option<RetryDelay>,
431}
432
433/// Entry in one of the `*_schedule`s
434///
435/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
436/// So don't deduplicate by `[Partial]Eq`, or use as a key in a map.
437#[derive(Debug)]
438struct RefetchEntry<TT, RD> {
439 /// When should we requeued this bridge for fetching
440 ///
441 /// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
442 when: TT,
443
444 /// The bridge to refetch
445 bridge: BridgeKey,
446
447 /// Retry delay
448 ///
449 /// `RetryDelay` if we previously failed (ie, if this is a retry entry);
450 /// otherwise `()`.
451 retry_delay: RD,
452}
453
454impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
455 fn cmp(&self, other: &Self) -> Ordering {
456 self.when.cmp(&other.when).reverse()
457 // We don't care about the ordering of BridgeConfig or retry_delay.
458 // Different BridgeConfig with the same fetch time will be fetched in "some order".
459 }
460}
461
462impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
463 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
464 Some(self.cmp(other))
465 }
466}
467
468impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
469 fn eq(&self, other: &Self) -> bool {
470 self.cmp(other) == Ordering::Equal
471 }
472}
473
474impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
475
476/// Dummy task join handle
477///
478/// We would like to be able to cancel now-redundant downloads
479/// using something like `tokio::task::JoinHandle::abort()`.
480/// tor-rtcompat doesn't support that so we stub it for now.
481///
482/// Providing this stub means the place where the cancellation needs to take place
483/// already has the appropriate call to our [`JoinHandle::abort`].
484#[derive(Debug)]
485struct JoinHandle;
486
487impl JoinHandle {
488 /// Would abort this async task, if we could do that.
489 fn abort(&self) {}
490}
491
492impl<R: Runtime> BridgeDescMgr<R> {
493 /// Create a new `BridgeDescMgr`
494 ///
495 /// This is the public constructor.
496 //
497 // TODO: That this constructor requires a DirMgr is rather odd.
498 // In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
499 // However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
500 // implementation is constructible only from the dirmgr's config. This should probably be
501 // tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
502 pub fn new(
503 config: &BridgeDescDownloadConfig,
504 runtime: R,
505 store: DirMgrStore<R>,
506 circmgr: Arc<tor_circmgr::CircMgr<R>>,
507 dormancy: Dormancy,
508 ) -> Result<Self, StartupError> {
509 Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
510 }
511}
512
513/// If download was successful, what we obtained
514///
515/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
516#[derive(Debug)]
517struct Downloaded {
518 /// The bridge descriptor, fully parsed and verified
519 desc: BridgeDesc,
520
521 /// When we should start a refresh for this descriptor
522 ///
523 /// This is derived from the expiry time,
524 /// and clamped according to limits in the configuration).
525 refetch: SystemTime,
526}
527
528impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
529 /// Actual constructor, which takes a mockable
530 //
531 // Allow passing `runtime` by value, which is usual API for this kind of setup function.
532 #[allow(clippy::needless_pass_by_value)]
533 fn new_internal(
534 runtime: R,
535 circmgr: M::CircMgr,
536 store: Arc<Mutex<DynStore>>,
537 config: &BridgeDescDownloadConfig,
538 dormancy: Dormancy,
539 mockable: M,
540 ) -> Result<Self, StartupError> {
541 /// Convenience alias
542 fn default<T: Default>() -> T {
543 Default::default()
544 }
545
546 let config = config.clone().into();
547 let (earliest_timeout, timeout_update) = postage::watch::channel();
548
549 let state = Mutex::new(State {
550 config,
551 subscribers: default(),
552 current: default(),
553 running: default(),
554 queued: default(),
555 dormancy,
556 retry_schedule: default(),
557 refetch_schedule: default(),
558 earliest_timeout,
559 });
560 let mgr = Arc::new(Manager {
561 state,
562 runtime: runtime.clone(),
563 circmgr,
564 store,
565 mockable,
566 });
567
568 runtime
569 .spawn(timeout_task(
570 runtime.clone(),
571 Arc::downgrade(&mgr),
572 timeout_update,
573 ))
574 .map_err(|cause| StartupError::Spawn {
575 spawning: "timeout task",
576 cause: cause.into(),
577 })?;
578
579 Ok(BridgeDescMgr { mgr })
580 }
581
582 /// Consistency check convenience wrapper
583 #[cfg(test)]
584 fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
585 where
586 I: IntoIterator<Item = &'i BridgeKey>,
587 {
588 self.mgr
589 .lock_only()
590 .check_consistency(&self.mgr.runtime, input_bridges);
591 }
592
593 /// Set whether this `BridgeDescMgr` is active
594 // TODO this should instead be handled by a central mechanism; see TODO on Dormancy
595 pub fn set_dormancy(&self, dormancy: Dormancy) {
596 self.mgr.lock_then_process().dormancy = dormancy;
597 }
598}
599
600impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
601 fn bridges(&self) -> Arc<BridgeDescList> {
602 self.mgr.lock_only().current.clone()
603 }
604
605 fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
606 let stream = self.mgr.lock_only().subscribers.subscribe();
607 Box::pin(stream) as _
608 }
609
610 fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
611 /// Helper: Called for each bridge that is currently Tracked.
612 ///
613 /// Checks if `new_bridges` has `bridge`. If so, removes it from `new_bridges`,
614 /// and returns `true`, indicating that this bridge should be kept.
615 ///
616 /// If not, returns `false`, indicating that this bridge should be removed,
617 /// and logs a message.
618 fn note_found_keep_p(
619 new_bridges: &mut HashSet<BridgeKey>,
620 bridge: &BridgeKey,
621 was_state: &str,
622 ) -> bool {
623 let keep = new_bridges.remove(bridge);
624 if !keep {
625 debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
626 }
627 keep
628 }
629
630 /// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
631 /// removing them as we go.
632 fn filter_schedule<TT: Ord + Copy, RD>(
633 new_bridges: &mut HashSet<BridgeKey>,
634 schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
635 was_state: &str,
636 ) {
637 schedule.retain_ext(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
638 }
639
640 let mut state = self.mgr.lock_then_process();
641 let state = &mut **state;
642
643 // We go through our own data structures, comparing them with `new_bridges`.
644 // Entries in our own structures that aren't in `new_bridges` are removed.
645 // Entries that *are* are removed from `new_bridges`.
646 // Eventually `new_bridges` is just the list of new bridges to *add*.
647 let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
648
649 // Is there anything in `current` that ought to be deleted?
650 if state.current.keys().any(|b| !new_bridges.contains(b)) {
651 // Found a bridge In `current` but not `new`
652 // We need to remove it (and any others like it) from `current`.
653 //
654 // Disturbs the invariant *Schedules*:
655 // After this maybe the schedules have entries they shouldn't.
656 let current: BridgeDescList = state
657 .current
658 .iter()
659 .filter(|(b, _)| new_bridges.contains(&**b))
660 .map(|(b, v)| (b.clone(), v.clone()))
661 .collect();
662 state.set_current_and_notify(current);
663 } else {
664 // Nothing is being removed, so we can keep `current`.
665 }
666 // Bridges being newly requested will be added to `current`
667 // later, after they have been fetched.
668
669 // Is there anything in running we should abort?
670 state.running.retain(|b, ri| {
671 let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
672 if !keep {
673 ri.join.abort();
674 }
675 keep
676 });
677
678 // Is there anything in queued we should forget about?
679 state
680 .queued
681 .retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
682
683 // Restore the invariant *Schedules*, that the schedules contain only things in current,
684 // by removing the same things from the schedules that we earlier removed from current.
685 filter_schedule(
686 &mut new_bridges,
687 &mut state.retry_schedule,
688 "previously failed",
689 );
690 filter_schedule(
691 &mut new_bridges,
692 &mut state.refetch_schedule,
693 "previously downloaded",
694 );
695
696 // OK now we have the list of bridges to add (if any).
697 state.queued.extend(new_bridges.into_iter().map(|bridge| {
698 debug!(r#" added bridge, queueing for download "{}""#, &bridge);
699 QueuedEntry {
700 bridge,
701 retry_delay: None,
702 }
703 }));
704
705 // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
706 // to make further progress and restore the liveness properties.
707 }
708}
709
710impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
711 /// Obtain a lock on state, for functions that want to disrupt liveness properties
712 ///
713 /// When `StateGuard` is dropped, the liveness properties will be restored
714 /// by making whatever progress is required.
715 ///
716 /// See [`State`].
717 fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
718 StateGuard {
719 state: self.lock_only(),
720 mgr: self,
721 }
722 }
723
724 /// Obtains the lock on state.
725 ///
726 /// Caller ought not to modify state
727 /// so as to invalidate invariants or liveness properties.
728 /// Callers which are part of the algorithms in this crate
729 /// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
730 fn lock_only(&self) -> MutexGuard<State> {
731 self.state.lock().expect("bridge desc manager poisoned")
732 }
733}
734
735/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
736///
737/// The holder must still maintain the invariants.
738///
739/// Obtained from [`Manager::lock_then_process`]. See [`State`].
740#[derive(Educe, Deref, DerefMut)]
741#[educe(Debug)]
742struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
743 /// Reference to the mutable state
744 #[deref]
745 #[deref_mut]
746 state: MutexGuard<'s, State>,
747
748 /// Reference to the outer container
749 ///
750 /// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
751 /// for use by spawned tasks.
752 #[educe(Debug(ignore))]
753 mgr: &'s Arc<Manager<R, M>>,
754}
755
756impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
757 fn drop(&mut self) {
758 self.state.process(self.mgr);
759 }
760}
761
762impl State {
763 /// Ensure progress is made, by restoring all the liveness invariants
764 ///
765 /// This includes launching circuits as needed.
766 fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
767 // Restore liveness property *Running*
768 self.consider_launching(mgr);
769
770 let now_wall = mgr.runtime.wallclock();
771
772 // Mitigate clock warping
773 //
774 // If the earliest `SystemTime` is more than `max_refetch` away,
775 // the clock must have warped. If that happens we clamp
776 // them all to `max_refetch`.
777 //
778 // (This is not perfect but will mitigate the worst effects by ensuring
779 // that we do *something* at least every `max_refetch`, in the worst case,
780 // other than just getting completely stuck.)
781 let max_refetch_wall = now_wall + self.config.max_refetch;
782 if self
783 .refetch_schedule
784 .peek()
785 .map(|re| re.when > max_refetch_wall)
786 == Some(true)
787 {
788 info!("bridge descriptor manager: clock warped, clamping refetch times");
789 self.refetch_schedule = self
790 .refetch_schedule
791 .drain()
792 .map(|mut re| {
793 re.when = max_refetch_wall;
794 re
795 })
796 .collect();
797 }
798
799 // Restore liveness property *Timeout**
800 // postage::watch will tell up the timeout task about the new wake-up time.
801 let new_earliest_timeout = [
802 // First retry. These are std Instant.
803 self.retry_schedule.peek().map(|re| re.when),
804 // First refetch. These are SystemTime, so we must convert them.
805 self.refetch_schedule.peek().map(|re| {
806 // If duration_since gives Err, that means when is before now,
807 // ie we should not be waiting: the wait duration should be 0.
808 let wait = re.when.duration_since(now_wall).unwrap_or_default();
809
810 mgr.runtime.now() + wait
811 }),
812 ]
813 .into_iter()
814 .flatten()
815 .min();
816 *self.earliest_timeout.borrow_mut() = new_earliest_timeout;
817 }
818
819 /// Launch download attempts if we can
820 ///
821 /// Specifically: if we have things in `queued`, and `running` is shorter than
822 /// `effective_parallelism()`, we launch task(s) to attempt download(s).
823 ///
824 /// Restores liveness invariant *Running*.
825 ///
826 /// Idempotent. Forms part of `process`.
827 #[allow(clippy::blocks_in_conditions)]
828 fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
829 let mut to_remove = vec![];
830
831 while self.running.len() < self.effective_parallelism() {
832 let QueuedEntry {
833 bridge,
834 retry_delay,
835 } = match self.queued.pop_front() {
836 Some(qe) => qe,
837 None => break,
838 };
839 match mgr
840 .runtime
841 .spawn({
842 let config = self.config.clone();
843 let bridge = bridge.clone();
844 let inner = mgr.clone();
845 let mockable = inner.mockable.clone();
846
847 // The task which actually downloads a descriptor.
848 async move {
849 let got =
850 AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
851 .catch_unwind()
852 .await
853 .unwrap_or_else(|_| {
854 Err(internal!("download descriptor task panicked!").into())
855 });
856 match &got {
857 Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
858 Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
859 };
860 let mut state = inner.lock_then_process();
861 state.record_download_outcome(bridge, got);
862 // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
863 // to make further progress and restore the liveness properties.
864 }
865 })
866 .map(|()| JoinHandle)
867 {
868 Ok(join) => {
869 self.running
870 .insert(bridge, RunningInfo { join, retry_delay });
871 }
872 Err(_) => {
873 // Spawn failed.
874 //
875 // We are going to forget about this bridge.
876 // And we're going to do that without notifying anyone.
877 // We *do* want to remove it from `current` because simply forgetting
878 // about a refetch could leave expired data there.
879 // We amortize this, so we don't do a lot of O(n^2) work on shutdown.
880 to_remove.push(bridge);
881 }
882 }
883 }
884
885 if !to_remove.is_empty() {
886 self.modify_current(|current| {
887 for bridge in to_remove {
888 current.remove(&bridge);
889 }
890 });
891 }
892 }
893
894 /// Modify `current` and notify subscribers
895 ///
896 /// Helper function which modifies only `current`, not any of the rest of the state.
897 /// it is the caller's responsibility to ensure that the invariants are upheld.
898 ///
899 /// The implementation actually involves cloning `current`,
900 /// so it is best to amortize calls to this function.
901 fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
902 let mut current = (*self.current).clone();
903 let r = f(&mut current);
904 self.set_current_and_notify(current);
905 r
906 }
907
908 /// Set `current` to a value and notify
909 ///
910 /// Helper function which modifies only `current`, not any of the rest of the state.
911 /// it is the caller's responsibility to ensure that the invariants are upheld.
912 fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
913 self.current = new.into();
914 self.subscribers.publish(BridgeDescEvent::SomethingChanged);
915 }
916
917 /// Obtain the currently-desired level of parallelism
918 ///
919 /// Helper function. The return value depends the mutable state and also the `config`.
920 ///
921 /// This is how we implement dormancy.
922 fn effective_parallelism(&self) -> usize {
923 match self.dormancy {
924 Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
925 Dormancy::Dormant => 0,
926 }
927 }
928}
929
930impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
931 /// Record a download outcome.
932 ///
933 /// Final act of the descriptor download task.
934 /// `got` is from [`download_descriptor`](Manager::download_descriptor).
935 fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
936 let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
937 Some(ri) => ri,
938 None => {
939 debug!("bridge descriptor download completed for no-longer-configured bridge");
940 return;
941 }
942 };
943
944 let insert = match got {
945 Ok(Downloaded { desc, refetch }) => {
946 // Successful download. Schedule the refetch, and we'll insert Ok.
947
948 self.refetch_schedule.push(RefetchEntry {
949 when: refetch,
950 bridge: bridge.clone(),
951 retry_delay: (),
952 });
953
954 Ok(desc)
955 }
956 Err(err) => {
957 // Failed. Schedule the retry, and we'll insert Err.
958
959 let mut retry_delay =
960 retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
961
962 let retry = err.retry_time();
963 // We retry at least as early as
964 let now = self.mgr.runtime.now();
965 let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
966 // Retry at least as early as max_refetch. That way if a bridge is
967 // misconfigured we will see it be fixed eventually.
968 let retry = {
969 let earliest = now;
970 let latest = || now + self.config.max_refetch;
971 match retry {
972 AbsRetryTime::Immediate => earliest,
973 AbsRetryTime::Never => latest(),
974 AbsRetryTime::At(i) => i.clamp(earliest, latest()),
975 }
976 };
977 self.retry_schedule.push(RefetchEntry {
978 when: retry,
979 bridge: bridge.clone(),
980 retry_delay,
981 });
982
983 Err(Box::new(err) as _)
984 }
985 };
986
987 self.modify_current(|current| current.insert(bridge, insert));
988 }
989}
990
991impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
992 /// Downloads a descriptor.
993 ///
994 /// The core of the descriptor download task
995 /// launched by `State::consider_launching`.
996 ///
997 /// Uses Mockable::download to actually get the document.
998 /// So most of this function is parsing and checking.
999 ///
1000 /// The returned value is precisely the `got` input to
1001 /// [`record_download_outcome`](StateGuard::record_download_outcome).
1002 async fn download_descriptor(
1003 &self,
1004 mockable: M,
1005 bridge: &BridgeConfig,
1006 config: &BridgeDescDownloadConfig,
1007 ) -> Result<Downloaded, Error> {
1008 // convenience alias, capturing the usual parameters from our variables.
1009 let process_document = |text| process_document(&self.runtime, config, text);
1010
1011 let store = || {
1012 self.store
1013 .lock()
1014 .map_err(|_| internal!("bridge descriptor store poisoned"))
1015 };
1016
1017 let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
1018 .unwrap_or_else(|err| {
1019 error_report!(
1020 err,
1021 r#"bridge descriptor cache lookup failed, for "{}""#,
1022 sensitive(bridge),
1023 );
1024 None
1025 });
1026
1027 let now = self.runtime.wallclock();
1028 let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
1029 if cached.fetched > now {
1030 // was fetched "in the future"
1031 None
1032 } else {
1033 // let's see if it's any use
1034 match process_document(&cached.document) {
1035 Err(err) => {
1036 // We had a doc in the cache but our attempt to use it failed
1037 // We wouldn't have written a bad cache entry.
1038 // So one of the following must be true:
1039 // * We were buggy or are stricter now or something
1040 // * The document was valid but its validity time has expired
1041 // In any case we can't reuse it.
1042 // (This happens in normal operation, when a document expires.)
1043 trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
1044 None
1045 }
1046 Ok(got) => {
1047 // The cached document looks valid.
1048 // But how long ago did we fetch it?
1049 // We need to enforce max_refresh even for still-valid documents.
1050 if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
1051 // Was fetched recently, too. We can just reuse it.
1052 return Ok(got);
1053 }
1054 Some(got)
1055 }
1056 }
1057 }
1058 } else {
1059 None
1060 };
1061
1062 // If cached_good is Some, we found a plausible cache entry; if we got here, it was
1063 // past its max_refresh. So in that case we want to send a request with
1064 // if-modified-since. If we get Not Modified, we can reuse it (and update the fetched time).
1065 let if_modified_since = cached_good
1066 .as_ref()
1067 .map(|got| got.desc.as_ref().published());
1068
1069 debug!(
1070 r#"starting download for "{}"{}"#,
1071 bridge,
1072 match if_modified_since {
1073 Some(ims) => format!(
1074 " if-modified-since {}",
1075 humantime::format_rfc3339_seconds(ims),
1076 ),
1077 None => "".into(),
1078 }
1079 );
1080
1081 let text = mockable
1082 .clone()
1083 .download(&self.runtime, &self.circmgr, bridge, if_modified_since)
1084 .await?;
1085
1086 let (document, got) = if let Some(text) = text {
1087 let got = process_document(&text)?;
1088 (text, got)
1089 } else if let Some(cached) = cached_good {
1090 (
1091 cache_entry
1092 .expect("cached_good but not cache_entry")
1093 .document,
1094 cached,
1095 )
1096 } else {
1097 return Err(internal!("download gave None but no if-modified-since").into());
1098 };
1099
1100 // IEFI catches cache store errors, which we log but don't do anything else with
1101 (|| {
1102 let cached = CachedBridgeDescriptor {
1103 document,
1104 fetched: now, // this is from before we started the fetch, which is correct
1105 };
1106
1107 // Calculate when the cache should forget about this.
1108 // We want to add a bit of slop for the purposes of mild clock skew handling,
1109 // etc., and the prefetch time is a good proxy for that.
1110 let until = got
1111 .refetch
1112 .checked_add(config.prefetch)
1113 .unwrap_or(got.refetch /*uh*/);
1114
1115 store()?.store_bridgedesc(bridge, cached, until)?;
1116 Ok(())
1117 })()
1118 .unwrap_or_else(|err: crate::Error| {
1119 error_report!(err, "failed to cache downloaded bridge descriptor",);
1120 });
1121
1122 Ok(got)
1123 }
1124}
1125
1126/// Processes and analyses a textual descriptor document into a `Downloaded`
1127///
1128/// Parses it, checks the signature, checks the document validity times,
1129/// and if that's all good, calculates when will want to refetch it.
1130fn process_document<R: Runtime>(
1131 runtime: &R,
1132 config: &BridgeDescDownloadConfig,
1133 text: &str,
1134) -> Result<Downloaded, Error> {
1135 let desc = RouterDesc::parse(text)?;
1136
1137 // We *could* just trust this because we have trustworthy provenance
1138 // we know that the channel machinery authenticated the identity keys in `bridge`.
1139 // But let's do some cross-checking anyway.
1140 // `check_signature` checks the self-signature.
1141 let desc = desc.check_signature().map_err(Arc::new)?;
1142
1143 let now = runtime.wallclock();
1144 desc.is_valid_at(&now)?;
1145
1146 // Justification that use of "dangerously" is correct:
1147 // 1. We have checked this just above, so it is valid now.
1148 // 2. We are extracting the timeout and implement our own refetch logic using expires.
1149 let (desc, (_, expires)) = desc.dangerously_into_parts();
1150
1151 // Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
1152 // The following situations can result in a nominally-expired descriptor being used:
1153 //
1154 // 1. We primarily enforce the timeout by looking at the expiry time,
1155 // subtracting a configured constant, and scheduling the start of a refetch then.
1156 // If it takes us longer to do the retry, than the prefetch constant,
1157 // we'll still be providing the old descriptor to consumers in the meantime.
1158 //
1159 // 2. We apply a minimum time before we will refetch a descriptor.
1160 // So if the validity time is unreasonably short, we'll use it beyond that time.
1161 //
1162 // 3. Clock warping could confuse this algorithm. This is inevitable because we
1163 // are relying on calendar times (SystemTime) in the descriptor, and because
1164 // we don't have a mechanism for being told about clock warps rather than the
1165 // passage of time.
1166 //
1167 // We think this is all OK given that a bridge descriptor is used for trying to
1168 // connect to the bridge itself. In particular, we don't want to completely trust
1169 // bridges to control our retry logic.
1170 let refetch = match expires {
1171 Some(expires) => expires
1172 .checked_sub(config.prefetch)
1173 .ok_or(Error::ExtremeValidityTime)?,
1174
1175 None => now
1176 .checked_add(config.max_refetch)
1177 .ok_or(Error::ExtremeValidityTime)?,
1178 };
1179 let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
1180
1181 let desc = BridgeDesc::new(Arc::new(desc));
1182
1183 Ok(Downloaded { desc, refetch })
1184}
1185
1186/// Task which waits for the timeout, and requeues bridges that need to be refetched
1187///
1188/// This task's job is to execute the wakeup instructions provided via `updates`.
1189///
1190/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
1191/// which is maintained to be the earliest time any of the schedules says we should wake up
1192/// (liveness property *Timeout*).
1193async fn timeout_task<R: Runtime, M: Mockable<R>>(
1194 runtime: R,
1195 inner: Weak<Manager<R, M>>,
1196 update: postage::watch::Receiver<Option<Instant>>,
1197) {
1198 /// Requeue things in `*_schedule` whose time for action has arrived
1199 ///
1200 /// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
1201 /// into the `Option` which appears in [`QueuedEntry`].
1202 ///
1203 /// Helper function. Idempotent.
1204 fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
1205 queued: &mut VecDeque<QueuedEntry>,
1206 schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
1207 now: TT,
1208 retry_delay_map: RDM,
1209 ) {
1210 while let Some(ent) = schedule.peek() {
1211 if ent.when > now {
1212 break;
1213 }
1214 let re = schedule.pop().expect("schedule became empty!");
1215 let bridge = re.bridge;
1216 let retry_delay = retry_delay_map(re.retry_delay);
1217
1218 queued.push_back(QueuedEntry {
1219 bridge,
1220 retry_delay,
1221 });
1222 }
1223 }
1224
1225 let mut next_wakeup = Some(runtime.now());
1226 let mut update = update.fuse();
1227 loop {
1228 select! {
1229 // Someone modified the schedules, and sent us a new earliest timeout
1230 changed = update.next() => {
1231 // changed is Option<Option< >>.
1232 // The outer Option is from the Stream impl for watch::Receiver - None means EOF.
1233 // The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
1234 next_wakeup = if let Some(changed) = changed {
1235 changed
1236 } else {
1237 // Oh, actually, the watch::Receiver is EOF - we're to shut down
1238 break
1239 }
1240 },
1241
1242 // Wait until the specified earliest wakeup time
1243 () = async {
1244 if let Some(next_wakeup) = next_wakeup {
1245 let now = runtime.now();
1246 if next_wakeup > now {
1247 let duration = next_wakeup - now;
1248 runtime.sleep(duration).await;
1249 }
1250 } else {
1251 #[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
1252 { future::pending().await }
1253 }
1254 }.fuse() => {
1255 // We have reached the pre-programmed time. Check what needs doing.
1256
1257 let inner = if let Some(i) = inner.upgrade() { i } else { break; };
1258 let mut state = inner.lock_then_process();
1259 let state = &mut **state; // Do the DerefMut once so we can borrow fields
1260
1261 requeue_as_required(
1262 &mut state.queued,
1263 &mut state.refetch_schedule,
1264 runtime.wallclock(),
1265 |()| None,
1266 );
1267
1268 requeue_as_required(
1269 &mut state.queued,
1270 &mut state.retry_schedule,
1271 runtime.now(),
1272 Some,
1273 );
1274
1275 // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
1276 // to make further progress and restore the liveness properties.
1277 }
1278 }
1279 }
1280}
1281
1282/// Error which occurs during bridge descriptor manager startup
1283#[derive(Clone, Debug, thiserror::Error)]
1284#[non_exhaustive]
1285pub enum StartupError {
1286 /// No circuit manager in the directory manager
1287 #[error(
1288 "tried to create bridge descriptor manager from directory manager with no circuit manager"
1289 )]
1290 MissingCircMgr,
1291
1292 /// Unable to spawn task
1293 //
1294 // TODO lots of our Errors have a variant exactly like this.
1295 // Maybe we should make a struct tor_error::SpawnError.
1296 #[error("Unable to spawn {spawning}")]
1297 Spawn {
1298 /// What we were trying to spawn.
1299 spawning: &'static str,
1300 /// What happened when we tried to spawn it.
1301 #[source]
1302 cause: Arc<SpawnError>,
1303 },
1304}
1305
1306impl HasKind for StartupError {
1307 fn kind(&self) -> ErrorKind {
1308 use ErrorKind as EK;
1309 use StartupError as SE;
1310 match self {
1311 SE::MissingCircMgr => EK::Internal,
1312 SE::Spawn { cause, .. } => cause.kind(),
1313 }
1314 }
1315}
1316
1317/// An error which occurred trying to obtain the descriptor for a particular bridge
1318#[derive(Clone, Debug, thiserror::Error)]
1319#[non_exhaustive]
1320pub enum Error {
1321 /// Couldn't establish a circuit to the bridge
1322 #[error("Failed to establish circuit")]
1323 CircuitFailed(#[from] tor_circmgr::Error),
1324
1325 /// Couldn't establish a directory stream to the bridge
1326 #[error("Failed to establish directory stream")]
1327 StreamFailed(#[source] tor_proto::Error),
1328
1329 /// Directory request failed
1330 #[error("Directory request failed")]
1331 RequestFailed(#[from] tor_dirclient::RequestFailedError),
1332
1333 /// Failed to parse descriptor in response
1334 #[error("Failed to parse descriptor in response")]
1335 ParseFailed(#[from] tor_netdoc::Error),
1336
1337 /// Signature check failed
1338 #[error("Signature check failed")]
1339 SignatureCheckFailed(#[from] Arc<signature::Error>),
1340
1341 /// Obtained descriptor but it is outside its validity time
1342 #[error("Descriptor is outside its validity time, as supplied")]
1343 BadValidityTime(#[from] tor_checkable::TimeValidityError),
1344
1345 /// A bridge descriptor has very extreme validity times
1346 /// such that our refetch time calculations overflow.
1347 #[error("Descriptor validity time range is too extreme for us to cope with")]
1348 ExtremeValidityTime,
1349
1350 /// There was a programming error somewhere in our code, or the calling code.
1351 #[error("Programming error")]
1352 Bug(#[from] tor_error::Bug),
1353
1354 /// Error used for testing
1355 #[cfg(test)]
1356 #[error("Error for testing, {0:?}, retry at {1:?}")]
1357 TestError(&'static str, RetryTime),
1358}
1359
1360impl HasKind for Error {
1361 fn kind(&self) -> ErrorKind {
1362 use Error as E;
1363 use ErrorKind as EK;
1364 let bridge_protocol_violation = EK::TorAccessFailed;
1365 match self {
1366 // We trust that tor_circmgr returns TorAccessFailed when it ought to.
1367 E::CircuitFailed(e) => e.kind(),
1368 E::StreamFailed(e) => e.kind(),
1369 E::RequestFailed(e) => e.kind(),
1370 E::ParseFailed(..) => bridge_protocol_violation,
1371 E::SignatureCheckFailed(..) => bridge_protocol_violation,
1372 E::ExtremeValidityTime => bridge_protocol_violation,
1373 E::BadValidityTime(..) => EK::ClockSkew,
1374 E::Bug(e) => e.kind(),
1375 #[cfg(test)]
1376 E::TestError(..) => EK::Internal,
1377 }
1378 }
1379}
1380
1381impl HasRetryTime for Error {
1382 fn retry_time(&self) -> RetryTime {
1383 use Error as E;
1384 use RetryTime as R;
1385 match self {
1386 // Errors with their own retry times
1387 E::CircuitFailed(e) => e.retry_time(),
1388
1389 // Remote misbehavior, maybe the network is being strange?
1390 E::StreamFailed(..) => R::AfterWaiting,
1391 E::RequestFailed(..) => R::AfterWaiting,
1392
1393 // Remote misconfiguration, detected *after* we successfully made the channel
1394 // (so not a network problem). We'll say "never" for RetryTime,
1395 // even though actually we will in fact retry in at most `max_refetch`.
1396 E::ParseFailed(..) => R::Never,
1397 E::SignatureCheckFailed(..) => R::Never,
1398 E::BadValidityTime(..) => R::Never,
1399 E::ExtremeValidityTime => R::Never,
1400
1401 // Probably, things are broken here, rather than remotely.
1402 E::Bug(..) => R::Never,
1403
1404 #[cfg(test)]
1405 E::TestError(_, retry) => *retry,
1406 }
1407 }
1408}
1409
1410impl BridgeDescError for Error {}
1411
1412impl State {
1413 /// Consistency check (for testing)
1414 ///
1415 /// `input` should be what was passed to `set_bridges` (or `None` if not known).
1416 ///
1417 /// Does not make any changes.
1418 /// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
1419 #[cfg(test)]
1420 fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
1421 where
1422 R: Runtime,
1423 I: IntoIterator<Item = &'i BridgeKey>,
1424 {
1425 /// Where we found a thing was Tracked
1426 #[derive(Debug, Clone, Copy, Eq, PartialEq)]
1427 enum Where {
1428 /// Found in `running`
1429 Running,
1430 /// Found in `queued`
1431 Queued,
1432 /// Found in the schedule `sch`
1433 Schedule {
1434 sch_name: &'static str,
1435 /// Starts out as `false`, set to `true` when we find this in `current`
1436 found_in_current: bool,
1437 },
1438 }
1439
1440 /// Records the expected input from `input`, and what we have found so far
1441 struct Tracked {
1442 /// Were we told what the last `set_bridges` call got as input?
1443 known_input: bool,
1444 /// `Some` means we have seen this bridge in one our records (other than `current`)
1445 tracked: HashMap<BridgeKey, Option<Where>>,
1446 /// Earliest instant found in any schedule
1447 earliest: Option<Instant>,
1448 }
1449
1450 let mut tracked = if let Some(input) = input {
1451 let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
1452 Tracked {
1453 tracked,
1454 known_input: true,
1455 earliest: None,
1456 }
1457 } else {
1458 Tracked {
1459 tracked: HashMap::new(),
1460 known_input: false,
1461 earliest: None,
1462 }
1463 };
1464
1465 impl Tracked {
1466 /// Note that `bridge` is Tracked
1467 fn note(&mut self, where_: Where, b: &BridgeKey) {
1468 match self.tracked.get(b) {
1469 // Invariant *Tracked* - ie appears at most once
1470 Some(Some(prev_where)) => {
1471 panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
1472 }
1473 // Invariant *Input (every tracked bridge is was in input)*
1474 None if self.known_input => {
1475 panic!("unexpected {:?} {:?}", where_, b);
1476 }
1477 // OK, we've not seen it before, note it as being here
1478 _ => {
1479 self.tracked.insert(b.clone(), Some(where_));
1480 }
1481 }
1482 }
1483 }
1484
1485 /// Walk `schedule` and update `tracked` (including `tracked.earliest`)
1486 ///
1487 /// Check invariant *Tracked* and *Schedule* wrt this schedule.
1488 #[cfg(test)]
1489 fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
1490 tracked: &mut Tracked,
1491 sch_name: &'static str,
1492 schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
1493 conv_time: CT,
1494 ) {
1495 let where_ = Where::Schedule {
1496 sch_name,
1497 found_in_current: false,
1498 };
1499
1500 if let Some(first) = schedule.peek() {
1501 // Of course this is a heap, so this ought to be a wasteful scan,
1502 // but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
1503 for re in schedule {
1504 tracked.note(where_, &re.bridge);
1505 }
1506
1507 let scanned = schedule
1508 .iter()
1509 .map(|re| re.when)
1510 .min()
1511 .expect("schedule empty!");
1512 assert_eq!(scanned, first.when);
1513 tracked.earliest = Some(
1514 [tracked.earliest, Some(conv_time(scanned))]
1515 .into_iter()
1516 .flatten()
1517 .min()
1518 .expect("flatten of chain Some was empty"),
1519 );
1520 }
1521 }
1522
1523 // *Timeout* (prep)
1524 //
1525 // This will fail if there is clock skew, but won't mind if
1526 // the earliest refetch time is in the past.
1527 let now_wall = runtime.wallclock();
1528 let now_mono = runtime.now();
1529 let adj_wall = |wallclock: SystemTime| {
1530 // Good grief what a palaver!
1531 if let Ok(ahead) = wallclock.duration_since(now_wall) {
1532 now_mono + ahead
1533 } else if let Ok(behind) = now_wall.duration_since(wallclock) {
1534 now_mono
1535 .checked_sub(behind)
1536 .expect("time subtraction underflow")
1537 } else {
1538 panic!("times should be totally ordered!")
1539 }
1540 };
1541
1542 // *Tracked*
1543 //
1544 // We walk our data structures in turn
1545
1546 for b in self.running.keys() {
1547 tracked.note(Where::Running, b);
1548 }
1549 for qe in &self.queued {
1550 tracked.note(Where::Queued, &qe.bridge);
1551 }
1552
1553 walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
1554 walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
1555
1556 // *Current*
1557 for b in self.current.keys() {
1558 let found = tracked
1559 .tracked
1560 .get_mut(b)
1561 .and_then(Option::as_mut)
1562 .unwrap_or_else(|| panic!("current but untracked {:?}", b));
1563 if let Where::Schedule {
1564 found_in_current, ..
1565 } = found
1566 {
1567 *found_in_current = true;
1568 }
1569 }
1570
1571 // *Input (sense: every input bridge is tracked)*
1572 //
1573 // (Will not cope if spawn ever failed, since that violates the invariant.)
1574 for (b, where_) in &tracked.tracked {
1575 match where_ {
1576 None => panic!("missing {}", &b),
1577 Some(Where::Schedule {
1578 sch_name,
1579 found_in_current,
1580 }) => {
1581 assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
1582 }
1583 _ => {}
1584 }
1585 }
1586
1587 // *Limit*
1588 let parallelism = self.effective_parallelism();
1589 assert!(self.running.len() <= parallelism);
1590
1591 // *Running*
1592 assert!(self.running.len() == parallelism || self.queued.is_empty());
1593
1594 // *Timeout* (final)
1595 assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
1596 }
1597}