use crate::skew::SkewObservation;
use rand::seq::IteratorRandom;
use std::time::{Duration, Instant};
use tor_linkspec::HasRelayIds;
use super::{DirStatus, FallbackDir, FallbackDirBuilder};
use crate::fallback::default_fallbacks;
use crate::{ids::FallbackId, PickGuardError};
use tor_basic_utils::iter::{FilterCount, IteratorExt as _};
use tor_config::define_list_builder_helper;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FallbackList {
fallbacks: Vec<FallbackDir>,
}
impl<T: IntoIterator<Item = FallbackDir>> From<T> for FallbackList {
fn from(fallbacks: T) -> Self {
FallbackList {
fallbacks: fallbacks.into_iter().collect(),
}
}
}
define_list_builder_helper! {
pub struct FallbackListBuilder {
pub(crate) fallbacks: [FallbackDirBuilder],
}
built: FallbackList = FallbackList { fallbacks };
default = default_fallbacks();
}
impl FallbackList {
pub fn len(&self) -> usize {
self.fallbacks.len()
}
pub fn is_empty(&self) -> bool {
self.fallbacks.is_empty()
}
pub fn choose<R: rand::Rng>(&self, rng: &mut R) -> Result<&FallbackDir, PickGuardError> {
self.fallbacks
.iter()
.choose(rng)
.ok_or(PickGuardError::NoCandidatesAvailable)
}
}
#[derive(Debug, Clone)]
pub(crate) struct FallbackState {
fallbacks: Vec<Entry>,
}
#[derive(Debug, Clone)]
pub(super) struct Entry {
fallback: FallbackDir,
status: DirStatus,
clock_skew: Option<SkewObservation>,
}
const FALLBACK_RETRY_FLOOR: Duration = Duration::from_secs(150);
impl From<FallbackDir> for Entry {
fn from(fallback: FallbackDir) -> Self {
let status = DirStatus::new(FALLBACK_RETRY_FLOOR);
Entry {
fallback,
status,
clock_skew: None,
}
}
}
impl HasRelayIds for Entry {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
self.fallback.identity(key_type)
}
}
impl From<&FallbackList> for FallbackState {
fn from(list: &FallbackList) -> Self {
let mut fallbacks: Vec<Entry> = list.fallbacks.iter().map(|fb| fb.clone().into()).collect();
fallbacks.sort_by(|x, y| x.cmp_by_relay_ids(y));
fallbacks.dedup_by(|x, y| x.same_relay_ids(y));
FallbackState { fallbacks }
}
}
impl FallbackState {
pub(crate) fn choose<R: rand::Rng>(
&self,
rng: &mut R,
now: Instant,
filter: &crate::GuardFilter,
) -> Result<&FallbackDir, PickGuardError> {
if self.fallbacks.is_empty() {
return Err(PickGuardError::NoCandidatesAvailable);
}
let mut running = FilterCount::default();
let mut filtered = FilterCount::default();
self.fallbacks
.iter()
.filter_cnt(&mut running, |ent| ent.status.usable_at(now))
.filter_cnt(&mut filtered, |ent| filter.permits(&ent.fallback))
.choose(rng)
.map(|ent| &ent.fallback)
.ok_or_else(|| PickGuardError::AllFallbacksDown {
retry_at: self.next_retry(),
running,
filtered,
})
}
fn next_retry(&self) -> Option<Instant> {
self.fallbacks
.iter()
.filter_map(|ent| ent.status.next_retriable())
.min()
}
fn get(&self, id: &FallbackId) -> Option<&Entry> {
match self.fallbacks.binary_search_by(|e| e.cmp_by_relay_ids(id)) {
Ok(idx) => Some(&self.fallbacks[idx]),
Err(_) => None,
}
}
fn get_mut(&mut self, id: &FallbackId) -> Option<&mut Entry> {
match self.fallbacks.binary_search_by(|e| e.cmp_by_relay_ids(id)) {
Ok(idx) => Some(&mut self.fallbacks[idx]),
Err(_) => None,
}
}
pub(crate) fn contains(&self, id: &FallbackId) -> bool {
self.get(id).is_some()
}
pub(crate) fn note_success(&mut self, id: &FallbackId) {
if let Some(entry) = self.get_mut(id) {
entry.status.note_success();
}
}
pub(crate) fn note_failure(&mut self, id: &FallbackId, now: Instant) {
if let Some(entry) = self.get_mut(id) {
entry.status.note_failure(now);
}
}
pub(crate) fn take_status_from(&mut self, other: FallbackState) {
use itertools::EitherOrBoth::Both;
itertools::merge_join_by(self.fallbacks.iter_mut(), other.fallbacks, |a, b| {
a.fallback.cmp_by_relay_ids(&b.fallback)
})
.for_each(|entry| {
if let Both(entry, other) = entry {
debug_assert!(entry.fallback.same_relay_ids(&other.fallback));
entry.status = other.status;
}
});
}
pub(crate) fn note_skew(&mut self, id: &FallbackId, observation: SkewObservation) {
if let Some(entry) = self.get_mut(id) {
entry.clock_skew = Some(observation);
}
}
pub(crate) fn skew_observations(&self) -> impl Iterator<Item = &SkewObservation> {
self.fallbacks
.iter()
.filter_map(|fb| fb.clock_skew.as_ref())
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
use super::*;
use rand::Rng;
use tor_basic_utils::test_rng::testing_rng;
fn rand_fb<R: Rng>(rng: &mut R) -> FallbackDir {
let ed: [u8; 32] = rng.gen();
let rsa: [u8; 20] = rng.gen();
let ip: u32 = rng.gen();
let mut bld = FallbackDir::builder();
bld.ed_identity(ed.into())
.rsa_identity(rsa.into())
.orports()
.push(std::net::SocketAddrV4::new(ip.into(), 9090).into());
bld.build().unwrap()
}
#[test]
fn construct_fallback_set() {
use rand::seq::SliceRandom;
use std::cmp::Ordering as O;
let mut rng = testing_rng();
let fbs = vec![
rand_fb(&mut rng),
rand_fb(&mut rng),
rand_fb(&mut rng),
rand_fb(&mut rng),
];
let fb_other = rand_fb(&mut rng);
let id_other = FallbackId::from_relay_ids(&fb_other);
let list: FallbackList = fbs.clone().into();
assert!(!list.is_empty());
assert_eq!(list.len(), 4);
let mut set: FallbackState = (&list).into();
assert_eq!(set.fallbacks.len(), 4);
assert_eq!(
set.fallbacks[0].cmp_by_relay_ids(&set.fallbacks[1]),
O::Less
);
assert_eq!(
set.fallbacks[1].cmp_by_relay_ids(&set.fallbacks[2]),
O::Less
);
assert_eq!(
set.fallbacks[2].cmp_by_relay_ids(&set.fallbacks[3]),
O::Less
);
for fb in fbs.iter() {
let id = FallbackId::from_relay_ids(fb);
assert_eq!(set.get_mut(&id).unwrap().cmp_by_relay_ids(&id), O::Equal);
}
assert!(set.get_mut(&id_other).is_none());
let mut redundant_fbs = fbs.clone();
redundant_fbs.extend(fbs.clone());
redundant_fbs.extend(fbs[0..2].iter().map(Clone::clone));
redundant_fbs[..].shuffle(&mut testing_rng());
let list2 = redundant_fbs.into();
assert_ne!(&list, &list2);
let set2: FallbackState = (&list2).into();
assert_eq!(set.fallbacks.len(), set2.fallbacks.len());
assert!(set
.fallbacks
.iter()
.zip(set2.fallbacks.iter())
.all(|(ent1, ent2)| ent1.same_relay_ids(ent2)));
}
#[test]
fn set_choose() {
dbg!("X");
let mut rng = testing_rng();
let fbs = vec![
rand_fb(&mut rng),
rand_fb(&mut rng),
rand_fb(&mut rng),
rand_fb(&mut rng),
];
let list: FallbackList = fbs.into();
let mut set: FallbackState = (&list).into();
let filter = crate::GuardFilter::unfiltered();
let mut counts = [0_usize; 4];
let now = Instant::now();
dbg!("A");
fn lookup_idx(set: &FallbackState, id: &impl HasRelayIds) -> Option<usize> {
set.fallbacks
.binary_search_by(|ent| ent.fallback.cmp_by_relay_ids(id))
.ok()
}
for _ in 0..100 {
let fb = set.choose(&mut rng, now, &filter).unwrap();
let idx = lookup_idx(&set, fb).unwrap();
counts[idx] += 1;
}
dbg!("B");
assert!(counts.iter().all(|v| *v > 0));
let ids: Vec<_> = set
.fallbacks
.iter()
.map(|ent| FallbackId::from_relay_ids(&ent.fallback))
.collect();
set.note_failure(&ids[2], now);
counts = [0; 4];
for _ in 0..100 {
let fb = set.choose(&mut rng, now, &filter).unwrap();
let idx = lookup_idx(&set, fb).unwrap();
counts[idx] += 1;
}
assert_eq!(counts.iter().filter(|v| **v > 0).count(), 3);
assert_eq!(counts[2], 0);
for id in ids.iter() {
set.note_failure(id, now);
}
assert!(matches!(
set.choose(&mut rng, now, &filter),
Err(PickGuardError::AllFallbacksDown { .. })
));
let empty_set = FallbackState::from(&FallbackList::from(vec![]));
assert!(matches!(
empty_set.choose(&mut rng, now, &filter),
Err(PickGuardError::NoCandidatesAvailable)
));
}
#[test]
fn test_status() {
let mut rng = testing_rng();
let fbs = vec![
rand_fb(&mut rng),
rand_fb(&mut rng),
rand_fb(&mut rng),
rand_fb(&mut rng),
];
let list: FallbackList = fbs.clone().into();
let mut set: FallbackState = (&list).into();
let ids: Vec<_> = set
.fallbacks
.iter()
.map(|ent| FallbackId::from_relay_ids(&ent.fallback))
.collect();
let now = Instant::now();
assert!(set.next_retry().is_none());
set.note_failure(&ids[3], now);
assert!(set.fallbacks[3].status.next_retriable().unwrap() > now);
assert!(!set.fallbacks[3].status.usable_at(now));
assert_eq!(set.next_retry(), set.fallbacks[3].status.next_retriable());
set.note_failure(&ids[0], now);
assert!(set.fallbacks[0].status.next_retriable().unwrap() > now);
assert!(!set.fallbacks[0].status.usable_at(now));
assert_eq!(
set.next_retry().unwrap(),
std::cmp::min(
set.fallbacks[0].status.next_retriable().unwrap(),
set.fallbacks[3].status.next_retriable().unwrap()
)
);
set.note_success(&ids[0]);
assert!(set.fallbacks[0].status.next_retriable().is_none());
assert!(set.fallbacks[0].status.usable_at(now));
let mut fbs2: Vec<_> = fbs
.into_iter()
.filter(|fb| FallbackId::from_relay_ids(fb) != ids[2])
.collect();
let fbs_new = vec![rand_fb(&mut rng), rand_fb(&mut rng), rand_fb(&mut rng)];
fbs2.extend(fbs_new.clone());
let mut set2 = FallbackState::from(&FallbackList::from(fbs2.clone()));
set2.take_status_from(set); assert_eq!(set2.fallbacks.len(), 6); assert!(set2.get_mut(&ids[0]).unwrap().status.usable_at(now));
assert!(set2.get_mut(&ids[1]).unwrap().status.usable_at(now));
assert!(set2.get_mut(&ids[2]).is_none());
assert!(!set2.get_mut(&ids[3]).unwrap().status.usable_at(now));
for new_fb in fbs_new {
assert!(set2
.get_mut(&FallbackId::from_relay_ids(&new_fb))
.unwrap()
.status
.usable_at(now));
}
}
}