1
//! `BridgeDescMgr` - downloads and caches bridges' router descriptors
2

            
3
use std::borrow::Cow;
4
use std::cmp::Ordering;
5
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
6
use std::fmt::{self, Debug, Display};
7
use std::num::NonZeroU8;
8
use std::panic::AssertUnwindSafe;
9
use std::sync::{Arc, Mutex, MutexGuard, Weak};
10
use std::time::{Duration, Instant, SystemTime};
11

            
12
use async_trait::async_trait;
13
use derive_more::{Deref, DerefMut};
14
use educe::Educe;
15
use futures::future;
16
use futures::select;
17
use futures::stream::{BoxStream, StreamExt};
18
use futures::task::{SpawnError, SpawnExt as _};
19
use futures::FutureExt;
20
use tracing::{debug, error, info, trace};
21

            
22
use safelog::sensitive;
23
use tor_basic_utils::retry::RetryDelay;
24
use tor_basic_utils::BinaryHeapExt as _;
25
use tor_checkable::{SelfSigned, Timebound};
26
use tor_circmgr::CircMgr;
27
use tor_error::{error_report, internal, ErrorKind, HasKind};
28
use tor_error::{AbsRetryTime, HasRetryTime, RetryTime};
29
use tor_guardmgr::bridge::{BridgeConfig, BridgeDesc};
30
use tor_guardmgr::bridge::{BridgeDescError, BridgeDescEvent, BridgeDescList, BridgeDescProvider};
31
use tor_netdoc::doc::routerdesc::RouterDesc;
32
use tor_rtcompat::Runtime;
33

            
34
use crate::event::FlagPublisher;
35
use crate::storage::CachedBridgeDescriptor;
36
use crate::{DirMgrStore, DynStore};
37

            
38
#[cfg(test)]
39
mod bdtest;
40

            
41
/// The key we use in all our data structures
42
///
43
/// This type saves typing and would make it easier to change the bridge descriptor manager
44
/// to take and handle another way of identifying the bridges it is working with.
45
type BridgeKey = BridgeConfig;
46

            
47
/// Active vs dormant state, as far as the bridge descriptor manager is concerned
48
///
49
/// This is usually derived in higher layers from `arti_client::DormantMode`,
50
/// whether `TorClient::bootstrap()` has been called, etc.
51
#[non_exhaustive]
52
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
53
// TODO: These proliferating `Dormancy` enums should be centralized and unified with `TaskHandle`
54
//     https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/845#note_2853190
55
pub enum Dormancy {
56
    /// Dormant (inactive)
57
    ///
58
    /// Bridge descriptor downloads, or refreshes, will not be started.
59
    ///
60
    /// In-progress downloads will be stopped if possible,
61
    /// but they may continue until they complete (or fail).
62
    // TODO async task cancellation: actually cancel these in this case
63
    ///
64
    /// So a dormant BridgeDescMgr may still continue to
65
    /// change the return value from [`bridges()`](BridgeDescProvider::bridges)
66
    /// and continue to report [`BridgeDescEvent`]s.
67
    ///
68
    /// When the BridgeDescMgr is dormant,
69
    /// `bridges()` may return stale descriptors
70
    /// (that is, descriptors which ought to have been refetched and may no longer be valid),
71
    /// or stale errors
72
    /// (that is, errors which occurred some time ago,
73
    /// and which would normally have been retried by now).
74
    Dormant,
75

            
76
    /// Active
77
    ///
78
    /// Bridge descriptors will be downloaded as requested.
79
    ///
80
    /// When a bridge descriptor manager has been `Dormant`,
81
    /// it may continue to provide stale data (as described)
82
    /// for a while after it is made `Active`,
83
    /// until the required refreshes and retries have taken place (or failed).
84
    Active,
85
}
86

            
87
/// **Downloader and cache for bridges' router descriptors**
88
///
89
/// This is a handle which is cheap to clone and has internal mutability.
90
#[derive(Clone)]
91
pub struct BridgeDescMgr<R: Runtime, M = ()>
92
where
93
    M: Mockable<R>,
94
{
95
    /// The actual manager
96
    ///
97
    /// We have the `Arc` in here, rather than in our callers, because this
98
    /// makes the API nicer for them, and also because some of our tasks
99
    /// want a handle they can use to relock and modify the state.
100
    mgr: Arc<Manager<R, M>>,
101
}
102

            
103
/// Configuration for the `BridgeDescMgr`
104
///
105
/// Currently, the only way to make this is via its `Default` impl.
106
// TODO: there should be some way to override the defaults.  See #629 for considerations.
107
#[derive(Debug, Clone)]
108
pub struct BridgeDescDownloadConfig {
109
    /// How many bridge descriptor downloads to attempt in parallel?
110
    parallelism: NonZeroU8,
111

            
112
    /// Default/initial time to retry a failure to download a descriptor
113
    ///
114
    /// (This has the semantics of an initial delay for [`RetryDelay`],
115
    /// and is used unless there is more specific retry information for the particular failure.)
116
    retry: Duration,
117

            
118
    /// When a downloaded descriptor is going to expire, how soon in advance to refetch it?
119
    prefetch: Duration,
120

            
121
    /// Minimum interval between successive refetches of the descriptor for the same bridge
122
    ///
123
    /// This limits the download activity which can be caused by an errant bridge.
124
    ///
125
    /// If the descriptor's validity information is shorter than this, we will use
126
    /// it after it has expired (rather than treating the bridge as broken).
127
    min_refetch: Duration,
128

            
129
    /// Maximum interval between successive refetches of the descriptor for the same bridge
130
    ///
131
    /// This sets an upper bound on how old a descriptor we are willing to use.
132
    /// When this time expires, a refetch attempt will be started even if the
133
    /// descriptor is not going to expire soon.
134
    //
135
    // TODO: When this is configurable, we need to make sure we reject
136
    // configurations with max_refresh < min_refresh, or we may panic.
137
    max_refetch: Duration,
138
}
139

            
140
impl Default for BridgeDescDownloadConfig {
141
20
    fn default() -> Self {
142
20
        let secs = Duration::from_secs;
143
20
        BridgeDescDownloadConfig {
144
20
            parallelism: 4.try_into().expect("parallelism is zero"),
145
20
            retry: secs(30),
146
20
            prefetch: secs(1000),
147
20
            min_refetch: secs(3600),
148
20
            max_refetch: secs(3600 * 3), // matches C Tor behaviour
149
20
        }
150
20
    }
151
}
152

            
153
/// Mockable internal methods for within the `BridgeDescMgr`
154
///
155
/// Implemented for `()`, meaning "do not use mocks: use the real versions of everything".
156
///
157
/// This (`()`) is the default for the type parameter in
158
/// [`BridgeDescMgr`],
159
/// and it is the only publicly available implementation,
160
/// since this trait is sealed.
161
pub trait Mockable<R>: mockable::MockableAPI<R> {}
162
impl<R: Runtime> Mockable<R> for () {}
163

            
164
/// Private module which seals [`Mockable`]
165
/// by containing [`MockableAPI`](mockable::MockableAPI)
166
mod mockable {
167
    use super::*;
168

            
169
    /// Defines the actual mockable APIs
170
    ///
171
    /// Not nameable (and therefore not implementable)
172
    /// outside the `bridgedesc` module,
173
    #[async_trait]
174
    pub trait MockableAPI<R>: Clone + Send + Sync + 'static {
175
        /// Circuit manager
176
        type CircMgr: Send + Sync + 'static;
177

            
178
        /// Download this bridge's descriptor, and return it as a string
179
        ///
180
        /// Runs in a task.
181
        /// Called by `Manager::download_descriptor`, which handles parsing and validation.
182
        ///
183
        /// If `if_modified_since` is `Some`,
184
        /// should tolerate an HTTP 304 Not Modified and return `None` in that case.
185
        /// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
186
        async fn download(
187
            self,
188
            runtime: &R,
189
            circmgr: &Self::CircMgr,
190
            bridge: &BridgeConfig,
191
            if_modified_since: Option<SystemTime>,
192
        ) -> Result<Option<String>, Error>;
193
    }
