Lines
87.17 %
Functions
49.81 %
Branches
100 %
//! Code for notifying other modules about changes in the directory.
// TODO(nickm): After we have enough experience with this FlagPublisher, we
// might want to make it a public interface. If we do it should probably move
// into another crate.
use std::{
fmt,
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Poll,
time::SystemTime,
};
use educe::Educe;
use futures::{stream::Stream, Future, StreamExt};
use itertools::chain;
use paste::paste;
use time::OffsetDateTime;
use tor_basic_utils::skip_fmt;
use tor_netdir::DirEvent;
use tor_netdoc::doc::netstatus;
#[cfg(feature = "bridge-client")]
use tor_guardmgr::bridge::BridgeDescEvent;
use crate::bootstrap::AttemptId;
/// A trait to indicate something that can be published with [`FlagPublisher`].
///
/// Since the implementation of `FlagPublisher` requires that its events be
/// represented as small integers, this trait is mainly about converting to and
/// from those integers.
pub(crate) trait FlagEvent: Sized {
/// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
/// for this type.
/// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
/// known flags, and sometimes iterates over the whole vector.
const MAXIMUM: u16;
/// Convert this event into an index.
/// For efficiency, indices should be small and densely packed.
fn to_index(self) -> u16;
/// Try to reconstruct an event from its index. Return None if the index is
/// out-of-bounds.
fn from_index(flag: u16) -> Option<Self>;
}
/// Implements [`FlagEvent`] for a C-like enum
/// Requiremets:
/// * `$ty` must implement [`strum::EnumCount`] [`strum::IntoEnumIterator`]
/// * `$ty` type must implement [`Into<u16>`] and [`TryFrom<u16>`]
/// (for example using the `num_enum` crate).
/// * The discriminants must be densely allocated.
/// This will be done automatically by the compiler
/// if explicit discriminants are not specified.
/// (This property is checked in a test.)
/// * The variants may not contain any data.
/// This is required for correctness.
/// We think it is checked if you use `num_enum::TryFromPrimitive`.
/// # Example
// Sadly, it does not appear to be possible to doctest a private macro.
/// ```rust,ignore
/// use num_enum::{IntoPrimitive, TryFromPrimitive};
/// use strum::{EnumCount, EnumIter};
/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// #[derive(EnumIter, EnumCount, IntoPrimitive, TryFromPrimitive)]
/// #[non_exhaustive]
/// #[repr(u16)]
/// pub enum DirEvent {
/// NewConsensus,
/// NewDescriptors,
/// }
/// impl_FlagEvent!{ DirEvent }
/// ```
macro_rules! impl_FlagEvent { { $ty:ident } => { paste!{
impl FlagEvent for $ty {
const MAXIMUM: u16 = {
let count = <$ty as $crate::strum::EnumCount>::COUNT;
(count - 1) as u16
fn to_index(self) -> u16 {
self.into()
fn from_index(flag: u16) -> Option<Self> {
flag.try_into().ok()
#[test]
#[allow(non_snake_case)]
fn [< flagevent_test_variant_numbers_ $ty >]() {
for variant in <$ty as $crate::strum::IntoEnumIterator>::iter() {
assert!(<$ty as FlagEvent>::to_index(variant) <=
<$ty as FlagEvent>::MAXIMUM,
"impl_FlagEvent only allowed if discriminators are dense");
} } }
impl_FlagEvent! { DirEvent }
impl_FlagEvent! { BridgeDescEvent }
/// A publisher that broadcasts flag-level events to multiple subscribers.
/// Events with the same flag value may be coalesced: that is, if the same event
/// is published ten times in a row, a subscriber may receive only a single
/// notification of the event.
/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
/// that can also broadcast events to everybody listening on the channel.
/// Dropping the last handle closes all streams subscribed to it.
pub(crate) struct FlagPublisher<F> {
/// Inner data shared by publishers and streams.
inner: Arc<Inner<F>>,
/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
struct Inner<F> {
/// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
event: event_listener::Event,
/// How many times has each event occurred, ever.
/// (It is safe for this to wrap around.)
// TODO(nickm): I wish this could be an array, but const generics don't
// quite support that yet.
counts: Vec<AtomicUsize>, // I wish this could be an array.
/// How many publishers remain?
n_publishers: AtomicUsize,
/// Phantom member to provide correct covariance.
/// The `fn` business is a covariance trick to include `F` without affecting
/// this object's Send/Sync status.
_phantom: PhantomData<fn(F) -> F>,
/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
/// [`FlagPublisher`].
pub(crate) struct FlagListener<F> {
/// What value of each flag's count have we seen most recently?
/// Note that we count the event as "received" only once for each observed
/// change in the flag's count, even if that count has changed by more than
/// 1.
my_counts: Vec<usize>,
/// An an `EventListener` that will be notified when events are published,
/// or when the final publisher is dropped.
/// We must always have one of these available _before_ we check any counts
/// in self.inner.
listener: event_listener::EventListener,
/// Reference to shared data.
impl<F: FlagEvent> Default for FlagPublisher<F> {
fn default() -> Self {
Self::new()
impl<F: FlagEvent> FlagPublisher<F> {
/// Construct a new FlagPublisher.
pub(crate) fn new() -> Self {
// We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
// require AtomicUsize to be Clone.
let counts = std::iter::repeat_with(AtomicUsize::default)
.take(F::MAXIMUM as usize + 1)
.collect();
FlagPublisher {
inner: Arc::new(Inner {
event: event_listener::Event::new(),
counts,
n_publishers: AtomicUsize::new(1),
_phantom: PhantomData,
}),
/// Create a new subscription to this FlagPublisher.
pub(crate) fn subscribe(&self) -> FlagListener<F> {
// We need to do this event.listen before we check the counts; otherwise
// we could have a sequence where: we check the count, then the
// publisher increments the count, then the publisher calls
// event.notify(), and we call event.listen(). That would cause us to
// miss the increment.
let listener = self.inner.event.listen();
FlagListener {
my_counts: self
.inner
.counts
.iter()
.map(|a| a.load(Ordering::SeqCst))
.collect(),
listener,
inner: Arc::clone(&self.inner),
/// Tell every listener that the provided flag has been published.
pub(crate) fn publish(&self, flag: F) {
self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
self.inner.event.notify(usize::MAX);
impl<F> Clone for FlagPublisher<F> {
fn clone(&self) -> FlagPublisher<F> {
self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
// We must implement Drop to keep count publishers, and so that when the last
// publisher goes away, we can wake up every listener so that it notices that
// the stream is now ended.
impl<F> Drop for FlagPublisher<F> {
fn drop(&mut self) {
if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
// That was the last reference; we must notify the listeners.
impl<F: FlagEvent> Stream for FlagListener<F> {
type Item = F;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
// Notify the caller if any events are ready to fire.
for idx in 0..F::MAXIMUM as usize + 1 {
let cur = self.inner.counts[idx].load(Ordering::SeqCst);
// We don't have to use < here specifically, since any change
// indicates that the count has been modified. That lets us
// survive usize wraparound.
if cur != self.my_counts[idx] {
self.my_counts[idx] = cur;
return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
// At this point, notify the caller if there are no more publishers.
if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
return Poll::Ready(None);
if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
// Got a new notification; we must create a new event and continue the loop.
//
// See discussion in `FlagPublisher::subscribe()` for why we must always create
// this listener _before_ checking any flags.
self.listener = self.inner.event.listen();
} else {
// Nothing to do yet: put the listener back.
return Poll::Pending;
/// Description of the directory manager's current bootstrapping status.
/// This status does not necessarily increase monotonically: it can go backwards
/// if (for example) our directory information expires before we're able to get
/// new information.
// TODO(nickm): This type has gotten a bit large for being the type we send over
// a `postage::watch`: perhaps we'd be better off having this information stored
// in the guardmgr, and having only a summary of it sent over the
// `postage::watch`. But for now, let's not, unless it shows up in profiles.
#[derive(Clone, Debug, Default)]
pub struct DirBootstrapStatus(StatusEnum);
/// The contents of a DirBootstrapStatus.
/// This is a separate type since we don't want to make these variables public.
enum StatusEnum {
/// There is no active attempt to load or fetch a directory.
#[default]
NoActivity,
/// We have only one attempt to fetch a directory.
Single {
/// The currently active directory attempt.
/// We're either using this directory now, or we plan to use it as soon
/// as it's complete enough.
current: StatusEntry,
/// We have an existing directory attempt, but it's stale, and we're
/// fetching a new one to replace it.
/// Invariant: `current.id < next.id`
Replacing {
/// The previous attempt's status. It may still be trying to fetch
/// information if it has descriptors left to download.
/// The current attempt's status. We are not yet using this directory
/// for our activity, since it does not (yet) have enough information.
next: StatusEntry,
/// The status and identifier of a single attempt to download a full directory.
#[derive(Clone, Debug)]
struct StatusEntry {
/// The identifier for this attempt.
id: AttemptId,
/// The latest status.
status: DirStatus,
/// The status for a single directory.
#[derive(Clone, Debug, Default, derive_more::Display)]
#[display("{0}", progress)]
pub(crate) struct DirStatus {
/// How much of the directory do we currently have?
progress: DirProgress,
/// How many resets have been forced while fetching this directory?
n_resets: usize,
/// How many errors have we encountered since last we advanced the
/// 'progress' on this directory?
n_errors: usize,
/// How many times has an `update_progress` call not actually moved us
/// forward since we last advanced the 'progress' on this directory?
n_stalls: usize,
/// How much progress have we made in downloading a given directory?
/// This is a separate type so that we don't make the variants public.
#[derive(Clone, Debug, Educe)]
#[educe(Default)]
pub(crate) enum DirProgress {
/// We don't have any information yet.
NoConsensus {
/// If present, we are fetching a consensus whose valid-after time
/// postdates this time.
#[allow(dead_code)]
after: Option<SystemTime>,
/// We've downloaded a consensus, but we haven't validated it yet.
FetchingCerts {
/// The actual declared lifetime of the consensus.
lifetime: netstatus::Lifetime,
/// The lifetime for which we are willing to use this consensus. (This
/// may be broader than `lifetime`.)
usable_lifetime: netstatus::Lifetime,
/// A fraction (in (numerator,denominator) format) of the certificates
/// we have for this consensus.
n_certs: (u16, u16),
/// We've validated a consensus and we're fetching (or have fetched) its
/// microdescriptors.
Validated {
/// A fraction (in (numerator,denominator) form) of the microdescriptors
/// that we have for this consensus.
n_mds: (u32, u32),
/// True iff we've decided that the consensus is usable.
usable: bool,
// TODO(nickm) Someday we could add a field about whether any primary
// guards are missing microdescriptors, to give a better explanation for
// the case where we won't switch our consensus because of that.
/// A reported diagnostic for what kind of trouble we've seen while trying to
/// bootstrap a directory.
/// These blockages types are not yet terribly specific: if you encounter one,
/// it's probably a good idea to check the logs to see what's really going on.
/// If you encounter connection blockage _and_ directory blockage at the same
/// time, the connection blockage is almost certainly the real problem.
// TODO(nickm): At present these diagnostics aren't very helpful; they say too
// much about _how we know_ that the process has gone wrong, but not so much
// about _what the problem is_. In the future, we may wish to look more closely
// at what _kind_ of errors or resets we've seen, so we can report better
// information. Probably, however, we should only do that after we get some
// experience with which problems people encounter in practice, and what
// diagnostics would be useful for them.
#[derive(Clone, Debug, derive_more::Display)]
#[non_exhaustive]
pub enum DirBlockage {
/// We've been downloading information without error, but we haven't
/// actually been getting anything that we want.
/// This might indicate that there's a problem with information propagating
/// through the Tor network, or it might indicate that a bogus consensus or
/// a bad clock has tricked us into asking for something that nobody has.
#[display("Can't make progress.")]
Stalled,
/// We've gotten a lot of errors without making forward progress on our
/// bootstrap attempt.
/// This might indicate that something's wrong with the Tor network, or that
/// there's something buggy with our ability to handle directory responses.
/// It might also indicate a malfunction on our directory guards, or a bug
/// on our retry logic.
#[display("Too many errors without making progress.")]
TooManyErrors,
/// We've reset our bootstrap attempt a lot of times.
/// This either indicates that we have been failing a lot for one of the
/// other reasons above, or that we keep getting served a consensus which
/// turns out, upon trying to fetch certificates, not to be usable. It can
/// also indicate a bug in our retry logic.
#[display("Had to reset bootstrapping too many times.")]
TooManyResets,
impl fmt::Display for DirProgress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// Format this time in a format useful for displaying
/// lifetime boundaries.
fn fmt_time(t: SystemTime) -> String {
use once_cell::sync::Lazy;
/// Formatter object for lifetime boundaries.
/// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
/// sub-second times here, and using non-UTC offsets is confusing
/// in this context.
static FORMAT: Lazy<Vec<time::format_description::FormatItem>> = Lazy::new(|| {
time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] UTC")
.expect("Invalid time format")
});
OffsetDateTime::from(t)
.format(&FORMAT)
.unwrap_or_else(|_| "(could not format)".into())
match &self {
DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
DirProgress::FetchingCerts { n_certs, .. } => write!(
f,
"fetching authority certificates ({}/{})",
n_certs.0, n_certs.1
),
DirProgress::Validated {
usable: false,
n_mds,
..
} => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
usable: true,
lifetime,
} => write!(
"usable, fresh until {}, and valid until {}",
fmt_time(lifetime.fresh_until()),
fmt_time(lifetime.valid_until())
impl fmt::Display for DirBootstrapStatus {
match &self.0 {
StatusEnum::NoActivity => write!(f, "not downloading")?,
StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
StatusEnum::Replacing { current, next } => write!(
"directory is {}; next directory is {}",
current.status, next.status
)?,
Ok(())
impl DirBootstrapStatus {
/// Return the current DirStatus.
/// This is the _most complete_ status. If we have any usable status, it is
/// this one.
fn current(&self) -> Option<&DirStatus> {
StatusEnum::NoActivity => None,
StatusEnum::Single { current } => Some(¤t.status),
StatusEnum::Replacing { current, .. } => Some(¤t.status),
/// Return the next DirStatus, if there is one.
fn next(&self) -> Option<&DirStatus> {
StatusEnum::Replacing { next, .. } => Some(&next.status),
_ => None,
/// Return the contained `DirStatus`es, in order: `current`, then `next`
#[allow(clippy::implied_bounds_in_impls)]
fn statuses(&self) -> impl Iterator<Item = &DirStatus> + DoubleEndedIterator {
chain!(self.current(), self.next(),)
/// Return the contained `StatusEntry`s mutably, in order: `current`, then `next`
fn entries_mut(&mut self) -> impl Iterator<Item = &mut StatusEntry> + DoubleEndedIterator {
let (current, next) = match &mut self.0 {
StatusEnum::NoActivity => (None, None),
StatusEnum::Single { current } => (Some(current), None),
StatusEnum::Replacing { current, next } => (Some(current), Some(next)),
chain!(current, next,)
/// Return the fraction of completion for directory download, in a form
/// suitable for a progress bar at some particular time.
/// This value is not monotonic, and can go down as one directory is
/// replaced with another.
/// Callers _should not_ depend on the specific meaning of any particular
/// fraction; we may change these fractions in the future.
pub fn frac_at(&self, when: SystemTime) -> f32 {
self.statuses()
.filter_map(|st| st.frac_at(when))
.next()
.unwrap_or(0.0)
/// Return true if this status indicates that we have a current usable
/// directory.
pub fn usable_at(&self, now: SystemTime) -> bool {
if let Some(current) = self.current() {
current.progress.usable() && current.okay_to_use_at(now)
false
/// If there is a problem with our attempts to bootstrap, return a
/// corresponding DirBlockage.
pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
if current.progress.usable() && current.declared_live_at(now) {
// The current directory is sufficient, and not even a little bit
// expired. There is no problem.
return None;
// Any blockage in "current" is more serious, so return that if there is one
self.statuses().filter_map(|st| st.blockage()).next()
/// Return the appropriate DirStatus for `AttemptId`, constructing it if
/// necessary.
/// Return None if all relevant attempts are more recent than this Id.
#[allow(clippy::search_is_some)] // tpo/core/arti/-/merge_requests/599#note_2816368
fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
// First, ensure that we have a *recent enough* attempt
// Look for the latest attempt, and see if it's new enough; if not, start a new one.
if self
.entries_mut()
.rev()
.take(1)
.find(|entry| entry.id >= attempt_id)
.is_none()
{
let current = match std::mem::take(&mut self.0) {
StatusEnum::Single { current } => Some(current),
StatusEnum::Replacing { current, .. } => Some(current),
// If we have a `current` already, we keep it, and restart `next`.
let next = StatusEntry::new(attempt_id);
self.0 = match current {
None => StatusEnum::Single { current: next },
Some(current) => StatusEnum::Replacing { current, next },
// Find the entry with `attempt_id` and return it.
// (Despite the above, there might not be one: maybe `attempt_id` is old.)
self.entries_mut()
.find(|entry| entry.id == attempt_id)
.map(|entry| &mut entry.status)
/// If the "next" status is usable, replace the current status with it.
fn advance_status(&mut self) {
// TODO: should make sure that the compiler is smart enough to optimize
// this mem::take() and replacement away, and turn it into a conditional
// replacement?
self.0 = match std::mem::take(&mut self.0) {
StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
StatusEnum::Single { current: next }
other => other,
/// Update this status by replacing the `DirProgress` in its current status
/// (or its next status) with `new_status`, as appropriate.
pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
if let Some(status) = self.mut_status_for(attempt_id) {
let old_frac = status.frac();
status.progress = new_progress;
let new_frac = status.frac();
if new_frac > old_frac {
// This download has made progress: clear our count of errors
// and stalls.
status.n_errors = 0;
status.n_stalls = 0;
// This download didn't make progress; increment the stall
// count.
status.n_stalls += 1;
self.advance_status();
/// Update this status by noting that some errors have occurred in a given
/// download attempt.
pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
status.n_errors += n_errors;
/// Update this status by noting that we had to reset a given download attempt;
pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
status.n_resets += 1;
impl StatusEntry {
/// Construct a new StatusEntry with a given attempt id, and no progress
/// reported.
fn new(id: AttemptId) -> Self {
Self {
id,
status: DirStatus::default(),
impl DirStatus {
/// Return the declared consensus lifetime for this directory, if we have one.
fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
match &self.progress {
DirProgress::NoConsensus { .. } => None,
DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
DirProgress::Validated { lifetime, .. } => Some(lifetime),
/// Return the consensus lifetime for this directory, if we have one, as
/// modified by our skew-tolerance settings.
fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
DirProgress::FetchingCerts {
usable_lifetime, ..
} => Some(usable_lifetime),
/// Return true if the directory is valid at the given time, as modified by
/// our clock skew settings.
fn okay_to_use_at(&self, when: SystemTime) -> bool {
self.usable_lifetime()
.map(|lt| lt.valid_at(when))
.unwrap_or(false)
/// Return true if the directory is valid at the given time, _unmodified_ by our
/// clock skew settings.
fn declared_live_at(&self, when: SystemTime) -> bool {
self.declared_lifetime()
/// As `frac`, but return None if this consensus is not valid at the given time,
/// and down-rate expired consensuses that we're still willing to use.
fn frac_at(&self, when: SystemTime) -> Option<f32> {
.declared_lifetime()
// We're officially okay to use this directory.
Some(self.frac())
} else if self.okay_to_use_at(when) {
// This directory is a little expired, but only a little.
Some(self.frac() * 0.9)
None
/// suitable for a progress bar.
/// This is monotonically increasing for a single directory, but can go down
/// as one directory is replaced with another.
fn frac(&self) -> f32 {
// We arbitrarily decide that 25% is downloading the consensus, 10% is
// downloading the certificates, and the remaining 65% is downloading
// the microdescriptors until we become usable. We may want to re-tune that in the future, but
// the documentation of this function should allow us to do so.
DirProgress::NoConsensus { .. } => 0.0,
DirProgress::FetchingCerts { n_certs, .. } => {
0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
} => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
DirProgress::Validated { usable: true, .. } => 1.0,
/// If we think there is a problem with our bootstrapping process, return a
/// [`DirBlockage`] to describe it.
/// The caller may want to also check `usable_at` to avoid reporting trouble
/// if the directory is currently usable.
fn blockage(&self) -> Option<DirBlockage> {
/// How many resets are sufficient for us to report a blockage?
const RESET_THRESHOLD: usize = 2;
/// How many errors are sufficient for us to report a blockage?
const ERROR_THRESHOLD: usize = 6;
/// How many no-progress download attempts are sufficient for us to
/// report a blockage?
const STALL_THRESHOLD: usize = 8;
if self.n_resets >= RESET_THRESHOLD {
Some(DirBlockage::TooManyResets)
} else if self.n_errors >= ERROR_THRESHOLD {
Some(DirBlockage::TooManyErrors)
} else if self.n_stalls >= STALL_THRESHOLD {
Some(DirBlockage::Stalled)
impl DirProgress {
/// Return true if this progress indicates a usable directory.
fn usable(&self) -> bool {
matches!(self, DirProgress::Validated { usable: true, .. })
/// A stream of [`DirBootstrapStatus`] events.
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct DirBootstrapEvents {
/// The `postage::watch::Receiver` that we're wrapping.
/// We wrap this type so that we don't expose its entire API, and so that we
/// can migrate to some other implementation in the future if we want.
#[educe(Debug(method = "skip_fmt"))]
pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
impl Stream for DirBootstrapEvents {
type Item = DirBootstrapStatus;
mut self: Pin<&mut Self>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use std::time::Duration;
use super::*;
use float_eq::assert_float_eq;
use futures::stream::StreamExt;
use tor_rtcompat::test_with_all_runtimes;
fn subscribe_and_publish() {
test_with_all_runtimes!(|_rt| async {
let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
let mut sub1 = publish.subscribe();
publish.publish(DirEvent::NewConsensus);
let mut sub2 = publish.subscribe();
let ev = event_listener::Event::new();
let lis = ev.listen();
futures::join!(
async {
// sub1 was created in time to see this event...
let val1 = sub1.next().await;
assert_eq!(val1, Some(DirEvent::NewConsensus));
ev.notify(1); // Tell the third task below to drop the publisher.
let val2 = sub1.next().await;
assert_eq!(val2, None);
let val = sub2.next().await;
assert_eq!(val, None);
lis.await;
drop(publish);
);
fn receive_two() {
let mut sub = publish.subscribe();
let ev_lis = ev.listen();
let val1 = sub.next().await;
assert_eq!(val1, Some(DirEvent::NewDescriptors));
ev.notify(1);
let val2 = sub.next().await;
assert_eq!(val2, Some(DirEvent::NewConsensus));
publish.publish(DirEvent::NewDescriptors);
ev_lis.await;
fn two_publishers() {
let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
let publish2 = publish1.clone();
let mut sub = publish1.subscribe();
let ev1 = event_listener::Event::new();
let ev2 = event_listener::Event::new();
let ev1_lis = ev1.listen();
let ev2_lis = ev2.listen();
let mut count = [0_usize; 2];
// These awaits guarantee that we will see at least one event flag of each
// type, before the stream is dropped.
ev1_lis.await;
ev2_lis.await;
while let Some(e) = sub.next().await {
count[e.to_index() as usize] += 1;
assert!(count[0] > 0);
assert!(count[1] > 0);
assert!(count[0] <= 100);
assert!(count[1] <= 100);
for _ in 0..100 {
publish1.publish(DirEvent::NewDescriptors);
ev1.notify(1);
tor_rtcompat::task::yield_now().await;
drop(publish1);
publish2.publish(DirEvent::NewConsensus);
ev2.notify(1);
drop(publish2);
fn receive_after_publishers_are_gone() {
let v = sub.next().await;
assert_eq!(v, Some(DirEvent::NewConsensus));
assert!(v.is_none());
fn failed_conversion() {
assert_eq!(DirEvent::from_index(999), None);
fn dir_status_basics() {
let now = SystemTime::now();
let hour = Duration::new(3600, 0);
let nothing = DirStatus {
progress: DirProgress::NoConsensus { after: None },
..Default::default()
let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
let unval = DirStatus {
progress: DirProgress::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_certs: (3, 5),
let lifetime =
netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
let with_c = DirStatus {
progress: DirProgress::Validated {
n_mds: (30, 40),
// lifetime()
assert!(nothing.usable_lifetime().is_none());
assert_eq!(unval.usable_lifetime().unwrap().valid_after(), now);
assert_eq!(
with_c.usable_lifetime().unwrap().valid_until(),
now + hour * 3
// frac() (It's okay if we change the actual numbers here later; the
// current ones are more or less arbitrary.)
const TOL: f32 = 0.00001;
assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
// frac_at()
let t1 = now + hour / 2;
let t2 = t1 + hour * 2;
assert!(nothing.frac_at(t1).is_none());
assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
assert!(with_c.frac_at(t1).is_none());
assert!(nothing.frac_at(t2).is_none());
assert!(unval.frac_at(t2).is_none());
assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
fn dir_status_display() {
use time::macros::datetime;
let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let ds = DirStatus {
assert_eq!(ds.to_string(), "fetching a consensus");
usable_lifetime: lifetime.clone(),
assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
ds.to_string(),
"usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
fn bootstrap_status() {
let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
let dp1 = DirProgress::Validated {
n_mds: (3, 40),
let dp2 = DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (5, 40),
let attempt1 = AttemptId::next();
let attempt2 = AttemptId::next();
let bs = DirBootstrapStatus(StatusEnum::Replacing {
current: StatusEntry {
id: attempt1,
status: DirStatus {
progress: dp1.clone(),
next: StatusEntry {
id: attempt2,
progress: dp2.clone(),
assert_eq!(bs.to_string(),
"directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
assert_float_eq!(
bs.frac_at(t1 + hour * 3 + hour / 2),
0.35 + 0.65 * 0.125,
abs <= TOL
// Now try updating.
// Case 1: we have a usable directory and the updated status isn't usable.
let mut bs = bs;
let dp3 = DirProgress::Validated {
n_mds: (10, 40),
bs.update_progress(attempt2, dp3);
assert!(matches!(
bs.next().unwrap(),
DirStatus {
));
// Case 2: The new directory _is_ usable and newer. It will replace the old one.
let ds4 = DirStatus {
n_mds: (20, 40),
bs.update_progress(attempt2, ds4.progress);
assert!(bs.next().is_none());
bs.current()
.unwrap()
.usable_lifetime()
.valid_after(),
lifetime2.valid_after()
// Case 3: The new directory is usable but older. Nothing will happen.
bs.update_progress(attempt1, dp1);
assert!(bs.next().as_ref().is_none());
assert_ne!(
lifetime.valid_after()
// Case 4: starting with an unusable directory, we always replace.
let mut bs = DirBootstrapStatus::default();
assert!(!dp2.usable());
assert!(bs.current().is_none());
bs.update_progress(attempt2, dp2);
assert!(bs.current().unwrap().usable_lifetime().is_some());