tor_dirmgr/
event.rs

1//! Code for notifying other modules about changes in the directory.
2
3// TODO(nickm): After we have enough experience with this FlagPublisher, we
4// might want to make it a public interface. If we do it should probably move
5// into another crate.
6
7use std::{
8    fmt,
9    marker::PhantomData,
10    pin::Pin,
11    sync::{
12        atomic::{AtomicUsize, Ordering},
13        Arc,
14    },
15    task::Poll,
16    time::SystemTime,
17};
18
19use educe::Educe;
20use futures::{stream::Stream, Future, StreamExt};
21use itertools::chain;
22use paste::paste;
23use time::OffsetDateTime;
24use tor_basic_utils::skip_fmt;
25use tor_netdir::DirEvent;
26use tor_netdoc::doc::netstatus;
27
28#[cfg(feature = "bridge-client")]
29use tor_guardmgr::bridge::BridgeDescEvent;
30
31use crate::bootstrap::AttemptId;
32
33/// A trait to indicate something that can be published with [`FlagPublisher`].
34///
35/// Since the implementation of `FlagPublisher` requires that its events be
36/// represented as small integers, this trait is mainly about converting to and
37/// from those integers.
38pub(crate) trait FlagEvent: Sized {
39    /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
40    /// for this type.
41    ///
42    /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
43    /// known flags, and sometimes iterates over the whole vector.
44    const MAXIMUM: u16;
45    /// Convert this event into an index.
46    ///
47    /// For efficiency, indices should be small and densely packed.
48    fn to_index(self) -> u16;
49    /// Try to reconstruct an event from its index.  Return None if the index is
50    /// out-of-bounds.
51    fn from_index(flag: u16) -> Option<Self>;
52}
53
54/// Implements [`FlagEvent`] for a C-like enum
55///
56/// Requiremets:
57///
58///  * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
59///
60///  * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
61///    (for example using the `num_enum` crate).
62///
63///  * The discriminants must be densely allocated.
64///    This will be done automatically by the compiler
65///    if explicit discriminants are not specified.
66///    (This property is checked in a test.)
67///
68///  * The variants may not contain any data.
69///    This is required for correctness.
70///    We think it is checked if you use `num_enum::TryFromPrimitive`.
71///
72/// # Example
73///
74// Sadly, it does not appear to be possible to doctest a private macro.
75/// ```rust,ignore
76/// use num_enum::{IntoPrimitive, TryFromPrimitive};
77/// use strum::{EnumCount, EnumIter};
78///
79/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
80/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
81/// #[non_exhaustive]
82/// #[repr(u16)]
83/// pub enum DirEvent {
84///     NewConsensus,
85///     NewDescriptors,
86/// }
87///
88/// impl_FlagEvent!{ DirEvent }
89/// ```
90macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
91    impl FlagEvent for $ty {
92        const MAXIMUM: u16 = {
93            let count = <$ty as $crate::strum::EnumCount>::COUNT;
94            (count - 1) as u16
95        };
96        fn to_index(self) -> u16 {
97            self.into()
98        }
99        fn from_index(flag: u16) -> Option<Self> {
100            flag.try_into().ok()
101        }
102    }
103
104    #[test]
105    #[allow(non_snake_case)]
106    fn [< flagevent_test_variant_numbers_ $ty >]() {
107        for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
108            assert!(<$ty as FlagEvent>::to_index(variant) <=
109                    <$ty as FlagEvent>::MAXIMUM,
110                    "impl_FlagEvent only allowed if discriminators are dense");
111        }
112    }
113} } }
114
115impl_FlagEvent! { DirEvent }
116
117#[cfg(feature = "bridge-client")]
118impl_FlagEvent! { BridgeDescEvent }
119
120/// A publisher that broadcasts flag-level events to multiple subscribers.
121///
122/// Events with the same flag value may be coalesced: that is, if the same event
123/// is published ten times in a row, a subscriber may receive only a single
124/// notification of the event.
125///
126/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
127/// that can also broadcast events to everybody listening on the channel.
128///  Dropping the last handle closes all streams subscribed to it.
129pub(crate) struct FlagPublisher<F> {
130    /// Inner data shared by publishers and streams.
131    inner: Arc<Inner<F>>,
132}
133
134/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
135struct Inner<F> {
136    /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
137    event: event_listener::Event,
138    /// How many times has each event occurred, ever.
139    ///
140    /// (It is safe for this to wrap around.)
141    // TODO(nickm): I wish this could be an array, but const generics don't
142    // quite support that yet.
143    counts: Vec<AtomicUsize>, // I wish this could be an array.
144    /// How many publishers remain?
145    n_publishers: AtomicUsize,
146    /// Phantom member to provide correct covariance.
147    ///
148    /// The `fn` business is a covariance trick to include `F` without affecting
149    /// this object's Send/Sync status.
150    _phantom: PhantomData<fn(F) -> F>,
151}
152
153/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
154/// [`FlagPublisher`].
155pub(crate) struct FlagListener<F> {
156    /// What value of each flag's count have we seen most recently?  
157    ///
158    /// Note that we count the event as "received" only once for each observed
159    /// change in the flag's count, even if that count has changed by more than
160    /// 1.
161    my_counts: Vec<usize>,
162    /// An an `EventListener` that will be notified when events are published,
163    /// or when the final publisher is dropped.
164    ///
165    /// We must always have one of these available _before_ we check any counts
166    /// in self.inner.
167    listener: event_listener::EventListener,
168    /// Reference to shared data.
169    inner: Arc<Inner<F>>,
170}
171
172impl<F: FlagEvent> Default for FlagPublisher<F> {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178impl<F: FlagEvent> FlagPublisher<F> {
179    /// Construct a new FlagPublisher.
180    pub(crate) fn new() -> Self {
181        // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
182        // require AtomicUsize to be Clone.
183        let counts = std::iter::repeat_with(AtomicUsize::default)
184            .take(F::MAXIMUM as usize + 1)
185            .collect();
186        FlagPublisher {
187            inner: Arc::new(Inner {
188                event: event_listener::Event::new(),
189                counts,
190                n_publishers: AtomicUsize::new(1),
191                _phantom: PhantomData,
192            }),
193        }
194    }
195
196    /// Create a new subscription to this FlagPublisher.
197    pub(crate) fn subscribe(&self) -> FlagListener<F> {
198        // We need to do this event.listen before we check the counts; otherwise
199        // we could have a sequence where: we check the count, then the
200        // publisher increments the count, then the publisher calls
201        // event.notify(), and we call event.listen(). That would cause us to
202        // miss the increment.
203        let listener = self.inner.event.listen();
204
205        FlagListener {
206            my_counts: self
207                .inner
208                .counts
209                .iter()
210                .map(|a| a.load(Ordering::SeqCst))
211                .collect(),
212            listener,
213            inner: Arc::clone(&self.inner),
214        }
215    }
216
217    /// Tell every listener that the provided flag has been published.
218    pub(crate) fn publish(&self, flag: F) {
219        self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
220        self.inner.event.notify(usize::MAX);
221    }
222}
223
224impl<F> Clone for FlagPublisher<F> {
225    fn clone(&self) -> FlagPublisher<F> {
226        self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
227        FlagPublisher {
228            inner: Arc::clone(&self.inner),
229        }
230    }
231}
232
233// We must implement Drop to keep count publishers, and so that when the last
234// publisher goes away, we can wake up every listener  so that it notices that
235// the stream is now ended.
236impl<F> Drop for FlagPublisher<F> {
237    fn drop(&mut self) {
238        if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
239            // That was the last reference; we must notify the listeners.
240            self.inner.event.notify(usize::MAX);
241        }
242    }
243}
244
245impl<F: FlagEvent> Stream for FlagListener<F> {
246    type Item = F;
247
248    fn poll_next(
249        mut self: std::pin::Pin<&mut Self>,
250        cx: &mut std::task::Context<'_>,
251    ) -> std::task::Poll<Option<Self::Item>> {
252        loop {
253            // Notify the caller if any events are ready to fire.
254            for idx in 0..F::MAXIMUM as usize + 1 {
255                let cur = self.inner.counts[idx].load(Ordering::SeqCst);
256                // We don't have to use < here specifically, since any change
257                // indicates that the count has been modified. That lets us
258                // survive usize wraparound.
259                if cur != self.my_counts[idx] {
260                    self.my_counts[idx] = cur;
261                    return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
262                }
263            }
264
265            // At this point, notify the caller if there are no more publishers.
266            if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
267                return Poll::Ready(None);
268            }
269
270            if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
271                // Got a new notification; we must create a new event and continue the loop.
272                //
273                // See discussion in `FlagPublisher::subscribe()` for why we must always create
274                // this listener _before_ checking any flags.
275                self.listener = self.inner.event.listen();
276            } else {
277                // Nothing to do yet: put the listener back.
278                return Poll::Pending;
279            }
280        }
281    }
282}
283
284/// Description of the directory manager's current bootstrapping status.
285///
286/// This status does not necessarily increase monotonically: it can go backwards
287/// if (for example) our directory information expires before we're able to get
288/// new information.
289//
290// TODO(nickm): This type has gotten a bit large for being the type we send over
291// a `postage::watch`: perhaps we'd be better off having this information stored
292// in the guardmgr, and having only a summary of it sent over the
293// `postage::watch`.  But for now, let's not, unless it shows up in profiles.
294#[derive(Clone, Debug, Default)]
295pub struct DirBootstrapStatus(StatusEnum);
296
297/// The contents of a DirBootstrapStatus.
298///
299/// This is a separate type since we don't want to make these variables public.
300#[derive(Clone, Debug, Default)]
301enum StatusEnum {
302    /// There is no active attempt to load or fetch a directory.
303    #[default]
304    NoActivity,
305    /// We have only one attempt to fetch a directory.
306    Single {
307        /// The currently active directory attempt.
308        ///
309        /// We're either using this directory now, or we plan to use it as soon
310        /// as it's complete enough.
311        current: StatusEntry,
312    },
313    /// We have an existing directory attempt, but it's stale, and we're
314    /// fetching a new one to replace it.
315    ///
316    /// Invariant: `current.id < next.id`
317    Replacing {
318        /// The previous attempt's status.  It may still be trying to fetch
319        /// information if it has descriptors left to download.
320        current: StatusEntry,
321        /// The current attempt's status.  We are not yet using this directory
322        /// for our activity, since it does not (yet) have enough information.
323        next: StatusEntry,
324    },
325}
326
327/// The status and identifier of a single attempt to download a full directory.
328#[derive(Clone, Debug)]
329struct StatusEntry {
330    /// The identifier for this attempt.
331    id: AttemptId,
332    /// The latest status.
333    status: DirStatus,
334}
335
336/// The status for a single directory.
337#[derive(Clone, Debug, Default, derive_more::Display)]
338#[display("{0}", progress)]
339pub(crate) struct DirStatus {
340    /// How much of the directory do we currently have?
341    progress: DirProgress,
342    /// How many resets have been forced while fetching this directory?
343    n_resets: usize,
344    /// How many errors have we encountered since last we advanced the
345    /// 'progress' on this directory?
346    n_errors: usize,
347    /// How many times has an `update_progress` call not actually moved us
348    /// forward since we last advanced the 'progress' on this directory?
349    n_stalls: usize,
350}
351
352/// How much progress have we made in downloading a given directory?
353///
354/// This is a separate type so that we don't make the variants public.
355#[derive(Clone, Debug, Educe)]
356#[educe(Default)]
357pub(crate) enum DirProgress {
358    /// We don't have any information yet.
359    #[educe(Default)]
360    NoConsensus {
361        /// If present, we are fetching a consensus whose valid-after time
362        /// postdates this time.
363        #[allow(dead_code)]
364        after: Option<SystemTime>,
365    },
366    /// We've downloaded a consensus, but we haven't validated it yet.
367    FetchingCerts {
368        /// The actual declared lifetime of the consensus.
369        lifetime: netstatus::Lifetime,
370        /// The lifetime for which we are willing to use this consensus.  (This
371        /// may be broader than `lifetime`.)
372        usable_lifetime: netstatus::Lifetime,
373        /// A fraction (in (numerator,denominator) format) of the certificates
374        /// we have for this consensus.
375        n_certs: (u16, u16),
376    },
377    /// We've validated a consensus and we're fetching (or have fetched) its
378    /// microdescriptors.
379    Validated {
380        /// The actual declared lifetime of the consensus.
381        lifetime: netstatus::Lifetime,
382        /// The lifetime for which we are willing to use this consensus.  (This
383        /// may be broader than `lifetime`.)
384        usable_lifetime: netstatus::Lifetime,
385        /// A fraction (in (numerator,denominator) form) of the microdescriptors
386        /// that we have for this consensus.
387        n_mds: (u32, u32),
388        /// True iff we've decided that the consensus is usable.
389        usable: bool,
390        // TODO(nickm) Someday we could add a field about whether any primary
391        // guards are missing microdescriptors, to give a better explanation for
392        // the case where we won't switch our consensus because of that.
393    },
394}
395
396/// A reported diagnostic for what kind of trouble we've seen while trying to
397/// bootstrap a directory.
398///
399/// These blockages types are not yet terribly specific: if you encounter one,
400/// it's probably a good idea to check the logs to see what's really going on.
401///
402/// If you encounter connection blockage _and_ directory blockage at the same
403/// time, the connection blockage is almost certainly the real problem.
404//
405// TODO(nickm): At present these diagnostics aren't very helpful; they say too
406// much about _how we know_ that the process has gone wrong, but not so much
407// about _what the problem is_.  In the future, we may wish to look more closely
408// at what _kind_ of errors or resets we've seen, so we can report better
409// information. Probably, however, we should only do that after we get some
410// experience with which problems people encounter in practice, and what
411// diagnostics would be useful for them.
412#[derive(Clone, Debug, derive_more::Display)]
413#[non_exhaustive]
414pub enum DirBlockage {
415    /// We've been downloading information without error, but we haven't
416    /// actually been getting anything that we want.
417    ///
418    /// This might indicate that there's a problem with information propagating
419    /// through the Tor network, or it might indicate that a bogus consensus or
420    /// a bad clock has tricked us into asking for something that nobody has.
421    #[display("Can't make progress.")]
422    Stalled,
423    /// We've gotten a lot of errors without making forward progress on our
424    /// bootstrap attempt.
425    ///
426    /// This might indicate that something's wrong with the Tor network, or that
427    /// there's something buggy with our ability to handle directory responses.
428    /// It might also indicate a malfunction on our directory guards, or a bug
429    /// on our retry logic.
430    #[display("Too many errors without making progress.")]
431    TooManyErrors,
432    /// We've reset our bootstrap attempt a lot of times.
433    ///
434    /// This either indicates that we have been failing a lot for one of the
435    /// other reasons above, or that we keep getting served a consensus which
436    /// turns out, upon trying to fetch certificates, not to be usable.  It can
437    /// also indicate a bug in our retry logic.
438    #[display("Had to reset bootstrapping too many times.")]
439    TooManyResets,
440}
441
442impl fmt::Display for DirProgress {
443    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444        /// Format this time in a format useful for displaying
445        /// lifetime boundaries.
446        fn fmt_time(t: SystemTime) -> String {
447            use once_cell::sync::Lazy;
448            /// Formatter object for lifetime boundaries.
449            ///
450            /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
451            /// sub-second times here, and using non-UTC offsets is confusing
452            /// in this context.
453            static FORMAT: Lazy<Vec<time::format_description::FormatItem>> = Lazy::new(|| {
454                time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] UTC")
455                    .expect("Invalid time format")
456            });
457            OffsetDateTime::from(t)
458                .format(&FORMAT)
459                .unwrap_or_else(|_| "(could not format)".into())
460        }
461
462        match &self {
463            DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
464            DirProgress::FetchingCerts { n_certs, .. } => write!(
465                f,
466                "fetching authority certificates ({}/{})",
467                n_certs.0, n_certs.1
468            ),
469            DirProgress::Validated {
470                usable: false,
471                n_mds,
472                ..
473            } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
474            DirProgress::Validated {
475                usable: true,
476                lifetime,
477                ..
478            } => write!(
479                f,
480                "usable, fresh until {}, and valid until {}",
481                fmt_time(lifetime.fresh_until()),
482                fmt_time(lifetime.valid_until())
483            ),
484        }
485    }
486}
487
488impl fmt::Display for DirBootstrapStatus {
489    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490        match &self.0 {
491            StatusEnum::NoActivity => write!(f, "not downloading")?,
492            StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
493            StatusEnum::Replacing { current, next } => write!(
494                f,
495                "directory is {}; next directory is {}",
496                current.status, next.status
497            )?,
498        }
499        Ok(())
500    }
501}
502
503impl DirBootstrapStatus {
504    /// Return the current DirStatus.
505    ///
506    /// This is the _most complete_ status.  If we have any usable status, it is
507    /// this one.
508    fn current(&self) -> Option<&DirStatus> {
509        match &self.0 {
510            StatusEnum::NoActivity => None,
511            StatusEnum::Single { current } => Some(&current.status),
512            StatusEnum::Replacing { current, .. } => Some(&current.status),
513        }
514    }
515
516    /// Return the next DirStatus, if there is one.
517    fn next(&self) -> Option<&DirStatus> {
518        match &self.0 {
519            StatusEnum::Replacing { next, .. } => Some(&next.status),
520            _ => None,
521        }
522    }
523
524    /// Return the contained `DirStatus`es, in order: `current`, then `next`
525    #[allow(clippy::implied_bounds_in_impls)]
526    fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
527        chain!(self.current(), self.next(),)
528    }
529
530    /// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
531    #[allow(clippy::implied_bounds_in_impls)]
532    fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
533        let (current, next) = match &mut self.0 {
534            StatusEnum::NoActivity => (None, None),
535            StatusEnum::Single { current } => (Some(current), None),
536            StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
537        };
538        chain!(current, next,)
539    }
540
541    /// Return the fraction of completion for directory download, in a form
542    /// suitable for a progress bar at some particular time.
543    ///
544    /// This value is not monotonic, and can go down as one directory is
545    /// replaced with another.
546    ///
547    /// Callers _should not_ depend on the specific meaning of any particular
548    /// fraction; we may change these fractions in the future.
549    pub fn frac_at(&self, when: SystemTime) -> f32 {
550        self.statuses()
551            .filter_map(|st| st.frac_at(when))
552            .next()
553            .unwrap_or(0.0)
554    }
555
556    /// Return true if this status indicates that we have a current usable
557    /// directory.
558    pub fn usable_at(&self, now: SystemTime) -> bool {
559        if let Some(current) = self.current() {
560            current.progress.usable() && current.okay_to_use_at(now)
561        } else {
562            false
563        }
564    }
565
566    /// If there is a problem with our attempts to bootstrap, return a
567    /// corresponding DirBlockage.  
568    pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
569        if let Some(current) = self.current() {
570            if current.progress.usable() && current.declared_live_at(now) {
571                // The current directory is sufficient, and not even a little bit
572                // expired. There is no problem.
573                return None;
574            }
575        }
576
577        // Any blockage in "current" is more serious, so return that if there is one
578        self.statuses().filter_map(|st| st.blockage()).next()
579    }
580
581    /// Return the appropriate DirStatus for `AttemptId`, constructing it if
582    /// necessary.
583    ///
584    /// Return None if all relevant attempts are more recent than this Id.
585    #[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
586    fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
587        // First, ensure that we have a *recent enough* attempt
588        // Look for the latest attempt, and see if it's new enough; if not, start a new one.
589        if self
590            .entries_mut()
591            .rev()
592            .take(1)
593            .find(|entry| entry.id >= attempt_id)
594            .is_none()
595        {
596            let current = match std::mem::take(&mut self.0) {
597                StatusEnum::NoActivity => None,
598                StatusEnum::Single { current } => Some(current),
599                StatusEnum::Replacing { current, .. } => Some(current),
600            };
601            // If we have a `current` already, we keep it, and restart `next`.
602            let next = StatusEntry::new(attempt_id);
603            self.0 = match current {
604                None => StatusEnum::Single { current: next },
605                Some(current) => StatusEnum::Replacing { current, next },
606            };
607        }
608
609        // Find the entry with `attempt_id` and return it.
610        // (Despite the above, there might not be one: maybe `attempt_id` is old.)
611        self.entries_mut()
612            .find(|entry| entry.id == attempt_id)
613            .map(|entry| &mut entry.status)
614    }
615
616    /// If the "next" status is usable, replace the current status with it.
617    fn advance_status(&mut self) {
618        // TODO: should make sure that the compiler is smart enough to optimize
619        // this mem::take() and replacement away, and turn it into a conditional
620        // replacement?
621        self.0 = match std::mem::take(&mut self.0) {
622            StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
623                StatusEnum::Single { current: next }
624            }
625            other => other,
626        };
627    }
628
629    /// Update this status by replacing the `DirProgress` in its current status
630    /// (or its next status) with `new_status`, as appropriate.
631    pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
632        if let Some(status) = self.mut_status_for(attempt_id) {
633            let old_frac = status.frac();
634            status.progress = new_progress;
635            let new_frac = status.frac();
636            if new_frac > old_frac {
637                // This download has made progress: clear our count of errors
638                // and stalls.
639                status.n_errors = 0;
640                status.n_stalls = 0;
641            } else {
642                // This download didn't make progress; increment the stall
643                // count.
644                status.n_stalls += 1;
645            }
646            self.advance_status();
647        }
648    }
649
650    /// Update this status by noting that some errors have occurred in a given
651    /// download attempt.
652    pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
653        if let Some(status) = self.mut_status_for(attempt_id) {
654            status.n_errors += n_errors;
655        }
656    }
657
658    /// Update this status by noting that we had to reset a given download attempt;
659    pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
660        if let Some(status) = self.mut_status_for(attempt_id) {
661            status.n_resets += 1;
662        }
663    }
664}
665
666impl StatusEntry {
667    /// Construct a new StatusEntry with a given attempt id, and no progress
668    /// reported.
669    fn new(id: AttemptId) -> Self {
670        Self {
671            id,
672            status: DirStatus::default(),
673        }
674    }
675}
676
677impl DirStatus {
678    /// Return the declared consensus lifetime for this directory, if we have one.
679    fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
680        match &self.progress {
681            DirProgress::NoConsensus { .. } => None,
682            DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
683            DirProgress::Validated { lifetime, .. } => Some(lifetime),
684        }
685    }
686
687    /// Return the consensus lifetime for this directory, if we have one, as
688    /// modified by our skew-tolerance settings.
689    fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
690        match &self.progress {
691            DirProgress::NoConsensus { .. } => None,
692            DirProgress::FetchingCerts {
693                usable_lifetime, ..
694            } => Some(usable_lifetime),
695            DirProgress::Validated {
696                usable_lifetime, ..
697            } => Some(usable_lifetime),
698        }
699    }
700
701    /// Return true if the directory is valid at the given time, as modified by
702    /// our clock skew settings.
703    fn okay_to_use_at(&self, when: SystemTime) -> bool {
704        self.usable_lifetime()
705            .map(|lt| lt.valid_at(when))
706            .unwrap_or(false)
707    }
708
709    /// Return true if the directory is valid at the given time, _unmodified_ by our
710    /// clock skew settings.
711    fn declared_live_at(&self, when: SystemTime) -> bool {
712        self.declared_lifetime()
713            .map(|lt| lt.valid_at(when))
714            .unwrap_or(false)
715    }
716
717    /// As `frac`, but return None if this consensus is not valid at the given time,
718    /// and down-rate expired consensuses that we're still willing to use.
719    fn frac_at(&self, when: SystemTime) -> Option<f32> {
720        if self
721            .declared_lifetime()
722            .map(|lt| lt.valid_at(when))
723            .unwrap_or(false)
724        {
725            // We're officially okay to use this directory.
726            Some(self.frac())
727        } else if self.okay_to_use_at(when) {
728            // This directory is a little expired, but only a little.
729            Some(self.frac() * 0.9)
730        } else {
731            None
732        }
733    }
734
735    /// Return the fraction of completion for directory download, in a form
736    /// suitable for a progress bar.
737    ///
738    /// This is monotonically increasing for a single directory, but can go down
739    /// as one directory is replaced with another.
740    ///
741    /// Callers _should not_ depend on the specific meaning of any particular
742    /// fraction; we may change these fractions in the future.
743    fn frac(&self) -> f32 {
744        // We arbitrarily decide that 25% is downloading the consensus, 10% is
745        // downloading the certificates, and the remaining 65% is downloading
746        // the microdescriptors until we become usable.  We may want to re-tune that in the future, but
747        // the documentation of this function should allow us to do so.
748        match &self.progress {
749            DirProgress::NoConsensus { .. } => 0.0,
750            DirProgress::FetchingCerts { n_certs, .. } => {
751                0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
752            }
753            DirProgress::Validated {
754                usable: false,
755                n_mds,
756                ..
757            } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
758            DirProgress::Validated { usable: true, .. } => 1.0,
759        }
760    }
761
762    /// If we think there is a problem with our bootstrapping process, return a
763    /// [`DirBlockage`] to describe it.
764    ///
765    /// The caller may want to also check `usable_at` to avoid reporting trouble
766    /// if the directory is currently usable.
767    fn blockage(&self) -> Option<DirBlockage> {
768        /// How many resets are sufficient for us to report a blockage?
769        const RESET_THRESHOLD: usize = 2;
770        /// How many errors are sufficient for us to report a blockage?
771        const ERROR_THRESHOLD: usize = 6;
772        /// How many no-progress download attempts are sufficient for us to
773        /// report a blockage?
774        const STALL_THRESHOLD: usize = 8;
775
776        if self.n_resets >= RESET_THRESHOLD {
777            Some(DirBlockage::TooManyResets)
778        } else if self.n_errors >= ERROR_THRESHOLD {
779            Some(DirBlockage::TooManyErrors)
780        } else if self.n_stalls >= STALL_THRESHOLD {
781            Some(DirBlockage::Stalled)
782        } else {
783            None
784        }
785    }
786}
787
788impl DirProgress {
789    /// Return true if this progress indicates a usable directory.
790    fn usable(&self) -> bool {
791        matches!(self, DirProgress::Validated { usable: true, .. })
792    }
793}
794
795/// A stream of [`DirBootstrapStatus`] events.
796#[derive(Clone, Educe)]
797#[educe(Debug)]
798pub struct DirBootstrapEvents {
799    /// The `postage::watch::Receiver` that we're wrapping.
800    ///
801    /// We wrap this type so that we don't expose its entire API, and so that we
802    /// can migrate to some other implementation in the future if we want.
803    #[educe(Debug(method = "skip_fmt"))]
804    pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
805}
806
807impl Stream for DirBootstrapEvents {
808    type Item = DirBootstrapStatus;
809
810    fn poll_next(
811        mut self: Pin<&mut Self>,
812        cx: &mut std::task::Context<'_>,
813    ) -> Poll<Option<Self::Item>> {
814        self.inner.poll_next_unpin(cx)
815    }
816}
817
818#[cfg(test)]
819mod test {
820    // @@ begin test lint list maintained by maint/add_warning @@
821    #![allow(clippy::bool_assert_comparison)]
822    #![allow(clippy::clone_on_copy)]
823    #![allow(clippy::dbg_macro)]
824    #![allow(clippy::mixed_attributes_style)]
825    #![allow(clippy::print_stderr)]
826    #![allow(clippy::print_stdout)]
827    #![allow(clippy::single_char_pattern)]
828    #![allow(clippy::unwrap_used)]
829    #![allow(clippy::unchecked_duration_subtraction)]
830    #![allow(clippy::useless_vec)]
831    #![allow(clippy::needless_pass_by_value)]
832    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
833    use std::time::Duration;
834
835    use super::*;
836    use float_eq::assert_float_eq;
837    use futures::stream::StreamExt;
838    use tor_rtcompat::test_with_all_runtimes;
839
840    #[test]
841    fn subscribe_and_publish() {
842        test_with_all_runtimes!(|_rt| async {
843            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
844            let mut sub1 = publish.subscribe();
845            publish.publish(DirEvent::NewConsensus);
846            let mut sub2 = publish.subscribe();
847            let ev = event_listener::Event::new();
848            let lis = ev.listen();
849
850            futures::join!(
851                async {
852                    // sub1 was created in time to see this event...
853                    let val1 = sub1.next().await;
854                    assert_eq!(val1, Some(DirEvent::NewConsensus));
855                    ev.notify(1); // Tell the third task below to drop the publisher.
856                    let val2 = sub1.next().await;
857                    assert_eq!(val2, None);
858                },
859                async {
860                    let val = sub2.next().await;
861                    assert_eq!(val, None);
862                },
863                async {
864                    lis.await;
865                    drop(publish);
866                }
867            );
868        });
869    }
870
871    #[test]
872    fn receive_two() {
873        test_with_all_runtimes!(|_rt| async {
874            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
875
876            let mut sub = publish.subscribe();
877            let ev = event_listener::Event::new();
878            let ev_lis = ev.listen();
879            futures::join!(
880                async {
881                    let val1 = sub.next().await;
882                    assert_eq!(val1, Some(DirEvent::NewDescriptors));
883                    ev.notify(1);
884                    let val2 = sub.next().await;
885                    assert_eq!(val2, Some(DirEvent::NewConsensus));
886                },
887                async {
888                    publish.publish(DirEvent::NewDescriptors);
889                    ev_lis.await;
890                    publish.publish(DirEvent::NewConsensus);
891                }
892            );
893        });
894    }
895
896    #[test]
897    fn two_publishers() {
898        test_with_all_runtimes!(|_rt| async {
899            let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
900            let publish2 = publish1.clone();
901
902            let mut sub = publish1.subscribe();
903            let ev1 = event_listener::Event::new();
904            let ev2 = event_listener::Event::new();
905            let ev1_lis = ev1.listen();
906            let ev2_lis = ev2.listen();
907            futures::join!(
908                async {
909                    let mut count = [0_usize; 2];
910                    // These awaits guarantee that we will see at least one event flag of each
911                    // type, before the stream is dropped.
912                    ev1_lis.await;
913                    ev2_lis.await;
914                    while let Some(e) = sub.next().await {
915                        count[e.to_index() as usize] += 1;
916                    }
917                    assert!(count[0] > 0);
918                    assert!(count[1] > 0);
919                    assert!(count[0] <= 100);
920                    assert!(count[1] <= 100);
921                },
922                async {
923                    for _ in 0..100 {
924                        publish1.publish(DirEvent::NewDescriptors);
925                        ev1.notify(1);
926                        tor_rtcompat::task::yield_now().await;
927                    }
928                    drop(publish1);
929                },
930                async {
931                    for _ in 0..100 {
932                        publish2.publish(DirEvent::NewConsensus);
933                        ev2.notify(1);
934                        tor_rtcompat::task::yield_now().await;
935                    }
936                    drop(publish2);
937                }
938            );
939        });
940    }
941
942    #[test]
943    fn receive_after_publishers_are_gone() {
944        test_with_all_runtimes!(|_rt| async {
945            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
946
947            let mut sub = publish.subscribe();
948
949            publish.publish(DirEvent::NewConsensus);
950            drop(publish);
951            let v = sub.next().await;
952            assert_eq!(v, Some(DirEvent::NewConsensus));
953            let v = sub.next().await;
954            assert!(v.is_none());
955        });
956    }
957
958    #[test]
959    fn failed_conversion() {
960        assert_eq!(DirEvent::from_index(999), None);
961    }
962
963    #[test]
964    fn dir_status_basics() {
965        let now = SystemTime::now();
966        let hour = Duration::new(3600, 0);
967
968        let nothing = DirStatus {
969            progress: DirProgress::NoConsensus { after: None },
970            ..Default::default()
971        };
972        let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
973        let unval = DirStatus {
974            progress: DirProgress::FetchingCerts {
975                lifetime: lifetime.clone(),
976                usable_lifetime: lifetime,
977                n_certs: (3, 5),
978            },
979            ..Default::default()
980        };
981        let lifetime =
982            netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
983        let with_c = DirStatus {
984            progress: DirProgress::Validated {
985                lifetime: lifetime.clone(),
986                usable_lifetime: lifetime,
987                n_mds: (30, 40),
988                usable: false,
989            },
990            ..Default::default()
991        };
992
993        // lifetime()
994        assert!(nothing.usable_lifetime().is_none());
995        assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
996        assert_eq!(
997            with_c.usable_lifetime().unwrap().valid_until(),
998            now + hour * 3
999        );
1000
1001        // frac() (It's okay if we change the actual numbers here later; the
1002        // current ones are more or less arbitrary.)
1003        const TOL: f32 = 0.00001;
1004        assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
1005        assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
1006        assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
1007
1008        // frac_at()
1009        let t1 = now + hour / 2;
1010        let t2 = t1 + hour * 2;
1011        assert!(nothing.frac_at(t1).is_none());
1012        assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
1013        assert!(with_c.frac_at(t1).is_none());
1014        assert!(nothing.frac_at(t2).is_none());
1015        assert!(unval.frac_at(t2).is_none());
1016        assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
1017    }
1018
1019    #[test]
1020    fn dir_status_display() {
1021        use time::macros::datetime;
1022        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1023        let hour = Duration::new(3600, 0);
1024        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1025
1026        let ds = DirStatus {
1027            progress: DirProgress::NoConsensus { after: None },
1028            ..Default::default()
1029        };
1030        assert_eq!(ds.to_string(), "fetching a consensus");
1031
1032        let ds = DirStatus {
1033            progress: DirProgress::FetchingCerts {
1034                lifetime: lifetime.clone(),
1035                usable_lifetime: lifetime.clone(),
1036                n_certs: (3, 5),
1037            },
1038            ..Default::default()
1039        };
1040        assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
1041
1042        let ds = DirStatus {
1043            progress: DirProgress::Validated {
1044                lifetime: lifetime.clone(),
1045                usable_lifetime: lifetime.clone(),
1046                n_mds: (30, 40),
1047                usable: false,
1048            },
1049            ..Default::default()
1050        };
1051        assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
1052
1053        let ds = DirStatus {
1054            progress: DirProgress::Validated {
1055                lifetime: lifetime.clone(),
1056                usable_lifetime: lifetime,
1057                n_mds: (30, 40),
1058                usable: true,
1059            },
1060            ..Default::default()
1061        };
1062        assert_eq!(
1063            ds.to_string(),
1064            "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
1065        );
1066    }
1067
1068    #[test]
1069    fn bootstrap_status() {
1070        use time::macros::datetime;
1071        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1072        let hour = Duration::new(3600, 0);
1073        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1074        let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
1075
1076        let dp1 = DirProgress::Validated {
1077            lifetime: lifetime.clone(),
1078            usable_lifetime: lifetime.clone(),
1079            n_mds: (3, 40),
1080            usable: true,
1081        };
1082        let dp2 = DirProgress::Validated {
1083            lifetime: lifetime2.clone(),
1084            usable_lifetime: lifetime2.clone(),
1085            n_mds: (5, 40),
1086            usable: false,
1087        };
1088        let attempt1 = AttemptId::next();
1089        let attempt2 = AttemptId::next();
1090
1091        let bs = DirBootstrapStatus(StatusEnum::Replacing {
1092            current: StatusEntry {
1093                id: attempt1,
1094                status: DirStatus {
1095                    progress: dp1.clone(),
1096                    ..Default::default()
1097                },
1098            },
1099            next: StatusEntry {
1100                id: attempt2,
1101                status: DirStatus {
1102                    progress: dp2.clone(),
1103                    ..Default::default()
1104                },
1105            },
1106        });
1107
1108        assert_eq!(bs.to_string(),
1109            "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
1110        );
1111
1112        const TOL: f32 = 0.00001;
1113        assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
1114        assert_float_eq!(
1115            bs.frac_at(t1 + hour * 3 + hour / 2),
1116            0.35 + 0.65 * 0.125,
1117            abs <= TOL
1118        );
1119
1120        // Now try updating.
1121
1122        // Case 1: we have a usable directory and the updated status isn't usable.
1123        let mut bs = bs;
1124        let dp3 = DirProgress::Validated {
1125            lifetime: lifetime2.clone(),
1126            usable_lifetime: lifetime2.clone(),
1127            n_mds: (10, 40),
1128            usable: false,
1129        };
1130
1131        bs.update_progress(attempt2, dp3);
1132        assert!(matches!(
1133            bs.next().unwrap(),
1134            DirStatus {
1135                progress: DirProgress::Validated {
1136                    n_mds: (10, 40),
1137                    ..
1138                },
1139                ..
1140            }
1141        ));
1142
1143        // Case 2: The new directory _is_ usable and newer.  It will replace the old one.
1144        let ds4 = DirStatus {
1145            progress: DirProgress::Validated {
1146                lifetime: lifetime2.clone(),
1147                usable_lifetime: lifetime2.clone(),
1148                n_mds: (20, 40),
1149                usable: true,
1150            },
1151            ..Default::default()
1152        };
1153        bs.update_progress(attempt2, ds4.progress);
1154        assert!(bs.next().is_none());
1155        assert_eq!(
1156            bs.current()
1157                .unwrap()
1158                .usable_lifetime()
1159                .unwrap()
1160                .valid_after(),
1161            lifetime2.valid_after()
1162        );
1163
1164        // Case 3: The new directory is usable but older. Nothing will happen.
1165        bs.update_progress(attempt1, dp1);
1166        assert!(bs.next().as_ref().is_none());
1167        assert_ne!(
1168            bs.current()
1169                .unwrap()
1170                .usable_lifetime()
1171                .unwrap()
1172                .valid_after(),
1173            lifetime.valid_after()
1174        );
1175
1176        // Case 4: starting with an unusable directory, we always replace.
1177        let mut bs = DirBootstrapStatus::default();
1178        assert!(!dp2.usable());
1179        assert!(bs.current().is_none());
1180        bs.update_progress(attempt2, dp2);
1181        assert!(bs.current().unwrap().usable_lifetime().is_some());
1182    }
1183}