1
//! Code for notifying other modules about changes in the directory.
2

            
3
// TODO(nickm): After we have enough experience with this FlagPublisher, we
4
// might want to make it a public interface. If we do it should probably move
5
// into another crate.
6

            
7
use std::{
8
    fmt,
9
    marker::PhantomData,
10
    pin::Pin,
11
    sync::{
12
        atomic::{AtomicUsize, Ordering},
13
        Arc,
14
    },
15
    task::Poll,
16
    time::SystemTime,
17
};
18

            
19
use educe::Educe;
20
use futures::{stream::Stream, Future, StreamExt};
21
use itertools::chain;
22
use paste::paste;
23
use time::OffsetDateTime;
24
use tor_basic_utils::skip_fmt;
25
use tor_netdir::DirEvent;
26
use tor_netdoc::doc::netstatus;
27

            
28
#[cfg(feature = "bridge-client")]
29
use tor_guardmgr::bridge::BridgeDescEvent;
30

            
31
use crate::bootstrap::AttemptId;
32

            
33
/// A trait to indicate something that can be published with [`FlagPublisher`].
34
///
35
/// Since the implementation of `FlagPublisher` requires that its events be
36
/// represented as small integers, this trait is mainly about converting to and
37
/// from those integers.
38
pub(crate) trait FlagEvent: Sized {
39
    /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
40
    /// for this type.
41
    ///
42
    /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
43
    /// known flags, and sometimes iterates over the whole vector.
44
    const MAXIMUM: u16;
45
    /// Convert this event into an index.
46
    ///
47
    /// For efficiency, indices should be small and densely packed.
48
    fn to_index(self) -> u16;
49
    /// Try to reconstruct an event from its index.  Return None if the index is
50
    /// out-of-bounds.
51
    fn from_index(flag: u16) -> Option<Self>;
52
}
53

            
54
/// Implements [`FlagEvent`] for a C-like enum
55
///
56
/// Requiremets:
57
///
58
///  * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
59
///
60
///  * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
61
///    (for example using the `num_enum` crate).
62
///
63
///  * The discriminants must be densely allocated.
64
///    This will be done automatically by the compiler
65
///    if explicit discriminants are not specified.
66
///    (This property is checked in a test.)
67
///
68
///  * The variants may not contain any data.
69
///    This is required for correctness.
70
///    We think it is checked if you use `num_enum::TryFromPrimitive`.
71
///
72
/// # Example
73
///
74
// Sadly, it does not appear to be possible to doctest a private macro.
75
/// ```rust,ignore
76
/// use num_enum::{IntoPrimitive, TryFromPrimitive};
77
/// use strum::{EnumCount, EnumIter};
78
///
79
/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
80
/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
81
/// #[non_exhaustive]
82
/// #[repr(u16)]
83
/// pub enum DirEvent {
84
///     NewConsensus,
85
///     NewDescriptors,
86
/// }
87
///
88
/// impl_FlagEvent!{ DirEvent }
89
/// ```
90
macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
91
    impl FlagEvent for $ty {
92
        const MAXIMUM: u16 = {
93
            let count = <$ty as $crate::strum::EnumCount>::COUNT;
94
            (count - 1) as u16
95
        };
96
3348
        fn to_index(self) -> u16 {
97
3348
            self.into()
98
3348
        }
99
1710
        fn from_index(flag: u16) -> Option<Self> {
100
1710
            flag.try_into().ok()
101
1710
        }
102
    }
103

            
104
    #[test]
105
    #[allow(non_snake_case)]
106
4
    fn [< flagevent_test_variant_numbers_ $ty >]() {
107
12
        for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
108
8
            assert!(<$ty as FlagEvent>::to_index(variant) <=
109
8
                    <$ty as FlagEvent>::MAXIMUM,
110
                    "impl_FlagEvent only allowed if discriminators are dense");
111
        }
112
4
    }
