tor_dirmgr/
event.rs

1//! Code for notifying other modules about changes in the directory.
2
3// TODO(nickm): After we have enough experience with this FlagPublisher, we
4// might want to make it a public interface. If we do it should probably move
5// into another crate.
6
7use std::{
8    fmt,
9    marker::PhantomData,
10    pin::Pin,
11    sync::{
12        atomic::{AtomicUsize, Ordering},
13        Arc,
14    },
15    task::Poll,
16    time::SystemTime,
17};
18
19use educe::Educe;
20use futures::{stream::Stream, Future, StreamExt};
21use itertools::chain;
22use paste::paste;
23use time::OffsetDateTime;
24use tor_basic_utils::skip_fmt;
25use tor_netdir::DirEvent;
26use tor_netdoc::doc::netstatus;
27
28#[cfg(feature = "bridge-client")]
29use tor_guardmgr::bridge::BridgeDescEvent;
30
31use crate::bootstrap::AttemptId;
32
33/// A trait to indicate something that can be published with [`FlagPublisher`].
34///
35/// Since the implementation of `FlagPublisher` requires that its events be
36/// represented as small integers, this trait is mainly about converting to and
37/// from those integers.
38pub(crate) trait FlagEvent: Sized {
39    /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
40    /// for this type.
41    ///
42    /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
43    /// known flags, and sometimes iterates over the whole vector.
44    const MAXIMUM: u16;
45    /// Convert this event into an index.
46    ///
47    /// For efficiency, indices should be small and densely packed.
48    fn to_index(self) -> u16;
49    /// Try to reconstruct an event from its index.  Return None if the index is
50    /// out-of-bounds.
51    fn from_index(flag: u16) -> Option<Self>;
52}
53
54/// Implements [`FlagEvent`] for a C-like enum
55///
56/// Requiremets:
57///
58///  * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
59///
60///  * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
61///    (for example using the `num_enum` crate).
62///
63///  * The discriminants must be densely allocated.
64///    This will be done automatically by the compiler
65///    if explicit discriminants are not specified.
66///    (This property is checked in a test.)
67///
68///  * The variants may not contain any data.
69///    This is required for correctness.
70///    We think it is checked if you use `num_enum::TryFromPrimitive`.
71///
72/// # Example
73///
74// Sadly, it does not appear to be possible to doctest a private macro.
75/// ```rust,ignore
76/// use num_enum::{IntoPrimitive, TryFromPrimitive};
77/// use strum::{EnumCount, EnumIter};
78///
79/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
80/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
81/// #[non_exhaustive]
82/// #[repr(u16)]
83/// pub enum DirEvent {
84///     NewConsensus,
85///     NewDescriptors,
86/// }
87///
88/// impl_FlagEvent!{ DirEvent }
89/// ```
90macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
91    impl FlagEvent for $ty {
92        const MAXIMUM: u16 = {
93            let count = <$ty as $crate::strum::EnumCount>::COUNT;
94            (count - 1) as u16
95        };
96        fn to_index(self) -> u16 {
97            self.into()
98        }
99        fn from_index(flag: u16) -> Option<Self> {
100            flag.try_into().ok()
101        }
102    }
103
104    #[test]
105    #[allow(non_snake_case)]
106    fn [< flagevent_test_variant_numbers_ $ty >]() {
107        for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
108            assert!(<$ty as FlagEvent>::to_index(variant) <=
109                    <$ty as FlagEvent>::MAXIMUM,
110                    "impl_FlagEvent only allowed if discriminators are dense");
111        }
112    }
113} } }
114
115impl_FlagEvent! { DirEvent }
116
117#[cfg(feature = "bridge-client")]
118impl_FlagEvent! { BridgeDescEvent }
119
120/// A publisher that broadcasts flag-level events to multiple subscribers.
121///
122/// Events with the same flag value may be coalesced: that is, if the same event
123/// is published ten times in a row, a subscriber may receive only a single
124/// notification of the event.
125///
126/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
127/// that can also broadcast events to everybody listening on the channel.
128///  Dropping the last handle closes all streams subscribed to it.
129pub(crate) struct FlagPublisher<F> {
130    /// Inner data shared by publishers and streams.
131    inner: Arc<Inner<F>>,
132}
133
134/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
135struct Inner<F> {
136    /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
137    event: event_listener::Event,
138    /// How many times has each event occurred, ever.
139    ///
140    /// (It is safe for this to wrap around.)
141    // TODO(nickm): I wish this could be an array, but const generics don't
142    // quite support that yet.
143    counts: Vec<AtomicUsize>, // I wish this could be an array.
144    /// How many publishers remain?
145    n_publishers: AtomicUsize,
146    /// Phantom member to provide correct covariance.
147    ///
148    /// The `fn` business is a covariance trick to include `F` without affecting
149    /// this object's Send/Sync status.
150    _phantom: PhantomData<fn(F) -> F>,
151}
152
153/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
154/// [`FlagPublisher`].
155pub(crate) struct FlagListener<F> {
156    /// What value of each flag's count have we seen most recently?  
157    ///
158    /// Note that we count the event as "received" only once for each observed
159    /// change in the flag's count, even if that count has changed by more than
160    /// 1.
161    my_counts: Vec<usize>,
162    /// An an `EventListener` that will be notified when events are published,
163    /// or when the final publisher is dropped.
164    ///
165    /// We must always have one of these available _before_ we check any counts
166    /// in self.inner.
167    listener: event_listener::EventListener,
168    /// Reference to shared data.
169    inner: Arc<Inner<F>>,
170}
171
172impl<F: FlagEvent> Default for FlagPublisher<F> {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178impl<F: FlagEvent> FlagPublisher<F> {
179    /// Construct a new FlagPublisher.
180    pub(crate) fn new() -> Self {
181        // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
182        // require AtomicUsize to be Clone.
183        let counts = std::iter::repeat_with(AtomicUsize::default)
184            .take(F::MAXIMUM as usize + 1)
185            .collect();
186        FlagPublisher {
187            inner: Arc::new(Inner {
188                event: event_listener::Event::new(),
189                counts,
190                n_publishers: AtomicUsize::new(1),
191                _phantom: PhantomData,
192            }),
193        }
194    }
195
196    /// Create a new subscription to this FlagPublisher.
197    pub(crate) fn subscribe(&self) -> FlagListener<F> {
198        // We need to do this event.listen before we check the counts; otherwise
199        // we could have a sequence where: we check the count, then the
200        // publisher increments the count, then the publisher calls
201        // event.notify(), and we call event.listen(). That would cause us to
202        // miss the increment.
203        let listener = self.inner.event.listen();
204
205        FlagListener {
206            my_counts: self
207                .inner
208                .counts
209                .iter()
210                .map(|a| a.load(Ordering::SeqCst))
211                .collect(),
212            listener,
213            inner: Arc::clone(&self.inner),
214        }
215    }
216
217    /// Tell every listener that the provided flag has been published.
218    pub(crate) fn publish(&self, flag: F) {
219        self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
220        self.inner.event.notify(usize::MAX);
221    }
222}
223
224impl<F> Clone for FlagPublisher<F> {
225    fn clone(&self) -> FlagPublisher<F> {
226        self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
227        FlagPublisher {
228            inner: Arc::clone(&self.inner),
229        }
230    }
231}
232
233// We must implement Drop to keep count publishers, and so that when the last
234// publisher goes away, we can wake up every listener  so that it notices that
235// the stream is now ended.
236impl<F> Drop for FlagPublisher<F> {
237    fn drop(&mut self) {
238        if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
239            // That was the last reference; we must notify the listeners.
240            self.inner.event.notify(usize::MAX);
241        }
242    }
243}
244
245impl<F: FlagEvent> Stream for FlagListener<F> {
246    type Item = F;
247
248    fn poll_next(
249        mut self: std::pin::Pin<&mut Self>,
250        cx: &mut std::task::Context<'_>,
251    ) -> std::task::Poll<Option<Self::Item>> {
252        loop {
253            // Notify the caller if any events are ready to fire.
254            for idx in 0..F::MAXIMUM as usize + 1 {
255                let cur = self.inner.counts[idx].load(Ordering::SeqCst);
256                // We don't have to use < here specifically, since any change
257                // indicates that the count has been modified. That lets us
258                // survive usize wraparound.
259                if cur != self.my_counts[idx] {
260                    self.my_counts[idx] = cur;
261                    return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
262                }
263            }
264
265            // At this point, notify the caller if there are no more publishers.
266            if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
267                return Poll::Ready(None);
268            }
269
270            if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
271                // Got a new notification; we must create a new event and continue the loop.
272                //
273                // See discussion in `FlagPublisher::subscribe()` for why we must always create
274                // this listener _before_ checking any flags.
275                self.listener = self.inner.event.listen();
276            } else {
277                // Nothing to do yet: put the listener back.
278                return Poll::Pending;
279            }
280        }
281    }
282}
283
284/// Description of the directory manager's current bootstrapping status.
285///
286/// This status does not necessarily increase monotonically: it can go backwards
287/// if (for example) our directory information expires before we're able to get
288/// new information.
289//
290// TODO(nickm): This type has gotten a bit large for being the type we send over
291// a `postage::watch`: perhaps we'd be better off having this information stored
292// in the guardmgr, and having only a summary of it sent over the
293// `postage::watch`.  But for now, let's not, unless it shows up in profiles.
294#[derive(Clone, Debug, Default)]
295pub struct DirBootstrapStatus(StatusEnum);
296
297/// The contents of a DirBootstrapStatus.
298///
299/// This is a separate type since we don't want to make these variables public.
300#[derive(Clone, Debug, Default)]
301enum StatusEnum {
302    /// There is no active attempt to load or fetch a directory.
303    #[default]
304    NoActivity,
305    /// We have only one attempt to fetch a directory.
306    Single {
307        /// The currently active directory attempt.
308        ///
309        /// We're either using this directory now, or we plan to use it as soon
310        /// as it's complete enough.
311        current: StatusEntry,
312    },
313    /// We have an existing directory attempt, but it's stale, and we're
314    /// fetching a new one to replace it.
315    ///
316    /// Invariant: `current.id < next.id`
317    Replacing {
318        /// The previous attempt's status.  It may still be trying to fetch
319        /// information if it has descriptors left to download.
320        current: StatusEntry,
321        /// The current attempt's status.  We are not yet using this directory
322        /// for our activity, since it does not (yet) have enough information.
323        next: StatusEntry,
324    },
325}
326
327/// The status and identifier of a single attempt to download a full directory.
328#[derive(Clone, Debug)]
329struct StatusEntry {
330    /// The identifier for this attempt.
331    id: AttemptId,
332    /// The latest status.
333    status: DirStatus,
334}
335
336/// The status for a single directory.
337#[derive(Clone, Debug, Default, derive_more::Display)]
338#[display("{0}", progress)]
339pub(crate) struct DirStatus {
340    /// How much of the directory do we currently have?
341    progress: DirProgress,
342    /// How many resets have been forced while fetching this directory?
343    n_resets: usize,
344    /// How many errors have we encountered since last we advanced the
345    /// 'progress' on this directory?
346    n_errors: usize,
347    /// How many times has an `update_progress` call not actually moved us
348    /// forward since we last advanced the 'progress' on this directory?
349    n_stalls: usize,
350}
351
352/// How much progress have we made in downloading a given directory?
353///
354/// This is a separate type so that we don't make the variants public.
355#[derive(Clone, Debug, Educe)]
356#[educe(Default)]
357pub(crate) enum DirProgress {
358    /// We don't have any information yet.
359    #[educe(Default)]
360    NoConsensus {
361        /// If present, we are fetching a consensus whose valid-after time
362        /// postdates this time.
363        #[allow(dead_code)]
364        after: Option<SystemTime>,
365    },
366    /// We've downloaded a consensus, but we haven't validated it yet.
367    FetchingCerts {
368        /// The actual declared lifetime of the consensus.
369        lifetime: netstatus::Lifetime,
370        /// The lifetime for which we are willing to use this consensus.  (This
371        /// may be broader than `lifetime`.)
372        usable_lifetime: netstatus::Lifetime,
373        /// A fraction (in (numerator,denominator) format) of the certificates
374        /// we have for this consensus.
375        n_certs: (u16, u16),
376    },
377    /// We've validated a consensus and we're fetching (or have fetched) its
378    /// microdescriptors.
379    Validated {
380        /// The actual declared lifetime of the consensus.
381        lifetime: netstatus::Lifetime,
382        /// The lifetime for which we are willing to use this consensus.  (This
383        /// may be broader than `lifetime`.)
384        usable_lifetime: netstatus::Lifetime,
385        /// A fraction (in (numerator,denominator) form) of the microdescriptors
386        /// that we have for this consensus.
387        n_mds: (u32, u32),
388        /// True iff we've decided that the consensus is usable.
389        usable: bool,
390        // TODO(nickm) Someday we could add a field about whether any primary
391        // guards are missing microdescriptors, to give a better explanation for
392        // the case where we won't switch our consensus because of that.
393    },
394}
395
396/// A reported diagnostic for what kind of trouble we've seen while trying to
397/// bootstrap a directory.
398///
399/// These blockages types are not yet terribly specific: if you encounter one,
400/// it's probably a good idea to check the logs to see what's really going on.
401///
402/// If you encounter connection blockage _and_ directory blockage at the same
403/// time, the connection blockage is almost certainly the real problem.
404//
405// TODO(nickm): At present these diagnostics aren't very helpful; they say too
406// much about _how we know_ that the process has gone wrong, but not so much
407// about _what the problem is_.  In the future, we may wish to look more closely
408// at what _kind_ of errors or resets we've seen, so we can report better
409// information. Probably, however, we should only do that after we get some
410// experience with which problems people encounter in practice, and what
411// diagnostics would be useful for them.
412#[derive(Clone, Debug, derive_more::Display)]
413#[non_exhaustive]
414pub enum DirBlockage {
415    /// We've been downloading information without error, but we haven't
416    /// actually been getting anything that we want.
417    ///
418    /// This might indicate that there's a problem with information propagating
419    /// through the Tor network, or it might indicate that a bogus consensus or
420    /// a bad clock has tricked us into asking for something that nobody has.
421    #[display("Can't make progress.")]
422    Stalled,
423    /// We've gotten a lot of errors without making forward progress on our
424    /// bootstrap attempt.
425    ///
426    /// This might indicate that something's wrong with the Tor network, or that
427    /// there's something buggy with our ability to handle directory responses.
428    /// It might also indicate a malfunction on our directory guards, or a bug
429    /// on our retry logic.
430    #[display("Too many errors without making progress.")]
431    TooManyErrors,
432    /// We've reset our bootstrap attempt a lot of times.
433    ///
434    /// This either indicates that we have been failing a lot for one of the
435    /// other reasons above, or that we keep getting served a consensus which
436    /// turns out, upon trying to fetch certificates, not to be usable.  It can
437    /// also indicate a bug in our retry logic.
438    #[display("Had to reset bootstrapping too many times.")]
439    TooManyResets,
440}
441
442impl fmt::Display for DirProgress {
443    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444        /// Format this time in a format useful for displaying
445        /// lifetime boundaries.
446        fn fmt_time(t: SystemTime) -> String {
447            use std::sync::LazyLock;
448            /// Formatter object for lifetime boundaries.
449            ///
450            /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
451            /// sub-second times here, and using non-UTC offsets is confusing
452            /// in this context.
453            static FORMAT: LazyLock<Vec<time::format_description::FormatItem>> =
454                LazyLock::new(|| {
455                    time::format_description::parse(
456                        "[year]-[month]-[day] [hour]:[minute]:[second] UTC",
457                    )
458                    .expect("Invalid time format")
459                });
460            OffsetDateTime::from(t)
461                .format(&FORMAT)
462                .unwrap_or_else(|_| "(could not format)".into())
463        }
464
465        match &self {
466            DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
467            DirProgress::FetchingCerts { n_certs, .. } => write!(
468                f,
469                "fetching authority certificates ({}/{})",
470                n_certs.0, n_certs.1
471            ),
472            DirProgress::Validated {
473                usable: false,
474                n_mds,
475                ..
476            } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
477            DirProgress::Validated {
478                usable: true,
479                lifetime,
480                ..
481            } => write!(
482                f,
483                "usable, fresh until {}, and valid until {}",
484                fmt_time(lifetime.fresh_until()),
485                fmt_time(lifetime.valid_until())
486            ),
487        }
488    }
489}
490
491impl fmt::Display for DirBootstrapStatus {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        match &self.0 {
494            StatusEnum::NoActivity => write!(f, "not downloading")?,
495            StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
496            StatusEnum::Replacing { current, next } => write!(
497                f,
498                "directory is {}; next directory is {}",
499                current.status, next.status
500            )?,
501        }
502        Ok(())
503    }
504}
505
506impl DirBootstrapStatus {
507    /// Return the current DirStatus.
508    ///
509    /// This is the _most complete_ status.  If we have any usable status, it is
510    /// this one.
511    fn current(&self) -> Option<&DirStatus> {
512        match &self.0 {
513            StatusEnum::NoActivity => None,
514            StatusEnum::Single { current } => Some(&current.status),
515            StatusEnum::Replacing { current, .. } => Some(&current.status),
516        }
517    }
518
519    /// Return the next DirStatus, if there is one.
520    fn next(&self) -> Option<&DirStatus> {
521        match &self.0 {
522            StatusEnum::Replacing { next, .. } => Some(&next.status),
523            _ => None,
524        }
525    }
526
527    /// Return the contained `DirStatus`es, in order: `current`, then `next`
528    #[allow(clippy::implied_bounds_in_impls)]
529    fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
530        chain!(self.current(), self.next(),)
531    }
532
533    /// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
534    #[allow(clippy::implied_bounds_in_impls)]
535    fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
536        let (current, next) = match &mut self.0 {
537            StatusEnum::NoActivity => (None, None),
538            StatusEnum::Single { current } => (Some(current), None),
539            StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
540        };
541        chain!(current, next,)
542    }
543
544    /// Return the fraction of completion for directory download, in a form
545    /// suitable for a progress bar at some particular time.
546    ///
547    /// This value is not monotonic, and can go down as one directory is
548    /// replaced with another.
549    ///
550    /// Callers _should not_ depend on the specific meaning of any particular
551    /// fraction; we may change these fractions in the future.
552    pub fn frac_at(&self, when: SystemTime) -> f32 {
553        self.statuses()
554            .filter_map(|st| st.frac_at(when))
555            .next()
556            .unwrap_or(0.0)
557    }
558
559    /// Return true if this status indicates that we have a current usable
560    /// directory.
561    pub fn usable_at(&self, now: SystemTime) -> bool {
562        if let Some(current) = self.current() {
563            current.progress.usable() && current.okay_to_use_at(now)
564        } else {
565            false
566        }
567    }
568
569    /// If there is a problem with our attempts to bootstrap, return a
570    /// corresponding DirBlockage.  
571    pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
572        if let Some(current) = self.current() {
573            if current.progress.usable() && current.declared_live_at(now) {
574                // The current directory is sufficient, and not even a little bit
575                // expired. There is no problem.
576                return None;
577            }
578        }
579
580        // Any blockage in "current" is more serious, so return that if there is one
581        self.statuses().filter_map(|st| st.blockage()).next()
582    }
583
584    /// Return the appropriate DirStatus for `AttemptId`, constructing it if
585    /// necessary.
586    ///
587    /// Return None if all relevant attempts are more recent than this Id.
588    #[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
589    fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
590        // First, ensure that we have a *recent enough* attempt
591        // Look for the latest attempt, and see if it's new enough; if not, start a new one.
592        if self
593            .entries_mut()
594            .rev()
595            .take(1)
596            .find(|entry| entry.id >= attempt_id)
597            .is_none()
598        {
599            let current = match std::mem::take(&mut self.0) {
600                StatusEnum::NoActivity => None,
601                StatusEnum::Single { current } => Some(current),
602                StatusEnum::Replacing { current, .. } => Some(current),
603            };
604            // If we have a `current` already, we keep it, and restart `next`.
605            let next = StatusEntry::new(attempt_id);
606            self.0 = match current {
607                None => StatusEnum::Single { current: next },
608                Some(current) => StatusEnum::Replacing { current, next },
609            };
610        }
611
612        // Find the entry with `attempt_id` and return it.
613        // (Despite the above, there might not be one: maybe `attempt_id` is old.)
614        self.entries_mut()
615            .find(|entry| entry.id == attempt_id)
616            .map(|entry| &mut entry.status)
617    }
618
619    /// If the "next" status is usable, replace the current status with it.
620    fn advance_status(&mut self) {
621        // TODO: should make sure that the compiler is smart enough to optimize
622        // this mem::take() and replacement away, and turn it into a conditional
623        // replacement?
624        self.0 = match std::mem::take(&mut self.0) {
625            StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
626                StatusEnum::Single { current: next }
627            }
628            other => other,
629        };
630    }
631
632    /// Update this status by replacing the `DirProgress` in its current status
633    /// (or its next status) with `new_status`, as appropriate.
634    pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
635        if let Some(status) = self.mut_status_for(attempt_id) {
636            let old_frac = status.frac();
637            status.progress = new_progress;
638            let new_frac = status.frac();
639            if new_frac > old_frac {
640                // This download has made progress: clear our count of errors
641                // and stalls.
642                status.n_errors = 0;
643                status.n_stalls = 0;
644            } else {
645                // This download didn't make progress; increment the stall
646                // count.
647                status.n_stalls += 1;
648            }
649            self.advance_status();
650        }
651    }
652
653    /// Update this status by noting that some errors have occurred in a given
654    /// download attempt.
655    pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
656        if let Some(status) = self.mut_status_for(attempt_id) {
657            status.n_errors += n_errors;
658        }
659    }
660
661    /// Update this status by noting that we had to reset a given download attempt;
662    pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
663        if let Some(status) = self.mut_status_for(attempt_id) {
664            status.n_resets += 1;
665        }
666    }
667}
668
669impl StatusEntry {
670    /// Construct a new StatusEntry with a given attempt id, and no progress
671    /// reported.
672    fn new(id: AttemptId) -> Self {
673        Self {
674            id,
675            status: DirStatus::default(),
676        }
677    }
678}
679
680impl DirStatus {
681    /// Return the declared consensus lifetime for this directory, if we have one.
682    fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
683        match &self.progress {
684            DirProgress::NoConsensus { .. } => None,
685            DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
686            DirProgress::Validated { lifetime, .. } => Some(lifetime),
687        }
688    }
689
690    /// Return the consensus lifetime for this directory, if we have one, as
691    /// modified by our skew-tolerance settings.
692    fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
693        match &self.progress {
694            DirProgress::NoConsensus { .. } => None,
695            DirProgress::FetchingCerts {
696                usable_lifetime, ..
697            } => Some(usable_lifetime),
698            DirProgress::Validated {
699                usable_lifetime, ..
700            } => Some(usable_lifetime),
701        }
702    }
703
704    /// Return true if the directory is valid at the given time, as modified by
705    /// our clock skew settings.
706    fn okay_to_use_at(&self, when: SystemTime) -> bool {
707        self.usable_lifetime()
708            .map(|lt| lt.valid_at(when))
709            .unwrap_or(false)
710    }
711
712    /// Return true if the directory is valid at the given time, _unmodified_ by our
713    /// clock skew settings.
714    fn declared_live_at(&self, when: SystemTime) -> bool {
715        self.declared_lifetime()
716            .map(|lt| lt.valid_at(when))
717            .unwrap_or(false)
718    }
719
720    /// As `frac`, but return None if this consensus is not valid at the given time,
721    /// and down-rate expired consensuses that we're still willing to use.
722    fn frac_at(&self, when: SystemTime) -> Option<f32> {
723        if self
724            .declared_lifetime()
725            .map(|lt| lt.valid_at(when))
726            .unwrap_or(false)
727        {
728            // We're officially okay to use this directory.
729            Some(self.frac())
730        } else if self.okay_to_use_at(when) {
731            // This directory is a little expired, but only a little.
732            Some(self.frac() * 0.9)
733        } else {
734            None
735        }
736    }
737
738    /// Return the fraction of completion for directory download, in a form
739    /// suitable for a progress bar.
740    ///
741    /// This is monotonically increasing for a single directory, but can go down
742    /// as one directory is replaced with another.
743    ///
744    /// Callers _should not_ depend on the specific meaning of any particular
745    /// fraction; we may change these fractions in the future.
746    fn frac(&self) -> f32 {
747        // We arbitrarily decide that 25% is downloading the consensus, 10% is
748        // downloading the certificates, and the remaining 65% is downloading
749        // the microdescriptors until we become usable.  We may want to re-tune that in the future, but
750        // the documentation of this function should allow us to do so.
751        match &self.progress {
752            DirProgress::NoConsensus { .. } => 0.0,
753            DirProgress::FetchingCerts { n_certs, .. } => {
754                0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
755            }
756            DirProgress::Validated {
757                usable: false,
758                n_mds,
759                ..
760            } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
761            DirProgress::Validated { usable: true, .. } => 1.0,
762        }
763    }
764
765    /// If we think there is a problem with our bootstrapping process, return a
766    /// [`DirBlockage`] to describe it.
767    ///
768    /// The caller may want to also check `usable_at` to avoid reporting trouble
769    /// if the directory is currently usable.
770    fn blockage(&self) -> Option<DirBlockage> {
771        /// How many resets are sufficient for us to report a blockage?
772        const RESET_THRESHOLD: usize = 2;
773        /// How many errors are sufficient for us to report a blockage?
774        const ERROR_THRESHOLD: usize = 6;
775        /// How many no-progress download attempts are sufficient for us to
776        /// report a blockage?
777        const STALL_THRESHOLD: usize = 8;
778
779        if self.n_resets >= RESET_THRESHOLD {
780            Some(DirBlockage::TooManyResets)
781        } else if self.n_errors >= ERROR_THRESHOLD {
782            Some(DirBlockage::TooManyErrors)
783        } else if self.n_stalls >= STALL_THRESHOLD {
784            Some(DirBlockage::Stalled)
785        } else {
786            None
787        }
788    }
789}
790
791impl DirProgress {
792    /// Return true if this progress indicates a usable directory.
793    fn usable(&self) -> bool {
794        matches!(self, DirProgress::Validated { usable: true, .. })
795    }
796}
797
798/// A stream of [`DirBootstrapStatus`] events.
799#[derive(Clone, Educe)]
800#[educe(Debug)]
801pub struct DirBootstrapEvents {
802    /// The `postage::watch::Receiver` that we're wrapping.
803    ///
804    /// We wrap this type so that we don't expose its entire API, and so that we
805    /// can migrate to some other implementation in the future if we want.
806    #[educe(Debug(method = "skip_fmt"))]
807    pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
808}
809
810impl Stream for DirBootstrapEvents {
811    type Item = DirBootstrapStatus;
812
813    fn poll_next(
814        mut self: Pin<&mut Self>,
815        cx: &mut std::task::Context<'_>,
816    ) -> Poll<Option<Self::Item>> {
817        self.inner.poll_next_unpin(cx)
818    }
819}
820
821#[cfg(test)]
822mod test {
823    // @@ begin test lint list maintained by maint/add_warning @@
824    #![allow(clippy::bool_assert_comparison)]
825    #![allow(clippy::clone_on_copy)]
826    #![allow(clippy::dbg_macro)]
827    #![allow(clippy::mixed_attributes_style)]
828    #![allow(clippy::print_stderr)]
829    #![allow(clippy::print_stdout)]
830    #![allow(clippy::single_char_pattern)]
831    #![allow(clippy::unwrap_used)]
832    #![allow(clippy::unchecked_duration_subtraction)]
833    #![allow(clippy::useless_vec)]
834    #![allow(clippy::needless_pass_by_value)]
835    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
836    use std::time::Duration;
837
838    use super::*;
839    use float_eq::assert_float_eq;
840    use futures::stream::StreamExt;
841    use tor_rtcompat::test_with_all_runtimes;
842
843    #[test]
844    fn subscribe_and_publish() {
845        test_with_all_runtimes!(|_rt| async {
846            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
847            let mut sub1 = publish.subscribe();
848            publish.publish(DirEvent::NewConsensus);
849            let mut sub2 = publish.subscribe();
850            let ev = event_listener::Event::new();
851            let lis = ev.listen();
852
853            futures::join!(
854                async {
855                    // sub1 was created in time to see this event...
856                    let val1 = sub1.next().await;
857                    assert_eq!(val1, Some(DirEvent::NewConsensus));
858                    ev.notify(1); // Tell the third task below to drop the publisher.
859                    let val2 = sub1.next().await;
860                    assert_eq!(val2, None);
861                },
862                async {
863                    let val = sub2.next().await;
864                    assert_eq!(val, None);
865                },
866                async {
867                    lis.await;
868                    drop(publish);
869                }
870            );
871        });
872    }
873
874    #[test]
875    fn receive_two() {
876        test_with_all_runtimes!(|_rt| async {
877            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
878
879            let mut sub = publish.subscribe();
880            let ev = event_listener::Event::new();
881            let ev_lis = ev.listen();
882            futures::join!(
883                async {
884                    let val1 = sub.next().await;
885                    assert_eq!(val1, Some(DirEvent::NewDescriptors));
886                    ev.notify(1);
887                    let val2 = sub.next().await;
888                    assert_eq!(val2, Some(DirEvent::NewConsensus));
889                },
890                async {
891                    publish.publish(DirEvent::NewDescriptors);
892                    ev_lis.await;
893                    publish.publish(DirEvent::NewConsensus);
894                }
895            );
896        });
897    }
898
899    #[test]
900    fn two_publishers() {
901        test_with_all_runtimes!(|_rt| async {
902            let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
903            let publish2 = publish1.clone();
904
905            let mut sub = publish1.subscribe();
906            let ev1 = event_listener::Event::new();
907            let ev2 = event_listener::Event::new();
908            let ev1_lis = ev1.listen();
909            let ev2_lis = ev2.listen();
910            futures::join!(
911                async {
912                    let mut count = [0_usize; 2];
913                    // These awaits guarantee that we will see at least one event flag of each
914                    // type, before the stream is dropped.
915                    ev1_lis.await;
916                    ev2_lis.await;
917                    while let Some(e) = sub.next().await {
918                        count[e.to_index() as usize] += 1;
919                    }
920                    assert!(count[0] > 0);
921                    assert!(count[1] > 0);
922                    assert!(count[0] <= 100);
923                    assert!(count[1] <= 100);
924                },
925                async {
926                    for _ in 0..100 {
927                        publish1.publish(DirEvent::NewDescriptors);
928                        ev1.notify(1);
929                        tor_rtcompat::task::yield_now().await;
930                    }
931                    drop(publish1);
932                },
933                async {
934                    for _ in 0..100 {
935                        publish2.publish(DirEvent::NewConsensus);
936                        ev2.notify(1);
937                        tor_rtcompat::task::yield_now().await;
938                    }
939                    drop(publish2);
940                }
941            );
942        });
943    }
944
945    #[test]
946    fn receive_after_publishers_are_gone() {
947        test_with_all_runtimes!(|_rt| async {
948            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
949
950            let mut sub = publish.subscribe();
951
952            publish.publish(DirEvent::NewConsensus);
953            drop(publish);
954            let v = sub.next().await;
955            assert_eq!(v, Some(DirEvent::NewConsensus));
956            let v = sub.next().await;
957            assert!(v.is_none());
958        });
959    }
960
961    #[test]
962    fn failed_conversion() {
963        assert_eq!(DirEvent::from_index(999), None);
964    }
965
966    #[test]
967    fn dir_status_basics() {
968        let now = SystemTime::now();
969        let hour = Duration::new(3600, 0);
970
971        let nothing = DirStatus {
972            progress: DirProgress::NoConsensus { after: None },
973            ..Default::default()
974        };
975        let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
976        let unval = DirStatus {
977            progress: DirProgress::FetchingCerts {
978                lifetime: lifetime.clone(),
979                usable_lifetime: lifetime,
980                n_certs: (3, 5),
981            },
982            ..Default::default()
983        };
984        let lifetime =
985            netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
986        let with_c = DirStatus {
987            progress: DirProgress::Validated {
988                lifetime: lifetime.clone(),
989                usable_lifetime: lifetime,
990                n_mds: (30, 40),
991                usable: false,
992            },
993            ..Default::default()
994        };
995
996        // lifetime()
997        assert!(nothing.usable_lifetime().is_none());
998        assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
999        assert_eq!(
1000            with_c.usable_lifetime().unwrap().valid_until(),
1001            now + hour * 3
1002        );
1003
1004        // frac() (It's okay if we change the actual numbers here later; the
1005        // current ones are more or less arbitrary.)
1006        const TOL: f32 = 0.00001;
1007        assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
1008        assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
1009        assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
1010
1011        // frac_at()
1012        let t1 = now + hour / 2;
1013        let t2 = t1 + hour * 2;
1014        assert!(nothing.frac_at(t1).is_none());
1015        assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
1016        assert!(with_c.frac_at(t1).is_none());
1017        assert!(nothing.frac_at(t2).is_none());
1018        assert!(unval.frac_at(t2).is_none());
1019        assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
1020    }
1021
1022    #[test]
1023    fn dir_status_display() {
1024        use time::macros::datetime;
1025        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1026        let hour = Duration::new(3600, 0);
1027        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1028
1029        let ds = DirStatus {
1030            progress: DirProgress::NoConsensus { after: None },
1031            ..Default::default()
1032        };
1033        assert_eq!(ds.to_string(), "fetching a consensus");
1034
1035        let ds = DirStatus {
1036            progress: DirProgress::FetchingCerts {
1037                lifetime: lifetime.clone(),
1038                usable_lifetime: lifetime.clone(),
1039                n_certs: (3, 5),
1040            },
1041            ..Default::default()
1042        };
1043        assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
1044
1045        let ds = DirStatus {
1046            progress: DirProgress::Validated {
1047                lifetime: lifetime.clone(),
1048                usable_lifetime: lifetime.clone(),
1049                n_mds: (30, 40),
1050                usable: false,
1051            },
1052            ..Default::default()
1053        };
1054        assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
1055
1056        let ds = DirStatus {
1057            progress: DirProgress::Validated {
1058                lifetime: lifetime.clone(),
1059                usable_lifetime: lifetime,
1060                n_mds: (30, 40),
1061                usable: true,
1062            },
1063            ..Default::default()
1064        };
1065        assert_eq!(
1066            ds.to_string(),
1067            "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
1068        );
1069    }
1070
1071    #[test]
1072    fn bootstrap_status() {
1073        use time::macros::datetime;
1074        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1075        let hour = Duration::new(3600, 0);
1076        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1077        let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
1078
1079        let dp1 = DirProgress::Validated {
1080            lifetime: lifetime.clone(),
1081            usable_lifetime: lifetime.clone(),
1082            n_mds: (3, 40),
1083            usable: true,
1084        };
1085        let dp2 = DirProgress::Validated {
1086            lifetime: lifetime2.clone(),
1087            usable_lifetime: lifetime2.clone(),
1088            n_mds: (5, 40),
1089            usable: false,
1090        };
1091        let attempt1 = AttemptId::next();
1092        let attempt2 = AttemptId::next();
1093
1094        let bs = DirBootstrapStatus(StatusEnum::Replacing {
1095            current: StatusEntry {
1096                id: attempt1,
1097                status: DirStatus {
1098                    progress: dp1.clone(),
1099                    ..Default::default()
1100                },
1101            },
1102            next: StatusEntry {
1103                id: attempt2,
1104                status: DirStatus {
1105                    progress: dp2.clone(),
1106                    ..Default::default()
1107                },
1108            },
1109        });
1110
1111        assert_eq!(bs.to_string(),
1112            "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
1113        );
1114
1115        const TOL: f32 = 0.00001;
1116        assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
1117        assert_float_eq!(
1118            bs.frac_at(t1 + hour * 3 + hour / 2),
1119            0.35 + 0.65 * 0.125,
1120            abs <= TOL
1121        );
1122
1123        // Now try updating.
1124
1125        // Case 1: we have a usable directory and the updated status isn't usable.
1126        let mut bs = bs;
1127        let dp3 = DirProgress::Validated {
1128            lifetime: lifetime2.clone(),
1129            usable_lifetime: lifetime2.clone(),
1130            n_mds: (10, 40),
1131            usable: false,
1132        };
1133
1134        bs.update_progress(attempt2, dp3);
1135        assert!(matches!(
1136            bs.next().unwrap(),
1137            DirStatus {
1138                progress: DirProgress::Validated {
1139                    n_mds: (10, 40),
1140                    ..
1141                },
1142                ..
1143            }
1144        ));
1145
1146        // Case 2: The new directory _is_ usable and newer.  It will replace the old one.
1147        let ds4 = DirStatus {
1148            progress: DirProgress::Validated {
1149                lifetime: lifetime2.clone(),
1150                usable_lifetime: lifetime2.clone(),
1151                n_mds: (20, 40),
1152                usable: true,
1153            },
1154            ..Default::default()
1155        };
1156        bs.update_progress(attempt2, ds4.progress);
1157        assert!(bs.next().is_none());
1158        assert_eq!(
1159            bs.current()
1160                .unwrap()
1161                .usable_lifetime()
1162                .unwrap()
1163                .valid_after(),
1164            lifetime2.valid_after()
1165        );
1166
1167        // Case 3: The new directory is usable but older. Nothing will happen.
1168        bs.update_progress(attempt1, dp1);
1169        assert!(bs.next().as_ref().is_none());
1170        assert_ne!(
1171            bs.current()
1172                .unwrap()
1173                .usable_lifetime()
1174                .unwrap()
1175                .valid_after(),
1176            lifetime.valid_after()
1177        );
1178
1179        // Case 4: starting with an unusable directory, we always replace.
1180        let mut bs = DirBootstrapStatus::default();
1181        assert!(!dp2.usable());
1182        assert!(bs.current().is_none());
1183        bs.update_progress(attempt2, dp2);
1184        assert!(bs.current().unwrap().usable_lifetime().is_some());
1185    }
1186}