194
}
195
#[async_trait]
196
impl<R: Runtime> mockable::MockableAPI<R> for () {
197
    type CircMgr = Arc<CircMgr<R>>;
198

            
199
    /// Actual code for downloading a descriptor document
200
    async fn download(
201
        self,
202
        runtime: &R,
203
        circmgr: &Self::CircMgr,
204
        bridge: &BridgeConfig,
205
        _if_modified_since: Option<SystemTime>,
206
    ) -> Result<Option<String>, Error> {
207
        // TODO actually support _if_modified_since
208
        let circuit = circmgr.get_or_launch_dir_specific(bridge).await?;
209
        let mut stream = circuit
210
            .begin_dir_stream()
211
            .await
212
            .map_err(Error::StreamFailed)?;
213
        let request = tor_dirclient::request::RoutersOwnDescRequest::new();
214
        let response = tor_dirclient::send_request(runtime, &request, &mut stream, None)
215
            .await
216
            .map_err(|dce| match dce {
217
                tor_dirclient::Error::RequestFailed(re) => Error::RequestFailed(re),
218
                _ => internal!(
219
                    "tor_dirclient::send_request gave non-RequestFailed {:?}",
220
                    dce
221
                )
222
                .into(),
223
            })?;
224
        let output = response.into_output_string()?;
225
        Ok(Some(output))
226
    }
227
}
228

            
229
/// The actual manager.
230
struct Manager<R: Runtime, M: Mockable<R>> {
231
    /// The mutable state
232
    state: Mutex<State>,
233

            
234
    /// Runtime, used for tasks and sleeping
235
    runtime: R,
236

            
237
    /// Circuit manager, used for creating circuits
238
    circmgr: M::CircMgr,
239

            
240
    /// Persistent state store
241
    store: Arc<Mutex<DynStore>>,
242

            
243
    /// Mock for testing, usually `()`
244
    mockable: M,
245
}
246

            
247
/// State: our downloaded descriptors (cache), and records of what we're doing
248
///
249
/// Various functions (both tasks and public entrypoints),
250
/// which generally start with a `Manager`,
251
/// lock the mutex and modify this.
252
///
253
/// Generally, the flow is:
254
///
255
///  * A public entrypoint, or task, obtains a [`StateGuard`].
256
///    It modifies the state to represent the callers' new requirements,
257
///    or things it has done, by updating the state,
258
///    preserving the invariants but disturbing the "liveness" (see below).
259
///
260
///  * [`StateGuard::drop`] calls [`State::process`].
261
///    This restores the liveness properties.
262
///
263
/// ### Possible states of a bridge:
264
///
265
/// A bridge can be in one of the following states,
266
/// represented by its presence in these particular data structures inside `State`:
267
///
268
///  * `running`/`queued`: newly added, no outcome yet.
269
///  * `current` + `running`/`queued`: we are fetching (or going to)
270
///  * `current = OK` + `refetch_schedule`: fetched OK, will refetch before expiry
271
///  * `current = Err` + `retry_schedule`: failed, will retry at some point
272
///
273
/// ### Invariants:
274
///
275
/// Can be disrupted in the middle of a principal function,
276
/// but should be restored on return.
277
///
278
/// * **Tracked**:
279
///   Each bridge appears at most once in
280
///   `running`, `queued`, `refetch_schedule` and `retry_schedule`.
281
///   We call such a bridge Tracked.
282
///
283
/// * **Current**
284
///   Every bridge in `current` is Tracked.
285
///   (But not every Tracked bridge is necessarily in `current`, yet.)
286
///
287
/// * **Schedules**
288
///   Every bridge in `refetch_schedule` or `retry_schedule` is also in `current`.
289
///
290
/// * **Input**:
291
///   Exactly each bridge that was passed to
292
///   the last call to [`set_bridges()`](BridgeDescMgr::set_bridges) is Tracked.
293
///   (If we encountered spawn failures, we treat this as trying to shut down,
294
///   so we cease attempts to get bridges, and discard the relevant state, violating this.)
295
///
296
/// * **Limit**:
297
///   `running` is capped at the effective parallelism: zero if we are dormant,
298
///   the configured parallelism otherwise.
299
///
300
/// ### Liveness properties:
301
///
302
/// These can be disrupted by any function which holds a [`StateGuard`].
303
/// Will be restored by [`process()`](State::process),
304
/// which is called when `StateGuard` is dropped.
305
///
306
/// Functions that take a `StateGuard` may disturb these invariants
307
/// and rely on someone else to restore them.
308
///
309
/// * **Running**:
310
///   If `queued` is nonempty, `running` is full.
311
///
312
/// * **Timeout**:
313
///   `earliest_timeout` is the earliest timeout in
314
///   either `retry_schedule` or `refetch_schedule`.
315
///   (Disturbances of this property which occur due to system time warps
316
///   are not necessarily detected and remedied in a timely way,
317
///   but will be remedied no later than after `max_refetch`.)
318
struct State {
319
    /// Our configuration
320
    config: Arc<BridgeDescDownloadConfig>,
321

            
322
    /// People who will be told when `current` changes.
323
    subscribers: FlagPublisher<BridgeDescEvent>,
324

            
325
    /// Our current idea of our output, which we give out handles onto.
326
    current: Arc<BridgeDescList>,
327

            
328
    /// Bridges whose descriptors we are currently downloading.
329
    running: HashMap<BridgeKey, RunningInfo>,
330

            
331
    /// Bridges which we want to download,
332
    /// but we're waiting for `running` to be less than `effective_parallelism()`.
333
    queued: VecDeque<QueuedEntry>,
334

            
335
    /// Are we dormant?
336
    dormancy: Dormancy,
337

            
338
    /// Bridges that we have a descriptor for,
339
    /// and when they should be refetched due to validity expiry.
340
    ///
341
    /// This is indexed by `SystemTime` because that helps avoids undesirable behaviors
342
    /// when the system clock changes.
343
    refetch_schedule: BinaryHeap<RefetchEntry<SystemTime, ()>>,
344

            
345
    /// Bridges that failed earlier, and when they should be retried.
346
    retry_schedule: BinaryHeap<RefetchEntry<Instant, RetryDelay>>,
347

            
348
    /// Earliest time from either `retry_schedule` or `refetch_schedule`
349
    ///
350
    /// `None` means "wait indefinitely".
351
    earliest_timeout: postage::watch::Sender<Option<Instant>>,
352
}
353

            
354
impl Debug for State {
355
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356
        /// Helper to format one bridge entry somewhere
357
        fn fmt_bridge(
358
            f: &mut fmt::Formatter,
359
            b: &BridgeConfig,
360
            info: &(dyn Display + '_),
361
        ) -> fmt::Result {
362
            let info = info.to_string(); // fmt::Formatter doesn't enforce precision, so do this
363
            writeln!(f, "    {:80.80} | {}", info, b)
364
        }
365

            
366
        /// Helper to format one of the schedules
367
        fn fmt_schedule<TT: Ord + Copy + Debug, RD>(
368
            f: &mut fmt::Formatter,
369
            summary: &str,
370
            name: &str,
371
            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
372
        ) -> fmt::Result {
373
            writeln!(f, "  {}:", name)?;
374
            for b in schedule {
375
                fmt_bridge(f, &b.bridge, &format_args!("{} {:?}", summary, &b.when))?;
376
            }
377
            Ok(())
378
        }
379

            
380
        // We are going to have to go multi-line because of the bridge lines,
381
        // so do completely bespoke formatting rather than `std::fmt::DebugStruct`
382
        // or a derive.
383
        writeln!(f, "State {{")?;
384
        // We'd like to print earliest_timeout but watch::Sender::borrow takes &mut
385
        writeln!(f, "  earliest_timeout: ???, ..,")?;
386
        writeln!(f, "  current:")?;
387
        for (b, v) in &*self.current {
388
            fmt_bridge(
389
                f,
390
                b,
391
                &match v {
392
                    Err(e) => Cow::from(format!("C Err {}", e)),
393
                    Ok(_) => "C Ok".into(),
394
                },
395
            )?;
396
        }
397
        writeln!(f, "  running:")?;
398
        for b in self.running.keys() {
399
            fmt_bridge(f, b, &"R")?;
400
        }
401
        writeln!(f, "  queued:")?;
402
        for qe in &self.queued {
403
            fmt_bridge(f, &qe.bridge, &"Q")?;
404
        }
405
        fmt_schedule(f, "FS", "refetch_schedule", &self.refetch_schedule)?;
406
        fmt_schedule(f, "TS", "retry_schedule", &self.retry_schedule)?;
407
        write!(f, "}}")?;
408

            
409
        Ok(())
410
    }
411
}
412

            
413
/// Value of the entry in `running`
414
#[derive(Debug)]
415
struct RunningInfo {
416
    /// For cancelling downloads no longer wanted
417
    join: JoinHandle,
418

            
419
    /// If this previously failed, the persistent retry delay.
420
    retry_delay: Option<RetryDelay>,
421
}
422

            
423
/// Entry in `queued`
424
#[derive(Debug)]
425
struct QueuedEntry {
426
    /// The bridge to fetch
427
    bridge: BridgeKey,
428

            
429
    /// If this previously failed, the persistent retry delay.
430
    retry_delay: Option<RetryDelay>,
431
}
432

            
433
/// Entry in one of the `*_schedule`s
434
///
435
/// Implements `Ord` and `Eq` but *only looking at the refetch time*.
436
/// So don't deduplicate by `[Partial]Eq`, or use as a key in a map.
437
#[derive(Debug)]
438
struct RefetchEntry<TT, RD> {
439
    /// When should we requeued this bridge for fetching
440
    ///
441
    /// Either [`Instant`] (in `retry_schedule`) or [`SystemTime`] (in `refetch_schedule`).
442
    when: TT,
443

            
444
    /// The bridge to refetch
445
    bridge: BridgeKey,
446

            
447
    /// Retry delay
448
    ///
449
    /// `RetryDelay` if we previously failed (ie, if this is a retry entry);
450
    /// otherwise `()`.
451
    retry_delay: RD,
452
}
453

            
454
impl<TT: Ord, RD> Ord for RefetchEntry<TT, RD> {
455
156
    fn cmp(&self, other: &Self) -> Ordering {
456
156
        self.when.cmp(&other.when).reverse()
457
156
        // We don't care about the ordering of BridgeConfig or retry_delay.
458
156
        // Different BridgeConfig with the same fetch time will be fetched in "some order".
459
156
    }
460
}
461

            
462
impl<TT: Ord, RD> PartialOrd for RefetchEntry<TT, RD> {
463
156
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
464
156
        Some(self.cmp(other))
465
156
    }