113
} } }
114

            
115
impl_FlagEvent! { DirEvent }
116

            
117
#[cfg(feature = "bridge-client")]
118
impl_FlagEvent! { BridgeDescEvent }
119

            
120
/// A publisher that broadcasts flag-level events to multiple subscribers.
121
///
122
/// Events with the same flag value may be coalesced: that is, if the same event
123
/// is published ten times in a row, a subscriber may receive only a single
124
/// notification of the event.
125
///
126
/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
127
/// that can also broadcast events to everybody listening on the channel.
128
///  Dropping the last handle closes all streams subscribed to it.
129
pub(crate) struct FlagPublisher<F> {
130
    /// Inner data shared by publishers and streams.
131
    inner: Arc<Inner<F>>,
132
}
133

            
134
/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
135
struct Inner<F> {
136
    /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
137
    event: event_listener::Event,
138
    /// How many times has each event occurred, ever.
139
    ///
140
    /// (It is safe for this to wrap around.)
141
    // TODO(nickm): I wish this could be an array, but const generics don't
142
    // quite support that yet.
143
    counts: Vec<AtomicUsize>, // I wish this could be an array.
144
    /// How many publishers remain?
145
    n_publishers: AtomicUsize,
146
    /// Phantom member to provide correct covariance.
147
    ///
148
    /// The `fn` business is a covariance trick to include `F` without affecting
149
    /// this object's Send/Sync status.
150
    _phantom: PhantomData<fn(F) -> F>,
151
}
152

            
153
/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
154
/// [`FlagPublisher`].
155
pub(crate) struct FlagListener<F> {
156
    /// What value of each flag's count have we seen most recently?  
157
    ///
158
    /// Note that we count the event as "received" only once for each observed
159
    /// change in the flag's count, even if that count has changed by more than
160
    /// 1.
161
    my_counts: Vec<usize>,
162
    /// An an `EventListener` that will be notified when events are published,
163
    /// or when the final publisher is dropped.
164
    ///
165
    /// We must always have one of these available _before_ we check any counts
166
    /// in self.inner.
167
    listener: event_listener::EventListener,
168
    /// Reference to shared data.
169
    inner: Arc<Inner<F>>,
170
}
171

            
172
impl<F: FlagEvent> Default for FlagPublisher<F> {
173
16
    fn default() -> Self {
174
16
        Self::new()
175
16
    }
176
}
177

            
178
impl<F: FlagEvent> FlagPublisher<F> {
179
    /// Construct a new FlagPublisher.
180
102
    pub(crate) fn new() -> Self {
181
102
        // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
182
102
        // require AtomicUsize to be Clone.
183
102
        let counts = std::iter::repeat_with(AtomicUsize::default)
184
102
            .take(F::MAXIMUM as usize + 1)
185
102
            .collect();
186
102
        FlagPublisher {
187
102
            inner: Arc::new(Inner {
188
102
                event: event_listener::Event::new(),
189
102
                counts,
190
102
                n_publishers: AtomicUsize::new(1),
191
102
                _phantom: PhantomData,
192
102
            }),
193
102
        }
194
102
    }
195

            
196
    /// Create a new subscription to this FlagPublisher.
197
412
    pub(crate) fn subscribe(&self) -> FlagListener<F> {
198
412
        // We need to do this event.listen before we check the counts; otherwise
199
412
        // we could have a sequence where: we check the count, then the
200
412
        // publisher increments the count, then the publisher calls
201
412
        // event.notify(), and we call event.listen(). That would cause us to
202
412
        // miss the increment.
203
412
        let listener = self.inner.event.listen();
204
412

            
205
412
        FlagListener {
206
412
            my_counts: self
207
412
                .inner
208
412
                .counts
209
412
                .iter()
210
1212
                .map(|a| a.load(Ordering::SeqCst))
211
412
                .collect(),
212
412
            listener,
213
412
            inner: Arc::clone(&self.inner),
214
412
        }
215
412
    }
216

            
217
    /// Tell every listener that the provided flag has been published.
218
1740
    pub(crate) fn publish(&self, flag: F) {
219
1740
        self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
220
1740
        self.inner.event.notify(usize::MAX);
221
1740
    }
222
}
223

            
224
impl<F> Clone for FlagPublisher<F> {
225
8
    fn clone(&self) -> FlagPublisher<F> {
226
8
        self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
227
8
        FlagPublisher {
228
8
            inner: Arc::clone(&self.inner),
229
8
        }
230
8
    }
231
}
232

            
233
// We must implement Drop to keep count publishers, and so that when the last
234
// publisher goes away, we can wake up every listener  so that it notices that
235
// the stream is now ended.
236
impl<F> Drop for FlagPublisher<F> {
237
70
    fn drop(&mut self) {
238
70
        if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
239
62
            // That was the last reference; we must notify the listeners.
240
62
            self.inner.event.notify(usize::MAX);
241
62
        }
242
70
    }
243
}
244

            
245
impl<F: FlagEvent> Stream for FlagListener<F> {
246
    type Item = F;
247

            
248
3136
    fn poll_next(
249
3136
        mut self: std::pin::Pin<&mut Self>,
250
3136
        cx: &mut std::task::Context<'_>,
251
3136
    ) -> std::task::Poll<Option<Self::Item>> {
252
        loop {
253
            // Notify the caller if any events are ready to fire.
254
9136
            for idx in 0..F::MAXIMUM as usize + 1 {
255
9136
                let cur = self.inner.counts[idx].load(Ordering::SeqCst);
256
9136
                // We don't have to use < here specifically, since any change
257
9136
                // indicates that the count has been modified. That lets us
258
9136
                // survive usize wraparound.
259
9136
                if cur != self.my_counts[idx] {
260
1708
                    self.my_counts[idx] = cur;
261
1708
                    return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
262
7428
                }
263
            }
264

            
265
            // At this point, notify the caller if there are no more publishers.
266
2300
            if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
267
32
                return Poll::Ready(None);
268
2268
            }
269
2268

            
270
2268
            if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
271
872
                // Got a new notification; we must create a new event and continue the loop.
272
872
                //
273
872
                // See discussion in `FlagPublisher::subscribe()` for why we must always create
274
872
                // this listener _before_ checking any flags.
275
872
                self.listener = self.inner.event.listen();
276
872
            } else {
277
                // Nothing to do yet: put the listener back.
278
1396
                return Poll::Pending;
279
            }
280
        }
281
3136
    }
