tor_dirmgr/
state.rs

1//! Implementation for the primary directory state machine.
2//!
3//! There are three (active) states that a download can be in: looking
4//! for a consensus ([`GetConsensusState`]), looking for certificates
5//! to validate that consensus ([`GetCertsState`]), and looking for
6//! microdescriptors ([`GetMicrodescsState`]).
7//!
8//! These states have no contact with the network, and are purely
9//! reactive to other code that drives them.  See the
10//! [`bootstrap`](crate::bootstrap) module for functions that actually
11//! load or download directory information.
12
13use std::collections::{HashMap, HashSet};
14use std::fmt::Debug;
15use std::mem;
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, SystemTime};
18use time::OffsetDateTime;
19use tor_basic_utils::RngExt as _;
20use tor_error::{internal, warn_report};
21use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
22use tor_netdoc::doc::authcert::UncheckedAuthCert;
23use tor_netdoc::doc::netstatus::{Lifetime, ProtoStatuses};
24use tracing::{debug, warn};
25
26use crate::event::DirProgress;
27
28use crate::storage::DynStore;
29use crate::{
30    docmeta::{AuthCertMeta, ConsensusMeta},
31    event,
32    retry::DownloadSchedule,
33    CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
34};
35use crate::{DocSource, SharedMutArc};
36use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
37#[cfg(feature = "geoip")]
38use tor_geoip::GeoipDb;
39use tor_llcrypto::pk::rsa::RsaIdentity;
40use tor_netdoc::doc::{
41    microdesc::{MdDigest, Microdesc},
42    netstatus::MdConsensus,
43};
44use tor_netdoc::{
45    doc::{
46        authcert::{AuthCert, AuthCertKeyIds},
47        microdesc::MicrodescReader,
48        netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
49    },
50    AllowAnnotations,
51};
52use tor_rtcompat::Runtime;
53
54/// A change to the currently running `NetDir`, returned by the state machines in this module.
55#[derive(Debug)]
56pub(crate) enum NetDirChange<'a> {
57    /// If the provided `NetDir` is suitable for use (i.e. the caller determines it can build
58    /// circuits with it), replace the current `NetDir` with it.
59    ///
60    /// The caller must call `DirState::on_netdir_replaced` if the replace was successful.
61    AttemptReplace {
62        /// The netdir to replace the current one with, if it's usable.
63        ///
64        /// The `Option` is always `Some` when returned from the state machine; it's there
65        /// so that the caller can call `.take()` to avoid cloning the netdir.
66        netdir: &'a mut Option<NetDir>,
67        /// The consensus metadata for this netdir.
68        consensus_meta: &'a ConsensusMeta,
69    },
70    /// Add the provided microdescriptors to the current `NetDir`.
71    AddMicrodescs(&'a mut Vec<Microdesc>),
72    /// Replace the recommended set of subprotocols.
73    SetRequiredProtocol {
74        /// The time at which the protocol statuses were recommended
75        timestamp: SystemTime,
76        /// The recommended set of protocols.
77        protos: Arc<ProtoStatuses>,
78    },
79}
80
81/// A "state" object used to represent our progress in downloading a
82/// directory.
83///
84/// These state objects are not meant to know about the network, or
85/// how to fetch documents at all.  Instead, they keep track of what
86/// information they are missing, and what to do when they get that
87/// information.
88///
89/// Every state object has two possible transitions: "resetting", and
90/// "advancing".  Advancing happens when a state has no more work to
91/// do, and needs to transform into a different kind of object.
92/// Resetting happens when this state needs to go back to an initial
93/// state in order to start over -- either because of an error or
94/// because the information it has downloaded is no longer timely.
95pub(crate) trait DirState: Send {
96    /// Return a human-readable description of this state.
97    fn describe(&self) -> String;
98    /// Return a list of the documents we're missing.
99    ///
100    /// If every document on this list were to be loaded or downloaded, then
101    /// the state should either become "ready to advance", or "complete."
102    ///
103    /// This list should never _grow_ on a given state; only advancing
104    /// or resetting the state should add new DocIds that weren't
105    /// there before.
106    fn missing_docs(&self) -> Vec<DocId>;
107    /// Describe whether this state has reached `ready` status.
108    fn is_ready(&self, ready: Readiness) -> bool;
109    /// If the state object wants to make changes to the currently running `NetDir`,
110    /// return the proposed changes.
111    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
112        None
113    }
114    /// Return true if this state can advance to another state via its
115    /// `advance` method.
116    fn can_advance(&self) -> bool;
117    /// Add one or more documents from our cache; returns 'true' if there
118    /// was any change in this state.
119    ///
120    /// Set `changed` to true if any semantic changes in this state were made.
121    ///
122    /// An error return does not necessarily mean that no data was added;
123    /// partial successes are possible.
124    fn add_from_cache(
125        &mut self,
126        docs: HashMap<DocId, DocumentText>,
127        changed: &mut bool,
128    ) -> Result<()>;
129
130    /// Add information that we have just downloaded to this state.
131    ///
132    /// This method receives a copy of the original request, and should reject
133    /// any documents that do not pertain to it.
134    ///
135    /// If `storage` is provided, then we should write any accepted documents
136    /// into `storage` so they can be saved in a cache.
137    ///
138    /// Set `changed` to true if any semantic changes in this state were made.
139    ///
140    /// An error return does not necessarily mean that no data was added;
141    /// partial successes are possible.
142    fn add_from_download(
143        &mut self,
144        text: &str,
145        request: &ClientRequest,
146        source: DocSource,
147        storage: Option<&Mutex<DynStore>>,
148        changed: &mut bool,
149    ) -> Result<()>;
150    /// Return a summary of this state as a [`DirProgress`].
151    fn bootstrap_progress(&self) -> event::DirProgress;
152    /// Return a configuration for attempting downloads.
153    fn dl_config(&self) -> DownloadSchedule;
154    /// If possible, advance to the next state.
155    fn advance(self: Box<Self>) -> Box<dyn DirState>;
156    /// Return a time (if any) when downloaders should stop attempting to
157    /// advance this state, and should instead reset it and start over.
158    fn reset_time(&self) -> Option<SystemTime>;
159    /// Reset this state and start over.
160    fn reset(self: Box<Self>) -> Box<dyn DirState>;
161}
162
163/// An object that can provide a previous netdir for the bootstrapping state machines to use.
164pub(crate) trait PreviousNetDir: Send + Sync + 'static + Debug {
165    /// Get the previous netdir, if there still is one.
166    fn get_netdir(&self) -> Option<Arc<NetDir>>;
167}
168
169impl PreviousNetDir for SharedMutArc<NetDir> {
170    fn get_netdir(&self) -> Option<Arc<NetDir>> {
171        self.get()
172    }
173}
174
175/// Initial state: fetching or loading a consensus directory.
176#[derive(Clone, Debug)]
177pub(crate) struct GetConsensusState<R: Runtime> {
178    /// How should we get the consensus from the cache, if at all?
179    cache_usage: CacheUsage,
180
181    /// If present, a time after which we want our consensus to have
182    /// been published.
183    //
184    // TODO: This is not yet used everywhere it could be.  In the future maybe
185    // it should be inserted into the DocId::LatestConsensus  alternative rather
186    // than being recalculated in make_consensus_request,
187    after: Option<SystemTime>,
188
189    /// If present, our next state.
190    ///
191    /// (This is present once we have a consensus.)
192    next: Option<GetCertsState<R>>,
193
194    /// A list of RsaIdentity for the authorities that we believe in.
195    ///
196    /// No consensus can be valid unless it purports to be signed by
197    /// more than half of these authorities.
198    authority_ids: Vec<RsaIdentity>,
199
200    /// A `Runtime` implementation.
201    rt: R,
202    /// The configuration of the directory manager. Used for download configuration
203    /// purposes.
204    config: Arc<DirMgrConfig>,
205    /// If one exists, the netdir we're trying to update.
206    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
207
208    /// A filter that gets applied to directory objects before we use them.
209    #[cfg(feature = "dirfilter")]
210    filter: Arc<dyn crate::filter::DirFilter>,
211}
212
213impl<R: Runtime> GetConsensusState<R> {
214    /// Create a new `GetConsensusState`, using the cache as per `cache_usage` and downloading as
215    /// per the relevant sections of `config`. If `prev_netdir` is supplied, information from that
216    /// directory may be used to complete the next one.
217    pub(crate) fn new(
218        rt: R,
219        config: Arc<DirMgrConfig>,
220        cache_usage: CacheUsage,
221        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
222        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
223    ) -> Self {
224        let authority_ids = config
225            .authorities()
226            .iter()
227            .map(|auth| auth.v3ident)
228            .collect();
229        let after = prev_netdir
230            .as_ref()
231            .and_then(|x| x.get_netdir())
232            .map(|nd| nd.lifetime().valid_after());
233
234        GetConsensusState {
235            cache_usage,
236            after,
237            next: None,
238            authority_ids,
239            rt,
240            config,
241            prev_netdir,
242            #[cfg(feature = "dirfilter")]
243            filter,
244        }
245    }
246}
247
248impl<R: Runtime> DirState for GetConsensusState<R> {
249    fn describe(&self) -> String {
250        if self.next.is_some() {
251            "About to fetch certificates."
252        } else {
253            match self.cache_usage {
254                CacheUsage::CacheOnly => "Looking for a cached consensus.",
255                CacheUsage::CacheOkay => "Looking for a consensus.",
256                CacheUsage::MustDownload => "Downloading a consensus.",
257            }
258        }
259        .to_string()
260    }
261    fn missing_docs(&self) -> Vec<DocId> {
262        if self.can_advance() {
263            return Vec::new();
264        }
265        let flavor = ConsensusFlavor::Microdesc;
266        vec![DocId::LatestConsensus {
267            flavor,
268            cache_usage: self.cache_usage,
269        }]
270    }
271    fn is_ready(&self, _ready: Readiness) -> bool {
272        false
273    }
274    fn can_advance(&self) -> bool {
275        self.next.is_some()
276    }
277    fn bootstrap_progress(&self) -> DirProgress {
278        if let Some(next) = &self.next {
279            next.bootstrap_progress()
280        } else {
281            DirProgress::NoConsensus { after: self.after }
282        }
283    }
284    fn dl_config(&self) -> DownloadSchedule {
285        self.config.schedule.retry_consensus
286    }
287    fn add_from_cache(
288        &mut self,
289        docs: HashMap<DocId, DocumentText>,
290        changed: &mut bool,
291    ) -> Result<()> {
292        let text = match docs.into_iter().next() {
293            None => return Ok(()),
294            Some((
295                DocId::LatestConsensus {
296                    flavor: ConsensusFlavor::Microdesc,
297                    ..
298                },
299                text,
300            )) => text,
301            _ => return Err(Error::CacheCorruption("Not an md consensus")),
302        };
303
304        let source = DocSource::LocalCache;
305
306        self.add_consensus_text(
307            source,
308            text.as_str().map_err(Error::BadUtf8InCache)?,
309            None,
310            changed,
311        )?;
312        Ok(())
313    }
314    fn add_from_download(
315        &mut self,
316        text: &str,
317        request: &ClientRequest,
318        source: DocSource,
319        storage: Option<&Mutex<DynStore>>,
320        changed: &mut bool,
321    ) -> Result<()> {
322        let requested_newer_than = match request {
323            ClientRequest::Consensus(r) => r.last_consensus_date(),
324            _ => None,
325        };
326        let meta = self.add_consensus_text(source, text, requested_newer_than, changed)?;
327
328        if let Some(store) = storage {
329            let mut w = store.lock().expect("Directory storage lock poisoned");
330            w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
331        }
332        Ok(())
333    }
334    fn advance(self: Box<Self>) -> Box<dyn DirState> {
335        match self.next {
336            Some(next) => Box::new(next),
337            None => self,
338        }
339    }
340    fn reset_time(&self) -> Option<SystemTime> {
341        None
342    }
343    fn reset(self: Box<Self>) -> Box<dyn DirState> {
344        self
345    }
346}
347
348impl<R: Runtime> GetConsensusState<R> {
349    /// Helper: try to set the current consensus text from an input string
350    /// `text`.  Refuse it if the authorities could never be correct, or if it
351    /// is ill-formed.
352    ///
353    /// If `cutoff` is provided, treat any consensus older than `cutoff` as
354    /// older-than-requested.
355    ///
356    /// Errors from this method are not fatal to the download process.
357    fn add_consensus_text(
358        &mut self,
359        source: DocSource,
360        text: &str,
361        cutoff: Option<SystemTime>,
362        changed: &mut bool,
363    ) -> Result<&ConsensusMeta> {
364        // Try to parse it and get its metadata.
365        let (consensus_meta, unvalidated) = {
366            let (signedval, remainder, parsed) =
367                MdConsensus::parse(text).map_err(|e| Error::from_netdoc(source.clone(), e))?;
368            #[cfg(feature = "dirfilter")]
369            let parsed = self.filter.filter_consensus(parsed)?;
370            let parsed = self.config.tolerance.extend_tolerance(parsed);
371            let now = self.rt.wallclock();
372            let timely = parsed.check_valid_at(&now)?;
373            if let Some(cutoff) = cutoff {
374                if timely.peek_lifetime().valid_after() < cutoff {
375                    return Err(Error::Unwanted("consensus was older than requested"));
376                }
377            }
378            let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
379            (meta, timely)
380        };
381
382        // Check out what authorities we believe in, and see if enough
383        // of them are purported to have signed this consensus.
384        let n_authorities = self.authority_ids.len() as u16;
385        let unvalidated = unvalidated.set_n_authorities(n_authorities);
386
387        let id_refs: Vec<_> = self.authority_ids.iter().collect();
388        if !unvalidated.authorities_are_correct(&id_refs[..]) {
389            return Err(Error::UnrecognizedAuthorities);
390        }
391        // Yes, we've added the consensus.  That's a change.
392        *changed = true;
393
394        // Make a set of all the certificates we want -- the subset of
395        // those listed on the consensus that we would indeed accept as
396        // authoritative.
397        let desired_certs = unvalidated
398            .signing_cert_ids()
399            .filter(|m| self.recognizes_authority(&m.id_fingerprint))
400            .collect();
401
402        self.next = Some(GetCertsState {
403            cache_usage: self.cache_usage,
404            consensus_source: source,
405            consensus: GetCertsConsensus::Unvalidated(unvalidated),
406            consensus_meta,
407            missing_certs: desired_certs,
408            certs: Vec::new(),
409            rt: self.rt.clone(),
410            config: self.config.clone(),
411            prev_netdir: self.prev_netdir.take(),
412            protocol_statuses: None,
413            #[cfg(feature = "dirfilter")]
414            filter: self.filter.clone(),
415        });
416
417        // Unwrap should be safe because `next` was just assigned
418        #[allow(clippy::unwrap_used)]
419        Ok(&self.next.as_ref().unwrap().consensus_meta)
420    }
421
422    /// Return true if `id` is an authority identity we recognize
423    fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
424        self.authority_ids.iter().any(|auth| auth == id)
425    }
426}
427
428/// One of two possible internal states for the consensus in a GetCertsState.
429///
430/// This inner object is advanced by `try_checking_sigs`.
431#[derive(Clone, Debug)]
432enum GetCertsConsensus {
433    /// We have an unvalidated consensus; we haven't checked its signatures.
434    Unvalidated(UnvalidatedMdConsensus),
435    /// A validated consensus: the signatures are fine and we can advance.
436    Validated(MdConsensus),
437    /// We failed to validate the consensus, even after getting enough certificates.
438    Failed,
439}
440
441/// Second state: fetching or loading authority certificates.
442///
443/// TODO: we should probably do what C tor does, and try to use the
444/// same directory that gave us the consensus.
445///
446/// TODO SECURITY: This needs better handling for the DOS attack where
447/// we are given a bad consensus signed with fictional certificates
448/// that we can never find.
449#[derive(Clone, Debug)]
450struct GetCertsState<R: Runtime> {
451    /// The cache usage we had in mind when we began.  Used to reset.
452    cache_usage: CacheUsage,
453    /// Where did we get our consensus?
454    consensus_source: DocSource,
455    /// The consensus that we are trying to validate, or an error if we've given
456    /// up on validating it.
457    consensus: GetCertsConsensus,
458    /// Metadata for the consensus.
459    consensus_meta: ConsensusMeta,
460    /// A set of the certificate keypairs for the certificates we don't
461    /// have yet.
462    missing_certs: HashSet<AuthCertKeyIds>,
463    /// A list of the certificates we've been able to load or download.
464    certs: Vec<AuthCert>,
465
466    /// A `Runtime` implementation.
467    rt: R,
468    /// The configuration of the directory manager. Used for download configuration
469    /// purposes.
470    config: Arc<DirMgrConfig>,
471    /// If one exists, the netdir we're trying to update.
472    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
473
474    /// If present a set of protocols to install as our latest recommended set.
475    protocol_statuses: Option<(SystemTime, Arc<ProtoStatuses>)>,
476
477    /// A filter that gets applied to directory objects before we use them.
478    #[cfg(feature = "dirfilter")]
479    filter: Arc<dyn crate::filter::DirFilter>,
480}
481
482impl<R: Runtime> GetCertsState<R> {
483    /// Handle a certificate result returned by `tor_netdoc`: checking it for timeliness
484    /// and well-signedness.
485    ///
486    /// On success return the `AuthCert` and the string that represents it within the string `within`.
487    /// On failure, return an error.
488    fn check_parsed_certificate<'s>(
489        &self,
490        parsed: tor_netdoc::Result<UncheckedAuthCert>,
491        source: &DocSource,
492        within: &'s str,
493    ) -> Result<(AuthCert, &'s str)> {
494        let parsed = parsed.map_err(|e| Error::from_netdoc(source.clone(), e))?;
495        let cert_text = parsed
496            .within(within)
497            .expect("Certificate was not in input as expected");
498        let wellsigned = parsed.check_signature()?;
499        let now = self.rt.wallclock();
500        let timely_cert = self
501            .config
502            .tolerance
503            .extend_tolerance(wellsigned)
504            .check_valid_at(&now)?;
505        Ok((timely_cert, cert_text))
506    }
507
508    /// If we have enough certificates, and we have not yet checked the
509    /// signatures on the consensus, try checking them.
510    ///
511    /// If the consensus is valid, remove the unvalidated consensus from `self`
512    /// and put the validated consensus there instead.
513    ///
514    /// If the consensus is invalid, throw it out set a blocking error.
515    fn try_checking_sigs(&mut self) -> Result<()> {
516        use GetCertsConsensus as C;
517        // Temporary value; we'll replace the consensus field with something
518        // better before the method returns.
519        let mut consensus = C::Failed;
520        std::mem::swap(&mut consensus, &mut self.consensus);
521
522        let unvalidated = match consensus {
523            C::Unvalidated(uv) if uv.key_is_correct(&self.certs[..]).is_ok() => uv,
524            _ => {
525                // nothing to check at this point.  Either we already checked the consensus, or we don't yet have enough certificates.
526                self.consensus = consensus;
527                return Ok(());
528            }
529        };
530
531        let (new_consensus, outcome) = match unvalidated.check_signature(&self.certs[..]) {
532            Ok(validated) => (C::Validated(validated), Ok(())),
533            Err(cause) => (
534                C::Failed,
535                Err(Error::ConsensusInvalid {
536                    source: self.consensus_source.clone(),
537                    cause,
538                }),
539            ),
540        };
541        self.consensus = new_consensus;
542
543        // Update our protocol recommendations if we have a validated consensus,
544        // and if we haven't already updated our recommendations.
545        if let GetCertsConsensus::Validated(v) = &self.consensus {
546            if self.protocol_statuses.is_none() {
547                let protoset: &Arc<ProtoStatuses> = v.protocol_statuses();
548                self.protocol_statuses = Some((
549                    self.consensus_meta.lifetime().valid_after(),
550                    Arc::clone(protoset),
551                ));
552            }
553        }
554
555        outcome
556    }
557}
558
559impl<R: Runtime> DirState for GetCertsState<R> {
560    fn describe(&self) -> String {
561        use GetCertsConsensus as C;
562        match &self.consensus {
563            C::Unvalidated(_) => {
564                let total = self.certs.len() + self.missing_certs.len();
565                format!(
566                    "Downloading certificates for consensus (we are missing {}/{}).",
567                    self.missing_certs.len(),
568                    total
569                )
570            }
571            C::Validated(_) => "Validated consensus; about to get microdescriptors".to_string(),
572            C::Failed => "Failed to validate consensus".to_string(),
573        }
574    }
575    fn missing_docs(&self) -> Vec<DocId> {
576        self.missing_certs
577            .iter()
578            .map(|id| DocId::AuthCert(*id))
579            .collect()
580    }
581    fn is_ready(&self, _ready: Readiness) -> bool {
582        false
583    }
584    fn can_advance(&self) -> bool {
585        matches!(self.consensus, GetCertsConsensus::Validated(_))
586    }
587    fn bootstrap_progress(&self) -> DirProgress {
588        let n_certs = self.certs.len();
589        let n_missing_certs = self.missing_certs.len();
590        let total_certs = n_missing_certs + n_certs;
591        DirProgress::FetchingCerts {
592            lifetime: self.consensus_meta.lifetime().clone(),
593            usable_lifetime: self
594                .config
595                .tolerance
596                .extend_lifetime(self.consensus_meta.lifetime()),
597
598            n_certs: (n_certs as u16, total_certs as u16),
599        }
600    }
601    fn dl_config(&self) -> DownloadSchedule {
602        self.config.schedule.retry_certs
603    }
604    fn add_from_cache(
605        &mut self,
606        docs: HashMap<DocId, DocumentText>,
607        changed: &mut bool,
608    ) -> Result<()> {
609        // Here we iterate over the documents we want, taking them from
610        // our input and remembering them.
611        let source = DocSource::LocalCache;
612        let mut nonfatal_error = None;
613        for id in &self.missing_docs() {
614            if let Some(cert) = docs.get(id) {
615                let text = cert.as_str().map_err(Error::BadUtf8InCache)?;
616                let parsed = AuthCert::parse(text);
617                match self.check_parsed_certificate(parsed, &source, text) {
618                    Ok((cert, _text)) => {
619                        self.missing_certs.remove(cert.key_ids());
620                        self.certs.push(cert);
621                        *changed = true;
622                    }
623                    Err(e) => {
624                        nonfatal_error.get_or_insert(e);
625                    }
626                }
627            }
628        }
629        if *changed {
630            self.try_checking_sigs()?;
631        }
632        opt_err_to_result(nonfatal_error)
633    }
634    fn add_from_download(
635        &mut self,
636        text: &str,
637        request: &ClientRequest,
638        source: DocSource,
639        storage: Option<&Mutex<DynStore>>,
640        changed: &mut bool,
641    ) -> Result<()> {
642        let asked_for: HashSet<_> = match request {
643            ClientRequest::AuthCert(a) => a.keys().collect(),
644            _ => return Err(internal!("expected an AuthCert request").into()),
645        };
646
647        let mut nonfatal_error = None;
648        let mut newcerts = Vec::new();
649        for cert in
650            AuthCert::parse_multiple(text).map_err(|e| Error::from_netdoc(source.clone(), e))?
651        {
652            match self.check_parsed_certificate(cert, &source, text) {
653                Ok((cert, cert_text)) => {
654                    newcerts.push((cert, cert_text));
655                }
656                Err(e) => {
657                    warn_report!(e, "Problem with certificate received from {}", &source);
658                    nonfatal_error.get_or_insert(e);
659                }
660            }
661        }
662
663        // Now discard any certs we didn't ask for.
664        let len_orig = newcerts.len();
665        newcerts.retain(|(cert, _)| asked_for.contains(cert.key_ids()));
666        if newcerts.len() != len_orig {
667            warn!(
668                "Discarding certificates from {} that we didn't ask for.",
669                source
670            );
671            nonfatal_error.get_or_insert(Error::Unwanted("Certificate we didn't request"));
672        }
673
674        // We want to exit early if we aren't saving any certificates.
675        if newcerts.is_empty() {
676            return opt_err_to_result(nonfatal_error);
677        }
678
679        if let Some(store) = storage {
680            // Write the certificates to the store.
681            let v: Vec<_> = newcerts[..]
682                .iter()
683                .map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
684                .collect();
685            let mut w = store.lock().expect("Directory storage lock poisoned");
686            w.store_authcerts(&v[..])?;
687        }
688
689        // Remember the certificates in this state, and remove them
690        // from our list of missing certs.
691        for (cert, _) in newcerts {
692            let ids = cert.key_ids();
693            if self.missing_certs.contains(ids) {
694                self.missing_certs.remove(ids);
695                self.certs.push(cert);
696                *changed = true;
697            }
698        }
699
700        if *changed {
701            self.try_checking_sigs()?;
702        }
703        opt_err_to_result(nonfatal_error)
704    }
705
706    fn advance(self: Box<Self>) -> Box<dyn DirState> {
707        use GetCertsConsensus::*;
708        match self.consensus {
709            Validated(validated) => Box::new(GetMicrodescsState::new(
710                self.cache_usage,
711                validated,
712                self.consensus_meta,
713                self.rt,
714                self.config,
715                self.prev_netdir,
716                #[cfg(feature = "dirfilter")]
717                self.filter,
718            )),
719            _ => self,
720        }
721    }
722
723    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
724        self.protocol_statuses.as_ref().map(|(timestamp, protos)| {
725            NetDirChange::SetRequiredProtocol {
726                timestamp: *timestamp,
727                protos: Arc::clone(protos),
728            }
729        })
730    }
731
732    fn reset_time(&self) -> Option<SystemTime> {
733        Some(
734            self.consensus_meta.lifetime().valid_until()
735                + self.config.tolerance.post_valid_tolerance,
736        )
737    }
738    fn reset(self: Box<Self>) -> Box<dyn DirState> {
739        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
740            // Cache only means we can't ever download.
741            CacheUsage::CacheOnly
742        } else {
743            // If we reset in this state, we should always go to "must
744            // download": Either we've failed to get the certs we needed, or we
745            // have found that the consensus wasn't valid.  Either case calls
746            // for a fresh consensus download attempt.
747            CacheUsage::MustDownload
748        };
749
750        Box::new(GetConsensusState::new(
751            self.rt,
752            self.config,
753            cache_usage,
754            self.prev_netdir,
755            #[cfg(feature = "dirfilter")]
756            self.filter,
757        ))
758    }
759}
760
761/// Final state: we're fetching or loading microdescriptors
762#[derive(Debug, Clone)]
763struct GetMicrodescsState<R: Runtime> {
764    /// How should we get the consensus from the cache, if at all?
765    cache_usage: CacheUsage,
766    /// Total number of microdescriptors listed in the consensus.
767    n_microdescs: usize,
768    /// The current status of our netdir.
769    partial: PendingNetDir,
770    /// Metadata for the current consensus.
771    meta: ConsensusMeta,
772    /// A pending list of microdescriptor digests whose
773    /// "last-listed-at" times we should update.
774    newly_listed: Vec<MdDigest>,
775    /// A time after which we should try to replace this directory and
776    /// find a new one.  Since this is randomized, we only compute it
777    /// once.
778    reset_time: SystemTime,
779
780    /// A `Runtime` implementation.
781    rt: R,
782    /// The configuration of the directory manager. Used for download configuration
783    /// purposes.
784    config: Arc<DirMgrConfig>,
785    /// If one exists, the netdir we're trying to update.
786    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
787
788    /// A filter that gets applied to directory objects before we use them.
789    #[cfg(feature = "dirfilter")]
790    filter: Arc<dyn crate::filter::DirFilter>,
791}
792
793/// Information about a network directory that might not be ready to become _the_ current network
794/// directory.
795#[derive(Debug, Clone)]
796enum PendingNetDir {
797    /// A NetDir for which we have a consensus, but not enough microdescriptors.
798    Partial(PartialNetDir),
799    /// A NetDir we're either trying to get our caller to replace, or that the caller
800    /// has already taken from us.
801    ///
802    /// After the netdir gets taken, the `collected_microdescs` and `missing_microdescs`
803    /// fields get used. Before then, we just do operations on the netdir.
804    Yielding {
805        /// The actual netdir. This starts out as `Some`, but our caller can `take()` it
806        /// from us.
807        netdir: Option<NetDir>,
808        /// Microdescs we have collected in order to yield to our caller.
809        collected_microdescs: Vec<Microdesc>,
810        /// Which microdescs we need for the netdir that either is or used to be in `netdir`.
811        ///
812        /// NOTE(eta): This MUST always match the netdir's own idea of which microdescs we need.
813        ///            We do this by copying the netdir's missing microdescs into here when we
814        ///            instantiate it.
815        ///            (This code assumes that it doesn't add more needed microdescriptors later!)
816        missing_microdescs: HashSet<MdDigest>,
817        /// The time at which we should renew this netdir, assuming we have
818        /// driven it to a "usable" state.
819        replace_dir_time: SystemTime,
820    },
821    /// A dummy value, so we can use `mem::replace`.
822    Dummy,
823}
824
825impl MdReceiver for PendingNetDir {
826    fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
827        match self {
828            PendingNetDir::Partial(partial) => partial.missing_microdescs(),
829            PendingNetDir::Yielding {
830                netdir,
831                missing_microdescs,
832                ..
833            } => {
834                if let Some(nd) = netdir.as_ref() {
835                    nd.missing_microdescs()
836                } else {
837                    Box::new(missing_microdescs.iter())
838                }
839            }
840            PendingNetDir::Dummy => unreachable!(),
841        }
842    }
843
844    fn add_microdesc(&mut self, md: Microdesc) -> bool {
845        match self {
846            PendingNetDir::Partial(partial) => partial.add_microdesc(md),
847            PendingNetDir::Yielding {
848                netdir,
849                missing_microdescs,
850                collected_microdescs,
851                ..
852            } => {
853                let wanted = missing_microdescs.remove(md.digest());
854                if let Some(nd) = netdir.as_mut() {
855                    let nd_wanted = nd.add_microdesc(md);
856                    // This shouldn't ever happen; if it does, our invariants are violated.
857                    debug_assert_eq!(wanted, nd_wanted);
858                    nd_wanted
859                } else {
860                    collected_microdescs.push(md);
861                    wanted
862                }
863            }
864            PendingNetDir::Dummy => unreachable!(),
865        }
866    }
867
868    fn n_missing(&self) -> usize {
869        match self {
870            PendingNetDir::Partial(partial) => partial.n_missing(),
871            PendingNetDir::Yielding {
872                netdir,
873                missing_microdescs,
874                ..
875            } => {
876                if let Some(nd) = netdir.as_ref() {
877                    // This shouldn't ever happen; if it does, our invariants are violated.
878                    debug_assert_eq!(nd.n_missing(), missing_microdescs.len());
879                    nd.n_missing()
880                } else {
881                    missing_microdescs.len()
882                }
883            }
884            PendingNetDir::Dummy => unreachable!(),
885        }
886    }
887}
888
889impl PendingNetDir {
890    /// If this PendingNetDir is Partial and could not be partial, upgrade it.
891    fn upgrade_if_necessary(&mut self) {
892        if matches!(self, PendingNetDir::Partial(..)) {
893            match mem::replace(self, PendingNetDir::Dummy) {
894                PendingNetDir::Partial(p) => match p.unwrap_if_sufficient() {
895                    Ok(nd) => {
896                        let missing: HashSet<_> = nd.missing_microdescs().copied().collect();
897                        let replace_dir_time = pick_download_time(nd.lifetime());
898                        debug!(
899                            "Consensus now usable, with {} microdescriptors missing. \
900                                The current consensus is fresh until {}, and valid until {}. \
901                                I've picked {} as the earliest time to replace it.",
902                            missing.len(),
903                            OffsetDateTime::from(nd.lifetime().fresh_until()),
904                            OffsetDateTime::from(nd.lifetime().valid_until()),
905                            OffsetDateTime::from(replace_dir_time)
906                        );
907                        *self = PendingNetDir::Yielding {
908                            netdir: Some(nd),
909                            collected_microdescs: vec![],
910                            missing_microdescs: missing,
911                            replace_dir_time,
912                        };
913                    }
914                    Err(p) => {
915                        *self = PendingNetDir::Partial(p);
916                    }
917                },
918                _ => unreachable!(),
919            }
920        }
921        assert!(!matches!(self, PendingNetDir::Dummy));
922    }
923}
924
925impl<R: Runtime> GetMicrodescsState<R> {
926    /// Create a new [`GetMicrodescsState`] from a provided
927    /// microdescriptor consensus.
928    fn new(
929        cache_usage: CacheUsage,
930        consensus: MdConsensus,
931        meta: ConsensusMeta,
932        rt: R,
933        config: Arc<DirMgrConfig>,
934        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
935        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
936    ) -> Self {
937        let reset_time = consensus.lifetime().valid_until() + config.tolerance.post_valid_tolerance;
938        let n_microdescs = consensus.relays().len();
939
940        let params = &config.override_net_params;
941        #[cfg(not(feature = "geoip"))]
942        let mut partial_dir = PartialNetDir::new(consensus, Some(params));
943        // TODO(eta): Make this embedded database configurable using the `DirMgrConfig`.
944        #[cfg(feature = "geoip")]
945        let mut partial_dir =
946            PartialNetDir::new_with_geoip(consensus, Some(params), &GeoipDb::new_embedded());
947
948        if let Some(old_dir) = prev_netdir.as_ref().and_then(|x| x.get_netdir()) {
949            partial_dir.fill_from_previous_netdir(old_dir);
950        }
951
952        // Always upgrade at least once: otherwise, we won't notice we're ready unless we
953        // add a microdescriptor.
954        let mut partial = PendingNetDir::Partial(partial_dir);
955        partial.upgrade_if_necessary();
956
957        GetMicrodescsState {
958            cache_usage,
959            n_microdescs,
960            partial,
961            meta,
962            newly_listed: Vec::new(),
963            reset_time,
964            rt,
965            config,
966            prev_netdir,
967
968            #[cfg(feature = "dirfilter")]
969            filter,
970        }
971    }
972
973    /// Add a bunch of microdescriptors to the in-progress netdir.
974    fn register_microdescs<I>(&mut self, mds: I, _source: &DocSource, changed: &mut bool)
975    where
976        I: IntoIterator<Item = Microdesc>,
977    {
978        #[cfg(feature = "dirfilter")]
979        let mds: Vec<Microdesc> = mds
980            .into_iter()
981            .filter_map(|m| self.filter.filter_md(m).ok())
982            .collect();
983        let is_partial = matches!(self.partial, PendingNetDir::Partial(..));
984        for md in mds {
985            if is_partial {
986                self.newly_listed.push(*md.digest());
987            }
988            self.partial.add_microdesc(md);
989            *changed = true;
990        }
991        self.partial.upgrade_if_necessary();
992    }
993}
994
995impl<R: Runtime> DirState for GetMicrodescsState<R> {
996    fn describe(&self) -> String {
997        format!(
998            "Downloading microdescriptors (we are missing {}).",
999            self.partial.n_missing()
1000        )
1001    }
1002    fn missing_docs(&self) -> Vec<DocId> {
1003        self.partial
1004            .missing_microdescs()
1005            .map(|d| DocId::Microdesc(*d))
1006            .collect()
1007    }
1008    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
1009        match self.partial {
1010            PendingNetDir::Yielding {
1011                ref mut netdir,
1012                ref mut collected_microdescs,
1013                ..
1014            } => {
1015                if netdir.is_some() {
1016                    Some(NetDirChange::AttemptReplace {
1017                        netdir,
1018                        consensus_meta: &self.meta,
1019                    })
1020                } else {
1021                    collected_microdescs
1022                        .is_empty()
1023                        .then_some(NetDirChange::AddMicrodescs(collected_microdescs))
1024                }
1025            }
1026            _ => None,
1027        }
1028    }
1029    fn is_ready(&self, ready: Readiness) -> bool {
1030        match ready {
1031            Readiness::Complete => self.partial.n_missing() == 0,
1032            Readiness::Usable => {
1033                // We're "usable" if the calling code thought our netdir was usable enough to
1034                // steal it.
1035                matches!(self.partial, PendingNetDir::Yielding { ref netdir, .. } if netdir.is_none())
1036            }
1037        }
1038    }
1039    fn can_advance(&self) -> bool {
1040        false
1041    }
1042    fn bootstrap_progress(&self) -> DirProgress {
1043        let n_present = self.n_microdescs - self.partial.n_missing();
1044        DirProgress::Validated {
1045            lifetime: self.meta.lifetime().clone(),
1046            usable_lifetime: self.config.tolerance.extend_lifetime(self.meta.lifetime()),
1047            n_mds: (n_present as u32, self.n_microdescs as u32),
1048            usable: self.is_ready(Readiness::Usable),
1049        }
1050    }
1051    fn dl_config(&self) -> DownloadSchedule {
1052        self.config.schedule.retry_microdescs
1053    }
1054    fn add_from_cache(
1055        &mut self,
1056        docs: HashMap<DocId, DocumentText>,
1057        changed: &mut bool,
1058    ) -> Result<()> {
1059        let mut microdescs = Vec::new();
1060        for (id, text) in docs {
1061            if let DocId::Microdesc(digest) = id {
1062                if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
1063                    if md.digest() == &digest {
1064                        microdescs.push(md);
1065                        continue;
1066                    }
1067                }
1068                warn!("Found a mismatched microdescriptor in cache; ignoring");
1069            }
1070        }
1071
1072        self.register_microdescs(microdescs, &DocSource::LocalCache, changed);
1073        Ok(())
1074    }
1075
1076    fn add_from_download(
1077        &mut self,
1078        text: &str,
1079        request: &ClientRequest,
1080        source: DocSource,
1081        storage: Option<&Mutex<DynStore>>,
1082        changed: &mut bool,
1083    ) -> Result<()> {
1084        let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
1085            req.digests().collect()
1086        } else {
1087            return Err(internal!("expected a microdesc request").into());
1088        };
1089        let mut new_mds = Vec::new();
1090        let mut nonfatal_err = None;
1091
1092        for anno in MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
1093            .map_err(|e| Error::from_netdoc(source.clone(), e))?
1094        {
1095            let anno = match anno {
1096                Err(e) => {
1097                    nonfatal_err.get_or_insert_with(|| Error::from_netdoc(source.clone(), e));
1098                    continue;
1099                }
1100                Ok(a) => a,
1101            };
1102            let txt = anno
1103                .within(text)
1104                .expect("microdesc not from within text as expected");
1105            let md = anno.into_microdesc();
1106            if !requested.contains(md.digest()) {
1107                warn!(
1108                    "Received microdescriptor from {} we did not ask for: {:?}",
1109                    source,
1110                    md.digest()
1111                );
1112                nonfatal_err.get_or_insert(Error::Unwanted("un-requested microdescriptor"));
1113                continue;
1114            }
1115            new_mds.push((txt, md));
1116        }
1117
1118        let mark_listed = self.meta.lifetime().valid_after();
1119        if let Some(store) = storage {
1120            let mut s = store
1121                .lock()
1122                //.get_mut()
1123                .expect("Directory storage lock poisoned");
1124            if !self.newly_listed.is_empty() {
1125                s.update_microdescs_listed(&self.newly_listed, mark_listed)?;
1126                self.newly_listed.clear();
1127            }
1128            if !new_mds.is_empty() {
1129                s.store_microdescs(
1130                    &new_mds
1131                        .iter()
1132                        .map(|(text, md)| (*text, md.digest()))
1133                        .collect::<Vec<_>>(),
1134                    mark_listed,
1135                )?;
1136            }
1137        }
1138
1139        self.register_microdescs(new_mds.into_iter().map(|(_, md)| md), &source, changed);
1140
1141        opt_err_to_result(nonfatal_err)
1142    }
1143    fn advance(self: Box<Self>) -> Box<dyn DirState> {
1144        self
1145    }
1146    fn reset_time(&self) -> Option<SystemTime> {
1147        // TODO(nickm): The reset logic is a little wonky here: we don't truly
1148        // want to _reset_ this state at `replace_dir_time`.  In fact, we ought
1149        // to be able to have multiple states running in parallel: one filling
1150        // in the mds for an old consensus, and one trying to fetch a better
1151        // one.  That's likely to require some amount of refactoring of the
1152        // bootstrap code.
1153
1154        Some(match self.partial {
1155            // If the client has taken a completed netdir, the netdir is now
1156            // usable: We can reset our download attempt when we choose to try
1157            // to replace this directory.
1158            PendingNetDir::Yielding {
1159                replace_dir_time,
1160                netdir: None,
1161                ..
1162            } => replace_dir_time,
1163            // We don't have a completed netdir: Keep trying to fill this one in
1164            // until it is _definitely_ unusable.  (Our clock might be skewed;
1165            // there might be no up-to-date consensus.)
1166            _ => self.reset_time,
1167        })
1168    }
1169    fn reset(self: Box<Self>) -> Box<dyn DirState> {
1170        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
1171            // Cache only means we can't ever download.
1172            CacheUsage::CacheOnly
1173        } else if self.is_ready(Readiness::Usable) {
1174            // If we managed to bootstrap a usable consensus, then we won't
1175            // accept our next consensus from the cache.
1176            CacheUsage::MustDownload
1177        } else {
1178            // If we didn't manage to bootstrap a usable consensus, then we can
1179            // indeed try again with the one in the cache.
1180            // TODO(nickm) is this right?
1181            CacheUsage::CacheOkay
1182        };
1183        Box::new(GetConsensusState::new(
1184            self.rt,
1185            self.config,
1186            cache_usage,
1187            self.prev_netdir,
1188            #[cfg(feature = "dirfilter")]
1189            self.filter,
1190        ))
1191    }
1192}
1193
1194/// Choose a random download time to replace a consensus whose lifetime
1195/// is `lifetime`.
1196fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
1197    let (lowbound, uncertainty) = client_download_range(lifetime);
1198    lowbound + rand::rng().gen_range_infallible(..=uncertainty)
1199}
1200
1201/// Based on the lifetime for a consensus, return the time range during which
1202/// clients should fetch the next one.
1203fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
1204    let valid_after = lt.valid_after();
1205    let valid_until = lt.valid_until();
1206    let voting_interval = lt.voting_period();
1207    let whole_lifetime = valid_until
1208        .duration_since(valid_after)
1209        .expect("valid-after must precede valid-until");
1210
1211    // From dir-spec:
1212    // "This time is chosen uniformly at random from the interval
1213    // between the time 3/4 into the first interval after the
1214    // consensus is no longer fresh, and 7/8 of the time remaining
1215    // after that before the consensus is invalid."
1216    let lowbound = voting_interval + (voting_interval * 3) / 4;
1217    let remainder = whole_lifetime - lowbound;
1218    let uncertainty = (remainder * 7) / 8;
1219
1220    (valid_after + lowbound, uncertainty)
1221}
1222
1223/// If `err` is some, return `Err(err)`.  Otherwise return Ok(()).
1224fn opt_err_to_result(e: Option<Error>) -> Result<()> {
1225    match e {
1226        Some(e) => Err(e),
1227        None => Ok(()),
1228    }
1229}
1230
1231/// A dummy state implementation, used when we need to temporarily write a
1232/// placeholder into a box.
1233///
1234/// Calling any method on this state will panic.
1235#[derive(Clone, Debug)]
1236pub(crate) struct PoisonedState;
1237
1238impl DirState for PoisonedState {
1239    fn describe(&self) -> String {
1240        unimplemented!()
1241    }
1242    fn missing_docs(&self) -> Vec<DocId> {
1243        unimplemented!()
1244    }
1245    fn is_ready(&self, _ready: Readiness) -> bool {
1246        unimplemented!()
1247    }
1248    fn can_advance(&self) -> bool {
1249        unimplemented!()
1250    }
1251    fn add_from_cache(
1252        &mut self,
1253        _docs: HashMap<DocId, DocumentText>,
1254        _changed: &mut bool,
1255    ) -> Result<()> {
1256        unimplemented!()
1257    }
1258    fn add_from_download(
1259        &mut self,
1260        _text: &str,
1261        _request: &ClientRequest,
1262        _source: DocSource,
1263        _storage: Option<&Mutex<DynStore>>,
1264        _changed: &mut bool,
1265    ) -> Result<()> {
1266        unimplemented!()
1267    }
1268    fn bootstrap_progress(&self) -> event::DirProgress {
1269        unimplemented!()
1270    }
1271    fn dl_config(&self) -> DownloadSchedule {
1272        unimplemented!()
1273    }
1274    fn advance(self: Box<Self>) -> Box<dyn DirState> {
1275        unimplemented!()
1276    }
1277    fn reset_time(&self) -> Option<SystemTime> {
1278        unimplemented!()
1279    }
1280    fn reset(self: Box<Self>) -> Box<dyn DirState> {
1281        unimplemented!()
1282    }
1283}
1284
1285#[cfg(test)]
1286mod test {
1287    // @@ begin test lint list maintained by maint/add_warning @@
1288    #![allow(clippy::bool_assert_comparison)]
1289    #![allow(clippy::clone_on_copy)]
1290    #![allow(clippy::dbg_macro)]
1291    #![allow(clippy::mixed_attributes_style)]
1292    #![allow(clippy::print_stderr)]
1293    #![allow(clippy::print_stdout)]
1294    #![allow(clippy::single_char_pattern)]
1295    #![allow(clippy::unwrap_used)]
1296    #![allow(clippy::unchecked_duration_subtraction)]
1297    #![allow(clippy::useless_vec)]
1298    #![allow(clippy::needless_pass_by_value)]
1299    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1300    #![allow(clippy::cognitive_complexity)]
1301    use super::*;
1302    use crate::{Authority, AuthorityBuilder, DownloadScheduleConfig};
1303    use std::convert::TryInto;
1304    use std::sync::Arc;
1305    use tempfile::TempDir;
1306    use time::macros::datetime;
1307    use tor_netdoc::doc::authcert::AuthCertKeyIds;
1308    use tor_rtcompat::RuntimeSubstExt as _;
1309    #[allow(deprecated)] // TODO #1885
1310    use tor_rtmock::time::MockSleepProvider;
1311
1312    #[test]
1313    fn download_schedule() {
1314        let va = datetime!(2008-08-02 20:00 UTC).into();
1315        let fu = datetime!(2008-08-02 21:00 UTC).into();
1316        let vu = datetime!(2008-08-02 23:00 UTC).into();
1317        let lifetime = Lifetime::new(va, fu, vu).unwrap();
1318
1319        let expected_start: SystemTime = datetime!(2008-08-02 21:45 UTC).into();
1320        let expected_range = Duration::from_millis((75 * 60 * 1000) * 7 / 8);
1321
1322        let (start, range) = client_download_range(&lifetime);
1323        assert_eq!(start, expected_start);
1324        assert_eq!(range, expected_range);
1325
1326        for _ in 0..100 {
1327            let when = pick_download_time(&lifetime);
1328            assert!(when > va);
1329            assert!(when >= expected_start);
1330            assert!(when < vu);
1331            assert!(when <= expected_start + range);
1332        }
1333    }
1334
1335    /// Makes a memory-backed storage.
1336    fn temp_store() -> (TempDir, Mutex<DynStore>) {
1337        let tempdir = TempDir::new().unwrap();
1338
1339        let store = crate::storage::SqliteStore::from_path_and_mistrust(
1340            tempdir.path(),
1341            &fs_mistrust::Mistrust::new_dangerously_trust_everyone(),
1342            false,
1343        )
1344        .unwrap();
1345
1346        (tempdir, Mutex::new(Box::new(store)))
1347    }
1348
1349    fn make_time_shifted_runtime(now: SystemTime, rt: impl Runtime) -> impl Runtime {
1350        #[allow(deprecated)] // TODO #1885
1351        let msp = MockSleepProvider::new(now);
1352        rt.with_sleep_provider(msp.clone())
1353            .with_coarse_time_provider(msp)
1354    }
1355
1356    fn make_dirmgr_config(authorities: Option<Vec<AuthorityBuilder>>) -> Arc<DirMgrConfig> {
1357        let mut netcfg = crate::NetworkConfig::builder();
1358        netcfg.set_fallback_caches(vec![]);
1359        if let Some(a) = authorities {
1360            netcfg.set_authorities(a);
1361        }
1362        let cfg = DirMgrConfig {
1363            cache_dir: "/we_will_never_use_this/".into(),
1364            network: netcfg.build().unwrap(),
1365            ..Default::default()
1366        };
1367        Arc::new(cfg)
1368    }
1369
1370    // Test data
1371    const CONSENSUS: &str = include_str!("../testdata/mdconsensus1.txt");
1372    const CONSENSUS2: &str = include_str!("../testdata/mdconsensus2.txt");
1373    const AUTHCERT_5696: &str = include_str!("../testdata/cert-5696.txt");
1374    const AUTHCERT_5A23: &str = include_str!("../testdata/cert-5A23.txt");
1375    #[allow(unused)]
1376    const AUTHCERT_7C47: &str = include_str!("../testdata/cert-7C47.txt");
1377    fn test_time() -> SystemTime {
1378        datetime!(2020-08-07 12:42:45 UTC).into()
1379    }
1380    fn rsa(s: &str) -> RsaIdentity {
1381        RsaIdentity::from_hex(s).unwrap()
1382    }
1383    fn test_authorities() -> Vec<AuthorityBuilder> {
1384        fn a(s: &str) -> AuthorityBuilder {
1385            Authority::builder().name("ignore").v3ident(rsa(s)).clone()
1386        }
1387        vec![
1388            a("5696AB38CB3852AFA476A5C07B2D4788963D5567"),
1389            a("5A23BA701776C9C1AB1C06E734E92AB3D5350D64"),
1390            // This is an authority according to the consensus, but we'll
1391            // pretend we don't recognize it, to make sure that we
1392            // don't fetch or accept it.
1393            // a("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
1394        ]
1395    }
1396    fn authcert_id_5696() -> AuthCertKeyIds {
1397        AuthCertKeyIds {
1398            id_fingerprint: rsa("5696ab38cb3852afa476a5c07b2d4788963d5567"),
1399            sk_fingerprint: rsa("f6ed4aa64d83caede34e19693a7fcf331aae8a6a"),
1400        }
1401    }
1402    fn authcert_id_5a23() -> AuthCertKeyIds {
1403        AuthCertKeyIds {
1404            id_fingerprint: rsa("5a23ba701776c9c1ab1c06e734e92ab3d5350d64"),
1405            sk_fingerprint: rsa("d08e965cc6dcb6cb6ed776db43e616e93af61177"),
1406        }
1407    }
1408    // remember, we're saying that we don't recognize this one as an authority.
1409    fn authcert_id_7c47() -> AuthCertKeyIds {
1410        AuthCertKeyIds {
1411            id_fingerprint: rsa("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
1412            sk_fingerprint: rsa("D3C013E0E6C82E246090D1C0798B75FCB7ACF120"),
1413        }
1414    }
1415    fn microdescs() -> HashMap<MdDigest, String> {
1416        const MICRODESCS: &str = include_str!("../testdata/microdescs.txt");
1417        let text = MICRODESCS;
1418        MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
1419            .unwrap()
1420            .map(|res| {
1421                let anno = res.unwrap();
1422                let text = anno.within(text).unwrap();
1423                let md = anno.into_microdesc();
1424                (*md.digest(), text.to_owned())
1425            })
1426            .collect()
1427    }
1428
1429    #[test]
1430    fn get_consensus_state() {
1431        tor_rtcompat::test_with_one_runtime!(|rt| async move {
1432            let rt = make_time_shifted_runtime(test_time(), rt);
1433            let cfg = make_dirmgr_config(None);
1434
1435            let (_tempdir, store) = temp_store();
1436
1437            let mut state = GetConsensusState::new(
1438                rt.clone(),
1439                cfg,
1440                CacheUsage::CacheOkay,
1441                None,
1442                #[cfg(feature = "dirfilter")]
1443                Arc::new(crate::filter::NilFilter),
1444            );
1445
1446            // Is description okay?
1447            assert_eq!(&state.describe(), "Looking for a consensus.");
1448
1449            // Basic properties: without a consensus it is not ready to advance.
1450            assert!(!state.can_advance());
1451            assert!(!state.is_ready(Readiness::Complete));
1452            assert!(!state.is_ready(Readiness::Usable));
1453
1454            // Basic properties: it doesn't want to reset.
1455            assert!(state.reset_time().is_none());
1456
1457            // Its starting DirStatus is "fetching a consensus".
1458            assert_eq!(
1459                state.bootstrap_progress().to_string(),
1460                "fetching a consensus"
1461            );
1462
1463            // Download configuration is simple: only 1 request can be done in
1464            // parallel.  It uses a consensus retry schedule.
1465            let retry = state.dl_config();
1466            assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
1467
1468            // Do we know what we want?
1469            let docs = state.missing_docs();
1470            assert_eq!(docs.len(), 1);
1471            let docid = docs[0];
1472
1473            assert!(matches!(
1474                docid,
1475                DocId::LatestConsensus {
1476                    flavor: ConsensusFlavor::Microdesc,
1477                    cache_usage: CacheUsage::CacheOkay,
1478                }
1479            ));
1480            let source = DocSource::DirServer { source: None };
1481
1482            // Now suppose that we get some complete junk from a download.
1483            let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
1484            let req = crate::docid::ClientRequest::Consensus(req);
1485            let mut changed = false;
1486            let outcome = state.add_from_download(
1487                "this isn't a consensus",
1488                &req,
1489                source.clone(),
1490                Some(&store),
1491                &mut changed,
1492            );
1493            assert!(matches!(outcome, Err(Error::NetDocError { .. })));
1494            assert!(!changed);
1495            // make sure it wasn't stored...
1496            assert!(store
1497                .lock()
1498                .unwrap()
1499                .latest_consensus(ConsensusFlavor::Microdesc, None)
1500                .unwrap()
1501                .is_none());
1502
1503            // Now try again, with a real consensus... but the wrong authorities.
1504            let mut changed = false;
1505            let outcome = state.add_from_download(
1506                CONSENSUS,
1507                &req,
1508                source.clone(),
1509                Some(&store),
1510                &mut changed,
1511            );
1512            assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
1513            assert!(!changed);
1514            assert!(store
1515                .lock()
1516                .unwrap()
1517                .latest_consensus(ConsensusFlavor::Microdesc, None)
1518                .unwrap()
1519                .is_none());
1520
1521            // Great. Change the receiver to use a configuration where these test
1522            // authorities are recognized.
1523            let cfg = make_dirmgr_config(Some(test_authorities()));
1524
1525            let mut state = GetConsensusState::new(
1526                rt.clone(),
1527                cfg,
1528                CacheUsage::CacheOkay,
1529                None,
1530                #[cfg(feature = "dirfilter")]
1531                Arc::new(crate::filter::NilFilter),
1532            );
1533            let mut changed = false;
1534            let outcome =
1535                state.add_from_download(CONSENSUS, &req, source, Some(&store), &mut changed);
1536            assert!(outcome.is_ok());
1537            assert!(changed);
1538            assert!(store
1539                .lock()
1540                .unwrap()
1541                .latest_consensus(ConsensusFlavor::Microdesc, None)
1542                .unwrap()
1543                .is_some());
1544
1545            // And with that, we should be asking for certificates
1546            assert!(state.can_advance());
1547            assert_eq!(&state.describe(), "About to fetch certificates.");
1548            assert_eq!(state.missing_docs(), Vec::new());
1549            let next = Box::new(state).advance();
1550            assert_eq!(
1551                &next.describe(),
1552                "Downloading certificates for consensus (we are missing 2/2)."
1553            );
1554
1555            // Try again, but this time get the state from the cache.
1556            let cfg = make_dirmgr_config(Some(test_authorities()));
1557            let mut state = GetConsensusState::new(
1558                rt,
1559                cfg,
1560                CacheUsage::CacheOkay,
1561                None,
1562                #[cfg(feature = "dirfilter")]
1563                Arc::new(crate::filter::NilFilter),
1564            );
1565            let text: crate::storage::InputString = CONSENSUS.to_owned().into();
1566            let map = vec![(docid, text.into())].into_iter().collect();
1567            let mut changed = false;
1568            let outcome = state.add_from_cache(map, &mut changed);
1569            assert!(outcome.is_ok());
1570            assert!(changed);
1571            assert!(state.can_advance());
1572        });
1573    }
1574
1575    #[test]
1576    fn get_certs_state() {
1577        tor_rtcompat::test_with_one_runtime!(|rt| async move {
1578            /// Construct a GetCertsState with our test data
1579            fn new_getcerts_state(rt: impl Runtime) -> Box<dyn DirState> {
1580                let rt = make_time_shifted_runtime(test_time(), rt);
1581                let cfg = make_dirmgr_config(Some(test_authorities()));
1582                let mut state = GetConsensusState::new(
1583                    rt,
1584                    cfg,
1585                    CacheUsage::CacheOkay,
1586                    None,
1587                    #[cfg(feature = "dirfilter")]
1588                    Arc::new(crate::filter::NilFilter),
1589                );
1590                let source = DocSource::DirServer { source: None };
1591                let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
1592                let req = crate::docid::ClientRequest::Consensus(req);
1593                let mut changed = false;
1594                let outcome = state.add_from_download(CONSENSUS, &req, source, None, &mut changed);
1595                assert!(outcome.is_ok());
1596                Box::new(state).advance()
1597            }
1598
1599            let (_tempdir, store) = temp_store();
1600            let mut state = new_getcerts_state(rt.clone());
1601            // Basic properties: description, status, reset time.
1602            assert_eq!(
1603                &state.describe(),
1604                "Downloading certificates for consensus (we are missing 2/2)."
1605            );
1606            assert!(!state.can_advance());
1607            assert!(!state.is_ready(Readiness::Complete));
1608            assert!(!state.is_ready(Readiness::Usable));
1609            let consensus_expires: SystemTime = datetime!(2020-08-07 12:43:20 UTC).into();
1610            let post_valid_tolerance = crate::DirTolerance::default().post_valid_tolerance;
1611            assert_eq!(
1612                state.reset_time(),
1613                Some(consensus_expires + post_valid_tolerance)
1614            );
1615            let retry = state.dl_config();
1616            assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
1617
1618            // Bootstrap status okay?
1619            assert_eq!(
1620                state.bootstrap_progress().to_string(),
1621                "fetching authority certificates (0/2)"
1622            );
1623
1624            // Check that we get the right list of missing docs.
1625            let missing = state.missing_docs();
1626            assert_eq!(missing.len(), 2); // We are missing two certificates.
1627            assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
1628            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
1629            // we don't ask for this one because we don't recognize its authority
1630            assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
1631
1632            // Add one from the cache; make sure the list is still right
1633            let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
1634            // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
1635            let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
1636                .into_iter()
1637                .collect();
1638            let mut changed = false;
1639            let outcome = state.add_from_cache(docs, &mut changed);
1640            assert!(changed);
1641            assert!(outcome.is_ok()); // no error, and something changed.
1642            assert!(!state.can_advance()); // But we aren't done yet.
1643            let missing = state.missing_docs();
1644            assert_eq!(missing.len(), 1); // Now we're only missing one!
1645            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
1646            assert_eq!(
1647                state.bootstrap_progress().to_string(),
1648                "fetching authority certificates (1/2)"
1649            );
1650
1651            // Now try to add the other from a download ... but fail
1652            // because we didn't ask for it.
1653            let source = DocSource::DirServer { source: None };
1654            let mut req = tor_dirclient::request::AuthCertRequest::new();
1655            req.push(authcert_id_5696()); // it's the wrong id.
1656            let req = ClientRequest::AuthCert(req);
1657            let mut changed = false;
1658            let outcome = state.add_from_download(
1659                AUTHCERT_5A23,
1660                &req,
1661                source.clone(),
1662                Some(&store),
1663                &mut changed,
1664            );
1665            assert!(matches!(outcome, Err(Error::Unwanted(_))));
1666            assert!(!changed);
1667            let missing2 = state.missing_docs();
1668            assert_eq!(missing, missing2); // No change.
1669            assert!(store
1670                .lock()
1671                .unwrap()
1672                .authcerts(&[authcert_id_5a23()])
1673                .unwrap()
1674                .is_empty());
1675
1676            // Now try to add the other from a download ... for real!
1677            let mut req = tor_dirclient::request::AuthCertRequest::new();
1678            req.push(authcert_id_5a23()); // Right idea this time!
1679            let req = ClientRequest::AuthCert(req);
1680            let mut changed = false;
1681            let outcome =
1682                state.add_from_download(AUTHCERT_5A23, &req, source, Some(&store), &mut changed);
1683            assert!(outcome.is_ok()); // No error, _and_ something changed!
1684            assert!(changed);
1685            let missing3 = state.missing_docs();
1686            assert!(missing3.is_empty());
1687            assert!(state.can_advance());
1688            assert!(!store
1689                .lock()
1690                .unwrap()
1691                .authcerts(&[authcert_id_5a23()])
1692                .unwrap()
1693                .is_empty());
1694
1695            let next = state.advance();
1696            assert_eq!(
1697                &next.describe(),
1698                "Downloading microdescriptors (we are missing 6)."
1699            );
1700
1701            // If we start from scratch and reset, we're back in GetConsensus.
1702            let state = new_getcerts_state(rt);
1703            let state = state.reset();
1704            assert_eq!(&state.describe(), "Downloading a consensus.");
1705
1706            // TODO: I'd like even more tests to make sure that we never
1707            // accept a certificate for an authority we don't believe in.
1708        });
1709    }
1710
1711    #[test]
1712    fn get_microdescs_state() {
1713        tor_rtcompat::test_with_one_runtime!(|rt| async move {
1714            /// Construct a GetCertsState with our test data
1715            fn new_getmicrodescs_state(rt: impl Runtime) -> GetMicrodescsState<impl Runtime> {
1716                let rt = make_time_shifted_runtime(test_time(), rt);
1717                let cfg = make_dirmgr_config(Some(test_authorities()));
1718                let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
1719                let consensus = consensus
1720                    .dangerously_assume_timely()
1721                    .dangerously_assume_wellsigned();
1722                let meta = ConsensusMeta::from_consensus(signed, rest, &consensus);
1723                GetMicrodescsState::new(
1724                    CacheUsage::CacheOkay,
1725                    consensus,
1726                    meta,
1727                    rt,
1728                    cfg,
1729                    None,
1730                    #[cfg(feature = "dirfilter")]
1731                    Arc::new(crate::filter::NilFilter),
1732                )
1733            }
1734            fn d64(s: &str) -> MdDigest {
1735                use base64ct::{Base64Unpadded, Encoding as _};
1736                Base64Unpadded::decode_vec(s).unwrap().try_into().unwrap()
1737            }
1738
1739            // If we start from scratch and reset, we're back in GetConsensus.
1740            let state = new_getmicrodescs_state(rt.clone());
1741            let state = Box::new(state).reset();
1742            assert_eq!(&state.describe(), "Looking for a consensus.");
1743
1744            // Check the basics.
1745            let mut state = new_getmicrodescs_state(rt.clone());
1746            assert_eq!(
1747                &state.describe(),
1748                "Downloading microdescriptors (we are missing 4)."
1749            );
1750            assert!(!state.can_advance());
1751            assert!(!state.is_ready(Readiness::Complete));
1752            assert!(!state.is_ready(Readiness::Usable));
1753            {
1754                let reset_time = state.reset_time().unwrap();
1755                let fresh_until: SystemTime = datetime!(2021-10-27 21:27:00 UTC).into();
1756                let valid_until: SystemTime = datetime!(2021-10-27 21:27:20 UTC).into();
1757                assert!(reset_time >= fresh_until);
1758                assert!(reset_time <= valid_until + state.config.tolerance.post_valid_tolerance);
1759            }
1760            let retry = state.dl_config();
1761            assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs);
1762            assert_eq!(
1763                state.bootstrap_progress().to_string(),
1764                "fetching microdescriptors (0/4)"
1765            );
1766
1767            // Now check whether we're missing all the right microdescs.
1768            let missing = state.missing_docs();
1769            let md_text = microdescs();
1770            assert_eq!(missing.len(), 4);
1771            assert_eq!(md_text.len(), 4);
1772            let md1 = d64("LOXRj8YZP0kwpEAsYOvBZWZWGoWv5b/Bp2Mz2Us8d8g");
1773            let md2 = d64("iOhVp33NyZxMRDMHsVNq575rkpRViIJ9LN9yn++nPG0");
1774            let md3 = d64("/Cd07b3Bl0K0jX2/1cAvsYXJJMi5d8UBU+oWKaLxoGo");
1775            let md4 = d64("z+oOlR7Ga6cg9OoC/A3D3Ey9Rtc4OldhKlpQblMfQKo");
1776            for md_digest in [md1, md2, md3, md4] {
1777                assert!(missing.contains(&DocId::Microdesc(md_digest)));
1778                assert!(md_text.contains_key(&md_digest));
1779            }
1780
1781            // Try adding a microdesc from the cache.
1782            let (_tempdir, store) = temp_store();
1783            let doc1: crate::storage::InputString = md_text.get(&md1).unwrap().clone().into();
1784            let docs = vec![(DocId::Microdesc(md1), doc1.into())]
1785                .into_iter()
1786                .collect();
1787            let mut changed = false;
1788            let outcome = state.add_from_cache(docs, &mut changed);
1789            assert!(outcome.is_ok()); // successfully loaded one MD.
1790            assert!(changed);
1791            assert!(!state.can_advance());
1792            assert!(!state.is_ready(Readiness::Complete));
1793            assert!(!state.is_ready(Readiness::Usable));
1794
1795            // Now we should be missing 3.
1796            let missing = state.missing_docs();
1797            assert_eq!(missing.len(), 3);
1798            assert!(!missing.contains(&DocId::Microdesc(md1)));
1799            assert_eq!(
1800                state.bootstrap_progress().to_string(),
1801                "fetching microdescriptors (1/4)"
1802            );
1803
1804            // Try adding the rest as if from a download.
1805            let mut req = tor_dirclient::request::MicrodescRequest::new();
1806            let mut response = "".to_owned();
1807            for md_digest in [md2, md3, md4] {
1808                response.push_str(md_text.get(&md_digest).unwrap());
1809                req.push(md_digest);
1810            }
1811            let req = ClientRequest::Microdescs(req);
1812            let source = DocSource::DirServer { source: None };
1813            let mut changed = false;
1814            let outcome = state.add_from_download(
1815                response.as_str(),
1816                &req,
1817                source,
1818                Some(&store),
1819                &mut changed,
1820            );
1821            assert!(outcome.is_ok()); // successfully loaded MDs
1822            assert!(changed);
1823            match state.get_netdir_change().unwrap() {
1824                NetDirChange::AttemptReplace { netdir, .. } => {
1825                    assert!(netdir.take().is_some());
1826                }
1827                x => panic!("wrong netdir change: {:?}", x),
1828            }
1829            assert!(state.is_ready(Readiness::Complete));
1830            assert!(state.is_ready(Readiness::Usable));
1831            assert_eq!(
1832                store
1833                    .lock()
1834                    .unwrap()
1835                    .microdescs(&[md2, md3, md4])
1836                    .unwrap()
1837                    .len(),
1838                3
1839            );
1840
1841            let missing = state.missing_docs();
1842            assert!(missing.is_empty());
1843        });
1844    }
1845}