466
}
467

            
468
impl<TT: Ord, RD> PartialEq for RefetchEntry<TT, RD> {
469
    fn eq(&self, other: &Self) -> bool {
470
        self.cmp(other) == Ordering::Equal
471
    }
472
}
473

            
474
impl<TT: Ord, RD> Eq for RefetchEntry<TT, RD> {}
475

            
476
/// Dummy task join handle
477
///
478
/// We would like to be able to cancel now-redundant downloads
479
/// using something like `tokio::task::JoinHandle::abort()`.
480
/// tor-rtcompat doesn't support that so we stub it for now.
481
///
482
/// Providing this stub means the place where the cancellation needs to take place
483
/// already has the appropriate call to our [`JoinHandle::abort`].
484
#[derive(Debug)]
485
struct JoinHandle;
486

            
487
impl JoinHandle {
488
    /// Would abort this async task, if we could do that.
489
    fn abort(&self) {}
490
}
491

            
492
impl<R: Runtime> BridgeDescMgr<R> {
493
    /// Create a new `BridgeDescMgr`
494
    ///
495
    /// This is the public constructor.
496
    //
497
    // TODO: That this constructor requires a DirMgr is rather odd.
498
    // In principle there is little reason why you need a DirMgr to make a BridgeDescMgr.
499
    // However, BridgeDescMgr needs a Store, and currently that is a private trait, and the
500
    // implementation is constructible only from the dirmgr's config.  This should probably be
501
    // tidied up somehow, at some point, perhaps by exposing `Store` and its configuration.
502
    pub fn new(
503
        config: &BridgeDescDownloadConfig,
504
        runtime: R,
505
        store: DirMgrStore<R>,
506
        circmgr: Arc<tor_circmgr::CircMgr<R>>,
507
        dormancy: Dormancy,
508
    ) -> Result<Self, StartupError> {
509
        Self::new_internal(runtime, circmgr, store.store, config, dormancy, ())
510
    }
511
}
512

            
513
/// If download was successful, what we obtained
514
///
515
/// Generated by `process_document`, from a downloaded (or cached) textual descriptor.
516
#[derive(Debug)]
517
struct Downloaded {
518
    /// The bridge descriptor, fully parsed and verified
519
    desc: BridgeDesc,
520

            
521
    /// When we should start a refresh for this descriptor
522
    ///
523
    /// This is derived from the expiry time,
524
    /// and clamped according to limits in the configuration).
525
    refetch: SystemTime,
526
}
527

            
528
impl<R: Runtime, M: Mockable<R>> BridgeDescMgr<R, M> {
529
    /// Actual constructor, which takes a mockable
530
    //
531
    // Allow passing `runtime` by value, which is usual API for this kind of setup function.
532
    #[allow(clippy::needless_pass_by_value)]
533
16
    fn new_internal(
534
16
        runtime: R,
535
16
        circmgr: M::CircMgr,
536
16
        store: Arc<Mutex<DynStore>>,
537
16
        config: &BridgeDescDownloadConfig,
538
16
        dormancy: Dormancy,
539
16
        mockable: M,
540
16
    ) -> Result<Self, StartupError> {
541
        /// Convenience alias
542
96
        fn default<T: Default>() -> T {
543
96
            Default::default()
544
96
        }
545

            
546
16
        let config = config.clone().into();
547
16
        let (earliest_timeout, timeout_update) = postage::watch::channel();
548
16

            
549
16
        let state = Mutex::new(State {
550
16
            config,
551
16
            subscribers: default(),
552
16
            current: default(),
553
16
            running: default(),
554
16
            queued: default(),
555
16
            dormancy,
556
16
            retry_schedule: default(),
557
16
            refetch_schedule: default(),
558
16
            earliest_timeout,
559
16
        });
560
16
        let mgr = Arc::new(Manager {
561
16
            state,
562
16
            runtime: runtime.clone(),
563
16
            circmgr,
564
16
            store,
565
16
            mockable,
566
16
        });
567
16

            
568
16
        runtime
569
16
            .spawn(timeout_task(
570
16
                runtime.clone(),
571
16
                Arc::downgrade(&mgr),
572
16
                timeout_update,
573
16
            ))
574
16
            .map_err(|cause| StartupError::Spawn {
575
                spawning: "timeout task",
576
                cause: cause.into(),
577
16
            })?;
578

            
579
16
        Ok(BridgeDescMgr { mgr })
580
16
    }
581

            
582
    /// Consistency check convenience wrapper
583
    #[cfg(test)]
584
92
    fn check_consistency<'i, I>(&self, input_bridges: Option<I>)
585
92
    where
586
92
        I: IntoIterator<Item = &'i BridgeKey>,
587
92
    {
588
92
        self.mgr
589
92
            .lock_only()
590
92
            .check_consistency(&self.mgr.runtime, input_bridges);
591
92
    }
592

            
593
    /// Set whether this `BridgeDescMgr` is active
594
    // TODO this should instead be handled by a central mechanism; see TODO on Dormancy
595
8
    pub fn set_dormancy(&self, dormancy: Dormancy) {
596
8
        self.mgr.lock_then_process().dormancy = dormancy;
597
8
    }
598
}
599

            
600
impl<R: Runtime, M: Mockable<R>> BridgeDescProvider for BridgeDescMgr<R, M> {
601
104
    fn bridges(&self) -> Arc<BridgeDescList> {
602
104
        self.mgr.lock_only().current.clone()
603
104
    }
604

            
605
12
    fn events(&self) -> BoxStream<'static, BridgeDescEvent> {
606
12
        let stream = self.mgr.lock_only().subscribers.subscribe();
607
12
        Box::pin(stream) as _
608
12
    }
