1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(clippy::single_component_path_imports)]
50
51pub mod authority;
52mod bootstrap;
53pub mod config;
54mod docid;
55mod docmeta;
56mod err;
57mod event;
58mod retry;
59mod shared_ref;
60mod state;
61mod storage;
62
63#[cfg(feature = "bridge-client")]
64pub mod bridgedesc;
65#[cfg(feature = "dirfilter")]
66pub mod filter;
67
68use crate::docid::{CacheUsage, ClientRequest, DocQuery};
69use crate::err::BootstrapAction;
70#[cfg(not(feature = "experimental-api"))]
71use crate::shared_ref::SharedMutArc;
72#[cfg(feature = "experimental-api")]
73pub use crate::shared_ref::SharedMutArc;
74use crate::storage::{DynStore, Store};
75use bootstrap::AttemptId;
76use event::DirProgress;
77use postage::watch;
78pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
79use scopeguard::ScopeGuard;
80use tor_circmgr::CircMgr;
81use tor_dirclient::SourceInfo;
82use tor_error::{info_report, into_internal, warn_report};
83use tor_netdir::params::NetParameters;
84use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
85
86use async_trait::async_trait;
87use futures::{stream::BoxStream, task::SpawnExt};
88use oneshot_fused_workaround as oneshot;
89use tor_netdoc::doc::netstatus::ProtoStatuses;
90use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
91use tor_rtcompat::Runtime;
92use tracing::{debug, info, trace, warn};
93
94use std::marker::PhantomData;
95use std::sync::atomic::{AtomicBool, Ordering};
96use std::sync::{Arc, Mutex};
97use std::time::Duration;
98use std::{collections::HashMap, sync::Weak};
99use std::{fmt::Debug, time::SystemTime};
100
101use crate::state::{DirState, NetDirChange};
102pub use authority::{Authority, AuthorityBuilder};
103pub use config::{
104 DirMgrConfig, DirTolerance, DirToleranceBuilder, DownloadScheduleConfig,
105 DownloadScheduleConfigBuilder, NetworkConfig, NetworkConfigBuilder,
106};
107pub use docid::DocId;
108pub use err::Error;
109pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
110pub use storage::DocumentText;
111pub use tor_guardmgr::fallback::{FallbackDir, FallbackDirBuilder};
112pub use tor_netdir::Timeliness;
113
114use strum;
116
117pub type Result<T> = std::result::Result<T, Error>;
119
120#[derive(Clone)]
127pub struct DirMgrStore<R: Runtime> {
128 pub(crate) store: Arc<Mutex<crate::DynStore>>,
130
131 pub(crate) runtime: PhantomData<R>,
133}
134
135impl<R: Runtime> DirMgrStore<R> {
136 pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
138 let store = Arc::new(Mutex::new(config.open_store(offline)?));
139 drop(runtime);
140 let runtime = PhantomData;
141 Ok(DirMgrStore { store, runtime })
142 }
143}
144
145#[async_trait]
147pub trait DirProvider: NetDirProvider {
148 fn reconfigure(
152 &self,
153 new_config: &DirMgrConfig,
154 how: tor_config::Reconfigure,
155 ) -> std::result::Result<(), tor_config::ReconfigureError>;
156
157 async fn bootstrap(&self) -> Result<()>;
159
160 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
166
167 fn download_task_handle(&self) -> Option<TaskHandle> {
169 None
170 }
171}
172
173impl<R: Runtime> NetDirProvider for DirMgr<R> {
176 fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
177 use tor_netdir::Error as NetDirError;
178 let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
179 let lifetime = match timeliness {
180 Timeliness::Strict => netdir.lifetime().clone(),
181 Timeliness::Timely => self
182 .config
183 .get()
184 .tolerance
185 .extend_lifetime(netdir.lifetime()),
186 Timeliness::Unchecked => return Ok(netdir),
187 };
188 let now = SystemTime::now();
189 if lifetime.valid_after() > now {
190 Err(NetDirError::DirNotYetValid)
191 } else if lifetime.valid_until() < now {
192 Err(NetDirError::DirExpired)
193 } else {
194 Ok(netdir)
195 }
196 }
197
198 fn events(&self) -> BoxStream<'static, DirEvent> {
199 Box::pin(self.events.subscribe())
200 }
201
202 fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
203 if let Some(netdir) = self.netdir.get() {
204 netdir
210 } else {
211 self.default_parameters
215 .lock()
216 .expect("Poisoned lock")
217 .clone()
218 }
219 }
225
226 fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
227 self.protocols.lock().expect("Poisoned lock").clone()
228 }
229}
230
231#[async_trait]
232impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
233 fn reconfigure(
234 &self,
235 new_config: &DirMgrConfig,
236 how: tor_config::Reconfigure,
237 ) -> std::result::Result<(), tor_config::ReconfigureError> {
238 DirMgr::reconfigure(self, new_config, how)
239 }
240
241 async fn bootstrap(&self) -> Result<()> {
242 DirMgr::bootstrap(self).await
243 }
244
245 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
246 Box::pin(DirMgr::bootstrap_events(self))
247 }
248
249 fn download_task_handle(&self) -> Option<TaskHandle> {
250 Some(self.task_handle.clone())
251 }
252}
253
254pub struct DirMgr<R: Runtime> {
268 config: tor_config::MutCfg<DirMgrConfig>,
271 store: Arc<Mutex<DynStore>>,
276 netdir: Arc<SharedMutArc<NetDir>>,
283
284 protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
286
287 default_parameters: Mutex<Arc<NetParameters>>,
289
290 events: event::FlagPublisher<DirEvent>,
292
293 send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
296
297 receive_status: DirBootstrapEvents,
303
304 circmgr: Option<Arc<CircMgr<R>>>,
306
307 runtime: R,
309
310 offline: bool,
312
313 bootstrap_started: AtomicBool,
320
321 #[cfg(feature = "dirfilter")]
323 filter: crate::filter::FilterConfig,
324
325 task_schedule: Mutex<Option<TaskSchedule<R>>>,
328
329 task_handle: TaskHandle,
331}
332
333#[derive(Debug, Clone)]
338#[non_exhaustive]
339pub enum DocSource {
340 LocalCache,
342 DirServer {
344 source: Option<SourceInfo>,
346 },
347}
348
349impl std::fmt::Display for DocSource {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 match self {
352 DocSource::LocalCache => write!(f, "local cache"),
353 DocSource::DirServer { source: None } => write!(f, "directory server"),
354 DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
355 }
356 }
357}
358
359impl<R: Runtime> DirMgr<R> {
360 pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
370 let store = DirMgrStore::new(&config, runtime.clone(), true)?;
371 let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
372
373 let attempt = AttemptId::next();
375 trace!(%attempt, "Trying to load a full directory from cache");
376 let outcome = dirmgr.load_directory(attempt).await;
377 trace!(%attempt, "Load result: {outcome:?}");
378 let _success = outcome?;
379
380 dirmgr
381 .netdir(Timeliness::Timely)
382 .map_err(|_| Error::DirectoryNotPresent)
383 }
384
385 pub async fn load_or_bootstrap_once(
395 config: DirMgrConfig,
396 runtime: R,
397 store: DirMgrStore<R>,
398 circmgr: Arc<CircMgr<R>>,
399 ) -> Result<Arc<NetDir>> {
400 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
401 dirmgr
402 .timely_netdir()
403 .map_err(|_| Error::DirectoryNotPresent)
404 }
405
406 pub fn create_unbootstrapped(
410 config: DirMgrConfig,
411 runtime: R,
412 store: DirMgrStore<R>,
413 circmgr: Arc<CircMgr<R>>,
414 ) -> Result<Arc<Self>> {
415 Ok(Arc::new(DirMgr::from_config(
416 config,
417 runtime,
418 store,
419 Some(circmgr),
420 false,
421 )?))
422 }
423
424 pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
445 if self.offline {
446 return Err(Error::OfflineMode);
447 }
448
449 if self
456 .bootstrap_started
457 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
458 .is_err()
459 {
460 debug!("Attempted to bootstrap twice; ignoring.");
461 return Ok(());
462 }
463
464 let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
467 v.store(false, Ordering::SeqCst);
468 });
469
470 let schedule = {
471 let sched = self.task_schedule.lock().expect("poisoned lock").take();
472 match sched {
473 Some(sched) => sched,
474 None => {
475 debug!("Attempted to bootstrap twice; ignoring.");
476 return Ok(());
477 }
478 }
479 };
480
481 let attempt_id = AttemptId::next();
483 trace!(attempt=%attempt_id, "Starting to bootstrap directory");
484 let have_directory = self.load_directory(attempt_id).await?;
485
486 let (mut sender, receiver) = if have_directory {
487 info!("Loaded a good directory from cache.");
488 (None, None)
489 } else {
490 info!("Didn't get usable directory from cache.");
491 let (sender, receiver) = oneshot::channel();
492 (Some(sender), Some(receiver))
493 };
494
495 let dirmgr_weak = Arc::downgrade(self);
497 self.runtime
498 .spawn(async move {
499 let mut schedule = scopeguard::guard(schedule, |schedule| {
506 if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
507 *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
508 }
509 });
510
511 if let Err(e) =
514 Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
515 .await
516 {
517 match e {
518 Error::ManagerDropped => {}
519 _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
520 }
521 } else if let Err(e) =
522 Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
523 .await
524 {
525 match e {
526 Error::ManagerDropped => {}
527 _ => warn_report!(e, "Unrecovered error while downloading"),
528 }
529 }
530 })
531 .map_err(|e| Error::from_spawn("directory updater task", e))?;
532
533 if let Some(receiver) = receiver {
534 match receiver.await {
535 Ok(()) => {
536 info!("We have enough information to build circuits.");
537 let _ = ScopeGuard::into_inner(reset_bootstrap_started);
539 }
540 Err(_) => {
541 warn!("Bootstrapping task exited before finishing.");
542 return Err(Error::CantAdvanceState);
543 }
544 }
545 }
546 Ok(())
547 }
548
549 pub fn bootstrap_started(&self) -> bool {
551 self.bootstrap_started.load(Ordering::SeqCst)
552 }
553
554 pub async fn bootstrap_from_config(
557 config: DirMgrConfig,
558 runtime: R,
559 store: DirMgrStore<R>,
560 circmgr: Arc<CircMgr<R>>,
561 ) -> Result<Arc<Self>> {
562 let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
563
564 dirmgr.bootstrap().await?;
565
566 Ok(dirmgr)
567 }
568
569 async fn reload_until_owner(
577 weak: &Weak<Self>,
578 schedule: &mut TaskSchedule<R>,
579 attempt_id: AttemptId,
580 on_complete: &mut Option<oneshot::Sender<()>>,
581 ) -> Result<()> {
582 let mut logged = false;
583 let mut bootstrapped;
584 {
585 let dirmgr = upgrade_weak_ref(weak)?;
586 bootstrapped = dirmgr.netdir.get().is_some();
587 }
588
589 loop {
590 {
591 let dirmgr = upgrade_weak_ref(weak)?;
592 trace!("Trying to take ownership of the directory cache lock");
593 if dirmgr.try_upgrade_to_readwrite()? {
594 if logged {
598 info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
599 }
600 return Ok(());
601 }
602 }
603
604 if !logged {
605 logged = true;
606 if bootstrapped {
607 info!("Another process is managing the directory. We'll use its cache.");
608 } else {
609 info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
610 }
611 }
612
613 let pause = if bootstrapped {
616 std::time::Duration::new(120, 0)
617 } else {
618 std::time::Duration::new(5, 0)
619 };
620 schedule.sleep(pause).await?;
621 {
625 let dirmgr = upgrade_weak_ref(weak)?;
626 trace!("Trying to load from the directory cache");
627 if dirmgr.load_directory(attempt_id).await? {
628 if let Some(send_done) = on_complete.take() {
630 let _ = send_done.send(());
631 }
632 if !bootstrapped {
633 info!("The directory is now bootstrapped.");
634 }
635 bootstrapped = true;
636 }
637 }
638 }
639 }
640
641 async fn download_forever(
646 weak: Weak<Self>,
647 schedule: &mut TaskSchedule<R>,
648 mut attempt_id: AttemptId,
649 mut on_complete: Option<oneshot::Sender<()>>,
650 ) -> Result<()> {
651 let mut state: Box<dyn DirState> = {
652 let dirmgr = upgrade_weak_ref(&weak)?;
653 Box::new(state::GetConsensusState::new(
654 dirmgr.runtime.clone(),
655 dirmgr.config.get(),
656 CacheUsage::CacheOkay,
657 Some(dirmgr.netdir.clone()),
658 #[cfg(feature = "dirfilter")]
659 dirmgr
660 .filter
661 .clone()
662 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
663 ))
664 };
665
666 trace!("Entering download loop.");
667
668 loop {
669 let mut usable = false;
670
671 let retry_config = {
672 let dirmgr = upgrade_weak_ref(&weak)?;
673 dirmgr.config.get().schedule.retry_bootstrap
677 };
678 let mut retry_delay = retry_config.schedule();
679
680 'retry_attempt: for try_num in retry_config.attempts() {
681 trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
682 let outcome = bootstrap::download(
683 Weak::clone(&weak),
684 &mut state,
685 schedule,
686 attempt_id,
687 &mut on_complete,
688 )
689 .await;
690 trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
691
692 if let Err(err) = outcome {
693 if state.is_ready(Readiness::Usable) {
694 usable = true;
695 info_report!(err, "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)");
696 break 'retry_attempt;
697 }
698
699 match err.bootstrap_action() {
700 BootstrapAction::Nonfatal => {
701 return Err(into_internal!(
702 "Nonfatal error should not have propagated here"
703 )(err)
704 .into());
705 }
706 BootstrapAction::Reset => {}
707 BootstrapAction::Fatal => return Err(err),
708 }
709
710 let delay = retry_delay.next_delay(&mut rand::rng());
711 warn_report!(
712 err,
713 "Unable to download a usable directory. (We will restart in {})",
714 humantime::format_duration(delay),
715 );
716 {
717 let dirmgr = upgrade_weak_ref(&weak)?;
718 dirmgr.note_reset(attempt_id);
719 }
720 schedule.sleep(delay).await?;
721 state = state.reset();
722 } else {
723 info!(attempt=%attempt_id, "Directory is complete.");
724 usable = true;
725 break 'retry_attempt;
726 }
727 }
728
729 if !usable {
730 warn!(
732 "We failed {} times to bootstrap a directory. We're going to give up.",
733 retry_config.n_attempts()
734 );
735 return Err(Error::CantAdvanceState);
736 } else {
737 if let Some(send_done) = on_complete.take() {
739 let _ = send_done.send(());
740 }
741 }
742
743 let reset_at = state.reset_time();
744 match reset_at {
745 Some(t) => {
746 trace!("Sleeping until {}", time::OffsetDateTime::from(t));
747 schedule.sleep_until_wallclock(t).await?;
748 }
749 None => return Ok(()),
750 }
751 attempt_id = bootstrap::AttemptId::next();
752 trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
753 state = state.reset();
754 }
755 }
756
757 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
759 self.circmgr.clone().ok_or(Error::NoDownloadSupport)
760 }
761
762 pub fn reconfigure(
766 &self,
767 new_config: &DirMgrConfig,
768 how: tor_config::Reconfigure,
769 ) -> std::result::Result<(), tor_config::ReconfigureError> {
770 let config = self.config.get();
771 if new_config.cache_dir != config.cache_dir {
776 how.cannot_change("storage.cache_dir")?;
777 }
778 if new_config.cache_trust != config.cache_trust {
779 how.cannot_change("storage.permissions")?;
780 }
781 if new_config.authorities() != config.authorities() {
782 how.cannot_change("network.authorities")?;
783 }
784
785 if how == tor_config::Reconfigure::CheckAllOrNothing {
786 return Ok(());
787 }
788
789 let params_changed = new_config.override_net_params != config.override_net_params;
790
791 self.config
792 .map_and_replace(|cfg| cfg.update_from_config(new_config));
793
794 if params_changed {
795 let _ignore_err = self.netdir.mutate(|netdir| {
796 netdir.replace_overridden_parameters(&new_config.override_net_params);
797 Ok(())
798 });
799 {
800 let mut params = self.default_parameters.lock().expect("lock failed");
801 *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
802 }
803
804 self.events.publish(DirEvent::NewConsensus);
807 }
808
809 Ok(())
810 }
811
812 pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
818 self.receive_status.clone()
819 }
820
821 fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
824 let mut sender = self.send_status.lock().expect("poisoned lock");
826 let mut status = sender.borrow_mut();
827
828 status.update_progress(attempt_id, progress);
829 }
830
831 fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
834 if n_errors == 0 {
835 return;
836 }
837 let mut sender = self.send_status.lock().expect("poisoned lock");
838 let mut status = sender.borrow_mut();
839
840 status.note_errors(attempt_id, n_errors);
841 }
842
843 fn note_reset(&self, attempt_id: AttemptId) {
845 let mut sender = self.send_status.lock().expect("poisoned lock");
846 let mut status = sender.borrow_mut();
847
848 status.note_reset(attempt_id);
849 }
850
851 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
858 self.store
859 .lock()
860 .expect("Directory storage lock poisoned")
861 .upgrade_to_readwrite()
862 }
863
864 #[cfg(test)]
866 fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
867 let rw = !self
868 .store
869 .lock()
870 .expect("Directory storage lock poisoned")
871 .is_readonly();
872 if rw {
874 Some(&self.store)
875 } else {
876 None
877 }
878 }
879
880 #[allow(clippy::unnecessary_wraps)] fn from_config(
886 config: DirMgrConfig,
887 runtime: R,
888 store: DirMgrStore<R>,
889 circmgr: Option<Arc<CircMgr<R>>>,
890 offline: bool,
891 ) -> Result<Self> {
892 let netdir = Arc::new(SharedMutArc::new());
893 let events = event::FlagPublisher::new();
894 let default_parameters = NetParameters::from_map(&config.override_net_params);
895 let default_parameters = Mutex::new(Arc::new(default_parameters));
896
897 let (send_status, receive_status) = postage::watch::channel();
898 let send_status = Mutex::new(send_status);
899 let receive_status = DirBootstrapEvents {
900 inner: receive_status,
901 };
902 #[cfg(feature = "dirfilter")]
903 let filter = config.extensions.filter.clone();
904
905 let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
907 let task_schedule = Mutex::new(Some(task_schedule));
908
909 let protocols = {
912 let store = store.store.lock().expect("lock poisoned");
913 store
914 .cached_protocol_recommendations()?
915 .map(|(t, p)| (t, Arc::new(p)))
916 };
917
918 Ok(DirMgr {
919 config: config.into(),
920 store: store.store,
921 netdir,
922 protocols: Mutex::new(protocols),
923 default_parameters,
924 events,
925 send_status,
926 receive_status,
927 circmgr,
928 runtime,
929 offline,
930 bootstrap_started: AtomicBool::new(false),
931 #[cfg(feature = "dirfilter")]
932 filter,
933 task_schedule,
934 task_handle,
935 })
936 }
937
938 async fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
943 let state = state::GetConsensusState::new(
944 self.runtime.clone(),
945 self.config.get(),
946 CacheUsage::CacheOnly,
947 None,
948 #[cfg(feature = "dirfilter")]
949 self.filter
950 .clone()
951 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
952 );
953 let _ = bootstrap::load(Arc::clone(self), Box::new(state), attempt_id).await?;
954
955 Ok(self.netdir.get().is_some())
956 }
957
958 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
965 self.events.subscribe()
966 }
967
968 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
971 use itertools::Itertools;
972 let mut result = HashMap::new();
973 let query: DocQuery = (*doc).into();
974 let store = self.store.lock().expect("store lock poisoned");
975 query.load_from_store_into(&mut result, &**store)?;
976 let item = result.into_iter().at_most_one().map_err(|_| {
977 Error::CacheCorruption("Found more than one entry in storage for given docid")
978 })?;
979 if let Some((docid, doctext)) = item {
980 if &docid != doc {
981 return Err(Error::CacheCorruption(
982 "Item from storage had incorrect docid.",
983 ));
984 }
985 Ok(Some(doctext))
986 } else {
987 Ok(None)
988 }
989 }
990
991 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
996 where
997 T: IntoIterator<Item = DocId>,
998 {
999 let partitioned = docid::partition_by_type(docs);
1000 let mut result = HashMap::new();
1001 let store = self.store.lock().expect("store lock poisoned");
1002 for (_, query) in partitioned.into_iter() {
1003 query.load_from_store_into(&mut result, &**store)?;
1004 }
1005 Ok(result)
1006 }
1007
1008 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1016 if let ClientRequest::Consensus(req) = req {
1017 if tor_consdiff::looks_like_diff(&text) {
1018 if let Some(old_d) = req.old_consensus_digests().next() {
1019 let db_val = {
1020 let s = self.store.lock().expect("Directory storage lock poisoned");
1021 s.consensus_by_sha3_digest_of_signed_part(old_d)?
1022 };
1023 if let Some((old_consensus, meta)) = db_val {
1024 info!("Applying a consensus diff");
1025 let new_consensus = tor_consdiff::apply_diff(
1026 old_consensus.as_str()?,
1027 &text,
1028 Some(*meta.sha3_256_of_signed()),
1029 )?;
1030 new_consensus.check_digest()?;
1031 return Ok(new_consensus.to_string());
1032 }
1033 }
1034 return Err(Error::Unwanted(
1035 "Received a consensus diff we did not ask for",
1036 ));
1037 }
1038 }
1039 Ok(text)
1040 }
1041
1042 #[allow(clippy::cognitive_complexity)]
1044 fn apply_netdir_changes(
1045 self: &Arc<Self>,
1046 state: &mut Box<dyn DirState>,
1047 store: &mut dyn Store,
1048 ) -> Result<()> {
1049 if let Some(change) = state.get_netdir_change() {
1050 match change {
1051 NetDirChange::AttemptReplace {
1052 netdir,
1053 consensus_meta,
1054 } => {
1055 if let Some(ref cm) = self.circmgr {
1058 if !cm
1059 .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1060 {
1061 debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1062 return Ok(());
1063 }
1064 }
1065 let is_stale = {
1066 self.netdir
1068 .get()
1069 .map(|x| {
1070 x.lifetime().valid_after()
1071 > netdir
1072 .as_ref()
1073 .expect("AttemptReplace had None")
1074 .lifetime()
1075 .valid_after()
1076 })
1077 .unwrap_or(false)
1078 };
1079 if is_stale {
1080 warn!("Got a new NetDir, but it's older than the one we currently have!");
1081 return Err(Error::NetDirOlder);
1082 }
1083 let cfg = self.config.get();
1084 let mut netdir = netdir.take().expect("AttemptReplace had None");
1085 netdir.replace_overridden_parameters(&cfg.override_net_params);
1086 self.netdir.replace(netdir);
1087 self.events.publish(DirEvent::NewConsensus);
1088 self.events.publish(DirEvent::NewDescriptors);
1089
1090 info!("Marked consensus usable.");
1091 if !store.is_readonly() {
1092 store.mark_consensus_usable(consensus_meta)?;
1093 store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1096 }
1097 Ok(())
1098 }
1099 NetDirChange::AddMicrodescs(mds) => {
1100 self.netdir.mutate(|netdir| {
1101 for md in mds.drain(..) {
1102 netdir.add_microdesc(md);
1103 }
1104 Ok(())
1105 })?;
1106 self.events.publish(DirEvent::NewDescriptors);
1107 Ok(())
1108 }
1109 NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1110 if !store.is_readonly() {
1111 store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1112 }
1113 let mut pr = self.protocols.lock().expect("Poisoned lock");
1114 *pr = Some((timestamp, protos));
1115 self.events.publish(DirEvent::NewProtocolRecommendation);
1116 Ok(())
1117 }
1118 }
1119 } else {
1120 Ok(())
1121 }
1122 }
1123}
1124
1125#[derive(Debug, Copy, Clone)]
1127enum Readiness {
1128 Complete,
1130 Usable,
1132}
1133
1134fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1137 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1138}
1139
1140pub(crate) fn default_consensus_cutoff(
1143 now: SystemTime,
1144 tolerance: &DirTolerance,
1145) -> Result<SystemTime> {
1146 const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1149 let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance);
1150 let cutoff = time::OffsetDateTime::from(now - allow_skew);
1151 let (h, _m, _s) = cutoff.to_hms();
1158 let cutoff = cutoff.replace_time(
1159 time::Time::from_hms(h, 0, 0)
1160 .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1161 );
1162 let cutoff = cutoff + Duration::from_secs(3600);
1163
1164 Ok(cutoff.into())
1165}
1166
1167pub fn supported_client_protocols() -> tor_protover::Protocols {
1170 use tor_protover::named::*;
1171 [
1174 DIRCACHE_CONSDIFF,
1176 ]
1177 .into_iter()
1178 .collect()
1179}
1180
1181#[cfg(test)]
1182mod test {
1183 #![allow(clippy::bool_assert_comparison)]
1185 #![allow(clippy::clone_on_copy)]
1186 #![allow(clippy::dbg_macro)]
1187 #![allow(clippy::mixed_attributes_style)]
1188 #![allow(clippy::print_stderr)]
1189 #![allow(clippy::print_stdout)]
1190 #![allow(clippy::single_char_pattern)]
1191 #![allow(clippy::unwrap_used)]
1192 #![allow(clippy::unchecked_duration_subtraction)]
1193 #![allow(clippy::useless_vec)]
1194 #![allow(clippy::needless_pass_by_value)]
1195 use super::*;
1197 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1198 use std::time::Duration;
1199 use tempfile::TempDir;
1200 use tor_basic_utils::test_rng::testing_rng;
1201 use tor_netdoc::doc::netstatus::ConsensusFlavor;
1202 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1203 use tor_rtcompat::SleepProvider;
1204
1205 #[test]
1206 fn protocols() {
1207 let pr = supported_client_protocols();
1208 let expected = "DirCache=2".parse().unwrap();
1209 assert_eq!(pr, expected);
1210 }
1211
1212 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1213 let dir = TempDir::new().unwrap();
1214 let config = DirMgrConfig {
1215 cache_dir: dir.path().into(),
1216 ..Default::default()
1217 };
1218 let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1219 let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1220
1221 (dir, dirmgr)
1222 }
1223
1224 #[test]
1225 fn failing_accessors() {
1226 tor_rtcompat::test_with_one_runtime!(|rt| async {
1227 let (_tempdir, mgr) = new_mgr(rt);
1228
1229 assert!(mgr.circmgr().is_err());
1230 assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1231 });
1232 }
1233
1234 #[test]
1235 fn load_and_store_internals() {
1236 tor_rtcompat::test_with_one_runtime!(|rt| async {
1237 let now = rt.wallclock();
1238 let tomorrow = now + Duration::from_secs(86400);
1239 let later = tomorrow + Duration::from_secs(86400);
1240
1241 let (_tempdir, mgr) = new_mgr(rt);
1242
1243 let d1 = [5_u8; 32];
1245 let d2 = [7; 32];
1246 let d3 = [42; 32];
1247 let d4 = [99; 20];
1248 let d5 = [12; 20];
1249 let certid1 = AuthCertKeyIds {
1250 id_fingerprint: d4.into(),
1251 sk_fingerprint: d5.into(),
1252 };
1253 let certid2 = AuthCertKeyIds {
1254 id_fingerprint: d5.into(),
1255 sk_fingerprint: d4.into(),
1256 };
1257
1258 {
1259 let mut store = mgr.store.lock().unwrap();
1260
1261 store
1262 .store_microdescs(
1263 &[
1264 ("Fake micro 1", &d1),
1265 ("Fake micro 2", &d2),
1266 ("Fake micro 3", &d3),
1267 ],
1268 now,
1269 )
1270 .unwrap();
1271
1272 #[cfg(feature = "routerdesc")]
1273 store
1274 .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1275 .unwrap();
1276
1277 store
1278 .store_authcerts(&[
1279 (
1280 AuthCertMeta::new(certid1, now, tomorrow),
1281 "Fake certificate one",
1282 ),
1283 (
1284 AuthCertMeta::new(certid2, now, tomorrow),
1285 "Fake certificate two",
1286 ),
1287 ])
1288 .unwrap();
1289
1290 let cmeta = ConsensusMeta::new(
1291 Lifetime::new(now, tomorrow, later).unwrap(),
1292 [102; 32],
1293 [103; 32],
1294 );
1295 store
1296 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1297 .unwrap();
1298 }
1299
1300 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1302 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1303
1304 let t2 = mgr
1305 .text(&DocId::LatestConsensus {
1306 flavor: ConsensusFlavor::Microdesc,
1307 cache_usage: CacheUsage::CacheOkay,
1308 })
1309 .unwrap()
1310 .unwrap();
1311 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1312
1313 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1314 assert!(t3.is_none());
1315
1316 let d_bogus = DocId::Microdesc([255; 32]);
1318 let res = mgr
1319 .texts(vec![
1320 DocId::Microdesc(d2),
1321 DocId::Microdesc(d3),
1322 d_bogus,
1323 DocId::AuthCert(certid2),
1324 #[cfg(feature = "routerdesc")]
1325 DocId::RouterDesc(d5),
1326 ])
1327 .unwrap();
1328 assert_eq!(
1329 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1330 Ok("Fake micro 2")
1331 );
1332 assert_eq!(
1333 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1334 Ok("Fake micro 3")
1335 );
1336 assert!(!res.contains_key(&d_bogus));
1337 assert_eq!(
1338 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1339 Ok("Fake certificate two")
1340 );
1341 #[cfg(feature = "routerdesc")]
1342 assert_eq!(
1343 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1344 Ok("Fake rd2")
1345 );
1346 });
1347 }
1348
1349 #[test]
1350 fn make_consensus_request() {
1351 tor_rtcompat::test_with_one_runtime!(|rt| async {
1352 let now = rt.wallclock();
1353 let tomorrow = now + Duration::from_secs(86400);
1354 let later = tomorrow + Duration::from_secs(86400);
1355
1356 let (_tempdir, mgr) = new_mgr(rt);
1357 let config = DirMgrConfig::default();
1358
1359 let req = {
1361 let store = mgr.store.lock().unwrap();
1362 bootstrap::make_consensus_request(
1363 now,
1364 ConsensusFlavor::Microdesc,
1365 &**store,
1366 &config,
1367 )
1368 .unwrap()
1369 };
1370 let tolerance = DirTolerance::default().post_valid_tolerance;
1371 match req {
1372 ClientRequest::Consensus(r) => {
1373 assert_eq!(r.old_consensus_digests().count(), 0);
1374 let date = r.last_consensus_date().unwrap();
1375 assert!(date >= now - tolerance);
1376 assert!(date <= now - tolerance + Duration::from_secs(3600));
1377 }
1378 _ => panic!("Wrong request type"),
1379 }
1380
1381 let d_prev = [42; 32];
1383 {
1384 let mut store = mgr.store.lock().unwrap();
1385
1386 let cmeta = ConsensusMeta::new(
1387 Lifetime::new(now, tomorrow, later).unwrap(),
1388 d_prev,
1389 [103; 32],
1390 );
1391 store
1392 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1393 .unwrap();
1394 }
1395
1396 let req = {
1398 let store = mgr.store.lock().unwrap();
1399 bootstrap::make_consensus_request(
1400 now,
1401 ConsensusFlavor::Microdesc,
1402 &**store,
1403 &config,
1404 )
1405 .unwrap()
1406 };
1407 match req {
1408 ClientRequest::Consensus(r) => {
1409 let ds: Vec<_> = r.old_consensus_digests().collect();
1410 assert_eq!(ds.len(), 1);
1411 assert_eq!(ds[0], &d_prev);
1412 assert_eq!(r.last_consensus_date(), Some(now));
1413 }
1414 _ => panic!("Wrong request type"),
1415 }
1416 });
1417 }
1418
1419 #[test]
1420 fn make_other_requests() {
1421 tor_rtcompat::test_with_one_runtime!(|rt| async {
1422 use rand::Rng;
1423 let (_tempdir, mgr) = new_mgr(rt);
1424
1425 let certid1 = AuthCertKeyIds {
1426 id_fingerprint: [99; 20].into(),
1427 sk_fingerprint: [100; 20].into(),
1428 };
1429 let mut rng = testing_rng();
1430 #[cfg(feature = "routerdesc")]
1431 let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1432 let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1433 let config = DirMgrConfig::default();
1434
1435 let query = DocId::AuthCert(certid1);
1437 let store = mgr.store.lock().unwrap();
1438 let reqs =
1439 bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1440 .unwrap();
1441 assert_eq!(reqs.len(), 1);
1442 let req = &reqs[0];
1443 if let ClientRequest::AuthCert(r) = req {
1444 assert_eq!(r.keys().next(), Some(&certid1));
1445 } else {
1446 panic!();
1447 }
1448
1449 let reqs =
1451 bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1452 .unwrap();
1453 assert_eq!(reqs.len(), 2);
1454 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1455
1456 #[cfg(feature = "routerdesc")]
1458 {
1459 let reqs = bootstrap::make_requests_for_documents(
1460 &mgr.runtime,
1461 &rd_ids,
1462 &**store,
1463 &config,
1464 )
1465 .unwrap();
1466 assert_eq!(reqs.len(), 2);
1467 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1468 }
1469 });
1470 }
1471
1472 #[test]
1473 fn expand_response() {
1474 tor_rtcompat::test_with_one_runtime!(|rt| async {
1475 let now = rt.wallclock();
1476 let day = Duration::from_secs(86400);
1477 let config = DirMgrConfig::default();
1478
1479 let (_tempdir, mgr) = new_mgr(rt);
1480
1481 let q = DocId::Microdesc([99; 32]);
1483 let r = {
1484 let store = mgr.store.lock().unwrap();
1485 bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1486 .unwrap()
1487 };
1488 let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1489 assert_eq!(&expanded.unwrap(), "ABC");
1490
1491 let latest_id = DocId::LatestConsensus {
1494 flavor: ConsensusFlavor::Microdesc,
1495 cache_usage: CacheUsage::CacheOkay,
1496 };
1497 let r = {
1498 let store = mgr.store.lock().unwrap();
1499 bootstrap::make_requests_for_documents(
1500 &mgr.runtime,
1501 &[latest_id],
1502 &**store,
1503 &config,
1504 )
1505 .unwrap()
1506 };
1507 let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1508 assert_eq!(&expanded.unwrap(), "DEF");
1509
1510 {
1513 let mut store = mgr.store.lock().unwrap();
1514 let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
1516 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1517 d_in,
1518 d_in,
1519 );
1520 store
1521 .store_consensus(
1522 &cmeta,
1523 ConsensusFlavor::Microdesc,
1524 false,
1525 "line 1\nline2\nline 3\n",
1526 )
1527 .unwrap();
1528 }
1529
1530 let r = {
1533 let store = mgr.store.lock().unwrap();
1534 bootstrap::make_requests_for_documents(
1535 &mgr.runtime,
1536 &[latest_id],
1537 &**store,
1538 &config,
1539 )
1540 .unwrap()
1541 };
1542 let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1543 assert_eq!(&expanded.unwrap(), "hello");
1544
1545 let diff = "network-status-diff-version 1
1547hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15482c
1549replacement line
1550.
1551".to_string();
1552 let expanded = mgr.expand_response_text(&r[0], diff);
1553
1554 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1555
1556 let diff = "network-status-diff-version 1
1558hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15592c
1560replacement line
1561.
1562".to_string();
1563 let expanded = mgr.expand_response_text(&r[0], diff);
1564 assert!(expanded.is_err());
1565 });
1566 }
1567}