282
}
283

            
284
/// Description of the directory manager's current bootstrapping status.
285
///
286
/// This status does not necessarily increase monotonically: it can go backwards
287
/// if (for example) our directory information expires before we're able to get
288
/// new information.
289
//
290
// TODO(nickm): This type has gotten a bit large for being the type we send over
291
// a `postage::watch`: perhaps we'd be better off having this information stored
292
// in the guardmgr, and having only a summary of it sent over the
293
// `postage::watch`.  But for now, let's not, unless it shows up in profiles.
294
#[derive(Clone, Debug, Default)]
295
pub struct DirBootstrapStatus(StatusEnum);
296

            
297
/// The contents of a DirBootstrapStatus.
298
///
299
/// This is a separate type since we don't want to make these variables public.
300
#[derive(Clone, Debug, Default)]
301
enum StatusEnum {
302
    /// There is no active attempt to load or fetch a directory.
303
    #[default]
304
    NoActivity,
305
    /// We have only one attempt to fetch a directory.
306
    Single {
307
        /// The currently active directory attempt.
308
        ///
309
        /// We're either using this directory now, or we plan to use it as soon
310
        /// as it's complete enough.
311
        current: StatusEntry,
312
    },
313
    /// We have an existing directory attempt, but it's stale, and we're
314
    /// fetching a new one to replace it.
315
    ///
316
    /// Invariant: `current.id < next.id`
317
    Replacing {
318
        /// The previous attempt's status.  It may still be trying to fetch
319
        /// information if it has descriptors left to download.
320
        current: StatusEntry,
321
        /// The current attempt's status.  We are not yet using this directory
322
        /// for our activity, since it does not (yet) have enough information.
323
        next: StatusEntry,
324
    },
325
}
326

            
327
/// The status and identifier of a single attempt to download a full directory.
328
#[derive(Clone, Debug)]
329
struct StatusEntry {
330
    /// The identifier for this attempt.
331
    id: AttemptId,
332
    /// The latest status.
333
    status: DirStatus,
334
}
335

            
336
/// The status for a single directory.
337
#[derive(Clone, Debug, Default, derive_more::Display)]
338
#[display("{0}", progress)]
339
pub(crate) struct DirStatus {
340
    /// How much of the directory do we currently have?
341
    progress: DirProgress,
342
    /// How many resets have been forced while fetching this directory?
343
    n_resets: usize,
344
    /// How many errors have we encountered since last we advanced the
345
    /// 'progress' on this directory?
346
    n_errors: usize,
347
    /// How many times has an `update_progress` call not actually moved us
348
    /// forward since we last advanced the 'progress' on this directory?
349
    n_stalls: usize,
350
}
351

            
352
/// How much progress have we made in downloading a given directory?
353
///
354
/// This is a separate type so that we don't make the variants public.
355
56
#[derive(Clone, Debug, Educe)]
356
#[educe(Default)]
357
pub(crate) enum DirProgress {
358
    /// We don't have any information yet.
359
    #[educe(Default)]
360
    NoConsensus {
361
        /// If present, we are fetching a consensus whose valid-after time
362
        /// postdates this time.
363
        #[allow(dead_code)]
364
        after: Option<SystemTime>,
365
    },
366
    /// We've downloaded a consensus, but we haven't validated it yet.
367
    FetchingCerts {
368
        /// The actual declared lifetime of the consensus.
369
        lifetime: netstatus::Lifetime,
370
        /// The lifetime for which we are willing to use this consensus.  (This
371
        /// may be broader than `lifetime`.)
372
        usable_lifetime: netstatus::Lifetime,
373
        /// A fraction (in (numerator,denominator) format) of the certificates
374
        /// we have for this consensus.
375
        n_certs: (u16, u16),
376
    },
377
    /// We've validated a consensus and we're fetching (or have fetched) its
378
    /// microdescriptors.
379
    Validated {
380
        /// The actual declared lifetime of the consensus.
381
        lifetime: netstatus::Lifetime,
382
        /// The lifetime for which we are willing to use this consensus.  (This
383
        /// may be broader than `lifetime`.)
384
        usable_lifetime: netstatus::Lifetime,
385
        /// A fraction (in (numerator,denominator) form) of the microdescriptors
386
        /// that we have for this consensus.
387
        n_mds: (u32, u32),
388
        /// True iff we've decided that the consensus is usable.
389
        usable: bool,
390
        // TODO(nickm) Someday we could add a field about whether any primary
391
        // guards are missing microdescriptors, to give a better explanation for
392
        // the case where we won't switch our consensus because of that.
393
    },
394
}
395

            
396
/// A reported diagnostic for what kind of trouble we've seen while trying to
397
/// bootstrap a directory.
398
///
399
/// These blockages types are not yet terribly specific: if you encounter one,
400
/// it's probably a good idea to check the logs to see what's really going on.
401
///
402
/// If you encounter connection blockage _and_ directory blockage at the same
403
/// time, the connection blockage is almost certainly the real problem.
404
//
405
// TODO(nickm): At present these diagnostics aren't very helpful; they say too
406
// much about _how we know_ that the process has gone wrong, but not so much
407
// about _what the problem is_.  In the future, we may wish to look more closely
408
// at what _kind_ of errors or resets we've seen, so we can report better
409
// information. Probably, however, we should only do that after we get some
410
// experience with which problems people encounter in practice, and what
411
// diagnostics would be useful for them.
412
#[derive(Clone, Debug, derive_more::Display)]
413
#[non_exhaustive]
414
pub enum DirBlockage {
415
    /// We've been downloading information without error, but we haven't
416
    /// actually been getting anything that we want.
417
    ///
418
    /// This might indicate that there's a problem with information propagating
419
    /// through the Tor network, or it might indicate that a bogus consensus or
420
    /// a bad clock has tricked us into asking for something that nobody has.
421
    #[display("Can't make progress.")]
422
    Stalled,
423
    /// We've gotten a lot of errors without making forward progress on our
424
    /// bootstrap attempt.
425
    ///
426
    /// This might indicate that something's wrong with the Tor network, or that
427
    /// there's something buggy with our ability to handle directory responses.
428
    /// It might also indicate a malfunction on our directory guards, or a bug
429
    /// on our retry logic.
430
    #[display("Too many errors without making progress.")]
431
    TooManyErrors,
432
    /// We've reset our bootstrap attempt a lot of times.
433
    ///
434
    /// This either indicates that we have been failing a lot for one of the
435
    /// other reasons above, or that we keep getting served a consensus which
436
    /// turns out, upon trying to fetch certificates, not to be usable.  It can
437
    /// also indicate a bug in our retry logic.
438
    #[display("Had to reset bootstrapping too many times.")]
439
    TooManyResets,
440
}
441

            
442
impl fmt::Display for DirProgress {
443
22
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444
        /// Format this time in a format useful for displaying
445
        /// lifetime boundaries.
446
8
        fn fmt_time(t: SystemTime) -> String {
447
            use once_cell::sync::Lazy;
448
            /// Formatter object for lifetime boundaries.
449
            ///
450
            /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
451
            /// sub-second times here, and using non-UTC offsets is confusing
452
            /// in this context.
453
2
            static FORMAT: Lazy<Vec<time::format_description::FormatItem>> = Lazy::new(|| {
454
2
                time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] UTC")
455
2
                    .expect("Invalid time format")
456
2
            });
457
8
            OffsetDateTime::from(t)
458
8
                .format(&FORMAT)
459
8
                .unwrap_or_else(|_| "(could not format)".into())
460
8
        }