609

            
610
44
    fn set_bridges(&self, new_bridges: &[BridgeConfig]) {
611
        /// Helper: Called for each bridge that is currently Tracked.
612
        ///
613
        /// Checks if `new_bridges` has `bridge`.  If so, removes it from `new_bridges`,
614
        /// and returns `true`, indicating that this bridge should be kept.
615
        ///
616
        /// If not, returns `false`, indicating that this bridge should be removed,
617
        /// and logs a message.
618
92
        fn note_found_keep_p(
619
92
            new_bridges: &mut HashSet<BridgeKey>,
620
92
            bridge: &BridgeKey,
621
92
            was_state: &str,
622
92
        ) -> bool {
623
92
            let keep = new_bridges.remove(bridge);
624
92
            if !keep {
625
16
                debug!(r#"forgetting bridge ({}) "{}""#, was_state, bridge);
626
76
            }
627
92
            keep
628
92
        }
629

            
630
        /// Helper: filters `*_schedule` so that it contains only things in `new_bridges`,
631
        /// removing them as we go.
632
88
        fn filter_schedule<TT: Ord + Copy, RD>(
633
88
            new_bridges: &mut HashSet<BridgeKey>,
634
88
            schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
635
88
            was_state: &str,
636
88
        ) {
637
92
            schedule.retain_ext(|b| note_found_keep_p(new_bridges, &b.bridge, was_state));
638
88
        }
639

            
640
44
        let mut state = self.mgr.lock_then_process();
641
44
        let state = &mut **state;
642
44

            
643
44
        // We go through our own data structures, comparing them with `new_bridges`.
644
44
        // Entries in our own structures that aren't in `new_bridges` are removed.
645
44
        // Entries that *are* are removed from `new_bridges`.
646
44
        // Eventually `new_bridges` is just the list of new bridges to *add*.
647
44
        let mut new_bridges: HashSet<_> = new_bridges.iter().cloned().collect();
648
44

            
649
44
        // Is there anything in `current` that ought to be deleted?
650
74
        if state.current.keys().any(|b| !new_bridges.contains(b)) {
651
16
            // Found a bridge In `current` but not `new`
652
16
            // We need to remove it (and any others like it) from `current`.
653
16
            //
654
16
            // Disturbs the invariant *Schedules*:
655
16
            // After this maybe the schedules have entries they shouldn't.
656
16
            let current: BridgeDescList = state
657
16
                .current
658
16
                .iter()
659
60
                .filter(|(b, _)| new_bridges.contains(&**b))
660
44
                .map(|(b, v)| (b.clone(), v.clone()))
661
16
                .collect();
662
16
            state.set_current_and_notify(current);
663
28
        } else {
664
28
            // Nothing is being removed, so we can keep `current`.
665
28
        }
666
        // Bridges being newly requested will be added to `current`
667
        // later, after they have been fetched.
668

            
669
        // Is there anything in running we should abort?
670
44
        state.running.retain(|b, ri| {
671
            let keep = note_found_keep_p(&mut new_bridges, b, "was downloading");
672
            if !keep {
673
                ri.join.abort();
674
            }
675
            keep
676
44
        });
677
44

            
678
44
        // Is there anything in queued we should forget about?
679
44
        state
680
44
            .queued
681
44
            .retain(|qe| note_found_keep_p(&mut new_bridges, &qe.bridge, "was queued"));
682
44

            
683
44
        // Restore the invariant *Schedules*, that the schedules contain only things in current,
684
44
        // by removing the same things from the schedules that we earlier removed from current.
685
44
        filter_schedule(
686
44
            &mut new_bridges,
687
44
            &mut state.retry_schedule,
688
44
            "previously failed",
689
44
        );
690
44
        filter_schedule(
691
44
            &mut new_bridges,
692
44
            &mut state.refetch_schedule,
693
44
            "previously downloaded",
694
44
        );
695
44

            
696
44
        // OK now we have the list of bridges to add (if any).
697
44
        state.queued.extend(new_bridges.into_iter().map(|bridge| {
698
44
            debug!(r#" added bridge, queueing for download "{}""#, &bridge);
699
44
            QueuedEntry {
700
44
                bridge,
701
44
                retry_delay: None,
702
44
            }
703
44
        }));
704
44

            
705
44
        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
706
44
        // to make further progress and restore the liveness properties.
707
44
    }
708
}
709

            
710
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
711
    /// Obtain a lock on state, for functions that want to disrupt liveness properties
712
    ///
713
    /// When `StateGuard` is dropped, the liveness properties will be restored
714
    /// by making whatever progress is required.
715
    ///
716
    /// See [`State`].
717
158
    fn lock_then_process<'s>(self: &'s Arc<Self>) -> StateGuard<'s, R, M> {
718
158
        StateGuard {
719
158
            state: self.lock_only(),
720
158
            mgr: self,
721
158
        }
722
158
    }
723

            
724
    /// Obtains the lock on state.
725
    ///
726
    /// Caller ought not to modify state
727
    /// so as to invalidate invariants or liveness properties.
728
    /// Callers which are part of the algorithms in this crate
729
    /// ought to consider [`lock_then_process`](Manager::lock_then_process) instead.
730
386
    fn lock_only(&self) -> MutexGuard<State> {
731
386
        self.state.lock().expect("bridge desc manager poisoned")
732
386
    }
733
}
734

            
735
/// Writeable reference to [`State`], entitling the holder to disrupt liveness properties.
736
///
737
/// The holder must still maintain the invariants.
738
///
739
/// Obtained from [`Manager::lock_then_process`].  See [`State`].
740
#[derive(Educe, Deref, DerefMut)]
741
#[educe(Debug)]
742
struct StateGuard<'s, R: Runtime, M: Mockable<R>> {
743
    /// Reference to the mutable state
744
    #[deref]
745
    #[deref_mut]
746
    state: MutexGuard<'s, State>,
747

            
748
    /// Reference to the outer container
