1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(clippy::single_component_path_imports)]
50
51pub mod authority;
52mod bootstrap;
53pub mod config;
54mod docid;
55mod docmeta;
56mod err;
57mod event;
58mod retry;
59mod shared_ref;
60mod state;
61mod storage;
62
63#[cfg(feature = "bridge-client")]
64pub mod bridgedesc;
65#[cfg(feature = "dirfilter")]
66pub mod filter;
67
68use crate::docid::{CacheUsage, ClientRequest, DocQuery};
69use crate::err::BootstrapAction;
70#[cfg(not(feature = "experimental-api"))]
71use crate::shared_ref::SharedMutArc;
72#[cfg(feature = "experimental-api")]
73pub use crate::shared_ref::SharedMutArc;
74use crate::storage::{DynStore, Store};
75use bootstrap::AttemptId;
76use event::DirProgress;
77use postage::watch;
78pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
79use scopeguard::ScopeGuard;
80use tor_circmgr::CircMgr;
81use tor_dirclient::SourceInfo;
82use tor_error::{info_report, into_internal, warn_report};
83use tor_netdir::params::NetParameters;
84use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
85
86use async_trait::async_trait;
87use futures::{stream::BoxStream, task::SpawnExt};
88use oneshot_fused_workaround as oneshot;
89use tor_netdoc::doc::netstatus::ProtoStatuses;
90use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
91use tor_rtcompat::Runtime;
92use tracing::{debug, info, trace, warn};
93
94use std::marker::PhantomData;
95use std::sync::atomic::{AtomicBool, Ordering};
96use std::sync::{Arc, Mutex};
97use std::time::Duration;
98use std::{collections::HashMap, sync::Weak};
99use std::{fmt::Debug, time::SystemTime};
100
101use crate::state::{DirState, NetDirChange};
102pub use authority::{Authority, AuthorityBuilder};
103pub use config::{
104 DirMgrConfig, DirTolerance, DirToleranceBuilder, DownloadScheduleConfig,
105 DownloadScheduleConfigBuilder, NetworkConfig, NetworkConfigBuilder,
106};
107pub use docid::DocId;
108pub use err::Error;
109pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
110pub use storage::DocumentText;
111pub use tor_guardmgr::fallback::{FallbackDir, FallbackDirBuilder};
112pub use tor_netdir::Timeliness;
113
114use strum;
116
117pub type Result<T> = std::result::Result<T, Error>;
119
120#[derive(Clone)]
127pub struct DirMgrStore<R: Runtime> {
128 pub(crate) store: Arc<Mutex<crate::DynStore>>,
130
131 pub(crate) runtime: PhantomData<R>,
133}
134
135impl<R: Runtime> DirMgrStore<R> {
136 pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
138 let store = Arc::new(Mutex::new(config.open_store(offline)?));
139 drop(runtime);
140 let runtime = PhantomData;
141 Ok(DirMgrStore { store, runtime })
142 }
143}
144
145#[async_trait]
147pub trait DirProvider: NetDirProvider {
148 fn reconfigure(
152 &self,
153 new_config: &DirMgrConfig,
154 how: tor_config::Reconfigure,
155 ) -> std::result::Result<(), tor_config::ReconfigureError>;
156
157 async fn bootstrap(&self) -> Result<()>;
159
160 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
166
167 fn download_task_handle(&self) -> Option<TaskHandle> {
169 None
170 }
171}
172
173impl<R: Runtime> NetDirProvider for DirMgr<R> {
176 fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
177 use tor_netdir::Error as NetDirError;
178 let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
179 let lifetime = match timeliness {
180 Timeliness::Strict => netdir.lifetime().clone(),
181 Timeliness::Timely => self
182 .config
183 .get()
184 .tolerance
185 .extend_lifetime(netdir.lifetime()),
186 Timeliness::Unchecked => return Ok(netdir),
187 };
188 let now = SystemTime::now();
189 if lifetime.valid_after() > now {
190 Err(NetDirError::DirNotYetValid)
191 } else if lifetime.valid_until() < now {
192 Err(NetDirError::DirExpired)
193 } else {
194 Ok(netdir)
195 }
196 }
197
198 fn events(&self) -> BoxStream<'static, DirEvent> {
199 Box::pin(self.events.subscribe())
200 }
201
202 fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
203 if let Some(netdir) = self.netdir.get() {
204 netdir
210 } else {
211 self.default_parameters
215 .lock()
216 .expect("Poisoned lock")
217 .clone()
218 }
219 }
225
226 fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
227 self.protocols.lock().expect("Poisoned lock").clone()
228 }
229}
230
231#[async_trait]
232impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
233 fn reconfigure(
234 &self,
235 new_config: &DirMgrConfig,
236 how: tor_config::Reconfigure,
237 ) -> std::result::Result<(), tor_config::ReconfigureError> {
238 DirMgr::reconfigure(self, new_config, how)
239 }
240
241 async fn bootstrap(&self) -> Result<()> {
242 DirMgr::bootstrap(self).await
243 }
244
245 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
246 Box::pin(DirMgr::bootstrap_events(self))
247 }
248
249 fn download_task_handle(&self) -> Option<TaskHandle> {
250 Some(self.task_handle.clone())
251 }
252}
253
254pub struct DirMgr<R: Runtime> {
268 config: tor_config::MutCfg<DirMgrConfig>,
271 store: Arc<Mutex<DynStore>>,
276 netdir: Arc<SharedMutArc<NetDir>>,
283
284 protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
286
287 default_parameters: Mutex<Arc<NetParameters>>,
289
290 events: event::FlagPublisher<DirEvent>,
292
293 send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
296
297 receive_status: DirBootstrapEvents,
303
304 circmgr: Option<Arc<CircMgr<R>>>,
306
307 runtime: R,
309
310 offline: bool,
312
313 bootstrap_started: AtomicBool,
320
321 #[cfg(feature = "dirfilter")]
323 filter: crate::filter::FilterConfig,
324
325 task_schedule: Mutex<Option<TaskSchedule<R>>>,
328
329 task_handle: TaskHandle,
331}
332
333#[derive(Debug, Clone)]
338#[non_exhaustive]
339pub enum DocSource {
340 LocalCache,
342 DirServer {
344 source: Option<SourceInfo>,
346 },
347}
348
349impl std::fmt::Display for DocSource {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 match self {
352 DocSource::LocalCache => write!(f, "local cache"),
353 DocSource::DirServer { source: None } => write!(f, "directory server"),
354 DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
355 }
356 }
357}
358
359impl<R: Runtime> DirMgr<R> {
360 pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
370 let store = DirMgrStore::new(&config, runtime.clone(), true)?;
371 let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
372
373 let attempt = AttemptId::next();
375 trace!(%attempt, "Trying to load a full directory from cache");
376 let outcome = dirmgr.load_directory(attempt).await;
377 trace!(%attempt, "Load result: {outcome:?}");
378 let _success = outcome?;
379
380 dirmgr
381 .netdir(Timeliness::Timely)
382 .map_err(|_| Error::DirectoryNotPresent)
383 }
384
385 pub async fn load_or_bootstrap_once(
395 config: DirMgrConfig,
396 runtime: R,
397 store: DirMgrStore<R>,
398 circmgr: Arc<CircMgr<R>>,
399 ) -> Result<Arc<NetDir>> {
400 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
401 dirmgr
402 .timely_netdir()
403 .map_err(|_| Error::DirectoryNotPresent)
404 }
405
406 pub fn create_unbootstrapped(
410 config: DirMgrConfig,
411 runtime: R,
412 store: DirMgrStore<R>,
413 circmgr: Arc<CircMgr<R>>,
414 ) -> Result<Arc<Self>> {
415 Ok(Arc::new(DirMgr::from_config(
416 config,
417 runtime,
418 store,
419 Some(circmgr),
420 false,
421 )?))
422 }
423
424 #[allow(clippy::cognitive_complexity)] pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
446 if self.offline {
447 return Err(Error::OfflineMode);
448 }
449
450 if self
457 .bootstrap_started
458 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
459 .is_err()
460 {
461 debug!("Attempted to bootstrap twice; ignoring.");
462 return Ok(());
463 }
464
465 let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
468 v.store(false, Ordering::SeqCst);
469 });
470
471 let schedule = {
472 let sched = self.task_schedule.lock().expect("poisoned lock").take();
473 match sched {
474 Some(sched) => sched,
475 None => {
476 debug!("Attempted to bootstrap twice; ignoring.");
477 return Ok(());
478 }
479 }
480 };
481
482 let attempt_id = AttemptId::next();
484 trace!(attempt=%attempt_id, "Starting to bootstrap directory");
485 let have_directory = self.load_directory(attempt_id).await?;
486
487 let (mut sender, receiver) = if have_directory {
488 info!("Loaded a good directory from cache.");
489 (None, None)
490 } else {
491 info!("Didn't get usable directory from cache.");
492 let (sender, receiver) = oneshot::channel();
493 (Some(sender), Some(receiver))
494 };
495
496 let dirmgr_weak = Arc::downgrade(self);
498 self.runtime
499 .spawn(async move {
500 let mut schedule = scopeguard::guard(schedule, |schedule| {
507 if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
508 *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
509 }
510 });
511
512 if let Err(e) =
515 Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
516 .await
517 {
518 match e {
519 Error::ManagerDropped => {}
520 _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
521 }
522 } else if let Err(e) =
523 Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
524 .await
525 {
526 match e {
527 Error::ManagerDropped => {}
528 _ => warn_report!(e, "Unrecovered error while downloading"),
529 }
530 }
531 })
532 .map_err(|e| Error::from_spawn("directory updater task", e))?;
533
534 if let Some(receiver) = receiver {
535 match receiver.await {
536 Ok(()) => {
537 info!("We have enough information to build circuits.");
538 let _ = ScopeGuard::into_inner(reset_bootstrap_started);
540 }
541 Err(_) => {
542 warn!("Bootstrapping task exited before finishing.");
543 return Err(Error::CantAdvanceState);
544 }
545 }
546 }
547 Ok(())
548 }
549
550 pub fn bootstrap_started(&self) -> bool {
552 self.bootstrap_started.load(Ordering::SeqCst)
553 }
554
555 pub async fn bootstrap_from_config(
558 config: DirMgrConfig,
559 runtime: R,
560 store: DirMgrStore<R>,
561 circmgr: Arc<CircMgr<R>>,
562 ) -> Result<Arc<Self>> {
563 let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
564
565 dirmgr.bootstrap().await?;
566
567 Ok(dirmgr)
568 }
569
570 #[allow(clippy::cognitive_complexity)] async fn reload_until_owner(
579 weak: &Weak<Self>,
580 schedule: &mut TaskSchedule<R>,
581 attempt_id: AttemptId,
582 on_complete: &mut Option<oneshot::Sender<()>>,
583 ) -> Result<()> {
584 let mut logged = false;
585 let mut bootstrapped;
586 {
587 let dirmgr = upgrade_weak_ref(weak)?;
588 bootstrapped = dirmgr.netdir.get().is_some();
589 }
590
591 loop {
592 {
593 let dirmgr = upgrade_weak_ref(weak)?;
594 trace!("Trying to take ownership of the directory cache lock");
595 if dirmgr.try_upgrade_to_readwrite()? {
596 if logged {
600 info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
601 }
602 return Ok(());
603 }
604 }
605
606 if !logged {
607 logged = true;
608 if bootstrapped {
609 info!("Another process is managing the directory. We'll use its cache.");
610 } else {
611 info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
612 }
613 }
614
615 let pause = if bootstrapped {
618 std::time::Duration::new(120, 0)
619 } else {
620 std::time::Duration::new(5, 0)
621 };
622 schedule.sleep(pause).await?;
623 {
627 let dirmgr = upgrade_weak_ref(weak)?;
628 trace!("Trying to load from the directory cache");
629 if dirmgr.load_directory(attempt_id).await? {
630 if let Some(send_done) = on_complete.take() {
632 let _ = send_done.send(());
633 }
634 if !bootstrapped {
635 info!("The directory is now bootstrapped.");
636 }
637 bootstrapped = true;
638 }
639 }
640 }
641 }
642
643 #[allow(clippy::cognitive_complexity)] async fn download_forever(
649 weak: Weak<Self>,
650 schedule: &mut TaskSchedule<R>,
651 mut attempt_id: AttemptId,
652 mut on_complete: Option<oneshot::Sender<()>>,
653 ) -> Result<()> {
654 let mut state: Box<dyn DirState> = {
655 let dirmgr = upgrade_weak_ref(&weak)?;
656 Box::new(state::GetConsensusState::new(
657 dirmgr.runtime.clone(),
658 dirmgr.config.get(),
659 CacheUsage::CacheOkay,
660 Some(dirmgr.netdir.clone()),
661 #[cfg(feature = "dirfilter")]
662 dirmgr
663 .filter
664 .clone()
665 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
666 ))
667 };
668
669 trace!("Entering download loop.");
670
671 loop {
672 let mut usable = false;
673
674 let retry_config = {
675 let dirmgr = upgrade_weak_ref(&weak)?;
676 dirmgr.config.get().schedule.retry_bootstrap
680 };
681 let mut retry_delay = retry_config.schedule();
682
683 'retry_attempt: for try_num in retry_config.attempts() {
684 trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
685 let outcome = bootstrap::download(
686 Weak::clone(&weak),
687 &mut state,
688 schedule,
689 attempt_id,
690 &mut on_complete,
691 )
692 .await;
693 trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
694
695 if let Err(err) = outcome {
696 if state.is_ready(Readiness::Usable) {
697 usable = true;
698 info_report!(err, "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)");
699 break 'retry_attempt;
700 }
701
702 match err.bootstrap_action() {
703 BootstrapAction::Nonfatal => {
704 return Err(into_internal!(
705 "Nonfatal error should not have propagated here"
706 )(err)
707 .into());
708 }
709 BootstrapAction::Reset => {}
710 BootstrapAction::Fatal => return Err(err),
711 }
712
713 let delay = retry_delay.next_delay(&mut rand::rng());
714 warn_report!(
715 err,
716 "Unable to download a usable directory. (We will restart in {})",
717 humantime::format_duration(delay),
718 );
719 {
720 let dirmgr = upgrade_weak_ref(&weak)?;
721 dirmgr.note_reset(attempt_id);
722 }
723 schedule.sleep(delay).await?;
724 state = state.reset();
725 } else {
726 info!(attempt=%attempt_id, "Directory is complete.");
727 usable = true;
728 break 'retry_attempt;
729 }
730 }
731
732 if !usable {
733 warn!(
735 "We failed {} times to bootstrap a directory. We're going to give up.",
736 retry_config.n_attempts()
737 );
738 return Err(Error::CantAdvanceState);
739 } else {
740 if let Some(send_done) = on_complete.take() {
742 let _ = send_done.send(());
743 }
744 }
745
746 let reset_at = state.reset_time();
747 match reset_at {
748 Some(t) => {
749 trace!("Sleeping until {}", time::OffsetDateTime::from(t));
750 schedule.sleep_until_wallclock(t).await?;
751 }
752 None => return Ok(()),
753 }
754 attempt_id = bootstrap::AttemptId::next();
755 trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
756 state = state.reset();
757 }
758 }
759
760 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
762 self.circmgr.clone().ok_or(Error::NoDownloadSupport)
763 }
764
765 pub fn reconfigure(
769 &self,
770 new_config: &DirMgrConfig,
771 how: tor_config::Reconfigure,
772 ) -> std::result::Result<(), tor_config::ReconfigureError> {
773 let config = self.config.get();
774 if new_config.cache_dir != config.cache_dir {
779 how.cannot_change("storage.cache_dir")?;
780 }
781 if new_config.cache_trust != config.cache_trust {
782 how.cannot_change("storage.permissions")?;
783 }
784 if new_config.authorities() != config.authorities() {
785 how.cannot_change("network.authorities")?;
786 }
787
788 if how == tor_config::Reconfigure::CheckAllOrNothing {
789 return Ok(());
790 }
791
792 let params_changed = new_config.override_net_params != config.override_net_params;
793
794 self.config
795 .map_and_replace(|cfg| cfg.update_from_config(new_config));
796
797 if params_changed {
798 let _ignore_err = self.netdir.mutate(|netdir| {
799 netdir.replace_overridden_parameters(&new_config.override_net_params);
800 Ok(())
801 });
802 {
803 let mut params = self.default_parameters.lock().expect("lock failed");
804 *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
805 }
806
807 self.events.publish(DirEvent::NewConsensus);
810 }
811
812 Ok(())
813 }
814
815 pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
821 self.receive_status.clone()
822 }
823
824 fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
827 let mut sender = self.send_status.lock().expect("poisoned lock");
829 let mut status = sender.borrow_mut();
830
831 status.update_progress(attempt_id, progress);
832 }
833
834 fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
837 if n_errors == 0 {
838 return;
839 }
840 let mut sender = self.send_status.lock().expect("poisoned lock");
841 let mut status = sender.borrow_mut();
842
843 status.note_errors(attempt_id, n_errors);
844 }
845
846 fn note_reset(&self, attempt_id: AttemptId) {
848 let mut sender = self.send_status.lock().expect("poisoned lock");
849 let mut status = sender.borrow_mut();
850
851 status.note_reset(attempt_id);
852 }
853
854 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
861 self.store
862 .lock()
863 .expect("Directory storage lock poisoned")
864 .upgrade_to_readwrite()
865 }
866
867 #[cfg(test)]
869 fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
870 let rw = !self
871 .store
872 .lock()
873 .expect("Directory storage lock poisoned")
874 .is_readonly();
875 if rw {
877 Some(&self.store)
878 } else {
879 None
880 }
881 }
882
883 #[allow(clippy::unnecessary_wraps)] fn from_config(
889 config: DirMgrConfig,
890 runtime: R,
891 store: DirMgrStore<R>,
892 circmgr: Option<Arc<CircMgr<R>>>,
893 offline: bool,
894 ) -> Result<Self> {
895 let netdir = Arc::new(SharedMutArc::new());
896 let events = event::FlagPublisher::new();
897 let default_parameters = NetParameters::from_map(&config.override_net_params);
898 let default_parameters = Mutex::new(Arc::new(default_parameters));
899
900 let (send_status, receive_status) = postage::watch::channel();
901 let send_status = Mutex::new(send_status);
902 let receive_status = DirBootstrapEvents {
903 inner: receive_status,
904 };
905 #[cfg(feature = "dirfilter")]
906 let filter = config.extensions.filter.clone();
907
908 let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
910 let task_schedule = Mutex::new(Some(task_schedule));
911
912 let protocols = {
915 let store = store.store.lock().expect("lock poisoned");
916 store
917 .cached_protocol_recommendations()?
918 .map(|(t, p)| (t, Arc::new(p)))
919 };
920
921 Ok(DirMgr {
922 config: config.into(),
923 store: store.store,
924 netdir,
925 protocols: Mutex::new(protocols),
926 default_parameters,
927 events,
928 send_status,
929 receive_status,
930 circmgr,
931 runtime,
932 offline,
933 bootstrap_started: AtomicBool::new(false),
934 #[cfg(feature = "dirfilter")]
935 filter,
936 task_schedule,
937 task_handle,
938 })
939 }
940
941 async fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
946 let state = state::GetConsensusState::new(
947 self.runtime.clone(),
948 self.config.get(),
949 CacheUsage::CacheOnly,
950 None,
951 #[cfg(feature = "dirfilter")]
952 self.filter
953 .clone()
954 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
955 );
956 let _ = bootstrap::load(Arc::clone(self), Box::new(state), attempt_id).await?;
957
958 Ok(self.netdir.get().is_some())
959 }
960
961 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
968 self.events.subscribe()
969 }
970
971 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
974 use itertools::Itertools;
975 let mut result = HashMap::new();
976 let query: DocQuery = (*doc).into();
977 let store = self.store.lock().expect("store lock poisoned");
978 query.load_from_store_into(&mut result, &**store)?;
979 let item = result.into_iter().at_most_one().map_err(|_| {
980 Error::CacheCorruption("Found more than one entry in storage for given docid")
981 })?;
982 if let Some((docid, doctext)) = item {
983 if &docid != doc {
984 return Err(Error::CacheCorruption(
985 "Item from storage had incorrect docid.",
986 ));
987 }
988 Ok(Some(doctext))
989 } else {
990 Ok(None)
991 }
992 }
993
994 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
999 where
1000 T: IntoIterator<Item = DocId>,
1001 {
1002 let partitioned = docid::partition_by_type(docs);
1003 let mut result = HashMap::new();
1004 let store = self.store.lock().expect("store lock poisoned");
1005 for (_, query) in partitioned.into_iter() {
1006 query.load_from_store_into(&mut result, &**store)?;
1007 }
1008 Ok(result)
1009 }
1010
1011 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1019 if let ClientRequest::Consensus(req) = req {
1020 if tor_consdiff::looks_like_diff(&text) {
1021 if let Some(old_d) = req.old_consensus_digests().next() {
1022 let db_val = {
1023 let s = self.store.lock().expect("Directory storage lock poisoned");
1024 s.consensus_by_sha3_digest_of_signed_part(old_d)?
1025 };
1026 if let Some((old_consensus, meta)) = db_val {
1027 info!("Applying a consensus diff");
1028 let new_consensus = tor_consdiff::apply_diff(
1029 old_consensus.as_str()?,
1030 &text,
1031 Some(*meta.sha3_256_of_signed()),
1032 )?;
1033 new_consensus.check_digest()?;
1034 return Ok(new_consensus.to_string());
1035 }
1036 }
1037 return Err(Error::Unwanted(
1038 "Received a consensus diff we did not ask for",
1039 ));
1040 }
1041 }
1042 Ok(text)
1043 }
1044
1045 #[allow(clippy::cognitive_complexity)]
1047 fn apply_netdir_changes(
1048 self: &Arc<Self>,
1049 state: &mut Box<dyn DirState>,
1050 store: &mut dyn Store,
1051 ) -> Result<()> {
1052 if let Some(change) = state.get_netdir_change() {
1053 match change {
1054 NetDirChange::AttemptReplace {
1055 netdir,
1056 consensus_meta,
1057 } => {
1058 if let Some(ref cm) = self.circmgr {
1061 if !cm
1062 .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1063 {
1064 debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1065 return Ok(());
1066 }
1067 }
1068 let is_stale = {
1069 self.netdir
1071 .get()
1072 .map(|x| {
1073 x.lifetime().valid_after()
1074 > netdir
1075 .as_ref()
1076 .expect("AttemptReplace had None")
1077 .lifetime()
1078 .valid_after()
1079 })
1080 .unwrap_or(false)
1081 };
1082 if is_stale {
1083 warn!("Got a new NetDir, but it's older than the one we currently have!");
1084 return Err(Error::NetDirOlder);
1085 }
1086 let cfg = self.config.get();
1087 let mut netdir = netdir.take().expect("AttemptReplace had None");
1088 netdir.replace_overridden_parameters(&cfg.override_net_params);
1089 self.netdir.replace(netdir);
1090 self.events.publish(DirEvent::NewConsensus);
1091 self.events.publish(DirEvent::NewDescriptors);
1092
1093 info!("Marked consensus usable.");
1094 if !store.is_readonly() {
1095 store.mark_consensus_usable(consensus_meta)?;
1096 store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1099 }
1100 Ok(())
1101 }
1102 NetDirChange::AddMicrodescs(mds) => {
1103 self.netdir.mutate(|netdir| {
1104 for md in mds.drain(..) {
1105 netdir.add_microdesc(md);
1106 }
1107 Ok(())
1108 })?;
1109 self.events.publish(DirEvent::NewDescriptors);
1110 Ok(())
1111 }
1112 NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1113 if !store.is_readonly() {
1114 store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1115 }
1116 let mut pr = self.protocols.lock().expect("Poisoned lock");
1117 *pr = Some((timestamp, protos));
1118 self.events.publish(DirEvent::NewProtocolRecommendation);
1119 Ok(())
1120 }
1121 }
1122 } else {
1123 Ok(())
1124 }
1125 }
1126}
1127
1128#[derive(Debug, Copy, Clone)]
1130enum Readiness {
1131 Complete,
1133 Usable,
1135}
1136
1137fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1140 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1141}
1142
1143pub(crate) fn default_consensus_cutoff(
1146 now: SystemTime,
1147 tolerance: &DirTolerance,
1148) -> Result<SystemTime> {
1149 const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1152 let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance);
1153 let cutoff = time::OffsetDateTime::from(now - allow_skew);
1154 let (h, _m, _s) = cutoff.to_hms();
1161 let cutoff = cutoff.replace_time(
1162 time::Time::from_hms(h, 0, 0)
1163 .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1164 );
1165 let cutoff = cutoff + Duration::from_secs(3600);
1166
1167 Ok(cutoff.into())
1168}
1169
1170pub fn supported_client_protocols() -> tor_protover::Protocols {
1173 use tor_protover::named::*;
1174 [
1177 DIRCACHE_CONSDIFF,
1179 ]
1180 .into_iter()
1181 .collect()
1182}
1183
1184#[cfg(test)]
1185mod test {
1186 #![allow(clippy::bool_assert_comparison)]
1188 #![allow(clippy::clone_on_copy)]
1189 #![allow(clippy::dbg_macro)]
1190 #![allow(clippy::mixed_attributes_style)]
1191 #![allow(clippy::print_stderr)]
1192 #![allow(clippy::print_stdout)]
1193 #![allow(clippy::single_char_pattern)]
1194 #![allow(clippy::unwrap_used)]
1195 #![allow(clippy::unchecked_duration_subtraction)]
1196 #![allow(clippy::useless_vec)]
1197 #![allow(clippy::needless_pass_by_value)]
1198 use super::*;
1200 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1201 use std::time::Duration;
1202 use tempfile::TempDir;
1203 use tor_basic_utils::test_rng::testing_rng;
1204 use tor_netdoc::doc::netstatus::ConsensusFlavor;
1205 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1206 use tor_rtcompat::SleepProvider;
1207
1208 #[test]
1209 fn protocols() {
1210 let pr = supported_client_protocols();
1211 let expected = "DirCache=2".parse().unwrap();
1212 assert_eq!(pr, expected);
1213 }
1214
1215 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1216 let dir = TempDir::new().unwrap();
1217 let config = DirMgrConfig {
1218 cache_dir: dir.path().into(),
1219 ..Default::default()
1220 };
1221 let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1222 let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1223
1224 (dir, dirmgr)
1225 }
1226
1227 #[test]
1228 fn failing_accessors() {
1229 tor_rtcompat::test_with_one_runtime!(|rt| async {
1230 let (_tempdir, mgr) = new_mgr(rt);
1231
1232 assert!(mgr.circmgr().is_err());
1233 assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1234 });
1235 }
1236
1237 #[test]
1238 fn load_and_store_internals() {
1239 tor_rtcompat::test_with_one_runtime!(|rt| async {
1240 let now = rt.wallclock();
1241 let tomorrow = now + Duration::from_secs(86400);
1242 let later = tomorrow + Duration::from_secs(86400);
1243
1244 let (_tempdir, mgr) = new_mgr(rt);
1245
1246 let d1 = [5_u8; 32];
1248 let d2 = [7; 32];
1249 let d3 = [42; 32];
1250 let d4 = [99; 20];
1251 let d5 = [12; 20];
1252 let certid1 = AuthCertKeyIds {
1253 id_fingerprint: d4.into(),
1254 sk_fingerprint: d5.into(),
1255 };
1256 let certid2 = AuthCertKeyIds {
1257 id_fingerprint: d5.into(),
1258 sk_fingerprint: d4.into(),
1259 };
1260
1261 {
1262 let mut store = mgr.store.lock().unwrap();
1263
1264 store
1265 .store_microdescs(
1266 &[
1267 ("Fake micro 1", &d1),
1268 ("Fake micro 2", &d2),
1269 ("Fake micro 3", &d3),
1270 ],
1271 now,
1272 )
1273 .unwrap();
1274
1275 #[cfg(feature = "routerdesc")]
1276 store
1277 .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1278 .unwrap();
1279
1280 store
1281 .store_authcerts(&[
1282 (
1283 AuthCertMeta::new(certid1, now, tomorrow),
1284 "Fake certificate one",
1285 ),
1286 (
1287 AuthCertMeta::new(certid2, now, tomorrow),
1288 "Fake certificate two",
1289 ),
1290 ])
1291 .unwrap();
1292
1293 let cmeta = ConsensusMeta::new(
1294 Lifetime::new(now, tomorrow, later).unwrap(),
1295 [102; 32],
1296 [103; 32],
1297 );
1298 store
1299 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1300 .unwrap();
1301 }
1302
1303 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1305 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1306
1307 let t2 = mgr
1308 .text(&DocId::LatestConsensus {
1309 flavor: ConsensusFlavor::Microdesc,
1310 cache_usage: CacheUsage::CacheOkay,
1311 })
1312 .unwrap()
1313 .unwrap();
1314 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1315
1316 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1317 assert!(t3.is_none());
1318
1319 let d_bogus = DocId::Microdesc([255; 32]);
1321 let res = mgr
1322 .texts(vec![
1323 DocId::Microdesc(d2),
1324 DocId::Microdesc(d3),
1325 d_bogus,
1326 DocId::AuthCert(certid2),
1327 #[cfg(feature = "routerdesc")]
1328 DocId::RouterDesc(d5),
1329 ])
1330 .unwrap();
1331 assert_eq!(
1332 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1333 Ok("Fake micro 2")
1334 );
1335 assert_eq!(
1336 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1337 Ok("Fake micro 3")
1338 );
1339 assert!(!res.contains_key(&d_bogus));
1340 assert_eq!(
1341 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1342 Ok("Fake certificate two")
1343 );
1344 #[cfg(feature = "routerdesc")]
1345 assert_eq!(
1346 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1347 Ok("Fake rd2")
1348 );
1349 });
1350 }
1351
1352 #[test]
1353 fn make_consensus_request() {
1354 tor_rtcompat::test_with_one_runtime!(|rt| async {
1355 let now = rt.wallclock();
1356 let tomorrow = now + Duration::from_secs(86400);
1357 let later = tomorrow + Duration::from_secs(86400);
1358
1359 let (_tempdir, mgr) = new_mgr(rt);
1360 let config = DirMgrConfig::default();
1361
1362 let req = {
1364 let store = mgr.store.lock().unwrap();
1365 bootstrap::make_consensus_request(
1366 now,
1367 ConsensusFlavor::Microdesc,
1368 &**store,
1369 &config,
1370 )
1371 .unwrap()
1372 };
1373 let tolerance = DirTolerance::default().post_valid_tolerance;
1374 match req {
1375 ClientRequest::Consensus(r) => {
1376 assert_eq!(r.old_consensus_digests().count(), 0);
1377 let date = r.last_consensus_date().unwrap();
1378 assert!(date >= now - tolerance);
1379 assert!(date <= now - tolerance + Duration::from_secs(3600));
1380 }
1381 _ => panic!("Wrong request type"),
1382 }
1383
1384 let d_prev = [42; 32];
1386 {
1387 let mut store = mgr.store.lock().unwrap();
1388
1389 let cmeta = ConsensusMeta::new(
1390 Lifetime::new(now, tomorrow, later).unwrap(),
1391 d_prev,
1392 [103; 32],
1393 );
1394 store
1395 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1396 .unwrap();
1397 }
1398
1399 let req = {
1401 let store = mgr.store.lock().unwrap();
1402 bootstrap::make_consensus_request(
1403 now,
1404 ConsensusFlavor::Microdesc,
1405 &**store,
1406 &config,
1407 )
1408 .unwrap()
1409 };
1410 match req {
1411 ClientRequest::Consensus(r) => {
1412 let ds: Vec<_> = r.old_consensus_digests().collect();
1413 assert_eq!(ds.len(), 1);
1414 assert_eq!(ds[0], &d_prev);
1415 assert_eq!(r.last_consensus_date(), Some(now));
1416 }
1417 _ => panic!("Wrong request type"),
1418 }
1419 });
1420 }
1421
1422 #[test]
1423 fn make_other_requests() {
1424 tor_rtcompat::test_with_one_runtime!(|rt| async {
1425 use rand::Rng;
1426 let (_tempdir, mgr) = new_mgr(rt);
1427
1428 let certid1 = AuthCertKeyIds {
1429 id_fingerprint: [99; 20].into(),
1430 sk_fingerprint: [100; 20].into(),
1431 };
1432 let mut rng = testing_rng();
1433 #[cfg(feature = "routerdesc")]
1434 let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1435 let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1436 let config = DirMgrConfig::default();
1437
1438 let query = DocId::AuthCert(certid1);
1440 let store = mgr.store.lock().unwrap();
1441 let reqs =
1442 bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1443 .unwrap();
1444 assert_eq!(reqs.len(), 1);
1445 let req = &reqs[0];
1446 if let ClientRequest::AuthCert(r) = req {
1447 assert_eq!(r.keys().next(), Some(&certid1));
1448 } else {
1449 panic!();
1450 }
1451
1452 let reqs =
1454 bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1455 .unwrap();
1456 assert_eq!(reqs.len(), 2);
1457 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1458
1459 #[cfg(feature = "routerdesc")]
1461 {
1462 let reqs = bootstrap::make_requests_for_documents(
1463 &mgr.runtime,
1464 &rd_ids,
1465 &**store,
1466 &config,
1467 )
1468 .unwrap();
1469 assert_eq!(reqs.len(), 2);
1470 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1471 }
1472 });
1473 }
1474
1475 #[test]
1476 fn expand_response() {
1477 tor_rtcompat::test_with_one_runtime!(|rt| async {
1478 let now = rt.wallclock();
1479 let day = Duration::from_secs(86400);
1480 let config = DirMgrConfig::default();
1481
1482 let (_tempdir, mgr) = new_mgr(rt);
1483
1484 let q = DocId::Microdesc([99; 32]);
1486 let r = {
1487 let store = mgr.store.lock().unwrap();
1488 bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1489 .unwrap()
1490 };
1491 let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1492 assert_eq!(&expanded.unwrap(), "ABC");
1493
1494 let latest_id = DocId::LatestConsensus {
1497 flavor: ConsensusFlavor::Microdesc,
1498 cache_usage: CacheUsage::CacheOkay,
1499 };
1500 let r = {
1501 let store = mgr.store.lock().unwrap();
1502 bootstrap::make_requests_for_documents(
1503 &mgr.runtime,
1504 &[latest_id],
1505 &**store,
1506 &config,
1507 )
1508 .unwrap()
1509 };
1510 let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1511 assert_eq!(&expanded.unwrap(), "DEF");
1512
1513 {
1516 let mut store = mgr.store.lock().unwrap();
1517 let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
1519 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1520 d_in,
1521 d_in,
1522 );
1523 store
1524 .store_consensus(
1525 &cmeta,
1526 ConsensusFlavor::Microdesc,
1527 false,
1528 "line 1\nline2\nline 3\n",
1529 )
1530 .unwrap();
1531 }
1532
1533 let r = {
1536 let store = mgr.store.lock().unwrap();
1537 bootstrap::make_requests_for_documents(
1538 &mgr.runtime,
1539 &[latest_id],
1540 &**store,
1541 &config,
1542 )
1543 .unwrap()
1544 };
1545 let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1546 assert_eq!(&expanded.unwrap(), "hello");
1547
1548 let diff = "network-status-diff-version 1
1550hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15512c
1552replacement line
1553.
1554".to_string();
1555 let expanded = mgr.expand_response_text(&r[0], diff);
1556
1557 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1558
1559 let diff = "network-status-diff-version 1
1561hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15622c
1563replacement line
1564.
1565".to_string();
1566 let expanded = mgr.expand_response_text(&r[0], diff);
1567 assert!(expanded.is_err());
1568 });
1569 }
1570}