461

            
462
22
        match &self {
463
4
            DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
464
6
            DirProgress::FetchingCerts { n_certs, .. } => write!(
465
6
                f,
466
6
                "fetching authority certificates ({}/{})",
467
6
                n_certs.0, n_certs.1
468
6
            ),
469
            DirProgress::Validated {
470
                usable: false,
471
8
                n_mds,
472
8
                ..
473
8
            } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
474
            DirProgress::Validated {
475
                usable: true,
476
4
                lifetime,
477
4
                ..
478
4
            } => write!(
479
4
                f,
480
4
                "usable, fresh until {}, and valid until {}",
481
4
                fmt_time(lifetime.fresh_until()),
482
4
                fmt_time(lifetime.valid_until())
483
4
            ),
484
        }
485
22
    }
486
}
487

            
488
impl fmt::Display for DirBootstrapStatus {
489
317
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490
317
        match &self.0 {
491
315
            StatusEnum::NoActivity => write!(f, "not downloading")?,
492
            StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
493
2
            StatusEnum::Replacing { current, next } => write!(
494
2
                f,
495
2
                "directory is {}; next directory is {}",
496
2
                current.status, next.status
497
2
            )?,
498
        }
499
317
        Ok(())
500
317
    }
501
}
502

            
503
impl DirBootstrapStatus {
504
    /// Return the current DirStatus.
505
    ///
506
    /// This is the _most complete_ status.  If we have any usable status, it is
507
    /// this one.
508
957
    fn current(&self) -> Option<&DirStatus> {
509
957
        match &self.0 {
510
947
            StatusEnum::NoActivity => None,
511
6
            StatusEnum::Single { current } => Some(&current.status),
512
4
            StatusEnum::Replacing { current, .. } => Some(&current.status),
513
        }
514
957
    }
515

            
516
    /// Return the next DirStatus, if there is one.
517
640
    fn next(&self) -> Option<&DirStatus> {
518
640
        match &self.0 {
519
6
            StatusEnum::Replacing { next, .. } => Some(&next.status),
520
634
            _ => None,
521
        }
522
640
    }
523

            
524
    /// Return the contained `DirStatus`es, in order: `current`, then `next`
525
    #[allow(clippy::implied_bounds_in_impls)]
526
634
    fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
527
634
        chain!(self.current(), self.next(),)
528
634
    }
529

            
530
    /// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
531
    #[allow(clippy::implied_bounds_in_impls)]
532
76
    fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
533
76
        let (current, next) = match &mut self.0 {
534
6
            StatusEnum::NoActivity => (None, None),
535
62
            StatusEnum::Single { current } => (Some(current), None),
536
8
            StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
537
        };
538
76
        chain!(current, next,)
539
76
    }
540

            
541
    /// Return the fraction of completion for directory download, in a form
542
    /// suitable for a progress bar at some particular time.
543
    ///
544
    /// This value is not monotonic, and can go down as one directory is
545
    /// replaced with another.
546
    ///
547
    /// Callers _should not_ depend on the specific meaning of any particular
548
    /// fraction; we may change these fractions in the future.
549
319
    pub fn frac_at(&self, when: SystemTime) -> f32 {
550
319
        self.statuses()
551
323
            .filter_map(|st| st.frac_at(when))
552
319
            .next()
553
319
            .unwrap_or(0.0)
554
319
    }