749
    ///
750
    /// Allows the holder to obtain a `'static` (owned) handle `Arc<Manager>`,
751
    /// for use by spawned tasks.
752
    #[educe(Debug(ignore))]
753
    mgr: &'s Arc<Manager<R, M>>,
754
}
755

            
756
impl<R: Runtime, M: Mockable<R>> Drop for StateGuard<'_, R, M> {
757
158
    fn drop(&mut self) {
758
158
        self.state.process(self.mgr);
759
158
    }
760
}
761

            
762
impl State {
763
    /// Ensure progress is made, by restoring all the liveness invariants
764
    ///
765
    /// This includes launching circuits as needed.
766
158
    fn process<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
767
158
        // Restore liveness property *Running*
768
158
        self.consider_launching(mgr);
769
158

            
770
158
        let now_wall = mgr.runtime.wallclock();
771
158

            
772
158
        // Mitigate clock warping
773
158
        //
774
158
        // If the earliest `SystemTime` is more than `max_refetch` away,
775
158
        // the clock must have warped.  If that happens we clamp
776
158
        // them all to `max_refetch`.
777
158
        //
778
158
        // (This is not perfect but will mitigate the worst effects by ensuring
779
158
        // that we do *something* at least every `max_refetch`, in the worst case,
780
158
        // other than just getting completely stuck.)
781
158
        let max_refetch_wall = now_wall + self.config.max_refetch;
782
158
        if self
783
158
            .refetch_schedule
784
158
            .peek()
785
158
            .map(|re| re.when > max_refetch_wall)
786
158
            == Some(true)
787
        {
788
            info!("bridge descriptor manager: clock warped, clamping refetch times");
789
            self.refetch_schedule = self
790
                .refetch_schedule
791
                .drain()
792
                .map(|mut re| {
793
                    re.when = max_refetch_wall;
794
                    re
795
                })
796
                .collect();
797
158
        }
798

            
799
        // Restore liveness property *Timeout**
800
        // postage::watch will tell up the timeout task about the new wake-up time.
801
158
        let new_earliest_timeout = [
802
158
            // First retry.  These are std Instant.
803
158
            self.retry_schedule.peek().map(|re| re.when),
804
158
            // First refetch.  These are SystemTime, so we must convert them.
805
158
            self.refetch_schedule.peek().map(|re| {
806
104
                // If duration_since gives Err, that means when is before now,
807
104
                // ie we should not be waiting: the wait duration should be 0.
808
104
                let wait = re.when.duration_since(now_wall).unwrap_or_default();
809
104

            
810
104
                mgr.runtime.now() + wait
811
158
            }),
812
158
        ]
813
158
        .into_iter()
814
158
        .flatten()
815
158
        .min();
816
158
        *self.earliest_timeout.borrow_mut() = new_earliest_timeout;
817
158
    }
818

            
819
    /// Launch download attempts if we can
820
    ///
821
    /// Specifically: if we have things in `queued`, and `running` is shorter than
822
    /// `effective_parallelism()`, we launch task(s) to attempt download(s).
823
    ///
824
    /// Restores liveness invariant *Running*.
825
    ///
826
    /// Idempotent.  Forms part of `process`.
827
    #[allow(clippy::blocks_in_conditions)]
828
158
    fn consider_launching<R: Runtime, M: Mockable<R>>(&mut self, mgr: &Arc<Manager<R, M>>) {
829
158
        let mut to_remove = vec![];
830

            
831
250
        while self.running.len() < self.effective_parallelism() {
832
            let QueuedEntry {
833
92
                bridge,
834
92
                retry_delay,
835
210
            } = match self.queued.pop_front() {
836
92
                Some(qe) => qe,
837
118
                None => break,
838
            };
839
92
            match mgr
840
92
                .runtime
841
92
                .spawn({
842
92
                    let config = self.config.clone();
843
92
                    let bridge = bridge.clone();
844
92
                    let inner = mgr.clone();
845
92
                    let mockable = inner.mockable.clone();
846
92

            
847
92
                    // The task which actually downloads a descriptor.
848
92
                    async move {
849
92
                        let got =
850
92
                            AssertUnwindSafe(inner.download_descriptor(mockable, &bridge, &config))
851
92
                                .catch_unwind()
852
92
                                .await
853
92
                                .unwrap_or_else(|_| {
854
                                    Err(internal!("download descriptor task panicked!").into())
855
92
                                });
856
92
                        match &got {
857
28
                            Ok(_) => debug!(r#"download succeeded for "{}""#, bridge),
858
64
                            Err(err) => debug!(r#"download failed for "{}": {}"#, bridge, err),
859
                        };
860
92
                        let mut state = inner.lock_then_process();
861
92
                        state.record_download_outcome(bridge, got);
862
92
                        // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
863
92
                        // to make further progress and restore the liveness properties.
864
92
                    }
865
92
                })
866
92
                .map(|()| JoinHandle)
867
            {
868
92
                Ok(join) => {
869
92
                    self.running
870
92
                        .insert(bridge, RunningInfo { join, retry_delay });
871
92
                }
872
                Err(_) => {
873
                    // Spawn failed.
874
                    //
875
                    // We are going to forget about this bridge.
876
                    // And we're going to do that without notifying anyone.
877
                    // We *do* want to remove it from `current` because simply forgetting
878
                    // about a refetch could leave expired data there.
879
                    // We amortize this, so we don't do a lot of O(n^2) work on shutdown.
880
                    to_remove.push(bridge);
881
                }
882
            }
883
        }
884

            
885
158
        if !to_remove.is_empty() {
886
            self.modify_current(|current| {
887
                for bridge in to_remove {
888
                    current.remove(&bridge);
889
                }
890
            });
891
158
        }
892
158
    }
893

            
894
    /// Modify `current` and notify subscribers
895
    ///
896
    /// Helper function which modifies only `current`, not any of the rest of the state.
897
    /// it is the caller's responsibility to ensure that the invariants are upheld.
898
    ///
899
    /// The implementation actually involves cloning `current`,
900
    /// so it is best to amortize calls to this function.
901
92
    fn modify_current<T, F: FnOnce(&mut BridgeDescList) -> T>(&mut self, f: F) -> T {
902
92
        let mut current = (*self.current).clone();
903
92
        let r = f(&mut current);
904
92
        self.set_current_and_notify(current);
905
92
        r
906
92
    }
907

            
908
    /// Set `current` to a value and notify
909
    ///
910
    /// Helper function which modifies only `current`, not any of the rest of the state.
911
    /// it is the caller's responsibility to ensure that the invariants are upheld.
912
108
    fn set_current_and_notify<BDL: Into<Arc<BridgeDescList>>>(&mut self, new: BDL) {
913
108
        self.current = new.into();
914
108
        self.subscribers.publish(BridgeDescEvent::SomethingChanged);
915
108
    }
916

            
917
    /// Obtain the currently-desired level of parallelism
918
    ///
919
    /// Helper function.  The return value depends the mutable state and also the `config`.
920
    ///
921
    /// This is how we implement dormancy.
922
342
    fn effective_parallelism(&self) -> usize {
923
342
        match self.dormancy {
924
334
            Dormancy::Active => usize::from(u8::from(self.config.parallelism)),
925
8
            Dormancy::Dormant => 0,
926
        }
927
342
    }
928
}
929

            
930
impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
931
    /// Record a download outcome.
932
    ///
933
    /// Final act of the descriptor download task.
934
    /// `got` is from [`download_descriptor`](Manager::download_descriptor).
935
92
    fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
936
92
        let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
937
92
            Some(ri) => ri,
938
            None => {
939
                debug!("bridge descriptor download completed for no-longer-configured bridge");
940
                return;
941
            }
942
        };
