tor_dirmgr/event.rs
1//! Code for notifying other modules about changes in the directory.
2
3// TODO(nickm): After we have enough experience with this FlagPublisher, we
4// might want to make it a public interface. If we do it should probably move
5// into another crate.
6
7use std::{
8 fmt,
9 marker::PhantomData,
10 pin::Pin,
11 sync::{
12 atomic::{AtomicUsize, Ordering},
13 Arc,
14 },
15 task::Poll,
16 time::SystemTime,
17};
18
19use educe::Educe;
20use futures::{stream::Stream, Future, StreamExt};
21use itertools::chain;
22use paste::paste;
23use time::OffsetDateTime;
24use tor_basic_utils::skip_fmt;
25use tor_netdir::DirEvent;
26use tor_netdoc::doc::netstatus;
27
28#[cfg(feature = "bridge-client")]
29use tor_guardmgr::bridge::BridgeDescEvent;
30
31use crate::bootstrap::AttemptId;
32
33/// A trait to indicate something that can be published with [`FlagPublisher`].
34///
35/// Since the implementation of `FlagPublisher` requires that its events be
36/// represented as small integers, this trait is mainly about converting to and
37/// from those integers.
38pub(crate) trait FlagEvent: Sized {
39 /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
40 /// for this type.
41 ///
42 /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
43 /// known flags, and sometimes iterates over the whole vector.
44 const MAXIMUM: u16;
45 /// Convert this event into an index.
46 ///
47 /// For efficiency, indices should be small and densely packed.
48 fn to_index(self) -> u16;
49 /// Try to reconstruct an event from its index. Return None if the index is
50 /// out-of-bounds.
51 fn from_index(flag: u16) -> Option<Self>;
52}
53
54/// Implements [`FlagEvent`] for a C-like enum
55///
56/// Requiremets:
57///
58/// * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
59///
60/// * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
61/// (for example using the `num_enum` crate).
62///
63/// * The discriminants must be densely allocated.
64/// This will be done automatically by the compiler
65/// if explicit discriminants are not specified.
66/// (This property is checked in a test.)
67///
68/// * The variants may not contain any data.
69/// This is required for correctness.
70/// We think it is checked if you use `num_enum::TryFromPrimitive`.
71///
72/// # Example
73///
74// Sadly, it does not appear to be possible to doctest a private macro.
75/// ```rust,ignore
76/// use num_enum::{IntoPrimitive, TryFromPrimitive};
77/// use strum::{EnumCount, EnumIter};
78///
79/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
80/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
81/// #[non_exhaustive]
82/// #[repr(u16)]
83/// pub enum DirEvent {
84/// NewConsensus,
85/// NewDescriptors,
86/// }
87///
88/// impl_FlagEvent!{ DirEvent }
89/// ```
90macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
91 impl FlagEvent for $ty {
92 const MAXIMUM: u16 = {
93 let count = <$ty as $crate::strum::EnumCount>::COUNT;
94 (count - 1) as u16
95 };
96 fn to_index(self) -> u16 {
97 self.into()
98 }
99 fn from_index(flag: u16) -> Option<Self> {
100 flag.try_into().ok()
101 }
102 }
103
104 #[test]
105 #[allow(non_snake_case)]
106 fn [< flagevent_test_variant_numbers_ $ty >]() {
107 for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
108 assert!(<$ty as FlagEvent>::to_index(variant) <=
109 <$ty as FlagEvent>::MAXIMUM,
110 "impl_FlagEvent only allowed if discriminators are dense");
111 }
112 }
113} } }
114
115impl_FlagEvent! { DirEvent }
116
117#[cfg(feature = "bridge-client")]
118impl_FlagEvent! { BridgeDescEvent }
119
120/// A publisher that broadcasts flag-level events to multiple subscribers.
121///
122/// Events with the same flag value may be coalesced: that is, if the same event
123/// is published ten times in a row, a subscriber may receive only a single
124/// notification of the event.
125///
126/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
127/// that can also broadcast events to everybody listening on the channel.
128/// Dropping the last handle closes all streams subscribed to it.
129pub(crate) struct FlagPublisher<F> {
130 /// Inner data shared by publishers and streams.
131 inner: Arc<Inner<F>>,
132}
133
134/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
135struct Inner<F> {
136 /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
137 event: event_listener::Event,
138 /// How many times has each event occurred, ever.
139 ///
140 /// (It is safe for this to wrap around.)
141 // TODO(nickm): I wish this could be an array, but const generics don't
142 // quite support that yet.
143 counts: Vec<AtomicUsize>, // I wish this could be an array.
144 /// How many publishers remain?
145 n_publishers: AtomicUsize,
146 /// Phantom member to provide correct covariance.
147 ///
148 /// The `fn` business is a covariance trick to include `F` without affecting
149 /// this object's Send/Sync status.
150 _phantom: PhantomData<fn(F) -> F>,
151}
152
153/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
154/// [`FlagPublisher`].
155pub(crate) struct FlagListener<F> {
156 /// What value of each flag's count have we seen most recently?
157 ///
158 /// Note that we count the event as "received" only once for each observed
159 /// change in the flag's count, even if that count has changed by more than
160 /// 1.
161 my_counts: Vec<usize>,
162 /// An an `EventListener` that will be notified when events are published,
163 /// or when the final publisher is dropped.
164 ///
165 /// We must always have one of these available _before_ we check any counts
166 /// in self.inner.
167 listener: event_listener::EventListener,
168 /// Reference to shared data.
169 inner: Arc<Inner<F>>,
170}
171
172impl<F: FlagEvent> Default for FlagPublisher<F> {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl<F: FlagEvent> FlagPublisher<F> {
179 /// Construct a new FlagPublisher.
180 pub(crate) fn new() -> Self {
181 // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
182 // require AtomicUsize to be Clone.
183 let counts = std::iter::repeat_with(AtomicUsize::default)
184 .take(F::MAXIMUM as usize + 1)
185 .collect();
186 FlagPublisher {
187 inner: Arc::new(Inner {
188 event: event_listener::Event::new(),
189 counts,
190 n_publishers: AtomicUsize::new(1),
191 _phantom: PhantomData,
192 }),
193 }
194 }
195
196 /// Create a new subscription to this FlagPublisher.
197 pub(crate) fn subscribe(&self) -> FlagListener<F> {
198 // We need to do this event.listen before we check the counts; otherwise
199 // we could have a sequence where: we check the count, then the
200 // publisher increments the count, then the publisher calls
201 // event.notify(), and we call event.listen(). That would cause us to
202 // miss the increment.
203 let listener = self.inner.event.listen();
204
205 FlagListener {
206 my_counts: self
207 .inner
208 .counts
209 .iter()
210 .map(|a| a.load(Ordering::SeqCst))
211 .collect(),
212 listener,
213 inner: Arc::clone(&self.inner),
214 }
215 }
216
217 /// Tell every listener that the provided flag has been published.
218 pub(crate) fn publish(&self, flag: F) {
219 self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
220 self.inner.event.notify(usize::MAX);
221 }
222}
223
224impl<F> Clone for FlagPublisher<F> {
225 fn clone(&self) -> FlagPublisher<F> {
226 self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
227 FlagPublisher {
228 inner: Arc::clone(&self.inner),
229 }
230 }
231}
232
233// We must implement Drop to keep count publishers, and so that when the last
234// publisher goes away, we can wake up every listener so that it notices that
235// the stream is now ended.
236impl<F> Drop for FlagPublisher<F> {
237 fn drop(&mut self) {
238 if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
239 // That was the last reference; we must notify the listeners.
240 self.inner.event.notify(usize::MAX);
241 }
242 }
243}
244
245impl<F: FlagEvent> Stream for FlagListener<F> {
246 type Item = F;
247
248 fn poll_next(
249 mut self: std::pin::Pin<&mut Self>,
250 cx: &mut std::task::Context<'_>,
251 ) -> std::task::Poll<Option<Self::Item>> {
252 loop {
253 // Notify the caller if any events are ready to fire.
254 for idx in 0..F::MAXIMUM as usize + 1 {
255 let cur = self.inner.counts[idx].load(Ordering::SeqCst);
256 // We don't have to use < here specifically, since any change
257 // indicates that the count has been modified. That lets us
258 // survive usize wraparound.
259 if cur != self.my_counts[idx] {
260 self.my_counts[idx] = cur;
261 return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
262 }
263 }
264
265 // At this point, notify the caller if there are no more publishers.
266 if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
267 return Poll::Ready(None);
268 }
269
270 if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
271 // Got a new notification; we must create a new event and continue the loop.
272 //
273 // See discussion in `FlagPublisher::subscribe()` for why we must always create
274 // this listener _before_ checking any flags.
275 self.listener = self.inner.event.listen();
276 } else {
277 // Nothing to do yet: put the listener back.
278 return Poll::Pending;
279 }
280 }
281 }
282}
283
284/// Description of the directory manager's current bootstrapping status.
285///
286/// This status does not necessarily increase monotonically: it can go backwards
287/// if (for example) our directory information expires before we're able to get
288/// new information.
289//
290// TODO(nickm): This type has gotten a bit large for being the type we send over
291// a `postage::watch`: perhaps we'd be better off having this information stored
292// in the guardmgr, and having only a summary of it sent over the
293// `postage::watch`. But for now, let's not, unless it shows up in profiles.
294#[derive(Clone, Debug, Default)]
295pub struct DirBootstrapStatus(StatusEnum);
296
297/// The contents of a DirBootstrapStatus.
298///
299/// This is a separate type since we don't want to make these variables public.
300#[derive(Clone, Debug, Default)]
301enum StatusEnum {
302 /// There is no active attempt to load or fetch a directory.
303 #[default]
304 NoActivity,
305 /// We have only one attempt to fetch a directory.
306 Single {
307 /// The currently active directory attempt.
308 ///
309 /// We're either using this directory now, or we plan to use it as soon
310 /// as it's complete enough.
311 current: StatusEntry,
312 },
313 /// We have an existing directory attempt, but it's stale, and we're
314 /// fetching a new one to replace it.
315 ///
316 /// Invariant: `current.id < next.id`
317 Replacing {
318 /// The previous attempt's status. It may still be trying to fetch
319 /// information if it has descriptors left to download.
320 current: StatusEntry,
321 /// The current attempt's status. We are not yet using this directory
322 /// for our activity, since it does not (yet) have enough information.
323 next: StatusEntry,
324 },
325}
326
327/// The status and identifier of a single attempt to download a full directory.
328#[derive(Clone, Debug)]
329struct StatusEntry {
330 /// The identifier for this attempt.
331 id: AttemptId,
332 /// The latest status.
333 status: DirStatus,
334}
335
336/// The status for a single directory.
337#[derive(Clone, Debug, Default, derive_more::Display)]
338#[display("{0}", progress)]
339pub(crate) struct DirStatus {
340 /// How much of the directory do we currently have?
341 progress: DirProgress,
342 /// How many resets have been forced while fetching this directory?
343 n_resets: usize,
344 /// How many errors have we encountered since last we advanced the
345 /// 'progress' on this directory?
346 n_errors: usize,
347 /// How many times has an `update_progress` call not actually moved us
348 /// forward since we last advanced the 'progress' on this directory?
349 n_stalls: usize,
350}
351
352/// How much progress have we made in downloading a given directory?
353///
354/// This is a separate type so that we don't make the variants public.
355#[derive(Clone, Debug, Educe)]
356#[educe(Default)]
357pub(crate) enum DirProgress {
358 /// We don't have any information yet.
359 #[educe(Default)]
360 NoConsensus {
361 /// If present, we are fetching a consensus whose valid-after time
362 /// postdates this time.
363 #[allow(dead_code)]
364 after: Option<SystemTime>,
365 },
366 /// We've downloaded a consensus, but we haven't validated it yet.
367 FetchingCerts {
368 /// The actual declared lifetime of the consensus.
369 lifetime: netstatus::Lifetime,
370 /// The lifetime for which we are willing to use this consensus. (This
371 /// may be broader than `lifetime`.)
372 usable_lifetime: netstatus::Lifetime,
373 /// A fraction (in (numerator,denominator) format) of the certificates
374 /// we have for this consensus.
375 n_certs: (u16, u16),
376 },
377 /// We've validated a consensus and we're fetching (or have fetched) its
378 /// microdescriptors.
379 Validated {
380 /// The actual declared lifetime of the consensus.
381 lifetime: netstatus::Lifetime,
382 /// The lifetime for which we are willing to use this consensus. (This
383 /// may be broader than `lifetime`.)
384 usable_lifetime: netstatus::Lifetime,
385 /// A fraction (in (numerator,denominator) form) of the microdescriptors
386 /// that we have for this consensus.
387 n_mds: (u32, u32),
388 /// True iff we've decided that the consensus is usable.
389 usable: bool,
390 // TODO(nickm) Someday we could add a field about whether any primary
391 // guards are missing microdescriptors, to give a better explanation for
392 // the case where we won't switch our consensus because of that.
393 },
394}
395
396/// A reported diagnostic for what kind of trouble we've seen while trying to
397/// bootstrap a directory.
398///
399/// These blockages types are not yet terribly specific: if you encounter one,
400/// it's probably a good idea to check the logs to see what's really going on.
401///
402/// If you encounter connection blockage _and_ directory blockage at the same
403/// time, the connection blockage is almost certainly the real problem.
404//
405// TODO(nickm): At present these diagnostics aren't very helpful; they say too
406// much about _how we know_ that the process has gone wrong, but not so much
407// about _what the problem is_. In the future, we may wish to look more closely
408// at what _kind_ of errors or resets we've seen, so we can report better
409// information. Probably, however, we should only do that after we get some
410// experience with which problems people encounter in practice, and what
411// diagnostics would be useful for them.
412#[derive(Clone, Debug, derive_more::Display)]
413#[non_exhaustive]
414pub enum DirBlockage {
415 /// We've been downloading information without error, but we haven't
416 /// actually been getting anything that we want.
417 ///
418 /// This might indicate that there's a problem with information propagating
419 /// through the Tor network, or it might indicate that a bogus consensus or
420 /// a bad clock has tricked us into asking for something that nobody has.
421 #[display("Can't make progress.")]
422 Stalled,
423 /// We've gotten a lot of errors without making forward progress on our
424 /// bootstrap attempt.
425 ///
426 /// This might indicate that something's wrong with the Tor network, or that
427 /// there's something buggy with our ability to handle directory responses.
428 /// It might also indicate a malfunction on our directory guards, or a bug
429 /// on our retry logic.
430 #[display("Too many errors without making progress.")]
431 TooManyErrors,
432 /// We've reset our bootstrap attempt a lot of times.
433 ///
434 /// This either indicates that we have been failing a lot for one of the
435 /// other reasons above, or that we keep getting served a consensus which
436 /// turns out, upon trying to fetch certificates, not to be usable. It can
437 /// also indicate a bug in our retry logic.
438 #[display("Had to reset bootstrapping too many times.")]
439 TooManyResets,
440}
441
442impl fmt::Display for DirProgress {
443 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444 /// Format this time in a format useful for displaying
445 /// lifetime boundaries.
446 fn fmt_time(t: SystemTime) -> String {
447 use std::sync::LazyLock;
448 /// Formatter object for lifetime boundaries.
449 ///
450 /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
451 /// sub-second times here, and using non-UTC offsets is confusing
452 /// in this context.
453 static FORMAT: LazyLock<Vec<time::format_description::FormatItem>> =
454 LazyLock::new(|| {
455 time::format_description::parse(
456 "[year]-[month]-[day] [hour]:[minute]:[second] UTC",
457 )
458 .expect("Invalid time format")
459 });
460 OffsetDateTime::from(t)
461 .format(&FORMAT)
462 .unwrap_or_else(|_| "(could not format)".into())
463 }
464
465 match &self {
466 DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
467 DirProgress::FetchingCerts { n_certs, .. } => write!(
468 f,
469 "fetching authority certificates ({}/{})",
470 n_certs.0, n_certs.1
471 ),
472 DirProgress::Validated {
473 usable: false,
474 n_mds,
475 ..
476 } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
477 DirProgress::Validated {
478 usable: true,
479 lifetime,
480 ..
481 } => write!(
482 f,
483 "usable, fresh until {}, and valid until {}",
484 fmt_time(lifetime.fresh_until()),
485 fmt_time(lifetime.valid_until())
486 ),
487 }
488 }
489}
490
491impl fmt::Display for DirBootstrapStatus {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 match &self.0 {
494 StatusEnum::NoActivity => write!(f, "not downloading")?,
495 StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
496 StatusEnum::Replacing { current, next } => write!(
497 f,
498 "directory is {}; next directory is {}",
499 current.status, next.status
500 )?,
501 }
502 Ok(())
503 }
504}
505
506impl DirBootstrapStatus {
507 /// Return the current DirStatus.
508 ///
509 /// This is the _most complete_ status. If we have any usable status, it is
510 /// this one.
511 fn current(&self) -> Option<&DirStatus> {
512 match &self.0 {
513 StatusEnum::NoActivity => None,
514 StatusEnum::Single { current } => Some(¤t.status),
515 StatusEnum::Replacing { current, .. } => Some(¤t.status),
516 }
517 }
518
519 /// Return the next DirStatus, if there is one.
520 fn next(&self) -> Option<&DirStatus> {
521 match &self.0 {
522 StatusEnum::Replacing { next, .. } => Some(&next.status),
523 _ => None,
524 }
525 }
526
527 /// Return the contained `DirStatus`es, in order: `current`, then `next`
528 #[allow(clippy::implied_bounds_in_impls)]
529 fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
530 chain!(self.current(), self.next(),)
531 }
532
533 /// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
534 #[allow(clippy::implied_bounds_in_impls)]
535 fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
536 let (current, next) = match &mut self.0 {
537 StatusEnum::NoActivity => (None, None),
538 StatusEnum::Single { current } => (Some(current), None),
539 StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
540 };
541 chain!(current, next,)
542 }
543
544 /// Return the fraction of completion for directory download, in a form
545 /// suitable for a progress bar at some particular time.
546 ///
547 /// This value is not monotonic, and can go down as one directory is
548 /// replaced with another.
549 ///
550 /// Callers _should not_ depend on the specific meaning of any particular
551 /// fraction; we may change these fractions in the future.
552 pub fn frac_at(&self, when: SystemTime) -> f32 {
553 self.statuses()
554 .filter_map(|st| st.frac_at(when))
555 .next()
556 .unwrap_or(0.0)
557 }
558
559 /// Return true if this status indicates that we have a current usable
560 /// directory.
561 pub fn usable_at(&self, now: SystemTime) -> bool {
562 if let Some(current) = self.current() {
563 current.progress.usable() && current.okay_to_use_at(now)
564 } else {
565 false
566 }
567 }
568
569 /// If there is a problem with our attempts to bootstrap, return a
570 /// corresponding DirBlockage.
571 pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
572 if let Some(current) = self.current() {
573 if current.progress.usable() && current.declared_live_at(now) {
574 // The current directory is sufficient, and not even a little bit
575 // expired. There is no problem.
576 return None;
577 }
578 }
579
580 // Any blockage in "current" is more serious, so return that if there is one
581 self.statuses().filter_map(|st| st.blockage()).next()
582 }
583
584 /// Return the appropriate DirStatus for `AttemptId`, constructing it if
585 /// necessary.
586 ///
587 /// Return None if all relevant attempts are more recent than this Id.
588 #[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
589 fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
590 // First, ensure that we have a *recent enough* attempt
591 // Look for the latest attempt, and see if it's new enough; if not, start a new one.
592 if self
593 .entries_mut()
594 .rev()
595 .take(1)
596 .find(|entry| entry.id >= attempt_id)
597 .is_none()
598 {
599 let current = match std::mem::take(&mut self.0) {
600 StatusEnum::NoActivity => None,
601 StatusEnum::Single { current } => Some(current),
602 StatusEnum::Replacing { current, .. } => Some(current),
603 };
604 // If we have a `current` already, we keep it, and restart `next`.
605 let next = StatusEntry::new(attempt_id);
606 self.0 = match current {
607 None => StatusEnum::Single { current: next },
608 Some(current) => StatusEnum::Replacing { current, next },
609 };
610 }
611
612 // Find the entry with `attempt_id` and return it.
613 // (Despite the above, there might not be one: maybe `attempt_id` is old.)
614 self.entries_mut()
615 .find(|entry| entry.id == attempt_id)
616 .map(|entry| &mut entry.status)
617 }
618
619 /// If the "next" status is usable, replace the current status with it.
620 fn advance_status(&mut self) {
621 // TODO: should make sure that the compiler is smart enough to optimize
622 // this mem::take() and replacement away, and turn it into a conditional
623 // replacement?
624 self.0 = match std::mem::take(&mut self.0) {
625 StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
626 StatusEnum::Single { current: next }
627 }
628 other => other,
629 };
630 }
631
632 /// Update this status by replacing the `DirProgress` in its current status
633 /// (or its next status) with `new_status`, as appropriate.
634 pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
635 if let Some(status) = self.mut_status_for(attempt_id) {
636 let old_frac = status.frac();
637 status.progress = new_progress;
638 let new_frac = status.frac();
639 if new_frac > old_frac {
640 // This download has made progress: clear our count of errors
641 // and stalls.
642 status.n_errors = 0;
643 status.n_stalls = 0;
644 } else {
645 // This download didn't make progress; increment the stall
646 // count.
647 status.n_stalls += 1;
648 }
649 self.advance_status();
650 }
651 }
652
653 /// Update this status by noting that some errors have occurred in a given
654 /// download attempt.
655 pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
656 if let Some(status) = self.mut_status_for(attempt_id) {
657 status.n_errors += n_errors;
658 }
659 }
660
661 /// Update this status by noting that we had to reset a given download attempt;
662 pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
663 if let Some(status) = self.mut_status_for(attempt_id) {
664 status.n_resets += 1;
665 }
666 }
667}
668
669impl StatusEntry {
670 /// Construct a new StatusEntry with a given attempt id, and no progress
671 /// reported.
672 fn new(id: AttemptId) -> Self {
673 Self {
674 id,
675 status: DirStatus::default(),
676 }
677 }
678}
679
680impl DirStatus {
681 /// Return the declared consensus lifetime for this directory, if we have one.
682 fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
683 match &self.progress {
684 DirProgress::NoConsensus { .. } => None,
685 DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
686 DirProgress::Validated { lifetime, .. } => Some(lifetime),
687 }
688 }
689
690 /// Return the consensus lifetime for this directory, if we have one, as
691 /// modified by our skew-tolerance settings.
692 fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
693 match &self.progress {
694 DirProgress::NoConsensus { .. } => None,
695 DirProgress::FetchingCerts {
696 usable_lifetime, ..
697 } => Some(usable_lifetime),
698 DirProgress::Validated {
699 usable_lifetime, ..
700 } => Some(usable_lifetime),
701 }
702 }
703
704 /// Return true if the directory is valid at the given time, as modified by
705 /// our clock skew settings.
706 fn okay_to_use_at(&self, when: SystemTime) -> bool {
707 self.usable_lifetime()
708 .map(|lt| lt.valid_at(when))
709 .unwrap_or(false)
710 }
711
712 /// Return true if the directory is valid at the given time, _unmodified_ by our
713 /// clock skew settings.
714 fn declared_live_at(&self, when: SystemTime) -> bool {
715 self.declared_lifetime()
716 .map(|lt| lt.valid_at(when))
717 .unwrap_or(false)
718 }
719
720 /// As `frac`, but return None if this consensus is not valid at the given time,
721 /// and down-rate expired consensuses that we're still willing to use.
722 fn frac_at(&self, when: SystemTime) -> Option<f32> {
723 if self
724 .declared_lifetime()
725 .map(|lt| lt.valid_at(when))
726 .unwrap_or(false)
727 {
728 // We're officially okay to use this directory.
729 Some(self.frac())
730 } else if self.okay_to_use_at(when) {
731 // This directory is a little expired, but only a little.
732 Some(self.frac() * 0.9)
733 } else {
734 None
735 }
736 }
737
738 /// Return the fraction of completion for directory download, in a form
739 /// suitable for a progress bar.
740 ///
741 /// This is monotonically increasing for a single directory, but can go down
742 /// as one directory is replaced with another.
743 ///
744 /// Callers _should not_ depend on the specific meaning of any particular
745 /// fraction; we may change these fractions in the future.
746 fn frac(&self) -> f32 {
747 // We arbitrarily decide that 25% is downloading the consensus, 10% is
748 // downloading the certificates, and the remaining 65% is downloading
749 // the microdescriptors until we become usable. We may want to re-tune that in the future, but
750 // the documentation of this function should allow us to do so.
751 match &self.progress {
752 DirProgress::NoConsensus { .. } => 0.0,
753 DirProgress::FetchingCerts { n_certs, .. } => {
754 0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
755 }
756 DirProgress::Validated {
757 usable: false,
758 n_mds,
759 ..
760 } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
761 DirProgress::Validated { usable: true, .. } => 1.0,
762 }
763 }
764
765 /// If we think there is a problem with our bootstrapping process, return a
766 /// [`DirBlockage`] to describe it.
767 ///
768 /// The caller may want to also check `usable_at` to avoid reporting trouble
769 /// if the directory is currently usable.
770 fn blockage(&self) -> Option<DirBlockage> {
771 /// How many resets are sufficient for us to report a blockage?
772 const RESET_THRESHOLD: usize = 2;
773 /// How many errors are sufficient for us to report a blockage?
774 const ERROR_THRESHOLD: usize = 6;
775 /// How many no-progress download attempts are sufficient for us to
776 /// report a blockage?
777 const STALL_THRESHOLD: usize = 8;
778
779 if self.n_resets >= RESET_THRESHOLD {
780 Some(DirBlockage::TooManyResets)
781 } else if self.n_errors >= ERROR_THRESHOLD {
782 Some(DirBlockage::TooManyErrors)
783 } else if self.n_stalls >= STALL_THRESHOLD {
784 Some(DirBlockage::Stalled)
785 } else {
786 None
787 }
788 }
789}
790
791impl DirProgress {
792 /// Return true if this progress indicates a usable directory.
793 fn usable(&self) -> bool {
794 matches!(self, DirProgress::Validated { usable: true, .. })
795 }
796}
797
798/// A stream of [`DirBootstrapStatus`] events.
799#[derive(Clone, Educe)]
800#[educe(Debug)]
801pub struct DirBootstrapEvents {
802 /// The `postage::watch::Receiver` that we're wrapping.
803 ///
804 /// We wrap this type so that we don't expose its entire API, and so that we
805 /// can migrate to some other implementation in the future if we want.
806 #[educe(Debug(method = "skip_fmt"))]
807 pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
808}
809
810impl Stream for DirBootstrapEvents {
811 type Item = DirBootstrapStatus;
812
813 fn poll_next(
814 mut self: Pin<&mut Self>,
815 cx: &mut std::task::Context<'_>,
816 ) -> Poll<Option<Self::Item>> {
817 self.inner.poll_next_unpin(cx)
818 }
819}
820
821#[cfg(test)]
822mod test {
823 // @@ begin test lint list maintained by maint/add_warning @@
824 #![allow(clippy::bool_assert_comparison)]
825 #![allow(clippy::clone_on_copy)]
826 #![allow(clippy::dbg_macro)]
827 #![allow(clippy::mixed_attributes_style)]
828 #![allow(clippy::print_stderr)]
829 #![allow(clippy::print_stdout)]
830 #![allow(clippy::single_char_pattern)]
831 #![allow(clippy::unwrap_used)]
832 #![allow(clippy::unchecked_duration_subtraction)]
833 #![allow(clippy::useless_vec)]
834 #![allow(clippy::needless_pass_by_value)]
835 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
836 use std::time::Duration;
837
838 use super::*;
839 use float_eq::assert_float_eq;
840 use futures::stream::StreamExt;
841 use tor_rtcompat::test_with_all_runtimes;
842
843 #[test]
844 fn subscribe_and_publish() {
845 test_with_all_runtimes!(|_rt| async {
846 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
847 let mut sub1 = publish.subscribe();
848 publish.publish(DirEvent::NewConsensus);
849 let mut sub2 = publish.subscribe();
850 let ev = event_listener::Event::new();
851 let lis = ev.listen();
852
853 futures::join!(
854 async {
855 // sub1 was created in time to see this event...
856 let val1 = sub1.next().await;
857 assert_eq!(val1, Some(DirEvent::NewConsensus));
858 ev.notify(1); // Tell the third task below to drop the publisher.
859 let val2 = sub1.next().await;
860 assert_eq!(val2, None);
861 },
862 async {
863 let val = sub2.next().await;
864 assert_eq!(val, None);
865 },
866 async {
867 lis.await;
868 drop(publish);
869 }
870 );
871 });
872 }
873
874 #[test]
875 fn receive_two() {
876 test_with_all_runtimes!(|_rt| async {
877 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
878
879 let mut sub = publish.subscribe();
880 let ev = event_listener::Event::new();
881 let ev_lis = ev.listen();
882 futures::join!(
883 async {
884 let val1 = sub.next().await;
885 assert_eq!(val1, Some(DirEvent::NewDescriptors));
886 ev.notify(1);
887 let val2 = sub.next().await;
888 assert_eq!(val2, Some(DirEvent::NewConsensus));
889 },
890 async {
891 publish.publish(DirEvent::NewDescriptors);
892 ev_lis.await;
893 publish.publish(DirEvent::NewConsensus);
894 }
895 );
896 });
897 }
898
899 #[test]
900 fn two_publishers() {
901 test_with_all_runtimes!(|_rt| async {
902 let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
903 let publish2 = publish1.clone();
904
905 let mut sub = publish1.subscribe();
906 let ev1 = event_listener::Event::new();
907 let ev2 = event_listener::Event::new();
908 let ev1_lis = ev1.listen();
909 let ev2_lis = ev2.listen();
910 futures::join!(
911 async {
912 let mut count = [0_usize; 2];
913 // These awaits guarantee that we will see at least one event flag of each
914 // type, before the stream is dropped.
915 ev1_lis.await;
916 ev2_lis.await;
917 while let Some(e) = sub.next().await {
918 count[e.to_index() as usize] += 1;
919 }
920 assert!(count[0] > 0);
921 assert!(count[1] > 0);
922 assert!(count[0] <= 100);
923 assert!(count[1] <= 100);
924 },
925 async {
926 for _ in 0..100 {
927 publish1.publish(DirEvent::NewDescriptors);
928 ev1.notify(1);
929 tor_rtcompat::task::yield_now().await;
930 }
931 drop(publish1);
932 },
933 async {
934 for _ in 0..100 {
935 publish2.publish(DirEvent::NewConsensus);
936 ev2.notify(1);
937 tor_rtcompat::task::yield_now().await;
938 }
939 drop(publish2);
940 }
941 );
942 });
943 }
944
945 #[test]
946 fn receive_after_publishers_are_gone() {
947 test_with_all_runtimes!(|_rt| async {
948 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
949
950 let mut sub = publish.subscribe();
951
952 publish.publish(DirEvent::NewConsensus);
953 drop(publish);
954 let v = sub.next().await;
955 assert_eq!(v, Some(DirEvent::NewConsensus));
956 let v = sub.next().await;
957 assert!(v.is_none());
958 });
959 }
960
961 #[test]
962 fn failed_conversion() {
963 assert_eq!(DirEvent::from_index(999), None);
964 }
965
966 #[test]
967 fn dir_status_basics() {
968 let now = SystemTime::now();
969 let hour = Duration::new(3600, 0);
970
971 let nothing = DirStatus {
972 progress: DirProgress::NoConsensus { after: None },
973 ..Default::default()
974 };
975 let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
976 let unval = DirStatus {
977 progress: DirProgress::FetchingCerts {
978 lifetime: lifetime.clone(),
979 usable_lifetime: lifetime,
980 n_certs: (3, 5),
981 },
982 ..Default::default()
983 };
984 let lifetime =
985 netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
986 let with_c = DirStatus {
987 progress: DirProgress::Validated {
988 lifetime: lifetime.clone(),
989 usable_lifetime: lifetime,
990 n_mds: (30, 40),
991 usable: false,
992 },
993 ..Default::default()
994 };
995
996 // lifetime()
997 assert!(nothing.usable_lifetime().is_none());
998 assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
999 assert_eq!(
1000 with_c.usable_lifetime().unwrap().valid_until(),
1001 now + hour * 3
1002 );
1003
1004 // frac() (It's okay if we change the actual numbers here later; the
1005 // current ones are more or less arbitrary.)
1006 const TOL: f32 = 0.00001;
1007 assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
1008 assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
1009 assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
1010
1011 // frac_at()
1012 let t1 = now + hour / 2;
1013 let t2 = t1 + hour * 2;
1014 assert!(nothing.frac_at(t1).is_none());
1015 assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
1016 assert!(with_c.frac_at(t1).is_none());
1017 assert!(nothing.frac_at(t2).is_none());
1018 assert!(unval.frac_at(t2).is_none());
1019 assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
1020 }
1021
1022 #[test]
1023 fn dir_status_display() {
1024 use time::macros::datetime;
1025 let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1026 let hour = Duration::new(3600, 0);
1027 let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1028
1029 let ds = DirStatus {
1030 progress: DirProgress::NoConsensus { after: None },
1031 ..Default::default()
1032 };
1033 assert_eq!(ds.to_string(), "fetching a consensus");
1034
1035 let ds = DirStatus {
1036 progress: DirProgress::FetchingCerts {
1037 lifetime: lifetime.clone(),
1038 usable_lifetime: lifetime.clone(),
1039 n_certs: (3, 5),
1040 },
1041 ..Default::default()
1042 };
1043 assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
1044
1045 let ds = DirStatus {
1046 progress: DirProgress::Validated {
1047 lifetime: lifetime.clone(),
1048 usable_lifetime: lifetime.clone(),
1049 n_mds: (30, 40),
1050 usable: false,
1051 },
1052 ..Default::default()
1053 };
1054 assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
1055
1056 let ds = DirStatus {
1057 progress: DirProgress::Validated {
1058 lifetime: lifetime.clone(),
1059 usable_lifetime: lifetime,
1060 n_mds: (30, 40),
1061 usable: true,
1062 },
1063 ..Default::default()
1064 };
1065 assert_eq!(
1066 ds.to_string(),
1067 "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
1068 );
1069 }
1070
1071 #[test]
1072 fn bootstrap_status() {
1073 use time::macros::datetime;
1074 let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1075 let hour = Duration::new(3600, 0);
1076 let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1077 let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
1078
1079 let dp1 = DirProgress::Validated {
1080 lifetime: lifetime.clone(),
1081 usable_lifetime: lifetime.clone(),
1082 n_mds: (3, 40),
1083 usable: true,
1084 };
1085 let dp2 = DirProgress::Validated {
1086 lifetime: lifetime2.clone(),
1087 usable_lifetime: lifetime2.clone(),
1088 n_mds: (5, 40),
1089 usable: false,
1090 };
1091 let attempt1 = AttemptId::next();
1092 let attempt2 = AttemptId::next();
1093
1094 let bs = DirBootstrapStatus(StatusEnum::Replacing {
1095 current: StatusEntry {
1096 id: attempt1,
1097 status: DirStatus {
1098 progress: dp1.clone(),
1099 ..Default::default()
1100 },
1101 },
1102 next: StatusEntry {
1103 id: attempt2,
1104 status: DirStatus {
1105 progress: dp2.clone(),
1106 ..Default::default()
1107 },
1108 },
1109 });
1110
1111 assert_eq!(bs.to_string(),
1112 "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
1113 );
1114
1115 const TOL: f32 = 0.00001;
1116 assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
1117 assert_float_eq!(
1118 bs.frac_at(t1 + hour * 3 + hour / 2),
1119 0.35 + 0.65 * 0.125,
1120 abs <= TOL
1121 );
1122
1123 // Now try updating.
1124
1125 // Case 1: we have a usable directory and the updated status isn't usable.
1126 let mut bs = bs;
1127 let dp3 = DirProgress::Validated {
1128 lifetime: lifetime2.clone(),
1129 usable_lifetime: lifetime2.clone(),
1130 n_mds: (10, 40),
1131 usable: false,
1132 };
1133
1134 bs.update_progress(attempt2, dp3);
1135 assert!(matches!(
1136 bs.next().unwrap(),
1137 DirStatus {
1138 progress: DirProgress::Validated {
1139 n_mds: (10, 40),
1140 ..
1141 },
1142 ..
1143 }
1144 ));
1145
1146 // Case 2: The new directory _is_ usable and newer. It will replace the old one.
1147 let ds4 = DirStatus {
1148 progress: DirProgress::Validated {
1149 lifetime: lifetime2.clone(),
1150 usable_lifetime: lifetime2.clone(),
1151 n_mds: (20, 40),
1152 usable: true,
1153 },
1154 ..Default::default()
1155 };
1156 bs.update_progress(attempt2, ds4.progress);
1157 assert!(bs.next().is_none());
1158 assert_eq!(
1159 bs.current()
1160 .unwrap()
1161 .usable_lifetime()
1162 .unwrap()
1163 .valid_after(),
1164 lifetime2.valid_after()
1165 );
1166
1167 // Case 3: The new directory is usable but older. Nothing will happen.
1168 bs.update_progress(attempt1, dp1);
1169 assert!(bs.next().as_ref().is_none());
1170 assert_ne!(
1171 bs.current()
1172 .unwrap()
1173 .usable_lifetime()
1174 .unwrap()
1175 .valid_after(),
1176 lifetime.valid_after()
1177 );
1178
1179 // Case 4: starting with an unusable directory, we always replace.
1180 let mut bs = DirBootstrapStatus::default();
1181 assert!(!dp2.usable());
1182 assert!(bs.current().is_none());
1183 bs.update_progress(attempt2, dp2);
1184 assert!(bs.current().unwrap().usable_lifetime().is_some());
1185 }
1186}