555

            
556
    /// Return true if this status indicates that we have a current usable
557
    /// directory.
558
    pub fn usable_at(&self, now: SystemTime) -> bool {
559
        if let Some(current) = self.current() {
560
            current.progress.usable() && current.okay_to_use_at(now)
561
        } else {
562
            false
563
        }
564
    }
565

            
566
    /// If there is a problem with our attempts to bootstrap, return a
567
    /// corresponding DirBlockage.  
568
315
    pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
569
315
        if let Some(current) = self.current() {
570
            if current.progress.usable() && current.declared_live_at(now) {
571
                // The current directory is sufficient, and not even a little bit
572
                // expired. There is no problem.
573
                return None;
574
            }
575
315
        }
576

            
577
        // Any blockage in "current" is more serious, so return that if there is one
578
315
        self.statuses().filter_map(|st| st.blockage()).next()
579
315
    }
580

            
581
    /// Return the appropriate DirStatus for `AttemptId`, constructing it if
582
    /// necessary.
583
    ///
584
    /// Return None if all relevant attempts are more recent than this Id.
585
    #[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
586
38
    fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
587
38
        // First, ensure that we have a *recent enough* attempt
588
38
        // Look for the latest attempt, and see if it's new enough; if not, start a new one.
589
38
        if self
590
38
            .entries_mut()
591
38
            .rev()
592
38
            .take(1)
593
54
            .find(|entry| entry.id >= attempt_id)
594
38
            .is_none()
595
        {
596
6
            let current = match std::mem::take(&mut self.0) {
597
6
                StatusEnum::NoActivity => None,
598
                StatusEnum::Single { current } => Some(current),
599
                StatusEnum::Replacing { current, .. } => Some(current),
600
            };
601
            // If we have a `current` already, we keep it, and restart `next`.
602
6
            let next = StatusEntry::new(attempt_id);
603
6
            self.0 = match current {
604
6
                None => StatusEnum::Single { current: next },
605
                Some(current) => StatusEnum::Replacing { current, next },
606
            };
607
32
        }
608

            
609
        // Find the entry with `attempt_id` and return it.
610
        // (Despite the above, there might not be one: maybe `attempt_id` is old.)
611
38
        self.entries_mut()
612
61
            .find(|entry| entry.id == attempt_id)
613
56
            .map(|entry| &mut entry.status)
614
38
    }
615

            
616
    /// If the "next" status is usable, replace the current status with it.
617
36
    fn advance_status(&mut self) {
618
36
        // TODO: should make sure that the compiler is smart enough to optimize
619
36
        // this mem::take() and replacement away, and turn it into a conditional
620
36
        // replacement?
621
36
        self.0 = match std::mem::take(&mut self.0) {
622
4
            StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
623
2
                StatusEnum::Single { current: next }
624
            }
625
34
            other => other,
626
        };
627
36
    }
628

            
629
    /// Update this status by replacing the `DirProgress` in its current status
630
    /// (or its next status) with `new_status`, as appropriate.
631
38
    pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
632
38
        if let Some(status) = self.mut_status_for(attempt_id) {
633
36
            let old_frac = status.frac();
634
36
            status.progress = new_progress;
635
36
            let new_frac = status.frac();
636
36
            if new_frac > old_frac {
637
6
                // This download has made progress: clear our count of errors
638
6
                // and stalls.
639
6
                status.n_errors = 0;
640
6
                status.n_stalls = 0;
641
30
            } else {
642
30
                // This download didn't make progress; increment the stall
643
30
                // count.
644
30
                status.n_stalls += 1;
645
30
            }
646
36
            self.advance_status();
647
2
        }
648
38
    }
649

            
650
    /// Update this status by noting that some errors have occurred in a given
651
    /// download attempt.
652
    pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
653
        if let Some(status) = self.mut_status_for(attempt_id) {
654
            status.n_errors += n_errors;
655
        }
656
    }
657

            
658
    /// Update this status by noting that we had to reset a given download attempt;
659
    pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
660
        if let Some(status) = self.mut_status_for(attempt_id) {
661
            status.n_resets += 1;
662
        }
663
    }
664
}
665

            
666
impl StatusEntry {
667
    /// Construct a new StatusEntry with a given attempt id, and no progress
668
    /// reported.
669
6
    fn new(id: AttemptId) -> Self {
670
6
        Self {
671
6
            id,
672
6
            status: DirStatus::default(),
673
6
        }
674
6
    }
675
}
676

            
677
impl DirStatus {
678
    /// Return the declared consensus lifetime for this directory, if we have one.
679
18
    fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
680
18
        match &self.progress {
681
4
            DirProgress::NoConsensus { .. } => None,
682
4
            DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
683
10
            DirProgress::Validated { lifetime, .. } => Some(lifetime),
684
        }
685
18
    }
686

            
687
    /// Return the consensus lifetime for this directory, if we have one, as
688
    /// modified by our skew-tolerance settings.
689
22
    fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
690
22
        match &self.progress {
691
6
            DirProgress::NoConsensus { .. } => None,
692
            DirProgress::FetchingCerts {
693
4
                usable_lifetime, ..
694
4
            } => Some(usable_lifetime),
695
            DirProgress::Validated {
696
12
                usable_lifetime, ..
697
12
            } => Some(usable_lifetime),
698
        }