943

            
944
92
        let insert = match got {
945
28
            Ok(Downloaded { desc, refetch }) => {
946
28
                // Successful download.  Schedule the refetch, and we'll insert Ok.
947
28

            
948
28
                self.refetch_schedule.push(RefetchEntry {
949
28
                    when: refetch,
950
28
                    bridge: bridge.clone(),
951
28
                    retry_delay: (),
952
28
                });
953
28

            
954
28
                Ok(desc)
955
            }
956
64
            Err(err) => {
957
64
                // Failed.  Schedule the retry, and we'll insert Err.
958
64

            
959
64
                let mut retry_delay =
960
64
                    retry_delay.unwrap_or_else(|| RetryDelay::from_duration(self.config.retry));
961
64

            
962
64
                let retry = err.retry_time();
963
64
                // We retry at least as early as
964
64
                let now = self.mgr.runtime.now();
965
64
                let retry = retry.absolute(now, || retry_delay.next_delay(&mut rand::rng()));
966
                // Retry at least as early as max_refetch.  That way if a bridge is
967
                // misconfigured we will see it be fixed eventually.
968
64
                let retry = {
969
64
                    let earliest = now;
970
64
                    let latest = || now + self.config.max_refetch;
971
64
                    match retry {
972
                        AbsRetryTime::Immediate => earliest,
973
                        AbsRetryTime::Never => latest(),
974
64
                        AbsRetryTime::At(i) => i.clamp(earliest, latest()),
975
                    }
976
                };
977
64
                self.retry_schedule.push(RefetchEntry {
978
64
                    when: retry,
979
64
                    bridge: bridge.clone(),
980
64
                    retry_delay,
981
64
                });
982
64

            
983
64
                Err(Box::new(err) as _)
984
            }
985
        };
986

            
987
92
        self.modify_current(|current| current.insert(bridge, insert));
988
92
    }
989
}
990

            
991
impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
992
    /// Downloads a descriptor.
993
    ///
994
    /// The core of the descriptor download task
995
    /// launched by `State::consider_launching`.
996
    ///
997
    /// Uses Mockable::download to actually get the document.
998
    /// So most of this function is parsing and checking.
999
    ///
    /// The returned value is precisely the `got` input to
    /// [`record_download_outcome`](StateGuard::record_download_outcome).
