1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::single_component_path_imports)]
51
52pub mod authority;
53mod bootstrap;
54pub mod config;
55mod docid;
56mod docmeta;
57mod err;
58mod event;
59mod retry;
60mod shared_ref;
61mod state;
62mod storage;
63
64#[cfg(feature = "bridge-client")]
65pub mod bridgedesc;
66#[cfg(feature = "dirfilter")]
67pub mod filter;
68
69use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70use crate::err::BootstrapAction;
71#[cfg(not(feature = "experimental-api"))]
72use crate::shared_ref::SharedMutArc;
73#[cfg(feature = "experimental-api")]
74pub use crate::shared_ref::SharedMutArc;
75use crate::storage::{DynStore, Store};
76use bootstrap::AttemptId;
77use event::DirProgress;
78use postage::watch;
79pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
80use scopeguard::ScopeGuard;
81use tor_circmgr::CircMgr;
82use tor_dirclient::SourceInfo;
83use tor_error::{info_report, into_internal, warn_report};
84use tor_netdir::params::NetParameters;
85use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
86
87use async_trait::async_trait;
88use futures::{stream::BoxStream, task::SpawnExt};
89use oneshot_fused_workaround as oneshot;
90use tor_netdoc::doc::netstatus::ProtoStatuses;
91use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
92use tor_rtcompat::Runtime;
93use tracing::{debug, info, trace, warn};
94
95use std::marker::PhantomData;
96use std::sync::atomic::{AtomicBool, Ordering};
97use std::sync::{Arc, Mutex};
98use std::time::Duration;
99use std::{collections::HashMap, sync::Weak};
100use std::{fmt::Debug, time::SystemTime};
101
102use crate::state::{DirState, NetDirChange};
103pub use authority::{Authority, AuthorityBuilder};
104pub use config::{
105 DirMgrConfig, DirTolerance, DirToleranceBuilder, DownloadScheduleConfig,
106 DownloadScheduleConfigBuilder, NetworkConfig, NetworkConfigBuilder,
107};
108pub use docid::DocId;
109pub use err::Error;
110pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
111pub use storage::DocumentText;
112pub use tor_guardmgr::fallback::{FallbackDir, FallbackDirBuilder};
113pub use tor_netdir::Timeliness;
114
115use strum;
117
118pub type Result<T> = std::result::Result<T, Error>;
120
121#[derive(Clone)]
128pub struct DirMgrStore<R: Runtime> {
129 pub(crate) store: Arc<Mutex<crate::DynStore>>,
131
132 pub(crate) runtime: PhantomData<R>,
134}
135
136impl<R: Runtime> DirMgrStore<R> {
137 pub fn new(config: &DirMgrConfig, runtime: R, offline: bool) -> Result<Self> {
139 let store = Arc::new(Mutex::new(config.open_store(offline)?));
140 drop(runtime);
141 let runtime = PhantomData;
142 Ok(DirMgrStore { store, runtime })
143 }
144}
145
146#[async_trait]
148pub trait DirProvider: NetDirProvider {
149 fn reconfigure(
153 &self,
154 new_config: &DirMgrConfig,
155 how: tor_config::Reconfigure,
156 ) -> std::result::Result<(), tor_config::ReconfigureError>;
157
158 async fn bootstrap(&self) -> Result<()>;
160
161 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
167
168 fn download_task_handle(&self) -> Option<TaskHandle> {
170 None
171 }
172}
173
174impl<R: Runtime> NetDirProvider for DirMgr<R> {
177 fn netdir(&self, timeliness: Timeliness) -> tor_netdir::Result<Arc<NetDir>> {
178 use tor_netdir::Error as NetDirError;
179 let netdir = self.netdir.get().ok_or(NetDirError::NoInfo)?;
180 let lifetime = match timeliness {
181 Timeliness::Strict => netdir.lifetime().clone(),
182 Timeliness::Timely => self
183 .config
184 .get()
185 .tolerance
186 .extend_lifetime(netdir.lifetime()),
187 Timeliness::Unchecked => return Ok(netdir),
188 };
189 let now = SystemTime::now();
190 if lifetime.valid_after() > now {
191 Err(NetDirError::DirNotYetValid)
192 } else if lifetime.valid_until() < now {
193 Err(NetDirError::DirExpired)
194 } else {
195 Ok(netdir)
196 }
197 }
198
199 fn events(&self) -> BoxStream<'static, DirEvent> {
200 Box::pin(self.events.subscribe())
201 }
202
203 fn params(&self) -> Arc<dyn AsRef<tor_netdir::params::NetParameters>> {
204 if let Some(netdir) = self.netdir.get() {
205 netdir
211 } else {
212 self.default_parameters
216 .lock()
217 .expect("Poisoned lock")
218 .clone()
219 }
220 }
226
227 fn protocol_statuses(&self) -> Option<(SystemTime, Arc<ProtoStatuses>)> {
228 self.protocols.lock().expect("Poisoned lock").clone()
229 }
230}
231
232#[async_trait]
233impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
234 fn reconfigure(
235 &self,
236 new_config: &DirMgrConfig,
237 how: tor_config::Reconfigure,
238 ) -> std::result::Result<(), tor_config::ReconfigureError> {
239 DirMgr::reconfigure(self, new_config, how)
240 }
241
242 async fn bootstrap(&self) -> Result<()> {
243 DirMgr::bootstrap(self).await
244 }
245
246 fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
247 Box::pin(DirMgr::bootstrap_events(self))
248 }
249
250 fn download_task_handle(&self) -> Option<TaskHandle> {
251 Some(self.task_handle.clone())
252 }
253}
254
255pub struct DirMgr<R: Runtime> {
269 config: tor_config::MutCfg<DirMgrConfig>,
272 store: Arc<Mutex<DynStore>>,
277 netdir: Arc<SharedMutArc<NetDir>>,
284
285 protocols: Mutex<Option<(SystemTime, Arc<ProtoStatuses>)>>,
287
288 default_parameters: Mutex<Arc<NetParameters>>,
290
291 events: event::FlagPublisher<DirEvent>,
293
294 send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
297
298 receive_status: DirBootstrapEvents,
304
305 circmgr: Option<Arc<CircMgr<R>>>,
307
308 runtime: R,
310
311 offline: bool,
313
314 bootstrap_started: AtomicBool,
321
322 #[cfg(feature = "dirfilter")]
324 filter: crate::filter::FilterConfig,
325
326 task_schedule: Mutex<Option<TaskSchedule<R>>>,
329
330 task_handle: TaskHandle,
332}
333
334#[derive(Debug, Clone)]
339#[non_exhaustive]
340pub enum DocSource {
341 LocalCache,
343 DirServer {
345 source: Option<SourceInfo>,
347 },
348}
349
350impl std::fmt::Display for DocSource {
351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352 match self {
353 DocSource::LocalCache => write!(f, "local cache"),
354 DocSource::DirServer { source: None } => write!(f, "directory server"),
355 DocSource::DirServer { source: Some(info) } => write!(f, "directory server {}", info),
356 }
357 }
358}
359
360impl<R: Runtime> DirMgr<R> {
361 pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
371 let store = DirMgrStore::new(&config, runtime.clone(), true)?;
372 let dirmgr = Arc::new(Self::from_config(config, runtime, store, None, true)?);
373
374 let attempt = AttemptId::next();
376 trace!(%attempt, "Trying to load a full directory from cache");
377 let outcome = dirmgr.load_directory(attempt).await;
378 trace!(%attempt, "Load result: {outcome:?}");
379 let _success = outcome?;
380
381 dirmgr
382 .netdir(Timeliness::Timely)
383 .map_err(|_| Error::DirectoryNotPresent)
384 }
385
386 pub async fn load_or_bootstrap_once(
396 config: DirMgrConfig,
397 runtime: R,
398 store: DirMgrStore<R>,
399 circmgr: Arc<CircMgr<R>>,
400 ) -> Result<Arc<NetDir>> {
401 let dirmgr = DirMgr::bootstrap_from_config(config, runtime, store, circmgr).await?;
402 dirmgr
403 .timely_netdir()
404 .map_err(|_| Error::DirectoryNotPresent)
405 }
406
407 pub fn create_unbootstrapped(
411 config: DirMgrConfig,
412 runtime: R,
413 store: DirMgrStore<R>,
414 circmgr: Arc<CircMgr<R>>,
415 ) -> Result<Arc<Self>> {
416 Ok(Arc::new(DirMgr::from_config(
417 config,
418 runtime,
419 store,
420 Some(circmgr),
421 false,
422 )?))
423 }
424
425 #[allow(clippy::cognitive_complexity)] pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
447 if self.offline {
448 return Err(Error::OfflineMode);
449 }
450
451 if self
458 .bootstrap_started
459 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
460 .is_err()
461 {
462 debug!("Attempted to bootstrap twice; ignoring.");
463 return Ok(());
464 }
465
466 let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
469 v.store(false, Ordering::SeqCst);
470 });
471
472 let schedule = {
473 let sched = self.task_schedule.lock().expect("poisoned lock").take();
474 match sched {
475 Some(sched) => sched,
476 None => {
477 debug!("Attempted to bootstrap twice; ignoring.");
478 return Ok(());
479 }
480 }
481 };
482
483 let attempt_id = AttemptId::next();
485 trace!(attempt=%attempt_id, "Starting to bootstrap directory");
486 let have_directory = self.load_directory(attempt_id).await?;
487
488 let (mut sender, receiver) = if have_directory {
489 info!("Loaded a good directory from cache.");
490 (None, None)
491 } else {
492 info!("Didn't get usable directory from cache.");
493 let (sender, receiver) = oneshot::channel();
494 (Some(sender), Some(receiver))
495 };
496
497 let dirmgr_weak = Arc::downgrade(self);
499 self.runtime
500 .spawn(async move {
501 let mut schedule = scopeguard::guard(schedule, |schedule| {
508 if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
509 *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
510 }
511 });
512
513 if let Err(e) =
516 Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
517 .await
518 {
519 match e {
520 Error::ManagerDropped => {}
521 _ => warn_report!(e, "Unrecovered error while waiting for bootstrap",),
522 }
523 } else if let Err(e) =
524 Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
525 .await
526 {
527 match e {
528 Error::ManagerDropped => {}
529 _ => warn_report!(e, "Unrecovered error while downloading"),
530 }
531 }
532 })
533 .map_err(|e| Error::from_spawn("directory updater task", e))?;
534
535 if let Some(receiver) = receiver {
536 match receiver.await {
537 Ok(()) => {
538 info!("We have enough information to build circuits.");
539 let _ = ScopeGuard::into_inner(reset_bootstrap_started);
541 }
542 Err(_) => {
543 warn!("Bootstrapping task exited before finishing.");
544 return Err(Error::CantAdvanceState);
545 }
546 }
547 }
548 Ok(())
549 }
550
551 pub fn bootstrap_started(&self) -> bool {
553 self.bootstrap_started.load(Ordering::SeqCst)
554 }
555
556 pub async fn bootstrap_from_config(
559 config: DirMgrConfig,
560 runtime: R,
561 store: DirMgrStore<R>,
562 circmgr: Arc<CircMgr<R>>,
563 ) -> Result<Arc<Self>> {
564 let dirmgr = Self::create_unbootstrapped(config, runtime, store, circmgr)?;
565
566 dirmgr.bootstrap().await?;
567
568 Ok(dirmgr)
569 }
570
571 #[allow(clippy::cognitive_complexity)] async fn reload_until_owner(
580 weak: &Weak<Self>,
581 schedule: &mut TaskSchedule<R>,
582 attempt_id: AttemptId,
583 on_complete: &mut Option<oneshot::Sender<()>>,
584 ) -> Result<()> {
585 let mut logged = false;
586 let mut bootstrapped;
587 {
588 let dirmgr = upgrade_weak_ref(weak)?;
589 bootstrapped = dirmgr.netdir.get().is_some();
590 }
591
592 loop {
593 {
594 let dirmgr = upgrade_weak_ref(weak)?;
595 trace!("Trying to take ownership of the directory cache lock");
596 if dirmgr.try_upgrade_to_readwrite()? {
597 if logged {
601 info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
602 }
603 return Ok(());
604 }
605 }
606
607 if !logged {
608 logged = true;
609 if bootstrapped {
610 info!("Another process is managing the directory. We'll use its cache.");
611 } else {
612 info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
613 }
614 }
615
616 let pause = if bootstrapped {
619 std::time::Duration::new(120, 0)
620 } else {
621 std::time::Duration::new(5, 0)
622 };
623 schedule.sleep(pause).await?;
624 {
628 let dirmgr = upgrade_weak_ref(weak)?;
629 trace!("Trying to load from the directory cache");
630 if dirmgr.load_directory(attempt_id).await? {
631 if let Some(send_done) = on_complete.take() {
633 let _ = send_done.send(());
634 }
635 if !bootstrapped {
636 info!("The directory is now bootstrapped.");
637 }
638 bootstrapped = true;
639 }
640 }
641 }
642 }
643
644 #[allow(clippy::cognitive_complexity)] async fn download_forever(
650 weak: Weak<Self>,
651 schedule: &mut TaskSchedule<R>,
652 mut attempt_id: AttemptId,
653 mut on_complete: Option<oneshot::Sender<()>>,
654 ) -> Result<()> {
655 let mut state: Box<dyn DirState> = {
656 let dirmgr = upgrade_weak_ref(&weak)?;
657 Box::new(state::GetConsensusState::new(
658 dirmgr.runtime.clone(),
659 dirmgr.config.get(),
660 CacheUsage::CacheOkay,
661 Some(dirmgr.netdir.clone()),
662 #[cfg(feature = "dirfilter")]
663 dirmgr
664 .filter
665 .clone()
666 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
667 ))
668 };
669
670 trace!("Entering download loop.");
671
672 loop {
673 let mut usable = false;
674
675 let retry_config = {
676 let dirmgr = upgrade_weak_ref(&weak)?;
677 dirmgr.config.get().schedule.retry_bootstrap
681 };
682 let mut retry_delay = retry_config.schedule();
683
684 'retry_attempt: for try_num in retry_config.attempts() {
685 trace!(attempt=%attempt_id, ?try_num, "Trying to download a directory.");
686 let outcome = bootstrap::download(
687 Weak::clone(&weak),
688 &mut state,
689 schedule,
690 attempt_id,
691 &mut on_complete,
692 )
693 .await;
694 trace!(attempt=%attempt_id, ?try_num, ?outcome, "Download is over.");
695
696 if let Err(err) = outcome {
697 if state.is_ready(Readiness::Usable) {
698 usable = true;
699 info_report!(err, "Unable to completely download a directory. (Nevertheless, the directory is usable, so we'll pause for now)");
700 break 'retry_attempt;
701 }
702
703 match err.bootstrap_action() {
704 BootstrapAction::Nonfatal => {
705 return Err(into_internal!(
706 "Nonfatal error should not have propagated here"
707 )(err)
708 .into());
709 }
710 BootstrapAction::Reset => {}
711 BootstrapAction::Fatal => return Err(err),
712 }
713
714 let delay = retry_delay.next_delay(&mut rand::rng());
715 warn_report!(
716 err,
717 "Unable to download a usable directory. (We will restart in {})",
718 humantime::format_duration(delay),
719 );
720 {
721 let dirmgr = upgrade_weak_ref(&weak)?;
722 dirmgr.note_reset(attempt_id);
723 }
724 schedule.sleep(delay).await?;
725 state = state.reset();
726 } else {
727 info!(attempt=%attempt_id, "Directory is complete.");
728 usable = true;
729 break 'retry_attempt;
730 }
731 }
732
733 if !usable {
734 warn!(
736 "We failed {} times to bootstrap a directory. We're going to give up.",
737 retry_config.n_attempts()
738 );
739 return Err(Error::CantAdvanceState);
740 } else {
741 if let Some(send_done) = on_complete.take() {
743 let _ = send_done.send(());
744 }
745 }
746
747 let reset_at = state.reset_time();
748 match reset_at {
749 Some(t) => {
750 trace!("Sleeping until {}", time::OffsetDateTime::from(t));
751 schedule.sleep_until_wallclock(t).await?;
752 }
753 None => return Ok(()),
754 }
755 attempt_id = bootstrap::AttemptId::next();
756 trace!(attempt=%attempt_id, "Beginning new attempt to bootstrap directory");
757 state = state.reset();
758 }
759 }
760
761 fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
763 self.circmgr.clone().ok_or(Error::NoDownloadSupport)
764 }
765
766 pub fn reconfigure(
770 &self,
771 new_config: &DirMgrConfig,
772 how: tor_config::Reconfigure,
773 ) -> std::result::Result<(), tor_config::ReconfigureError> {
774 let config = self.config.get();
775 if new_config.cache_dir != config.cache_dir {
780 how.cannot_change("storage.cache_dir")?;
781 }
782 if new_config.cache_trust != config.cache_trust {
783 how.cannot_change("storage.permissions")?;
784 }
785 if new_config.authorities() != config.authorities() {
786 how.cannot_change("network.authorities")?;
787 }
788
789 if how == tor_config::Reconfigure::CheckAllOrNothing {
790 return Ok(());
791 }
792
793 let params_changed = new_config.override_net_params != config.override_net_params;
794
795 self.config
796 .map_and_replace(|cfg| cfg.update_from_config(new_config));
797
798 if params_changed {
799 let _ignore_err = self.netdir.mutate(|netdir| {
800 netdir.replace_overridden_parameters(&new_config.override_net_params);
801 Ok(())
802 });
803 {
804 let mut params = self.default_parameters.lock().expect("lock failed");
805 *params = Arc::new(NetParameters::from_map(&new_config.override_net_params));
806 }
807
808 self.events.publish(DirEvent::NewConsensus);
811 }
812
813 Ok(())
814 }
815
816 pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
822 self.receive_status.clone()
823 }
824
825 fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
828 let mut sender = self.send_status.lock().expect("poisoned lock");
830 let mut status = sender.borrow_mut();
831
832 status.update_progress(attempt_id, progress);
833 }
834
835 fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
838 if n_errors == 0 {
839 return;
840 }
841 let mut sender = self.send_status.lock().expect("poisoned lock");
842 let mut status = sender.borrow_mut();
843
844 status.note_errors(attempt_id, n_errors);
845 }
846
847 fn note_reset(&self, attempt_id: AttemptId) {
849 let mut sender = self.send_status.lock().expect("poisoned lock");
850 let mut status = sender.borrow_mut();
851
852 status.note_reset(attempt_id);
853 }
854
855 fn try_upgrade_to_readwrite(&self) -> Result<bool> {
862 self.store
863 .lock()
864 .expect("Directory storage lock poisoned")
865 .upgrade_to_readwrite()
866 }
867
868 #[cfg(test)]
870 fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
871 let rw = !self
872 .store
873 .lock()
874 .expect("Directory storage lock poisoned")
875 .is_readonly();
876 if rw {
878 Some(&self.store)
879 } else {
880 None
881 }
882 }
883
884 #[allow(clippy::unnecessary_wraps)] fn from_config(
890 config: DirMgrConfig,
891 runtime: R,
892 store: DirMgrStore<R>,
893 circmgr: Option<Arc<CircMgr<R>>>,
894 offline: bool,
895 ) -> Result<Self> {
896 let netdir = Arc::new(SharedMutArc::new());
897 let events = event::FlagPublisher::new();
898 let default_parameters = NetParameters::from_map(&config.override_net_params);
899 let default_parameters = Mutex::new(Arc::new(default_parameters));
900
901 let (send_status, receive_status) = postage::watch::channel();
902 let send_status = Mutex::new(send_status);
903 let receive_status = DirBootstrapEvents {
904 inner: receive_status,
905 };
906 #[cfg(feature = "dirfilter")]
907 let filter = config.extensions.filter.clone();
908
909 let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
911 let task_schedule = Mutex::new(Some(task_schedule));
912
913 let protocols = {
916 let store = store.store.lock().expect("lock poisoned");
917 store
918 .cached_protocol_recommendations()?
919 .map(|(t, p)| (t, Arc::new(p)))
920 };
921
922 Ok(DirMgr {
923 config: config.into(),
924 store: store.store,
925 netdir,
926 protocols: Mutex::new(protocols),
927 default_parameters,
928 events,
929 send_status,
930 receive_status,
931 circmgr,
932 runtime,
933 offline,
934 bootstrap_started: AtomicBool::new(false),
935 #[cfg(feature = "dirfilter")]
936 filter,
937 task_schedule,
938 task_handle,
939 })
940 }
941
942 async fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
947 let state = state::GetConsensusState::new(
948 self.runtime.clone(),
949 self.config.get(),
950 CacheUsage::CacheOnly,
951 None,
952 #[cfg(feature = "dirfilter")]
953 self.filter
954 .clone()
955 .unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
956 );
957 let _ = bootstrap::load(Arc::clone(self), Box::new(state), attempt_id).await?;
958
959 Ok(self.netdir.get().is_some())
960 }
961
962 pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
969 self.events.subscribe()
970 }
971
972 pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
975 use itertools::Itertools;
976 let mut result = HashMap::new();
977 let query: DocQuery = (*doc).into();
978 let store = self.store.lock().expect("store lock poisoned");
979 query.load_from_store_into(&mut result, &**store)?;
980 let item = result.into_iter().at_most_one().map_err(|_| {
981 Error::CacheCorruption("Found more than one entry in storage for given docid")
982 })?;
983 if let Some((docid, doctext)) = item {
984 if &docid != doc {
985 return Err(Error::CacheCorruption(
986 "Item from storage had incorrect docid.",
987 ));
988 }
989 Ok(Some(doctext))
990 } else {
991 Ok(None)
992 }
993 }
994
995 pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
1000 where
1001 T: IntoIterator<Item = DocId>,
1002 {
1003 let partitioned = docid::partition_by_type(docs);
1004 let mut result = HashMap::new();
1005 let store = self.store.lock().expect("store lock poisoned");
1006 for (_, query) in partitioned.into_iter() {
1007 query.load_from_store_into(&mut result, &**store)?;
1008 }
1009 Ok(result)
1010 }
1011
1012 fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
1020 if let ClientRequest::Consensus(req) = req {
1021 if tor_consdiff::looks_like_diff(&text) {
1022 if let Some(old_d) = req.old_consensus_digests().next() {
1023 let db_val = {
1024 let s = self.store.lock().expect("Directory storage lock poisoned");
1025 s.consensus_by_sha3_digest_of_signed_part(old_d)?
1026 };
1027 if let Some((old_consensus, meta)) = db_val {
1028 info!("Applying a consensus diff");
1029 let new_consensus = tor_consdiff::apply_diff(
1030 old_consensus.as_str()?,
1031 &text,
1032 Some(*meta.sha3_256_of_signed()),
1033 )?;
1034 new_consensus.check_digest()?;
1035 return Ok(new_consensus.to_string());
1036 }
1037 }
1038 return Err(Error::Unwanted(
1039 "Received a consensus diff we did not ask for",
1040 ));
1041 }
1042 }
1043 Ok(text)
1044 }
1045
1046 #[allow(clippy::cognitive_complexity)]
1048 fn apply_netdir_changes(
1049 self: &Arc<Self>,
1050 state: &mut Box<dyn DirState>,
1051 store: &mut dyn Store,
1052 ) -> Result<()> {
1053 if let Some(change) = state.get_netdir_change() {
1054 match change {
1055 NetDirChange::AttemptReplace {
1056 netdir,
1057 consensus_meta,
1058 } => {
1059 if let Some(ref cm) = self.circmgr {
1062 if !cm
1063 .netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
1064 {
1065 debug!("Got a new NetDir, but it doesn't have enough guards yet.");
1066 return Ok(());
1067 }
1068 }
1069 let is_stale = {
1070 self.netdir
1072 .get()
1073 .map(|x| {
1074 x.lifetime().valid_after()
1075 > netdir
1076 .as_ref()
1077 .expect("AttemptReplace had None")
1078 .lifetime()
1079 .valid_after()
1080 })
1081 .unwrap_or(false)
1082 };
1083 if is_stale {
1084 warn!("Got a new NetDir, but it's older than the one we currently have!");
1085 return Err(Error::NetDirOlder);
1086 }
1087 let cfg = self.config.get();
1088 let mut netdir = netdir.take().expect("AttemptReplace had None");
1089 netdir.replace_overridden_parameters(&cfg.override_net_params);
1090 self.netdir.replace(netdir);
1091 self.events.publish(DirEvent::NewConsensus);
1092 self.events.publish(DirEvent::NewDescriptors);
1093
1094 info!("Marked consensus usable.");
1095 if !store.is_readonly() {
1096 store.mark_consensus_usable(consensus_meta)?;
1097 store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
1100 }
1101 Ok(())
1102 }
1103 NetDirChange::AddMicrodescs(mds) => {
1104 self.netdir.mutate(|netdir| {
1105 for md in mds.drain(..) {
1106 netdir.add_microdesc(md);
1107 }
1108 Ok(())
1109 })?;
1110 self.events.publish(DirEvent::NewDescriptors);
1111 Ok(())
1112 }
1113 NetDirChange::SetRequiredProtocol { timestamp, protos } => {
1114 if !store.is_readonly() {
1115 store.update_protocol_recommendations(timestamp, protos.as_ref())?;
1116 }
1117 let mut pr = self.protocols.lock().expect("Poisoned lock");
1118 *pr = Some((timestamp, protos));
1119 self.events.publish(DirEvent::NewProtocolRecommendation);
1120 Ok(())
1121 }
1122 }
1123 } else {
1124 Ok(())
1125 }
1126 }
1127}
1128
1129#[derive(Debug, Copy, Clone)]
1131enum Readiness {
1132 Complete,
1134 Usable,
1136}
1137
1138fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
1141 Weak::upgrade(weak).ok_or(Error::ManagerDropped)
1142}
1143
1144pub(crate) fn default_consensus_cutoff(
1147 now: SystemTime,
1148 tolerance: &DirTolerance,
1149) -> Result<SystemTime> {
1150 const MIN_AGE_TO_ALLOW: Duration = Duration::from_secs(3 * 3600);
1153 let allow_skew = std::cmp::max(MIN_AGE_TO_ALLOW, tolerance.post_valid_tolerance);
1154 let cutoff = time::OffsetDateTime::from(now - allow_skew);
1155 let (h, _m, _s) = cutoff.to_hms();
1162 let cutoff = cutoff.replace_time(
1163 time::Time::from_hms(h, 0, 0)
1164 .map_err(tor_error::into_internal!("Failed clock calculation"))?,
1165 );
1166 let cutoff = cutoff + Duration::from_secs(3600);
1167
1168 Ok(cutoff.into())
1169}
1170
1171pub fn supported_client_protocols() -> tor_protover::Protocols {
1174 use tor_protover::named::*;
1175 [
1178 DIRCACHE_CONSDIFF,
1180 ]
1181 .into_iter()
1182 .collect()
1183}
1184
1185#[cfg(test)]
1186mod test {
1187 #![allow(clippy::bool_assert_comparison)]
1189 #![allow(clippy::clone_on_copy)]
1190 #![allow(clippy::dbg_macro)]
1191 #![allow(clippy::mixed_attributes_style)]
1192 #![allow(clippy::print_stderr)]
1193 #![allow(clippy::print_stdout)]
1194 #![allow(clippy::single_char_pattern)]
1195 #![allow(clippy::unwrap_used)]
1196 #![allow(clippy::unchecked_duration_subtraction)]
1197 #![allow(clippy::useless_vec)]
1198 #![allow(clippy::needless_pass_by_value)]
1199 use super::*;
1201 use crate::docmeta::{AuthCertMeta, ConsensusMeta};
1202 use std::time::Duration;
1203 use tempfile::TempDir;
1204 use tor_basic_utils::test_rng::testing_rng;
1205 use tor_netdoc::doc::netstatus::ConsensusFlavor;
1206 use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};
1207 use tor_rtcompat::SleepProvider;
1208
1209 #[test]
1210 fn protocols() {
1211 let pr = supported_client_protocols();
1212 let expected = "DirCache=2".parse().unwrap();
1213 assert_eq!(pr, expected);
1214 }
1215
1216 pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
1217 let dir = TempDir::new().unwrap();
1218 let config = DirMgrConfig {
1219 cache_dir: dir.path().into(),
1220 ..Default::default()
1221 };
1222 let store = DirMgrStore::new(&config, runtime.clone(), false).unwrap();
1223 let dirmgr = DirMgr::from_config(config, runtime, store, None, false).unwrap();
1224
1225 (dir, dirmgr)
1226 }
1227
1228 #[test]
1229 fn failing_accessors() {
1230 tor_rtcompat::test_with_one_runtime!(|rt| async {
1231 let (_tempdir, mgr) = new_mgr(rt);
1232
1233 assert!(mgr.circmgr().is_err());
1234 assert!(mgr.netdir(Timeliness::Unchecked).is_err());
1235 });
1236 }
1237
1238 #[test]
1239 fn load_and_store_internals() {
1240 tor_rtcompat::test_with_one_runtime!(|rt| async {
1241 let now = rt.wallclock();
1242 let tomorrow = now + Duration::from_secs(86400);
1243 let later = tomorrow + Duration::from_secs(86400);
1244
1245 let (_tempdir, mgr) = new_mgr(rt);
1246
1247 let d1 = [5_u8; 32];
1249 let d2 = [7; 32];
1250 let d3 = [42; 32];
1251 let d4 = [99; 20];
1252 let d5 = [12; 20];
1253 let certid1 = AuthCertKeyIds {
1254 id_fingerprint: d4.into(),
1255 sk_fingerprint: d5.into(),
1256 };
1257 let certid2 = AuthCertKeyIds {
1258 id_fingerprint: d5.into(),
1259 sk_fingerprint: d4.into(),
1260 };
1261
1262 {
1263 let mut store = mgr.store.lock().unwrap();
1264
1265 store
1266 .store_microdescs(
1267 &[
1268 ("Fake micro 1", &d1),
1269 ("Fake micro 2", &d2),
1270 ("Fake micro 3", &d3),
1271 ],
1272 now,
1273 )
1274 .unwrap();
1275
1276 #[cfg(feature = "routerdesc")]
1277 store
1278 .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
1279 .unwrap();
1280
1281 store
1282 .store_authcerts(&[
1283 (
1284 AuthCertMeta::new(certid1, now, tomorrow),
1285 "Fake certificate one",
1286 ),
1287 (
1288 AuthCertMeta::new(certid2, now, tomorrow),
1289 "Fake certificate two",
1290 ),
1291 ])
1292 .unwrap();
1293
1294 let cmeta = ConsensusMeta::new(
1295 Lifetime::new(now, tomorrow, later).unwrap(),
1296 [102; 32],
1297 [103; 32],
1298 );
1299 store
1300 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1301 .unwrap();
1302 }
1303
1304 let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
1306 assert_eq!(t1.as_str(), Ok("Fake micro 1"));
1307
1308 let t2 = mgr
1309 .text(&DocId::LatestConsensus {
1310 flavor: ConsensusFlavor::Microdesc,
1311 cache_usage: CacheUsage::CacheOkay,
1312 })
1313 .unwrap()
1314 .unwrap();
1315 assert_eq!(t2.as_str(), Ok("Fake consensus!"));
1316
1317 let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
1318 assert!(t3.is_none());
1319
1320 let d_bogus = DocId::Microdesc([255; 32]);
1322 let res = mgr
1323 .texts(vec![
1324 DocId::Microdesc(d2),
1325 DocId::Microdesc(d3),
1326 d_bogus,
1327 DocId::AuthCert(certid2),
1328 #[cfg(feature = "routerdesc")]
1329 DocId::RouterDesc(d5),
1330 ])
1331 .unwrap();
1332 assert_eq!(
1333 res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
1334 Ok("Fake micro 2")
1335 );
1336 assert_eq!(
1337 res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
1338 Ok("Fake micro 3")
1339 );
1340 assert!(!res.contains_key(&d_bogus));
1341 assert_eq!(
1342 res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
1343 Ok("Fake certificate two")
1344 );
1345 #[cfg(feature = "routerdesc")]
1346 assert_eq!(
1347 res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
1348 Ok("Fake rd2")
1349 );
1350 });
1351 }
1352
1353 #[test]
1354 fn make_consensus_request() {
1355 tor_rtcompat::test_with_one_runtime!(|rt| async {
1356 let now = rt.wallclock();
1357 let tomorrow = now + Duration::from_secs(86400);
1358 let later = tomorrow + Duration::from_secs(86400);
1359
1360 let (_tempdir, mgr) = new_mgr(rt);
1361 let config = DirMgrConfig::default();
1362
1363 let req = {
1365 let store = mgr.store.lock().unwrap();
1366 bootstrap::make_consensus_request(
1367 now,
1368 ConsensusFlavor::Microdesc,
1369 &**store,
1370 &config,
1371 )
1372 .unwrap()
1373 };
1374 let tolerance = DirTolerance::default().post_valid_tolerance;
1375 match req {
1376 ClientRequest::Consensus(r) => {
1377 assert_eq!(r.old_consensus_digests().count(), 0);
1378 let date = r.last_consensus_date().unwrap();
1379 assert!(date >= now - tolerance);
1380 assert!(date <= now - tolerance + Duration::from_secs(3600));
1381 }
1382 _ => panic!("Wrong request type"),
1383 }
1384
1385 let d_prev = [42; 32];
1387 {
1388 let mut store = mgr.store.lock().unwrap();
1389
1390 let cmeta = ConsensusMeta::new(
1391 Lifetime::new(now, tomorrow, later).unwrap(),
1392 d_prev,
1393 [103; 32],
1394 );
1395 store
1396 .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
1397 .unwrap();
1398 }
1399
1400 let req = {
1402 let store = mgr.store.lock().unwrap();
1403 bootstrap::make_consensus_request(
1404 now,
1405 ConsensusFlavor::Microdesc,
1406 &**store,
1407 &config,
1408 )
1409 .unwrap()
1410 };
1411 match req {
1412 ClientRequest::Consensus(r) => {
1413 let ds: Vec<_> = r.old_consensus_digests().collect();
1414 assert_eq!(ds.len(), 1);
1415 assert_eq!(ds[0], &d_prev);
1416 assert_eq!(r.last_consensus_date(), Some(now));
1417 }
1418 _ => panic!("Wrong request type"),
1419 }
1420 });
1421 }
1422
1423 #[test]
1424 fn make_other_requests() {
1425 tor_rtcompat::test_with_one_runtime!(|rt| async {
1426 use rand::Rng;
1427 let (_tempdir, mgr) = new_mgr(rt);
1428
1429 let certid1 = AuthCertKeyIds {
1430 id_fingerprint: [99; 20].into(),
1431 sk_fingerprint: [100; 20].into(),
1432 };
1433 let mut rng = testing_rng();
1434 #[cfg(feature = "routerdesc")]
1435 let rd_ids: Vec<DocId> = (0..1000).map(|_| DocId::RouterDesc(rng.random())).collect();
1436 let md_ids: Vec<DocId> = (0..1000).map(|_| DocId::Microdesc(rng.random())).collect();
1437 let config = DirMgrConfig::default();
1438
1439 let query = DocId::AuthCert(certid1);
1441 let store = mgr.store.lock().unwrap();
1442 let reqs =
1443 bootstrap::make_requests_for_documents(&mgr.runtime, &[query], &**store, &config)
1444 .unwrap();
1445 assert_eq!(reqs.len(), 1);
1446 let req = &reqs[0];
1447 if let ClientRequest::AuthCert(r) = req {
1448 assert_eq!(r.keys().next(), Some(&certid1));
1449 } else {
1450 panic!();
1451 }
1452
1453 let reqs =
1455 bootstrap::make_requests_for_documents(&mgr.runtime, &md_ids, &**store, &config)
1456 .unwrap();
1457 assert_eq!(reqs.len(), 2);
1458 assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));
1459
1460 #[cfg(feature = "routerdesc")]
1462 {
1463 let reqs = bootstrap::make_requests_for_documents(
1464 &mgr.runtime,
1465 &rd_ids,
1466 &**store,
1467 &config,
1468 )
1469 .unwrap();
1470 assert_eq!(reqs.len(), 2);
1471 assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
1472 }
1473 });
1474 }
1475
1476 #[test]
1477 fn expand_response() {
1478 tor_rtcompat::test_with_one_runtime!(|rt| async {
1479 let now = rt.wallclock();
1480 let day = Duration::from_secs(86400);
1481 let config = DirMgrConfig::default();
1482
1483 let (_tempdir, mgr) = new_mgr(rt);
1484
1485 let q = DocId::Microdesc([99; 32]);
1487 let r = {
1488 let store = mgr.store.lock().unwrap();
1489 bootstrap::make_requests_for_documents(&mgr.runtime, &[q], &**store, &config)
1490 .unwrap()
1491 };
1492 let expanded = mgr.expand_response_text(&r[0], "ABC".to_string());
1493 assert_eq!(&expanded.unwrap(), "ABC");
1494
1495 let latest_id = DocId::LatestConsensus {
1498 flavor: ConsensusFlavor::Microdesc,
1499 cache_usage: CacheUsage::CacheOkay,
1500 };
1501 let r = {
1502 let store = mgr.store.lock().unwrap();
1503 bootstrap::make_requests_for_documents(
1504 &mgr.runtime,
1505 &[latest_id],
1506 &**store,
1507 &config,
1508 )
1509 .unwrap()
1510 };
1511 let expanded = mgr.expand_response_text(&r[0], "DEF".to_string());
1512 assert_eq!(&expanded.unwrap(), "DEF");
1513
1514 {
1517 let mut store = mgr.store.lock().unwrap();
1518 let d_in = [0x99; 32]; let cmeta = ConsensusMeta::new(
1520 Lifetime::new(now, now + day, now + 2 * day).unwrap(),
1521 d_in,
1522 d_in,
1523 );
1524 store
1525 .store_consensus(
1526 &cmeta,
1527 ConsensusFlavor::Microdesc,
1528 false,
1529 "line 1\nline2\nline 3\n",
1530 )
1531 .unwrap();
1532 }
1533
1534 let r = {
1537 let store = mgr.store.lock().unwrap();
1538 bootstrap::make_requests_for_documents(
1539 &mgr.runtime,
1540 &[latest_id],
1541 &**store,
1542 &config,
1543 )
1544 .unwrap()
1545 };
1546 let expanded = mgr.expand_response_text(&r[0], "hello".to_string());
1547 assert_eq!(&expanded.unwrap(), "hello");
1548
1549 let diff = "network-status-diff-version 1
1551hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
15522c
1553replacement line
1554.
1555".to_string();
1556 let expanded = mgr.expand_response_text(&r[0], diff);
1557
1558 assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");
1559
1560 let diff = "network-status-diff-version 1
1562hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
15632c
1564replacement line
1565.
1566".to_string();
1567 let expanded = mgr.expand_response_text(&r[0], diff);
1568 assert!(expanded.is_err());
1569 });
1570 }
1571}