699
22
    }
700

            
701
    /// Return true if the directory is valid at the given time, as modified by
702
    /// our clock skew settings.
703
10
    fn okay_to_use_at(&self, when: SystemTime) -> bool {
704
10
        self.usable_lifetime()
705
13
            .map(|lt| lt.valid_at(when))
706
10
            .unwrap_or(false)
707
10
    }
708

            
709
    /// Return true if the directory is valid at the given time, _unmodified_ by our
710
    /// clock skew settings.
711
    fn declared_live_at(&self, when: SystemTime) -> bool {
712
        self.declared_lifetime()
713
            .map(|lt| lt.valid_at(when))
714
            .unwrap_or(false)
715
    }
716

            
717
    /// As `frac`, but return None if this consensus is not valid at the given time,
718
    /// and down-rate expired consensuses that we're still willing to use.
719
18
    fn frac_at(&self, when: SystemTime) -> Option<f32> {
720
18
        if self
721
18
            .declared_lifetime()
722
25
            .map(|lt| lt.valid_at(when))
723
18
            .unwrap_or(false)
724
        {
725
            // We're officially okay to use this directory.
726
8
            Some(self.frac())
727
10
        } else if self.okay_to_use_at(when) {
728
            // This directory is a little expired, but only a little.
729
            Some(self.frac() * 0.9)
730
        } else {
731
10
            None
732
        }
733
18
    }
734

            
735
    /// Return the fraction of completion for directory download, in a form
736
    /// suitable for a progress bar.
737
    ///
738
    /// This is monotonically increasing for a single directory, but can go down
739
    /// as one directory is replaced with another.
740
    ///
741
    /// Callers _should not_ depend on the specific meaning of any particular
742
    /// fraction; we may change these fractions in the future.
743
86
    fn frac(&self) -> f32 {
744
86
        // We arbitrarily decide that 25% is downloading the consensus, 10% is
745
86
        // downloading the certificates, and the remaining 65% is downloading
746
86
        // the microdescriptors until we become usable.  We may want to re-tune that in the future, but
747
86
        // the documentation of this function should allow us to do so.
748
86
        match &self.progress {
749
64
            DirProgress::NoConsensus { .. } => 0.0,
750
4
            DirProgress::FetchingCerts { n_certs, .. } => {
751
4
                0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
752
            }
753
            DirProgress::Validated {
754
                usable: false,
755
14
                n_mds,
756
14
                ..
757
14
            } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
758
4
            DirProgress::Validated { usable: true, .. } => 1.0,
759
        }
760
86
    }
761

            
762
    /// If we think there is a problem with our bootstrapping process, return a
763
    /// [`DirBlockage`] to describe it.
764
    ///
765
    /// The caller may want to also check `usable_at` to avoid reporting trouble
766
    /// if the directory is currently usable.
767
    fn blockage(&self) -> Option<DirBlockage> {
768
        /// How many resets are sufficient for us to report a blockage?
769
        const RESET_THRESHOLD: usize = 2;
770
        /// How many errors are sufficient for us to report a blockage?
771
        const ERROR_THRESHOLD: usize = 6;
772
        /// How many no-progress download attempts are sufficient for us to
773
        /// report a blockage?
774
        const STALL_THRESHOLD: usize = 8;
775

            
776
        if self.n_resets >= RESET_THRESHOLD {
777
            Some(DirBlockage::TooManyResets)
778
        } else if self.n_errors >= ERROR_THRESHOLD {
779
            Some(DirBlockage::TooManyErrors)
780
        } else if self.n_stalls >= STALL_THRESHOLD {
781
            Some(DirBlockage::Stalled)
782
        } else {
783
            None
784
        }
785
    }