92
    async fn download_descriptor(
92
        &self,
92
        mockable: M,
92
        bridge: &BridgeConfig,
92
        config: &BridgeDescDownloadConfig,
92
    ) -> Result<Downloaded, Error> {
92
        // convenience alias, capturing the usual parameters from our variables.
92
        let process_document = |text| process_document(&self.runtime, config, text);
116
        let store = || {
116
            self.store
116
                .lock()
116
                .map_err(|_| internal!("bridge descriptor store poisoned"))
116
        };
92
        let cache_entry: Option<CachedBridgeDescriptor> = (|| store()?.lookup_bridgedesc(bridge))()
92
            .unwrap_or_else(|err| {
                error_report!(
                    err,
                    r#"bridge descriptor cache lookup failed, for "{}""#,
                    sensitive(bridge),
                );
                None
92
            });
92

            
92
        let now = self.runtime.wallclock();
92
        let cached_good: Option<Downloaded> = if let Some(cached) = &cache_entry {
16
            if cached.fetched > now {
                // was fetched "in the future"
                None
            } else {
                // let's see if it's any use
16
                match process_document(&cached.document) {
4
                    Err(err) => {
4
                        // We had a doc in the cache but our attempt to use it failed
4
                        // We wouldn't have written a bad cache entry.
4
                        // So one of the following must be true:
4
                        //  * We were buggy or are stricter now or something
4
                        //  * The document was valid but its validity time has expired
4
                        // In any case we can't reuse it.
4
                        // (This happens in normal operation, when a document expires.)
4
                        trace!(r#"cached document for "{}" invalid: {}"#, &bridge, err);
4
                        None
                    }
12
                    Ok(got) => {
12
                        // The cached document looks valid.
12
                        // But how long ago did we fetch it?
12
                        // We need to enforce max_refresh even for still-valid documents.
12
                        if now.duration_since(cached.fetched).ok() <= Some(config.max_refetch) {
                            // Was fetched recently, too.  We can just reuse it.
4
                            return Ok(got);
8
                        }
8
                        Some(got)
                    }
                }
            }
        } else {
76
            None
        };
        // If cached_good is Some, we found a plausible cache entry; if we got here, it was
        // past its max_refresh.  So in that case we want to send a request with
        // if-modified-since.  If we get Not Modified, we can reuse it (and update the fetched time).
88
        let if_modified_since = cached_good
88
            .as_ref()
88
            .map(|got| got.desc.as_ref().published());
88

            
88
        debug!(
            r#"starting download for "{}"{}"#,
            bridge,
            match if_modified_since {
8
                Some(ims) => format!(
                    " if-modified-since {}",
                    humantime::format_rfc3339_seconds(ims),
                ),
                None => "".into(),
            }
        );
88
        let text = mockable
88
            .clone()
88
            .download(&self.runtime, &self.circmgr, bridge, if_modified_since)
88
            .await?;
24
        let (document, got) = if let Some(text) = text {
20
            let got = process_document(&text)?;
20
            (text, got)
4
        } else if let Some(cached) = cached_good {
4
            (
4
                cache_entry
4
                    .expect("cached_good but not cache_entry")
4
                    .document,
4
                cached,
4
            )
        } else {
            return Err(internal!("download gave None but no if-modified-since").into());
        };
        // IEFI catches cache store errors, which we log but don't do anything else with
24
        (|| {
24
            let cached = CachedBridgeDescriptor {
24
                document,
24
                fetched: now, // this is from before we started the fetch, which is correct
24
            };
24

            
24
            // Calculate when the cache should forget about this.
24
            // We want to add a bit of slop for the purposes of mild clock skew handling,
24
            // etc., and the prefetch time is a good proxy for that.
24
            let until = got
24
                .refetch
24
                .checked_add(config.prefetch)
24
                .unwrap_or(got.refetch /*uh*/);
24

            
24
            store()?.store_bridgedesc(bridge, cached, until)?;
24
            Ok(())
24
        })()
24
        .unwrap_or_else(|err: crate::Error| {
            error_report!(err, "failed to cache downloaded bridge descriptor",);
24
        });
24

            
24
        Ok(got)
92
    }
}
/// Processes and analyses a textual descriptor document into a `Downloaded`
///
/// Parses it, checks the signature, checks the document validity times,
/// and if that's all good, calculates when will want to refetch it.
64
fn process_document<R: Runtime>(
64
    runtime: &R,
64
    config: &BridgeDescDownloadConfig,
64
    text: &str,
64
) -> Result<Downloaded, Error> {
64
    let desc = RouterDesc::parse(text)?;
    // We *could* just trust this because we have trustworthy provenance
    // we know that the channel machinery authenticated the identity keys in `bridge`.
    // But let's do some cross-checking anyway.
    // `check_signature` checks the self-signature.
60
    let desc = desc.check_signature().map_err(Arc::new)?;
56
    let now = runtime.wallclock();
56
    desc.is_valid_at(&now)?;
    // Justification that use of "dangerously" is correct:
    // 1. We have checked this just above, so it is valid now.
    // 2. We are extracting the timeout and implement our own refetch logic using expires.
48
    let (desc, (_, expires)) = desc.dangerously_into_parts();
    // Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
    // The following situations can result in a nominally-expired descriptor being used:
    //
    // 1. We primarily enforce the timeout by looking at the expiry time,
    //    subtracting a configured constant, and scheduling the start of a refetch then.
    //    If it takes us longer to do the retry, than the prefetch constant,
    //    we'll still be providing the old descriptor to consumers in the meantime.
    //
    // 2. We apply a minimum time before we will refetch a descriptor.
    //    So if the validity time is unreasonably short, we'll use it beyond that time.
    //
    // 3. Clock warping could confuse this algorithm.  This is inevitable because we
    //    are relying on calendar times (SystemTime) in the descriptor, and because
    //    we don't have a mechanism for being told about clock warps rather than the
    //    passage of time.
    //
    // We think this is all OK given that a bridge descriptor is used for trying to
    // connect to the bridge itself.  In particular, we don't want to completely trust
    // bridges to control our retry logic.
48
    let refetch = match expires {
48
        Some(expires) => expires
48
            .checked_sub(config.prefetch)
48
            .ok_or(Error::ExtremeValidityTime)?,
        None => now
            .checked_add(config.max_refetch)
            .ok_or(Error::ExtremeValidityTime)?,
    };
48
    let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
48

            
48
    let desc = BridgeDesc::new(Arc::new(desc));
48

            
48
    Ok(Downloaded { desc, refetch })
64
}
/// Task which waits for the timeout, and requeues bridges that need to be refetched
///
/// This task's job is to execute the wakeup instructions provided via `updates`.
///
/// `updates` is the receiving end of [`State`]'s `earliest_timeout`,
/// which is maintained to be the earliest time any of the schedules says we should wake up
/// (liveness property *Timeout*).
16
async fn timeout_task<R: Runtime, M: Mockable<R>>(
16
    runtime: R,
16
    inner: Weak<Manager<R, M>>,
16
    update: postage::watch::Receiver<Option<Instant>>,
16
) {
    /// Requeue things in `*_schedule` whose time for action has arrived
    ///
    /// `retry_delay_map` converts `retry_delay` from the schedule (`RetryDelay` or `()`)
    /// into the `Option` which appears in [`QueuedEntry`].
    ///
    /// Helper function.  Idempotent.
28
    fn requeue_as_required<TT: Ord + Copy + Debug, RD, RDM: Fn(RD) -> Option<RetryDelay>>(
28
        queued: &mut VecDeque<QueuedEntry>,
28
        schedule: &mut BinaryHeap<RefetchEntry<TT, RD>>,
28
        now: TT,
28
        retry_delay_map: RDM,
28
    ) {
76
        while let Some(ent) = schedule.peek() {
52
            if ent.when > now {
4
                break;
48
            }
48
            let re = schedule.pop().expect("schedule became empty!");
48
            let bridge = re.bridge;
48
            let retry_delay = retry_delay_map(re.retry_delay);
48

            
48
            queued.push_back(QueuedEntry {
48
                bridge,
48
                retry_delay,
48
            });
        }
28
    }
16
    let mut next_wakeup = Some(runtime.now());
16
    let mut update = update.fuse();
    loop {
104
        select! {
            // Someone modified the schedules, and sent us a new earliest timeout
104
            changed = update.next() => {
                // changed is Option<Option< >>.
                // The outer Option is from the Stream impl for watch::Receiver - None means EOF.
                // The inner Option is Some(wakeup_time), or None meaning "wait indefinitely"
90
                next_wakeup = if let Some(changed) = changed {
74
                    changed
                } else {
                    // Oh, actually, the watch::Receiver is EOF - we're to shut down
16
                    break
                }
            },
            // Wait until the specified earliest wakeup time
78
            () = async {
78
                if let Some(next_wakeup) = next_wakeup {
54
                    let now = runtime.now();
54
                    if next_wakeup > now {
44
                        let duration = next_wakeup - now;
44
                        runtime.sleep(duration).await;
10
                    }
                } else {
                    #[allow(clippy::semicolon_if_nothing_returned)] // rust-clippy/issues/9729
24
                    { future::pending().await }
                }
104
            }.fuse() => {
                // We have reached the pre-programmed time.  Check what needs doing.
14
                let inner = if let Some(i) = inner.upgrade() { i } else { break; };
14
                let mut state = inner.lock_then_process();
14
                let state = &mut **state; // Do the DerefMut once so we can borrow fields
14

            
14
                requeue_as_required(
14
                    &mut state.queued,
14
                    &mut state.refetch_schedule,
14
                    runtime.wallclock(),
14
                    |()| None,
14
                );
14

            
14
                requeue_as_required(
14
                    &mut state.queued,
14
                    &mut state.retry_schedule,
14
                    runtime.now(),
14
                    Some,
14
                );
14

            
14
                // `StateGuard`, from `lock_then_process`, gets dropped here, and runs `process`,
14
                // to make further progress and restore the liveness properties.
14
            }
        }
    }
16
}
/// Error which occurs during bridge descriptor manager startup
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum StartupError {
    /// No circuit manager in the directory manager
    #[error(
        "tried to create bridge descriptor manager from directory manager with no circuit manager"
    )]
    MissingCircMgr,
    /// Unable to spawn task
    //
    // TODO lots of our Errors have a variant exactly like this.
    // Maybe we should make a struct tor_error::SpawnError.
    #[error("Unable to spawn {spawning}")]
    Spawn {
        /// What we were trying to spawn.
        spawning: &'static str,
        /// What happened when we tried to spawn it.
        #[source]
        cause: Arc<SpawnError>,
    },
}
impl HasKind for StartupError {
    fn kind(&self) -> ErrorKind {
        use ErrorKind as EK;
        use StartupError as SE;
        match self {
            SE::MissingCircMgr => EK::Internal,
            SE::Spawn { cause, .. } => cause.kind(),
        }
    }
}
/// An error which occurred trying to obtain the descriptor for a particular bridge
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    /// Couldn't establish a circuit to the bridge
    #[error("Failed to establish circuit")]
    CircuitFailed(#[from] tor_circmgr::Error),
    /// Couldn't establish a directory stream to the bridge
    #[error("Failed to establish directory stream")]
    StreamFailed(#[source] tor_proto::Error),
    /// Directory request failed
    #[error("Directory request failed")]
    RequestFailed(#[from] tor_dirclient::RequestFailedError),
    /// Failed to parse descriptor in response
    #[error("Failed to parse descriptor in response")]
    ParseFailed(#[from] tor_netdoc::Error),
    /// Signature check failed
    #[error("Signature check failed")]
    SignatureCheckFailed(#[from] Arc<signature::Error>),
    /// Obtained descriptor but it is outside its validity time
    #[error("Descriptor is outside its validity time, as supplied")]
    BadValidityTime(#[from] tor_checkable::TimeValidityError),
    /// A bridge descriptor has very extreme validity times
    /// such that our refetch time calculations overflow.
    #[error("Descriptor validity time range is too extreme for us to cope with")]
    ExtremeValidityTime,
    /// There was a programming error somewhere in our code, or the calling code.
    #[error("Programming error")]
    Bug(#[from] tor_error::Bug),
    /// Error used for testing
    #[cfg(test)]
    #[error("Error for testing, {0:?}, retry at {1:?}")]
    TestError(&'static str, RetryTime),
}
impl HasKind for Error {
    fn kind(&self) -> ErrorKind {
        use Error as E;
        use ErrorKind as EK;
        let bridge_protocol_violation = EK::TorAccessFailed;
        match self {
            // We trust that tor_circmgr returns TorAccessFailed when it ought to.
            E::CircuitFailed(e) => e.kind(),
            E::StreamFailed(e) => e.kind(),
            E::RequestFailed(e) => e.kind(),
            E::ParseFailed(..) => bridge_protocol_violation,
            E::SignatureCheckFailed(..) => bridge_protocol_violation,
            E::ExtremeValidityTime => bridge_protocol_violation,
            E::BadValidityTime(..) => EK::ClockSkew,
            E::Bug(e) => e.kind(),
            #[cfg(test)]
            E::TestError(..) => EK::Internal,
        }
    }
}
impl HasRetryTime for Error {
64
    fn retry_time(&self) -> RetryTime {
        use Error as E;
        use RetryTime as R;
64
        match self {
            // Errors with their own retry times
            E::CircuitFailed(e) => e.retry_time(),
            // Remote misbehavior, maybe the network is being strange?
            E::StreamFailed(..) => R::AfterWaiting,
            E::RequestFailed(..) => R::AfterWaiting,
            // Remote misconfiguration, detected *after* we successfully made the channel
            // (so not a network problem).  We'll say "never" for RetryTime,
            // even though actually we will in fact retry in at most `max_refetch`.
            E::ParseFailed(..) => R::Never,
            E::SignatureCheckFailed(..) => R::Never,
            E::BadValidityTime(..) => R::Never,
            E::ExtremeValidityTime => R::Never,
            // Probably, things are broken here, rather than remotely.
            E::Bug(..) => R::Never,
            #[cfg(test)]
64
            E::TestError(_, retry) => *retry,
        }
64
    }
}
impl BridgeDescError for Error {}
impl State {
    /// Consistency check (for testing)
    ///
    /// `input` should be what was passed to `set_bridges` (or `None` if not known).
    ///
    /// Does not make any changes.
    /// Only takes `&mut` because postage::watch::Sender::borrow` wants it.
    #[cfg(test)]
92
    fn check_consistency<'i, R, I>(&mut self, runtime: &R, input: Option<I>)
92
    where
92
        R: Runtime,
92
        I: IntoIterator<Item = &'i BridgeKey>,
92
    {
        /// Where we found a thing was Tracked
        #[derive(Debug, Clone, Copy, Eq, PartialEq)]
        enum Where {
            /// Found in `running`
            Running,
            /// Found in `queued`
            Queued,
            /// Found in the schedule `sch`
            Schedule {
                sch_name: &'static str,
                /// Starts out as `false`, set to `true` when we find this in `current`
                found_in_current: bool,
            },
        }
        /// Records the expected input from `input`, and what we have found so far
        struct Tracked {
            /// Were we told what the last `set_bridges` call got as input?
            known_input: bool,
            /// `Some` means we have seen this bridge in one our records (other than `current`)
            tracked: HashMap<BridgeKey, Option<Where>>,
            /// Earliest instant found in any schedule
            earliest: Option<Instant>,
        }
92
        let mut tracked = if let Some(input) = input {
520
            let tracked = input.into_iter().map(|b| (b.clone(), None)).collect();
92
            Tracked {
92
                tracked,
92
                known_input: true,
92
                earliest: None,
92
            }
        } else {
            Tracked {
                tracked: HashMap::new(),
                known_input: false,
                earliest: None,
            }
        };
        impl Tracked {
            /// Note that `bridge` is Tracked
520
            fn note(&mut self, where_: Where, b: &BridgeKey) {
520
                match self.tracked.get(b) {
                    // Invariant *Tracked* - ie appears at most once
                    Some(Some(prev_where)) => {
                        panic!("duplicate {:?} {:?} {:?}", prev_where, where_, b);
                    }
                    // Invariant *Input (every tracked bridge is was in input)*
                    None if self.known_input => {
                        panic!("unexpected {:?} {:?}", where_, b);
                    }
                    // OK, we've not seen it before, note it as being here
520
                    _ => {
520
                        self.tracked.insert(b.clone(), Some(where_));
520
                    }
520
                }
520
            }
        }
        /// Walk `schedule` and update `tracked` (including `tracked.earliest`)
        ///
        /// Check invariant *Tracked* and *Schedule* wrt this schedule.
        #[cfg(test)]
184
        fn walk_sch<TT: Ord + Copy + Debug, RD, CT: Fn(TT) -> Instant>(
184
            tracked: &mut Tracked,
184
            sch_name: &'static str,
184
            schedule: &BinaryHeap<RefetchEntry<TT, RD>>,
184
            conv_time: CT,
184
        ) {
184
            let where_ = Where::Schedule {
184
                sch_name,
184
                found_in_current: false,
184
            };
184
            if let Some(first) = schedule.peek() {
                // Of course this is a heap, so this ought to be a wasteful scan,
                // but, indirectly,this tests our implementation of `Ord` for `RefetchEntry`.
600
                for re in schedule {
446
                    tracked.note(where_, &re.bridge);
446
                }
154
                let scanned = schedule
154
                    .iter()
446
                    .map(|re| re.when)
154
                    .min()
154
                    .expect("schedule empty!");
154
                assert_eq!(scanned, first.when);
154
                tracked.earliest = Some(
154
                    [tracked.earliest, Some(conv_time(scanned))]
154
                        .into_iter()
154
                        .flatten()
154
                        .min()
154
                        .expect("flatten of chain Some was empty"),
154
                );
30
            }
184
        }
        // *Timeout* (prep)
        //
        // This will fail if there is clock skew, but won't mind if
        // the earliest refetch time is in the past.
92
        let now_wall = runtime.wallclock();
92
        let now_mono = runtime.now();
92
        let adj_wall = |wallclock: SystemTime| {
            // Good grief what a palaver!
78
            if let Ok(ahead) = wallclock.duration_since(now_wall) {
66
                now_mono + ahead
12
            } else if let Ok(behind) = now_wall.duration_since(wallclock) {
12
                now_mono
12
                    .checked_sub(behind)
12
                    .expect("time subtraction underflow")
            } else {
                panic!("times should be totally ordered!")
            }
78
        };
        // *Tracked*
        //
        // We walk our data structures in turn
92
        for b in self.running.keys() {
66
            tracked.note(Where::Running, b);
66
        }
100
        for qe in &self.queued {
8
            tracked.note(Where::Queued, &qe.bridge);
8
        }
92
        walk_sch(&mut tracked, "refetch", &self.refetch_schedule, adj_wall);
92
        walk_sch(&mut tracked, "retry", &self.retry_schedule, |t| t);
        // *Current*
476
        for b in self.current.keys() {
476
            let found = tracked
476
                .tracked
476
                .get_mut(b)
476
                .and_then(Option::as_mut)
476
                .unwrap_or_else(|| panic!("current but untracked {:?}", b));
            if let Where::Schedule {
446
                found_in_current, ..
476
            } = found
446
            {
446
                *found_in_current = true;
446
            }
        }
        // *Input (sense: every input bridge is tracked)*
        //
        // (Will not cope if spawn ever failed, since that violates the invariant.)
612
        for (b, where_) in &tracked.tracked {
520
            match where_ {
                None => panic!("missing {}", &b),
                Some(Where::Schedule {
446
                    sch_name,
446
                    found_in_current,
446
                }) => {
446
                    assert!(found_in_current, "not-Schedule {} {}", &b, sch_name);
                }
74
                _ => {}
            }
        }
        // *Limit*
92
        let parallelism = self.effective_parallelism();
92
        assert!(self.running.len() <= parallelism);
        // *Running*
92
        assert!(self.running.len() == parallelism || self.queued.is_empty());
        // *Timeout* (final)
92
        assert_eq!(tracked.earliest, *self.earliest_timeout.borrow());
92
    }
}