tor_dirmgr/event.rs
1//! Code for notifying other modules about changes in the directory.
2
3// TODO(nickm): After we have enough experience with this FlagPublisher, we
4// might want to make it a public interface. If we do it should probably move
5// into another crate.
6
7use std::{
8 fmt,
9 marker::PhantomData,
10 pin::Pin,
11 sync::{
12 atomic::{AtomicUsize, Ordering},
13 Arc,
14 },
15 task::Poll,
16 time::SystemTime,
17};
18
19use educe::Educe;
20use futures::{stream::Stream, Future, StreamExt};
21use itertools::chain;
22use paste::paste;
23use time::OffsetDateTime;
24use tor_basic_utils::skip_fmt;
25use tor_netdir::DirEvent;
26use tor_netdoc::doc::netstatus;
27
28#[cfg(feature = "bridge-client")]
29use tor_guardmgr::bridge::BridgeDescEvent;
30
31use crate::bootstrap::AttemptId;
32
33/// A trait to indicate something that can be published with [`FlagPublisher`].
34///
35/// Since the implementation of `FlagPublisher` requires that its events be
36/// represented as small integers, this trait is mainly about converting to and
37/// from those integers.
38pub(crate) trait FlagEvent: Sized {
39 /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
40 /// for this type.
41 ///
42 /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
43 /// known flags, and sometimes iterates over the whole vector.
44 const MAXIMUM: u16;
45 /// Convert this event into an index.
46 ///
47 /// For efficiency, indices should be small and densely packed.
48 fn to_index(self) -> u16;
49 /// Try to reconstruct an event from its index. Return None if the index is
50 /// out-of-bounds.
51 fn from_index(flag: u16) -> Option<Self>;
52}
53
54/// Implements [`FlagEvent`] for a C-like enum
55///
56/// Requiremets:
57///
58/// * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
59///
60/// * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
61/// (for example using the `num_enum` crate).
62///
63/// * The discriminants must be densely allocated.
64/// This will be done automatically by the compiler
65/// if explicit discriminants are not specified.
66/// (This property is checked in a test.)
67///
68/// * The variants may not contain any data.
69/// This is required for correctness.
70/// We think it is checked if you use `num_enum::TryFromPrimitive`.
71///
72/// # Example
73///
74// Sadly, it does not appear to be possible to doctest a private macro.
75/// ```rust,ignore
76/// use num_enum::{IntoPrimitive, TryFromPrimitive};
77/// use strum::{EnumCount, EnumIter};
78///
79/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
80/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
81/// #[non_exhaustive]
82/// #[repr(u16)]
83/// pub enum DirEvent {
84/// NewConsensus,
85/// NewDescriptors,
86/// }
87///
88/// impl_FlagEvent!{ DirEvent }
89/// ```
90macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
91 impl FlagEvent for $ty {
92 const MAXIMUM: u16 = {
93 let count = <$ty as $crate::strum::EnumCount>::COUNT;
94 (count - 1) as u16
95 };
96 fn to_index(self) -> u16 {
97 self.into()
98 }
99 fn from_index(flag: u16) -> Option<Self> {
100 flag.try_into().ok()
101 }
102 }
103
104 #[test]
105 #[allow(non_snake_case)]
106 fn [< flagevent_test_variant_numbers_ $ty >]() {
107 for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
108 assert!(<$ty as FlagEvent>::to_index(variant) <=
109 <$ty as FlagEvent>::MAXIMUM,
110 "impl_FlagEvent only allowed if discriminators are dense");
111 }
112 }
113} } }
114
115impl_FlagEvent! { DirEvent }
116
117#[cfg(feature = "bridge-client")]
118impl_FlagEvent! { BridgeDescEvent }
119
120/// A publisher that broadcasts flag-level events to multiple subscribers.
121///
122/// Events with the same flag value may be coalesced: that is, if the same event
123/// is published ten times in a row, a subscriber may receive only a single
124/// notification of the event.
125///
126/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
127/// that can also broadcast events to everybody listening on the channel.
128/// Dropping the last handle closes all streams subscribed to it.
129pub(crate) struct FlagPublisher<F> {
130 /// Inner data shared by publishers and streams.
131 inner: Arc<Inner<F>>,
132}
133
134/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
135struct Inner<F> {
136 /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
137 event: event_listener::Event,
138 /// How many times has each event occurred, ever.
139 ///
140 /// (It is safe for this to wrap around.)
141 // TODO(nickm): I wish this could be an array, but const generics don't
142 // quite support that yet.
143 counts: Vec<AtomicUsize>, // I wish this could be an array.
144 /// How many publishers remain?
145 n_publishers: AtomicUsize,
146 /// Phantom member to provide correct covariance.
147 ///
148 /// The `fn` business is a covariance trick to include `F` without affecting
149 /// this object's Send/Sync status.
150 _phantom: PhantomData<fn(F) -> F>,
151}
152
153/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
154/// [`FlagPublisher`].
155pub(crate) struct FlagListener<F> {
156 /// What value of each flag's count have we seen most recently?
157 ///
158 /// Note that we count the event as "received" only once for each observed
159 /// change in the flag's count, even if that count has changed by more than
160 /// 1.
161 my_counts: Vec<usize>,
162 /// An an `EventListener` that will be notified when events are published,
163 /// or when the final publisher is dropped.
164 ///
165 /// We must always have one of these available _before_ we check any counts
166 /// in self.inner.
167 listener: event_listener::EventListener,
168 /// Reference to shared data.
169 inner: Arc<Inner<F>>,
170}
171
172impl<F: FlagEvent> Default for FlagPublisher<F> {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl<F: FlagEvent> FlagPublisher<F> {
179 /// Construct a new FlagPublisher.
180 pub(crate) fn new() -> Self {
181 // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
182 // require AtomicUsize to be Clone.
183 let counts = std::iter::repeat_with(AtomicUsize::default)
184 .take(F::MAXIMUM as usize + 1)
185 .collect();
186 FlagPublisher {
187 inner: Arc::new(Inner {
188 event: event_listener::Event::new(),
189 counts,
190 n_publishers: AtomicUsize::new(1),
191 _phantom: PhantomData,
192 }),
193 }
194 }
195
196 /// Create a new subscription to this FlagPublisher.
197 pub(crate) fn subscribe(&self) -> FlagListener<F> {
198 // We need to do this event.listen before we check the counts; otherwise
199 // we could have a sequence where: we check the count, then the
200 // publisher increments the count, then the publisher calls
201 // event.notify(), and we call event.listen(). That would cause us to
202 // miss the increment.
203 let listener = self.inner.event.listen();
204
205 FlagListener {
206 my_counts: self
207 .inner
208 .counts
209 .iter()
210 .map(|a| a.load(Ordering::SeqCst))
211 .collect(),
212 listener,
213 inner: Arc::clone(&self.inner),
214 }
215 }
216
217 /// Tell every listener that the provided flag has been published.
218 pub(crate) fn publish(&self, flag: F) {
219 self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
220 self.inner.event.notify(usize::MAX);
221 }
222}
223
224impl<F> Clone for FlagPublisher<F> {
225 fn clone(&self) -> FlagPublisher<F> {
226 self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
227 FlagPublisher {
228 inner: Arc::clone(&self.inner),
229 }
230 }
231}
232
233// We must implement Drop to keep count publishers, and so that when the last
234// publisher goes away, we can wake up every listener so that it notices that
235// the stream is now ended.
236impl<F> Drop for FlagPublisher<F> {
237 fn drop(&mut self) {
238 if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
239 // That was the last reference; we must notify the listeners.
240 self.inner.event.notify(usize::MAX);
241 }
242 }
243}
244
245impl<F: FlagEvent> Stream for FlagListener<F> {
246 type Item = F;
247
248 fn poll_next(
249 mut self: std::pin::Pin<&mut Self>,
250 cx: &mut std::task::Context<'_>,
251 ) -> std::task::Poll<Option<Self::Item>> {
252 loop {
253 // Notify the caller if any events are ready to fire.
254 for idx in 0..F::MAXIMUM as usize + 1 {
255 let cur = self.inner.counts[idx].load(Ordering::SeqCst);
256 // We don't have to use < here specifically, since any change
257 // indicates that the count has been modified. That lets us
258 // survive usize wraparound.
259 if cur != self.my_counts[idx] {
260 self.my_counts[idx] = cur;
261 return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
262 }
263 }
264
265 // At this point, notify the caller if there are no more publishers.
266 if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
267 return Poll::Ready(None);
268 }
269
270 if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
271 // Got a new notification; we must create a new event and continue the loop.
272 //
273 // See discussion in `FlagPublisher::subscribe()` for why we must always create
274 // this listener _before_ checking any flags.
275 self.listener = self.inner.event.listen();
276 } else {
277 // Nothing to do yet: put the listener back.
278 return Poll::Pending;
279 }
280 }
281 }
282}
283
284/// Description of the directory manager's current bootstrapping status.
285///
286/// This status does not necessarily increase monotonically: it can go backwards
287/// if (for example) our directory information expires before we're able to get
288/// new information.
289//
290// TODO(nickm): This type has gotten a bit large for being the type we send over
291// a `postage::watch`: perhaps we'd be better off having this information stored
292// in the guardmgr, and having only a summary of it sent over the
293// `postage::watch`. But for now, let's not, unless it shows up in profiles.
294#[derive(Clone, Debug, Default)]
295pub struct DirBootstrapStatus(StatusEnum);
296
297/// The contents of a DirBootstrapStatus.
298///
299/// This is a separate type since we don't want to make these variables public.
300#[derive(Clone, Debug, Default)]
301enum StatusEnum {
302 /// There is no active attempt to load or fetch a directory.
303 #[default]
304 NoActivity,
305 /// We have only one attempt to fetch a directory.
306 Single {
307 /// The currently active directory attempt.
308 ///
309 /// We're either using this directory now, or we plan to use it as soon
310 /// as it's complete enough.
311 current: StatusEntry,
312 },
313 /// We have an existing directory attempt, but it's stale, and we're
314 /// fetching a new one to replace it.
315 ///
316 /// Invariant: `current.id < next.id`
317 Replacing {
318 /// The previous attempt's status. It may still be trying to fetch
319 /// information if it has descriptors left to download.
320 current: StatusEntry,
321 /// The current attempt's status. We are not yet using this directory
322 /// for our activity, since it does not (yet) have enough information.
323 next: StatusEntry,
324 },
325}
326
327/// The status and identifier of a single attempt to download a full directory.
328#[derive(Clone, Debug)]
329struct StatusEntry {
330 /// The identifier for this attempt.
331 id: AttemptId,
332 /// The latest status.
333 status: DirStatus,
334}
335
336/// The status for a single directory.
337#[derive(Clone, Debug, Default, derive_more::Display)]
338#[display("{0}", progress)]
339pub(crate) struct DirStatus {
340 /// How much of the directory do we currently have?
341 progress: DirProgress,
342 /// How many resets have been forced while fetching this directory?
343 n_resets: usize,
344 /// How many errors have we encountered since last we advanced the
345 /// 'progress' on this directory?
346 n_errors: usize,
347 /// How many times has an `update_progress` call not actually moved us
348 /// forward since we last advanced the 'progress' on this directory?
349 n_stalls: usize,
350}
351
352/// How much progress have we made in downloading a given directory?
353///
354/// This is a separate type so that we don't make the variants public.
355#[derive(Clone, Debug, Educe)]
356#[educe(Default)]
357pub(crate) enum DirProgress {
358 /// We don't have any information yet.
359 #[educe(Default)]
360 NoConsensus {
361 /// If present, we are fetching a consensus whose valid-after time
362 /// postdates this time.
363 #[allow(dead_code)]
364 after: Option<SystemTime>,
365 },
366 /// We've downloaded a consensus, but we haven't validated it yet.
367 FetchingCerts {
368 /// The actual declared lifetime of the consensus.
369 lifetime: netstatus::Lifetime,
370 /// The lifetime for which we are willing to use this consensus. (This
371 /// may be broader than `lifetime`.)
372 usable_lifetime: netstatus::Lifetime,
373 /// A fraction (in (numerator,denominator) format) of the certificates
374 /// we have for this consensus.
375 n_certs: (u16, u16),
376 },
377 /// We've validated a consensus and we're fetching (or have fetched) its
378 /// microdescriptors.
379 Validated {
380 /// The actual declared lifetime of the consensus.
381 lifetime: netstatus::Lifetime,
382 /// The lifetime for which we are willing to use this consensus. (This
383 /// may be broader than `lifetime`.)
384 usable_lifetime: netstatus::Lifetime,
385 /// A fraction (in (numerator,denominator) form) of the microdescriptors
386 /// that we have for this consensus.
387 n_mds: (u32, u32),
388 /// True iff we've decided that the consensus is usable.
389 usable: bool,
390 // TODO(nickm) Someday we could add a field about whether any primary
391 // guards are missing microdescriptors, to give a better explanation for
392 // the case where we won't switch our consensus because of that.
393 },
394}
395
396/// A reported diagnostic for what kind of trouble we've seen while trying to
397/// bootstrap a directory.
398///
399/// These blockages types are not yet terribly specific: if you encounter one,
400/// it's probably a good idea to check the logs to see what's really going on.
401///
402/// If you encounter connection blockage _and_ directory blockage at the same
403/// time, the connection blockage is almost certainly the real problem.
404//
405// TODO(nickm): At present these diagnostics aren't very helpful; they say too
406// much about _how we know_ that the process has gone wrong, but not so much
407// about _what the problem is_. In the future, we may wish to look more closely
408// at what _kind_ of errors or resets we've seen, so we can report better
409// information. Probably, however, we should only do that after we get some
410// experience with which problems people encounter in practice, and what
411// diagnostics would be useful for them.
412#[derive(Clone, Debug, derive_more::Display)]
413#[non_exhaustive]
414pub enum DirBlockage {
415 /// We've been downloading information without error, but we haven't
416 /// actually been getting anything that we want.
417 ///
418 /// This might indicate that there's a problem with information propagating
419 /// through the Tor network, or it might indicate that a bogus consensus or
420 /// a bad clock has tricked us into asking for something that nobody has.
421 #[display("Can't make progress.")]
422 Stalled,
423 /// We've gotten a lot of errors without making forward progress on our
424 /// bootstrap attempt.
425 ///
426 /// This might indicate that something's wrong with the Tor network, or that
427 /// there's something buggy with our ability to handle directory responses.
428 /// It might also indicate a malfunction on our directory guards, or a bug
429 /// on our retry logic.
430 #[display("Too many errors without making progress.")]
431 TooManyErrors,
432 /// We've reset our bootstrap attempt a lot of times.
433 ///
434 /// This either indicates that we have been failing a lot for one of the
435 /// other reasons above, or that we keep getting served a consensus which
436 /// turns out, upon trying to fetch certificates, not to be usable. It can
437 /// also indicate a bug in our retry logic.
438 #[display("Had to reset bootstrapping too many times.")]
439 TooManyResets,
440}
441
442impl fmt::Display for DirProgress {
443 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444 /// Format this time in a format useful for displaying
445 /// lifetime boundaries.
446 fn fmt_time(t: SystemTime) -> String {
447 use once_cell::sync::Lazy;
448 /// Formatter object for lifetime boundaries.
449 ///
450 /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
451 /// sub-second times here, and using non-UTC offsets is confusing
452 /// in this context.
453 static FORMAT: Lazy<Vec<time::format_description::FormatItem>> = Lazy::new(|| {
454 time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] UTC")
455 .expect("Invalid time format")
456 });
457 OffsetDateTime::from(t)
458 .format(&FORMAT)
459 .unwrap_or_else(|_| "(could not format)".into())
460 }
461
462 match &self {
463 DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
464 DirProgress::FetchingCerts { n_certs, .. } => write!(
465 f,
466 "fetching authority certificates ({}/{})",
467 n_certs.0, n_certs.1
468 ),
469 DirProgress::Validated {
470 usable: false,
471 n_mds,
472 ..
473 } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
474 DirProgress::Validated {
475 usable: true,
476 lifetime,
477 ..
478 } => write!(
479 f,
480 "usable, fresh until {}, and valid until {}",
481 fmt_time(lifetime.fresh_until()),
482 fmt_time(lifetime.valid_until())
483 ),
484 }
485 }
486}
487
488impl fmt::Display for DirBootstrapStatus {
489 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490 match &self.0 {
491 StatusEnum::NoActivity => write!(f, "not downloading")?,
492 StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
493 StatusEnum::Replacing { current, next } => write!(
494 f,
495 "directory is {}; next directory is {}",
496 current.status, next.status
497 )?,
498 }
499 Ok(())
500 }
501}
502
503impl DirBootstrapStatus {
504 /// Return the current DirStatus.
505 ///
506 /// This is the _most complete_ status. If we have any usable status, it is
507 /// this one.
508 fn current(&self) -> Option<&DirStatus> {
509 match &self.0 {
510 StatusEnum::NoActivity => None,
511 StatusEnum::Single { current } => Some(¤t.status),
512 StatusEnum::Replacing { current, .. } => Some(¤t.status),
513 }
514 }
515
516 /// Return the next DirStatus, if there is one.
517 fn next(&self) -> Option<&DirStatus> {
518 match &self.0 {
519 StatusEnum::Replacing { next, .. } => Some(&next.status),
520 _ => None,
521 }
522 }
523
524 /// Return the contained `DirStatus`es, in order: `current`, then `next`
525 #[allow(clippy::implied_bounds_in_impls)]
526 fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
527 chain!(self.current(), self.next(),)
528 }
529
530 /// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
531 #[allow(clippy::implied_bounds_in_impls)]
532 fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
533 let (current, next) = match &mut self.0 {
534 StatusEnum::NoActivity => (None, None),
535 StatusEnum::Single { current } => (Some(current), None),
536 StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
537 };
538 chain!(current, next,)
539 }
540
541 /// Return the fraction of completion for directory download, in a form
542 /// suitable for a progress bar at some particular time.
543 ///
544 /// This value is not monotonic, and can go down as one directory is
545 /// replaced with another.
546 ///
547 /// Callers _should not_ depend on the specific meaning of any particular
548 /// fraction; we may change these fractions in the future.
549 pub fn frac_at(&self, when: SystemTime) -> f32 {
550 self.statuses()
551 .filter_map(|st| st.frac_at(when))
552 .next()
553 .unwrap_or(0.0)
554 }
555
556 /// Return true if this status indicates that we have a current usable
557 /// directory.
558 pub fn usable_at(&self, now: SystemTime) -> bool {
559 if let Some(current) = self.current() {
560 current.progress.usable() && current.okay_to_use_at(now)
561 } else {
562 false
563 }
564 }
565
566 /// If there is a problem with our attempts to bootstrap, return a
567 /// corresponding DirBlockage.
568 pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
569 if let Some(current) = self.current() {
570 if current.progress.usable() && current.declared_live_at(now) {
571 // The current directory is sufficient, and not even a little bit
572 // expired. There is no problem.
573 return None;
574 }
575 }
576
577 // Any blockage in "current" is more serious, so return that if there is one
578 self.statuses().filter_map(|st| st.blockage()).next()
579 }
580
581 /// Return the appropriate DirStatus for `AttemptId`, constructing it if
582 /// necessary.
583 ///
584 /// Return None if all relevant attempts are more recent than this Id.
585 #[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
586 fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
587 // First, ensure that we have a *recent enough* attempt
588 // Look for the latest attempt, and see if it's new enough; if not, start a new one.
589 if self
590 .entries_mut()
591 .rev()
592 .take(1)
593 .find(|entry| entry.id >= attempt_id)
594 .is_none()
595 {
596 let current = match std::mem::take(&mut self.0) {
597 StatusEnum::NoActivity => None,
598 StatusEnum::Single { current } => Some(current),
599 StatusEnum::Replacing { current, .. } => Some(current),
600 };
601 // If we have a `current` already, we keep it, and restart `next`.
602 let next = StatusEntry::new(attempt_id);
603 self.0 = match current {
604 None => StatusEnum::Single { current: next },
605 Some(current) => StatusEnum::Replacing { current, next },
606 };
607 }
608
609 // Find the entry with `attempt_id` and return it.
610 // (Despite the above, there might not be one: maybe `attempt_id` is old.)
611 self.entries_mut()
612 .find(|entry| entry.id == attempt_id)
613 .map(|entry| &mut entry.status)
614 }
615
616 /// If the "next" status is usable, replace the current status with it.
617 fn advance_status(&mut self) {
618 // TODO: should make sure that the compiler is smart enough to optimize
619 // this mem::take() and replacement away, and turn it into a conditional
620 // replacement?
621 self.0 = match std::mem::take(&mut self.0) {
622 StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
623 StatusEnum::Single { current: next }
624 }
625 other => other,
626 };
627 }
628
629 /// Update this status by replacing the `DirProgress` in its current status
630 /// (or its next status) with `new_status`, as appropriate.
631 pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
632 if let Some(status) = self.mut_status_for(attempt_id) {
633 let old_frac = status.frac();
634 status.progress = new_progress;
635 let new_frac = status.frac();
636 if new_frac > old_frac {
637 // This download has made progress: clear our count of errors
638 // and stalls.
639 status.n_errors = 0;
640 status.n_stalls = 0;
641 } else {
642 // This download didn't make progress; increment the stall
643 // count.
644 status.n_stalls += 1;
645 }
646 self.advance_status();
647 }
648 }
649
650 /// Update this status by noting that some errors have occurred in a given
651 /// download attempt.
652 pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
653 if let Some(status) = self.mut_status_for(attempt_id) {
654 status.n_errors += n_errors;
655 }
656 }
657
658 /// Update this status by noting that we had to reset a given download attempt;
659 pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
660 if let Some(status) = self.mut_status_for(attempt_id) {
661 status.n_resets += 1;
662 }
663 }
664}
665
666impl StatusEntry {
667 /// Construct a new StatusEntry with a given attempt id, and no progress
668 /// reported.
669 fn new(id: AttemptId) -> Self {
670 Self {
671 id,
672 status: DirStatus::default(),
673 }
674 }
675}
676
677impl DirStatus {
678 /// Return the declared consensus lifetime for this directory, if we have one.
679 fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
680 match &self.progress {
681 DirProgress::NoConsensus { .. } => None,
682 DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
683 DirProgress::Validated { lifetime, .. } => Some(lifetime),
684 }
685 }
686
687 /// Return the consensus lifetime for this directory, if we have one, as
688 /// modified by our skew-tolerance settings.
689 fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
690 match &self.progress {
691 DirProgress::NoConsensus { .. } => None,
692 DirProgress::FetchingCerts {
693 usable_lifetime, ..
694 } => Some(usable_lifetime),
695 DirProgress::Validated {
696 usable_lifetime, ..
697 } => Some(usable_lifetime),
698 }
699 }
700
701 /// Return true if the directory is valid at the given time, as modified by
702 /// our clock skew settings.
703 fn okay_to_use_at(&self, when: SystemTime) -> bool {
704 self.usable_lifetime()
705 .map(|lt| lt.valid_at(when))
706 .unwrap_or(false)
707 }
708
709 /// Return true if the directory is valid at the given time, _unmodified_ by our
710 /// clock skew settings.
711 fn declared_live_at(&self, when: SystemTime) -> bool {
712 self.declared_lifetime()
713 .map(|lt| lt.valid_at(when))
714 .unwrap_or(false)
715 }
716
717 /// As `frac`, but return None if this consensus is not valid at the given time,
718 /// and down-rate expired consensuses that we're still willing to use.
719 fn frac_at(&self, when: SystemTime) -> Option<f32> {
720 if self
721 .declared_lifetime()
722 .map(|lt| lt.valid_at(when))
723 .unwrap_or(false)
724 {
725 // We're officially okay to use this directory.
726 Some(self.frac())
727 } else if self.okay_to_use_at(when) {
728 // This directory is a little expired, but only a little.
729 Some(self.frac() * 0.9)
730 } else {
731 None
732 }
733 }
734
735 /// Return the fraction of completion for directory download, in a form
736 /// suitable for a progress bar.
737 ///
738 /// This is monotonically increasing for a single directory, but can go down
739 /// as one directory is replaced with another.
740 ///
741 /// Callers _should not_ depend on the specific meaning of any particular
742 /// fraction; we may change these fractions in the future.
743 fn frac(&self) -> f32 {
744 // We arbitrarily decide that 25% is downloading the consensus, 10% is
745 // downloading the certificates, and the remaining 65% is downloading
746 // the microdescriptors until we become usable. We may want to re-tune that in the future, but
747 // the documentation of this function should allow us to do so.
748 match &self.progress {
749 DirProgress::NoConsensus { .. } => 0.0,
750 DirProgress::FetchingCerts { n_certs, .. } => {
751 0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
752 }
753 DirProgress::Validated {
754 usable: false,
755 n_mds,
756 ..
757 } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
758 DirProgress::Validated { usable: true, .. } => 1.0,
759 }
760 }
761
762 /// If we think there is a problem with our bootstrapping process, return a
763 /// [`DirBlockage`] to describe it.
764 ///
765 /// The caller may want to also check `usable_at` to avoid reporting trouble
766 /// if the directory is currently usable.
767 fn blockage(&self) -> Option<DirBlockage> {
768 /// How many resets are sufficient for us to report a blockage?
769 const RESET_THRESHOLD: usize = 2;
770 /// How many errors are sufficient for us to report a blockage?
771 const ERROR_THRESHOLD: usize = 6;
772 /// How many no-progress download attempts are sufficient for us to
773 /// report a blockage?
774 const STALL_THRESHOLD: usize = 8;
775
776 if self.n_resets >= RESET_THRESHOLD {
777 Some(DirBlockage::TooManyResets)
778 } else if self.n_errors >= ERROR_THRESHOLD {
779 Some(DirBlockage::TooManyErrors)
780 } else if self.n_stalls >= STALL_THRESHOLD {
781 Some(DirBlockage::Stalled)
782 } else {
783 None
784 }
785 }
786}
787
788impl DirProgress {
789 /// Return true if this progress indicates a usable directory.
790 fn usable(&self) -> bool {
791 matches!(self, DirProgress::Validated { usable: true, .. })
792 }
793}
794
795/// A stream of [`DirBootstrapStatus`] events.
796#[derive(Clone, Educe)]
797#[educe(Debug)]
798pub struct DirBootstrapEvents {
799 /// The `postage::watch::Receiver` that we're wrapping.
800 ///
801 /// We wrap this type so that we don't expose its entire API, and so that we
802 /// can migrate to some other implementation in the future if we want.
803 #[educe(Debug(method = "skip_fmt"))]
804 pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
805}
806
807impl Stream for DirBootstrapEvents {
808 type Item = DirBootstrapStatus;
809
810 fn poll_next(
811 mut self: Pin<&mut Self>,
812 cx: &mut std::task::Context<'_>,
813 ) -> Poll<Option<Self::Item>> {
814 self.inner.poll_next_unpin(cx)
815 }
816}
817
818#[cfg(test)]
819mod test {
820 // @@ begin test lint list maintained by maint/add_warning @@
821 #![allow(clippy::bool_assert_comparison)]
822 #![allow(clippy::clone_on_copy)]
823 #![allow(clippy::dbg_macro)]
824 #![allow(clippy::mixed_attributes_style)]
825 #![allow(clippy::print_stderr)]
826 #![allow(clippy::print_stdout)]
827 #![allow(clippy::single_char_pattern)]
828 #![allow(clippy::unwrap_used)]
829 #![allow(clippy::unchecked_duration_subtraction)]
830 #![allow(clippy::useless_vec)]
831 #![allow(clippy::needless_pass_by_value)]
832 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
833 use std::time::Duration;
834
835 use super::*;
836 use float_eq::assert_float_eq;
837 use futures::stream::StreamExt;
838 use tor_rtcompat::test_with_all_runtimes;
839
840 #[test]
841 fn subscribe_and_publish() {
842 test_with_all_runtimes!(|_rt| async {
843 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
844 let mut sub1 = publish.subscribe();
845 publish.publish(DirEvent::NewConsensus);
846 let mut sub2 = publish.subscribe();
847 let ev = event_listener::Event::new();
848 let lis = ev.listen();
849
850 futures::join!(
851 async {
852 // sub1 was created in time to see this event...
853 let val1 = sub1.next().await;
854 assert_eq!(val1, Some(DirEvent::NewConsensus));
855 ev.notify(1); // Tell the third task below to drop the publisher.
856 let val2 = sub1.next().await;
857 assert_eq!(val2, None);
858 },
859 async {
860 let val = sub2.next().await;
861 assert_eq!(val, None);
862 },
863 async {
864 lis.await;
865 drop(publish);
866 }
867 );
868 });
869 }
870
871 #[test]
872 fn receive_two() {
873 test_with_all_runtimes!(|_rt| async {
874 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
875
876 let mut sub = publish.subscribe();
877 let ev = event_listener::Event::new();
878 let ev_lis = ev.listen();
879 futures::join!(
880 async {
881 let val1 = sub.next().await;
882 assert_eq!(val1, Some(DirEvent::NewDescriptors));
883 ev.notify(1);
884 let val2 = sub.next().await;
885 assert_eq!(val2, Some(DirEvent::NewConsensus));
886 },
887 async {
888 publish.publish(DirEvent::NewDescriptors);
889 ev_lis.await;
890 publish.publish(DirEvent::NewConsensus);
891 }
892 );
893 });
894 }
895
896 #[test]
897 fn two_publishers() {
898 test_with_all_runtimes!(|_rt| async {
899 let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
900 let publish2 = publish1.clone();
901
902 let mut sub = publish1.subscribe();
903 let ev1 = event_listener::Event::new();
904 let ev2 = event_listener::Event::new();
905 let ev1_lis = ev1.listen();
906 let ev2_lis = ev2.listen();
907 futures::join!(
908 async {
909 let mut count = [0_usize; 2];
910 // These awaits guarantee that we will see at least one event flag of each
911 // type, before the stream is dropped.
912 ev1_lis.await;
913 ev2_lis.await;
914 while let Some(e) = sub.next().await {
915 count[e.to_index() as usize] += 1;
916 }
917 assert!(count[0] > 0);
918 assert!(count[1] > 0);
919 assert!(count[0] <= 100);
920 assert!(count[1] <= 100);
921 },
922 async {
923 for _ in 0..100 {
924 publish1.publish(DirEvent::NewDescriptors);
925 ev1.notify(1);
926 tor_rtcompat::task::yield_now().await;
927 }
928 drop(publish1);
929 },
930 async {
931 for _ in 0..100 {
932 publish2.publish(DirEvent::NewConsensus);
933 ev2.notify(1);
934 tor_rtcompat::task::yield_now().await;
935 }
936 drop(publish2);
937 }
938 );
939 });
940 }
941
942 #[test]
943 fn receive_after_publishers_are_gone() {
944 test_with_all_runtimes!(|_rt| async {
945 let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
946
947 let mut sub = publish.subscribe();
948
949 publish.publish(DirEvent::NewConsensus);
950 drop(publish);
951 let v = sub.next().await;
952 assert_eq!(v, Some(DirEvent::NewConsensus));
953 let v = sub.next().await;
954 assert!(v.is_none());
955 });
956 }
957
958 #[test]
959 fn failed_conversion() {
960 assert_eq!(DirEvent::from_index(999), None);
961 }
962
963 #[test]
964 fn dir_status_basics() {
965 let now = SystemTime::now();
966 let hour = Duration::new(3600, 0);
967
968 let nothing = DirStatus {
969 progress: DirProgress::NoConsensus { after: None },
970 ..Default::default()
971 };
972 let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
973 let unval = DirStatus {
974 progress: DirProgress::FetchingCerts {
975 lifetime: lifetime.clone(),
976 usable_lifetime: lifetime,
977 n_certs: (3, 5),
978 },
979 ..Default::default()
980 };
981 let lifetime =
982 netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
983 let with_c = DirStatus {
984 progress: DirProgress::Validated {
985 lifetime: lifetime.clone(),
986 usable_lifetime: lifetime,
987 n_mds: (30, 40),
988 usable: false,
989 },
990 ..Default::default()
991 };
992
993 // lifetime()
994 assert!(nothing.usable_lifetime().is_none());
995 assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
996 assert_eq!(
997 with_c.usable_lifetime().unwrap().valid_until(),
998 now + hour * 3
999 );
1000
1001 // frac() (It's okay if we change the actual numbers here later; the
1002 // current ones are more or less arbitrary.)
1003 const TOL: f32 = 0.00001;
1004 assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
1005 assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
1006 assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
1007
1008 // frac_at()
1009 let t1 = now + hour / 2;
1010 let t2 = t1 + hour * 2;
1011 assert!(nothing.frac_at(t1).is_none());
1012 assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
1013 assert!(with_c.frac_at(t1).is_none());
1014 assert!(nothing.frac_at(t2).is_none());
1015 assert!(unval.frac_at(t2).is_none());
1016 assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
1017 }
1018
1019 #[test]
1020 fn dir_status_display() {
1021 use time::macros::datetime;
1022 let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1023 let hour = Duration::new(3600, 0);
1024 let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1025
1026 let ds = DirStatus {
1027 progress: DirProgress::NoConsensus { after: None },
1028 ..Default::default()
1029 };
1030 assert_eq!(ds.to_string(), "fetching a consensus");
1031
1032 let ds = DirStatus {
1033 progress: DirProgress::FetchingCerts {
1034 lifetime: lifetime.clone(),
1035 usable_lifetime: lifetime.clone(),
1036 n_certs: (3, 5),
1037 },
1038 ..Default::default()
1039 };
1040 assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
1041
1042 let ds = DirStatus {
1043 progress: DirProgress::Validated {
1044 lifetime: lifetime.clone(),
1045 usable_lifetime: lifetime.clone(),
1046 n_mds: (30, 40),
1047 usable: false,
1048 },
1049 ..Default::default()
1050 };
1051 assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
1052
1053 let ds = DirStatus {
1054 progress: DirProgress::Validated {
1055 lifetime: lifetime.clone(),
1056 usable_lifetime: lifetime,
1057 n_mds: (30, 40),
1058 usable: true,
1059 },
1060 ..Default::default()
1061 };
1062 assert_eq!(
1063 ds.to_string(),
1064 "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
1065 );
1066 }
1067
1068 #[test]
1069 fn bootstrap_status() {
1070 use time::macros::datetime;
1071 let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
1072 let hour = Duration::new(3600, 0);
1073 let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
1074 let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
1075
1076 let dp1 = DirProgress::Validated {
1077 lifetime: lifetime.clone(),
1078 usable_lifetime: lifetime.clone(),
1079 n_mds: (3, 40),
1080 usable: true,
1081 };
1082 let dp2 = DirProgress::Validated {
1083 lifetime: lifetime2.clone(),
1084 usable_lifetime: lifetime2.clone(),
1085 n_mds: (5, 40),
1086 usable: false,
1087 };
1088 let attempt1 = AttemptId::next();
1089 let attempt2 = AttemptId::next();
1090
1091 let bs = DirBootstrapStatus(StatusEnum::Replacing {
1092 current: StatusEntry {
1093 id: attempt1,
1094 status: DirStatus {
1095 progress: dp1.clone(),
1096 ..Default::default()
1097 },
1098 },
1099 next: StatusEntry {
1100 id: attempt2,
1101 status: DirStatus {
1102 progress: dp2.clone(),
1103 ..Default::default()
1104 },
1105 },
1106 });
1107
1108 assert_eq!(bs.to_string(),
1109 "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
1110 );
1111
1112 const TOL: f32 = 0.00001;
1113 assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
1114 assert_float_eq!(
1115 bs.frac_at(t1 + hour * 3 + hour / 2),
1116 0.35 + 0.65 * 0.125,
1117 abs <= TOL
1118 );
1119
1120 // Now try updating.
1121
1122 // Case 1: we have a usable directory and the updated status isn't usable.
1123 let mut bs = bs;
1124 let dp3 = DirProgress::Validated {
1125 lifetime: lifetime2.clone(),
1126 usable_lifetime: lifetime2.clone(),
1127 n_mds: (10, 40),
1128 usable: false,
1129 };
1130
1131 bs.update_progress(attempt2, dp3);
1132 assert!(matches!(
1133 bs.next().unwrap(),
1134 DirStatus {
1135 progress: DirProgress::Validated {
1136 n_mds: (10, 40),
1137 ..
1138 },
1139 ..
1140 }
1141 ));
1142
1143 // Case 2: The new directory _is_ usable and newer. It will replace the old one.
1144 let ds4 = DirStatus {
1145 progress: DirProgress::Validated {
1146 lifetime: lifetime2.clone(),
1147 usable_lifetime: lifetime2.clone(),
1148 n_mds: (20, 40),
1149 usable: true,
1150 },
1151 ..Default::default()
1152 };
1153 bs.update_progress(attempt2, ds4.progress);
1154 assert!(bs.next().is_none());
1155 assert_eq!(
1156 bs.current()
1157 .unwrap()
1158 .usable_lifetime()
1159 .unwrap()
1160 .valid_after(),
1161 lifetime2.valid_after()
1162 );
1163
1164 // Case 3: The new directory is usable but older. Nothing will happen.
1165 bs.update_progress(attempt1, dp1);
1166 assert!(bs.next().as_ref().is_none());
1167 assert_ne!(
1168 bs.current()
1169 .unwrap()
1170 .usable_lifetime()
1171 .unwrap()
1172 .valid_after(),
1173 lifetime.valid_after()
1174 );
1175
1176 // Case 4: starting with an unusable directory, we always replace.
1177 let mut bs = DirBootstrapStatus::default();
1178 assert!(!dp2.usable());
1179 assert!(bs.current().is_none());
1180 bs.update_progress(attempt2, dp2);
1181 assert!(bs.current().unwrap().usable_lifetime().is_some());
1182 }
1183}