786
}
787

            
788
impl DirProgress {
789
    /// Return true if this progress indicates a usable directory.
790
6
    fn usable(&self) -> bool {
791
6
        matches!(self, DirProgress::Validated { usable: true, .. })
792
6
    }
793
}
794

            
795
/// A stream of [`DirBootstrapStatus`] events.
796
#[derive(Clone, Educe)]
797
#[educe(Debug)]
798
pub struct DirBootstrapEvents {
799
    /// The `postage::watch::Receiver` that we're wrapping.
800
    ///
801
    /// We wrap this type so that we don't expose its entire API, and so that we
802
    /// can migrate to some other implementation in the future if we want.
803
    #[educe(Debug(method = "skip_fmt"))]
804
    pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
805
}
806

            
807
impl Stream for DirBootstrapEvents {
808
    type Item = DirBootstrapStatus;
809

            
810
280
    fn poll_next(
811
280
        mut self: Pin<&mut Self>,
812
280
        cx: &mut std::task::Context<'_>,
813
280
    ) -> Poll<Option<Self::Item>> {
814
280
        self.inner.poll_next_unpin(cx)
815
280
    }
816
}
817

            
818
#[cfg(test)]
819
mod test {
820
    // @@ begin test lint list maintained by maint/add_warning @@
821
    #![allow(clippy::bool_assert_comparison)]
822
    #![allow(clippy::clone_on_copy)]
823
    #![allow(clippy::dbg_macro)]
824
    #![allow(clippy::mixed_attributes_style)]
825
    #![allow(clippy::print_stderr)]
826
    #![allow(clippy::print_stdout)]
827
    #![allow(clippy::single_char_pattern)]
828
    #![allow(clippy::unwrap_used)]
829
    #![allow(clippy::unchecked_duration_subtraction)]
830
    #![allow(clippy::useless_vec)]
831
    #![allow(clippy::needless_pass_by_value)]
832
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
833
    use std::time::Duration;
834

            
835
    use super::*;
836
    use float_eq::assert_float_eq;
837
    use futures::stream::StreamExt;
838
    use tor_rtcompat::test_with_all_runtimes;
839

            
840
    #[test]
841
    fn subscribe_and_publish() {
842
        test_with_all_runtimes!(|_rt| async {
843
            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
844
            let mut sub1 = publish.subscribe();
845
            publish.publish(DirEvent::NewConsensus);
846
            let mut sub2 = publish.subscribe();
847
            let ev = event_listener::Event::new();
848
            let lis = ev.listen();
849

            
850
            futures::join!(
851
                async {
852
                    // sub1 was created in time to see this event...
853
                    let val1 = sub1.next().await;
854
                    assert_eq!(val1, Some(DirEvent::NewConsensus));
855
                    ev.notify(1); // Tell the third task below to drop the publisher.
856
                    let val2 = sub1.next().await;
857
                    assert_eq!(val2, None);
858
                },
859
                async {
860
                    let val = sub2.next().await;
861
                    assert_eq!(val, None);
862
                },
863
                async {
864
                    lis.await;
865
                    drop(publish);
866
                }
867
            );
868
        });
869
    }
870

            
871
    #[test]
872
    fn receive_two() {
873
        test_with_all_runtimes!(|_rt| async {
874
            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
875

            
876
            let mut sub = publish.subscribe();
877
            let ev = event_listener::Event::new();
878
            let ev_lis = ev.listen();
879
            futures::join!(
880
                async {
881
                    let val1 = sub.next().await;
882
                    assert_eq!(val1, Some(DirEvent::NewDescriptors));
883
                    ev.notify(1);
884
                    let val2 = sub.next().await;
885
                    assert_eq!(val2, Some(DirEvent::NewConsensus));
886
                },
887
                async {
888
                    publish.publish(DirEvent::NewDescriptors);
889
                    ev_lis.await;
890
                    publish.publish(DirEvent::NewConsensus);
891
                }
892
            );
893
        });
894
    }
895

            
896
    #[test]
897
    fn two_publishers() {
898
        test_with_all_runtimes!(|_rt| async {
899
            let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
900
            let publish2 = publish1.clone();
901

            
902
            let mut sub = publish1.subscribe();
903
            let ev1 = event_listener::Event::new();
904
            let ev2 = event_listener::Event::new();
905
            let ev1_lis = ev1.listen();
906
            let ev2_lis = ev2.listen();
907
            futures::join!(
908
                async {
909
                    let mut count = [0_usize; 2];
910
                    // These awaits guarantee that we will see at least one event flag of each
911
                    // type, before the stream is dropped.
912
                    ev1_lis.await;
913
                    ev2_lis.await;
914
                    while let Some(e) = sub.next().await {
915
                        count[e.to_index() as usize] += 1;
916
                    }
917
                    assert!(count[0] > 0);
918
                    assert!(count[1] > 0);
919
                    assert!(count[0] <= 100);
920
                    assert!(count[1] <= 100);
921
                },
922
                async {
923
                    for _ in 0..100 {
924
                        publish1.publish(DirEvent::NewDescriptors);
925
                        ev1.notify(1);
926
                        tor_rtcompat::task::yield_now().await;
927
                    }
928
                    drop(publish1);
929
                },
930
                async {
931
                    for _ in 0..100 {
932
                        publish2.publish(DirEvent::NewConsensus);
933
                        ev2.notify(1);
934
                        tor_rtcompat::task::yield_now().await;
935
                    }
936
                    drop(publish2);
937
                }
938
            );
939
        });
940
    }
941

            
942
    #[test]
943
    fn receive_after_publishers_are_gone() {
944
        test_with_all_runtimes!(|_rt| async {
945
            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
946

            
947
            let mut sub = publish.subscribe();
948

            
949
            publish.publish(DirEvent::NewConsensus);
950
            drop(publish);
951
            let v = sub.next().await;
952
            assert_eq!(v, Some(DirEvent::NewConsensus));
953
            let v = sub.next().await;
954
            assert!(v.is_none());
955
        });
956
    }
957

            
958
    #[test]
959
    fn failed_conversion() {
960
        assert_eq!(DirEvent::from_index(999), None);
961
    }
962

            
963
    #[test]
964
    fn dir_status_basics() {
965
        let now = SystemTime::now();
966
        let hour = Duration::new(3600, 0);
967

            
968
        let nothing = DirStatus {
969
            progress: DirProgress::NoConsensus { after: None },
970
            ..Default::default()
971
        };
972
        let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
973
        let unval = DirStatus {
974
            progress: DirProgress::FetchingCerts {
975
                lifetime: lifetime.clone(),
976
                usable_lifetime: lifetime,
977
                n_certs: (3, 5),
978
            },
979
            ..Default::default()
980
        };
981
        let lifetime =
982
            netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
983
        let with_c = DirStatus {
984
            progress: DirProgress::Validated {
985
                lifetime: lifetime.clone(),
986
                usable_lifetime: lifetime,
987
                n_mds: (30, 40),
988
                usable: false,
989
            },
990
            ..Default::default()
991
        };
992

            
993
        // lifetime()
994
        assert!(nothing.usable_lifetime().is_none());
995
        assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
996
        assert_eq!(
997
            with_c.usable_lifetime().unwrap().valid_until(),
998
            now + hour * 3
999
        );
        // frac() (It's okay if we change the actual numbers here later; the
        // current ones are more or less arbitrary.)
        const TOL: f32 = 0.00001;
        assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
        assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
        assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
        // frac_at()
        let t1 = now + hour / 2;
        let t2 = t1 + hour * 2;
        assert!(nothing.frac_at(t1).is_none());
        assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
        assert!(with_c.frac_at(t1).is_none());
        assert!(nothing.frac_at(t2).is_none());
        assert!(unval.frac_at(t2).is_none());
        assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
    }
    #[test]
    fn dir_status_display() {
        use time::macros::datetime;
        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
        let hour = Duration::new(3600, 0);
        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
        let ds = DirStatus {
            progress: DirProgress::NoConsensus { after: None },
            ..Default::default()
        };
        assert_eq!(ds.to_string(), "fetching a consensus");
        let ds = DirStatus {
            progress: DirProgress::FetchingCerts {
                lifetime: lifetime.clone(),
                usable_lifetime: lifetime.clone(),
                n_certs: (3, 5),
            },
            ..Default::default()
        };
        assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
        let ds = DirStatus {
            progress: DirProgress::Validated {
                lifetime: lifetime.clone(),
                usable_lifetime: lifetime.clone(),
                n_mds: (30, 40),
                usable: false,
            },
            ..Default::default()
        };
        assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
        let ds = DirStatus {
            progress: DirProgress::Validated {
                lifetime: lifetime.clone(),
                usable_lifetime: lifetime,
                n_mds: (30, 40),
                usable: true,
            },
            ..Default::default()
        };
        assert_eq!(
            ds.to_string(),
            "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
        );
    }
    #[test]
    fn bootstrap_status() {
        use time::macros::datetime;
        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
        let hour = Duration::new(3600, 0);
        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
        let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
        let dp1 = DirProgress::Validated {
            lifetime: lifetime.clone(),
            usable_lifetime: lifetime.clone(),
            n_mds: (3, 40),
            usable: true,
        };
        let dp2 = DirProgress::Validated {
            lifetime: lifetime2.clone(),
            usable_lifetime: lifetime2.clone(),
            n_mds: (5, 40),
            usable: false,
        };
        let attempt1 = AttemptId::next();
        let attempt2 = AttemptId::next();
        let bs = DirBootstrapStatus(StatusEnum::Replacing {
            current: StatusEntry {
                id: attempt1,
                status: DirStatus {
                    progress: dp1.clone(),
                    ..Default::default()
                },
            },
            next: StatusEntry {
                id: attempt2,
                status: DirStatus {
                    progress: dp2.clone(),
                    ..Default::default()
                },
            },
        });
        assert_eq!(bs.to_string(),
            "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
        );
        const TOL: f32 = 0.00001;
        assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
        assert_float_eq!(
            bs.frac_at(t1 + hour * 3 + hour / 2),
            0.35 + 0.65 * 0.125,
            abs <= TOL
        );
        // Now try updating.
        // Case 1: we have a usable directory and the updated status isn't usable.
        let mut bs = bs;
        let dp3 = DirProgress::Validated {
            lifetime: lifetime2.clone(),
            usable_lifetime: lifetime2.clone(),
            n_mds: (10, 40),
            usable: false,
        };
        bs.update_progress(attempt2, dp3);
        assert!(matches!(
            bs.next().unwrap(),
            DirStatus {
                progress: DirProgress::Validated {
                    n_mds: (10, 40),
                    ..
                },
                ..
            }
        ));
        // Case 2: The new directory _is_ usable and newer.  It will replace the old one.
        let ds4 = DirStatus {
            progress: DirProgress::Validated {
                lifetime: lifetime2.clone(),
                usable_lifetime: lifetime2.clone(),
                n_mds: (20, 40),
                usable: true,
            },
            ..Default::default()
        };
        bs.update_progress(attempt2, ds4.progress);
        assert!(bs.next().is_none());
        assert_eq!(
            bs.current()
                .unwrap()
                .usable_lifetime()
                .unwrap()
                .valid_after(),
            lifetime2.valid_after()
        );
        // Case 3: The new directory is usable but older. Nothing will happen.
        bs.update_progress(attempt1, dp1);
        assert!(bs.next().as_ref().is_none());
        assert_ne!(
            bs.current()
                .unwrap()
                .usable_lifetime()
                .unwrap()
                .valid_after(),
            lifetime.valid_after()
        );
        // Case 4: starting with an unusable directory, we always replace.
        let mut bs = DirBootstrapStatus::default();
        assert!(!dp2.usable());
        assert!(bs.current().is_none());
        bs.update_progress(attempt2, dp2);
        assert!(bs.current().unwrap().usable_lifetime().is_some());
    }
}