1
//! Implementation for the primary directory state machine.
2
//!
3
//! There are three (active) states that a download can be in: looking
4
//! for a consensus ([`GetConsensusState`]), looking for certificates
5
//! to validate that consensus ([`GetCertsState`]), and looking for
6
//! microdescriptors ([`GetMicrodescsState`]).
7
//!
8
//! These states have no contact with the network, and are purely
9
//! reactive to other code that drives them.  See the
10
//! [`bootstrap`](crate::bootstrap) module for functions that actually
11
//! load or download directory information.
12

            
13
use std::collections::{HashMap, HashSet};
14
use std::fmt::Debug;
15
use std::mem;
16
use std::sync::{Arc, Mutex};
17
use std::time::{Duration, SystemTime};
18
use time::OffsetDateTime;
19
use tor_basic_utils::RngExt as _;
20
use tor_error::{internal, warn_report};
21
use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
22
use tor_netdoc::doc::authcert::UncheckedAuthCert;
23
use tor_netdoc::doc::netstatus::{Lifetime, ProtoStatuses};
24
use tracing::{debug, warn};
25

            
26
use crate::event::DirProgress;
27

            
28
use crate::storage::DynStore;
29
use crate::{
30
    docmeta::{AuthCertMeta, ConsensusMeta},
31
    event,
32
    retry::DownloadSchedule,
33
    CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
34
};
35
use crate::{DocSource, SharedMutArc};
36
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
37
#[cfg(feature = "geoip")]
38
use tor_geoip::GeoipDb;
39
use tor_llcrypto::pk::rsa::RsaIdentity;
40
use tor_netdoc::doc::{
41
    microdesc::{MdDigest, Microdesc},
42
    netstatus::MdConsensus,
43
};
44
use tor_netdoc::{
45
    doc::{
46
        authcert::{AuthCert, AuthCertKeyIds},
47
        microdesc::MicrodescReader,
48
        netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
49
    },
50
    AllowAnnotations,
51
};
52
use tor_rtcompat::Runtime;
53

            
54
/// A change to the currently running `NetDir`, returned by the state machines in this module.
55
#[derive(Debug)]
56
pub(crate) enum NetDirChange<'a> {
57
    /// If the provided `NetDir` is suitable for use (i.e. the caller determines it can build
58
    /// circuits with it), replace the current `NetDir` with it.
59
    ///
60
    /// The caller must call `DirState::on_netdir_replaced` if the replace was successful.
61
    AttemptReplace {
62
        /// The netdir to replace the current one with, if it's usable.
63
        ///
64
        /// The `Option` is always `Some` when returned from the state machine; it's there
65
        /// so that the caller can call `.take()` to avoid cloning the netdir.
66
        netdir: &'a mut Option<NetDir>,
67
        /// The consensus metadata for this netdir.
68
        consensus_meta: &'a ConsensusMeta,
69
    },
70
    /// Add the provided microdescriptors to the current `NetDir`.
71
    AddMicrodescs(&'a mut Vec<Microdesc>),
72
    /// Replace the recommended set of subprotocols.
73
    SetRequiredProtocol {
74
        /// The time at which the protocol statuses were recommended
75
        timestamp: SystemTime,
76
        /// The recommended set of protocols.
77
        protos: Arc<ProtoStatuses>,
78
    },
79
}
80

            
81
/// A "state" object used to represent our progress in downloading a
82
/// directory.
83
///
84
/// These state objects are not meant to know about the network, or
85
/// how to fetch documents at all.  Instead, they keep track of what
86
/// information they are missing, and what to do when they get that
87
/// information.
88
///
89
/// Every state object has two possible transitions: "resetting", and
90
/// "advancing".  Advancing happens when a state has no more work to
91
/// do, and needs to transform into a different kind of object.
92
/// Resetting happens when this state needs to go back to an initial
93
/// state in order to start over -- either because of an error or
94
/// because the information it has downloaded is no longer timely.
95
pub(crate) trait DirState: Send {
96
    /// Return a human-readable description of this state.
97
    fn describe(&self) -> String;
98
    /// Return a list of the documents we're missing.
99
    ///
100
    /// If every document on this list were to be loaded or downloaded, then
101
    /// the state should either become "ready to advance", or "complete."
102
    ///
103
    /// This list should never _grow_ on a given state; only advancing
104
    /// or resetting the state should add new DocIds that weren't
105
    /// there before.
106
    fn missing_docs(&self) -> Vec<DocId>;
107
    /// Describe whether this state has reached `ready` status.
108
    fn is_ready(&self, ready: Readiness) -> bool;
109
    /// If the state object wants to make changes to the currently running `NetDir`,
110
    /// return the proposed changes.
111
16
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
112
16
        None
113
16
    }
114
    /// Return true if this state can advance to another state via its
115
    /// `advance` method.
116
    fn can_advance(&self) -> bool;
117
    /// Add one or more documents from our cache; returns 'true' if there
118
    /// was any change in this state.
119
    ///
120
    /// Set `changed` to true if any semantic changes in this state were made.
121
    ///
122
    /// An error return does not necessarily mean that no data was added;
123
    /// partial successes are possible.
124
    fn add_from_cache(
125
        &mut self,
126
        docs: HashMap<DocId, DocumentText>,
127
        changed: &mut bool,
128
    ) -> Result<()>;
129

            
130
    /// Add information that we have just downloaded to this state.
131
    ///
132
    /// This method receives a copy of the original request, and should reject
133
    /// any documents that do not pertain to it.
134
    ///
135
    /// If `storage` is provided, then we should write any accepted documents
136
    /// into `storage` so they can be saved in a cache.
137
    ///
138
    /// Set `changed` to true if any semantic changes in this state were made.
139
    ///
140
    /// An error return does not necessarily mean that no data was added;
141
    /// partial successes are possible.
142
    fn add_from_download(
143
        &mut self,
144
        text: &str,
145
        request: &ClientRequest,
146
        source: DocSource,
147
        storage: Option<&Mutex<DynStore>>,
148
        changed: &mut bool,
149
    ) -> Result<()>;
150
    /// Return a summary of this state as a [`DirProgress`].
151
    fn bootstrap_progress(&self) -> event::DirProgress;
152
    /// Return a configuration for attempting downloads.
153
    fn dl_config(&self) -> DownloadSchedule;
154
    /// If possible, advance to the next state.
155
    fn advance(self: Box<Self>) -> Box<dyn DirState>;
156
    /// Return a time (if any) when downloaders should stop attempting to
157
    /// advance this state, and should instead reset it and start over.
158
    fn reset_time(&self) -> Option<SystemTime>;
159
    /// Reset this state and start over.
160
    fn reset(self: Box<Self>) -> Box<dyn DirState>;
161
}
162

            
163
/// An object that can provide a previous netdir for the bootstrapping state machines to use.
164
pub(crate) trait PreviousNetDir: Send + Sync + 'static + Debug {
165
    /// Get the previous netdir, if there still is one.
166
    fn get_netdir(&self) -> Option<Arc<NetDir>>;
167
}
168

            
169
impl PreviousNetDir for SharedMutArc<NetDir> {
170
    fn get_netdir(&self) -> Option<Arc<NetDir>> {
171
        self.get()
172
    }
173
}
174

            
175
/// Initial state: fetching or loading a consensus directory.
176
#[derive(Clone, Debug)]
177
pub(crate) struct GetConsensusState<R: Runtime> {
178
    /// How should we get the consensus from the cache, if at all?
179
    cache_usage: CacheUsage,
180

            
181
    /// If present, a time after which we want our consensus to have
182
    /// been published.
183
    //
184
    // TODO: This is not yet used everywhere it could be.  In the future maybe
185
    // it should be inserted into the DocId::LatestConsensus  alternative rather
186
    // than being recalculated in make_consensus_request,
187
    after: Option<SystemTime>,
188

            
189
    /// If present, our next state.
190
    ///
191
    /// (This is present once we have a consensus.)
192
    next: Option<GetCertsState<R>>,
193

            
194
    /// A list of RsaIdentity for the authorities that we believe in.
195
    ///
196
    /// No consensus can be valid unless it purports to be signed by
197
    /// more than half of these authorities.
198
    authority_ids: Vec<RsaIdentity>,
199

            
200
    /// A `Runtime` implementation.
201
    rt: R,
202
    /// The configuration of the directory manager. Used for download configuration
203
    /// purposes.
204
    config: Arc<DirMgrConfig>,
205
    /// If one exists, the netdir we're trying to update.
206
    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
207

            
208
    /// A filter that gets applied to directory objects before we use them.
209
    #[cfg(feature = "dirfilter")]
210
    filter: Arc<dyn crate::filter::DirFilter>,
211
}
212

            
213
impl<R: Runtime> GetConsensusState<R> {
214
    /// Create a new `GetConsensusState`, using the cache as per `cache_usage` and downloading as
215
    /// per the relevant sections of `config`. If `prev_netdir` is supplied, information from that
216
    /// directory may be used to complete the next one.
217
14
    pub(crate) fn new(
218
14
        rt: R,
219
14
        config: Arc<DirMgrConfig>,
220
14
        cache_usage: CacheUsage,
221
14
        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
222
14
        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
223
14
    ) -> Self {
224
14
        let authority_ids = config
225
14
            .authorities()
226
14
            .iter()
227
42
            .map(|auth| auth.v3ident)
228
14
            .collect();
229
14
        let after = prev_netdir
230
14
            .as_ref()
231
14
            .and_then(|x| x.get_netdir())
232
14
            .map(|nd| nd.lifetime().valid_after());
233
14

            
234
14
        GetConsensusState {
235
14
            cache_usage,
236
14
            after,
237
14
            next: None,
238
14
            authority_ids,
239
14
            rt,
240
14
            config,
241
14
            prev_netdir,
242
14
            #[cfg(feature = "dirfilter")]
243
14
            filter,
244
14
        }
245
14
    }
246
}
247

            
248
impl<R: Runtime> DirState for GetConsensusState<R> {
249
8
    fn describe(&self) -> String {
250
8
        if self.next.is_some() {
251
2
            "About to fetch certificates."
252
        } else {
253
6
            match self.cache_usage {
254
                CacheUsage::CacheOnly => "Looking for a cached consensus.",
255
4
                CacheUsage::CacheOkay => "Looking for a consensus.",
256
2
                CacheUsage::MustDownload => "Downloading a consensus.",
257
            }
258
        }
259
8
        .to_string()
260
8
    }
261
4
    fn missing_docs(&self) -> Vec<DocId> {
262
4
        if self.can_advance() {
263
2
            return Vec::new();
264
2
        }
265
2
        let flavor = ConsensusFlavor::Microdesc;
266
2
        vec![DocId::LatestConsensus {
267
2
            flavor,
268
2
            cache_usage: self.cache_usage,
269
2
        }]
270
4
    }
271
4
    fn is_ready(&self, _ready: Readiness) -> bool {
272
4
        false
273
4
    }
274
10
    fn can_advance(&self) -> bool {
275
10
        self.next.is_some()
276
10
    }
277
2
    fn bootstrap_progress(&self) -> DirProgress {
278
2
        if let Some(next) = &self.next {
279
            next.bootstrap_progress()
280
        } else {
281
2
            DirProgress::NoConsensus { after: self.after }
282
        }
283
2
    }
284
2
    fn dl_config(&self) -> DownloadSchedule {
285
2
        self.config.schedule.retry_consensus
286
2
    }
287
2
    fn add_from_cache(
288
2
        &mut self,
289
2
        docs: HashMap<DocId, DocumentText>,
290
2
        changed: &mut bool,
291
2
    ) -> Result<()> {
292
2
        let text = match docs.into_iter().next() {
293
            None => return Ok(()),
294
            Some((
295
                DocId::LatestConsensus {
296
                    flavor: ConsensusFlavor::Microdesc,
297
                    ..
298
                },
299
2
                text,
300
2
            )) => text,
301
            _ => return Err(Error::CacheCorruption("Not an md consensus")),
302
        };
303

            
304
2
        let source = DocSource::LocalCache;
305
2

            
306
2
        self.add_consensus_text(
307
2
            source,
308
2
            text.as_str().map_err(Error::BadUtf8InCache)?,
309
2
            None,
310
2
            changed,
311
        )?;
312
2
        Ok(())
313
2
    }
314
10
    fn add_from_download(
315
10
        &mut self,
316
10
        text: &str,
317
10
        request: &ClientRequest,
318
10
        source: DocSource,
319
10
        storage: Option<&Mutex<DynStore>>,
320
10
        changed: &mut bool,
321
10
    ) -> Result<()> {
322
10
        let requested_newer_than = match request {
323
10
            ClientRequest::Consensus(r) => r.last_consensus_date(),
324
            _ => None,
325
        };
326
10
        let meta = self.add_consensus_text(source, text, requested_newer_than, changed)?;
327

            
328
6
        if let Some(store) = storage {
329
2
            let mut w = store.lock().expect("Directory storage lock poisoned");
330
2
            w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
331
4
        }
332
6
        Ok(())
333
10
    }
334
6
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
335
6
        match self.next {
336
6
            Some(next) => Box::new(next),
337
            None => self,
338
        }
339
6
    }
340
2
    fn reset_time(&self) -> Option<SystemTime> {
341
2
        None
342
2
    }
343
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
344
        self
345
    }
346
}
347

            
348
impl<R: Runtime> GetConsensusState<R> {
349
    /// Helper: try to set the current consensus text from an input string
350
    /// `text`.  Refuse it if the authorities could never be correct, or if it
351
    /// is ill-formed.
352
    ///
353
    /// If `cutoff` is provided, treat any consensus older than `cutoff` as
354
    /// older-than-requested.
355
    ///
356
    /// Errors from this method are not fatal to the download process.
357
12
    fn add_consensus_text(
358
12
        &mut self,
359
12
        source: DocSource,
360
12
        text: &str,
361
12
        cutoff: Option<SystemTime>,
362
12
        changed: &mut bool,
363
12
    ) -> Result<&ConsensusMeta> {
364
        // Try to parse it and get its metadata.
365
10
        let (consensus_meta, unvalidated) = {
366
10
            let (signedval, remainder, parsed) =
367
12
                MdConsensus::parse(text).map_err(|e| Error::from_netdoc(source.clone(), e))?;
368
            #[cfg(feature = "dirfilter")]
369
10
            let parsed = self.filter.filter_consensus(parsed)?;
370
10
            let parsed = self.config.tolerance.extend_tolerance(parsed);
371
10
            let now = self.rt.wallclock();
372
10
            let timely = parsed.check_valid_at(&now)?;
373
10
            if let Some(cutoff) = cutoff {
374
                if timely.peek_lifetime().valid_after() < cutoff {
375
                    return Err(Error::Unwanted("consensus was older than requested"));
376
                }
377
10
            }
378
10
            let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
379
10
            (meta, timely)
380
10
        };
381
10

            
382
10
        // Check out what authorities we believe in, and see if enough
383
10
        // of them are purported to have signed this consensus.
384
10
        let n_authorities = self.authority_ids.len() as u16;
385
10
        let unvalidated = unvalidated.set_n_authorities(n_authorities);
386
10

            
387
10
        let id_refs: Vec<_> = self.authority_ids.iter().collect();
388
10
        if !unvalidated.authorities_are_correct(&id_refs[..]) {
389
2
            return Err(Error::UnrecognizedAuthorities);
390
8
        }
391
8
        // Yes, we've added the consensus.  That's a change.
392
8
        *changed = true;
393
8

            
394
8
        // Make a set of all the certificates we want -- the subset of
395
8
        // those listed on the consensus that we would indeed accept as
396
8
        // authoritative.
397
8
        let desired_certs = unvalidated
398
8
            .signing_cert_ids()
399
24
            .filter(|m| self.recognizes_authority(&m.id_fingerprint))
400
8
            .collect();
401
8

            
402
8
        self.next = Some(GetCertsState {
403
8
            cache_usage: self.cache_usage,
404
8
            consensus_source: source,
405
8
            consensus: GetCertsConsensus::Unvalidated(unvalidated),
406
8
            consensus_meta,
407
8
            missing_certs: desired_certs,
408
8
            certs: Vec::new(),
409
8
            rt: self.rt.clone(),
410
8
            config: self.config.clone(),
411
8
            prev_netdir: self.prev_netdir.take(),
412
8
            protocol_statuses: None,
413
8
            #[cfg(feature = "dirfilter")]
414
8
            filter: self.filter.clone(),
415
8
        });
416
8

            
417
8
        // Unwrap should be safe because `next` was just assigned
418
8
        #[allow(clippy::unwrap_used)]
419
8
        Ok(&self.next.as_ref().unwrap().consensus_meta)
420
12
    }
421

            
422
    /// Return true if `id` is an authority identity we recognize
423
24
    fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
424
40
        self.authority_ids.iter().any(|auth| auth == id)
425
24
    }
426
}
427

            
428
/// One of two possible internal states for the consensus in a GetCertsState.
429
///
430
/// This inner object is advanced by `try_checking_sigs`.
431
#[derive(Clone, Debug)]
432
enum GetCertsConsensus {
433
    /// We have an unvalidated consensus; we haven't checked its signatures.
434
    Unvalidated(UnvalidatedMdConsensus),
435
    /// A validated consensus: the signatures are fine and we can advance.
436
    Validated(MdConsensus),
437
    /// We failed to validate the consensus, even after getting enough certificates.
438
    Failed,
439
}
440

            
441
/// Second state: fetching or loading authority certificates.
442
///
443
/// TODO: we should probably do what C tor does, and try to use the
444
/// same directory that gave us the consensus.
445
///
446
/// TODO SECURITY: This needs better handling for the DOS attack where
447
/// we are given a bad consensus signed with fictional certificates
448
/// that we can never find.
449
#[derive(Clone, Debug)]
450
struct GetCertsState<R: Runtime> {
451
    /// The cache usage we had in mind when we began.  Used to reset.
452
    cache_usage: CacheUsage,
453
    /// Where did we get our consensus?
454
    consensus_source: DocSource,
455
    /// The consensus that we are trying to validate, or an error if we've given
456
    /// up on validating it.
457
    consensus: GetCertsConsensus,
458
    /// Metadata for the consensus.
459
    consensus_meta: ConsensusMeta,
460
    /// A set of the certificate keypairs for the certificates we don't
461
    /// have yet.
462
    missing_certs: HashSet<AuthCertKeyIds>,
463
    /// A list of the certificates we've been able to load or download.
464
    certs: Vec<AuthCert>,
465

            
466
    /// A `Runtime` implementation.
467
    rt: R,
468
    /// The configuration of the directory manager. Used for download configuration
469
    /// purposes.
470
    config: Arc<DirMgrConfig>,
471
    /// If one exists, the netdir we're trying to update.
472
    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
473

            
474
    /// If present a set of protocols to install as our latest recommended set.
475
    protocol_statuses: Option<(SystemTime, Arc<ProtoStatuses>)>,
476

            
477
    /// A filter that gets applied to directory objects before we use them.
478
    #[cfg(feature = "dirfilter")]
479
    filter: Arc<dyn crate::filter::DirFilter>,
480
}
481

            
482
impl<R: Runtime> GetCertsState<R> {
483
    /// Handle a certificate result returned by `tor_netdoc`: checking it for timeliness
484
    /// and well-signedness.
485
    ///
486
    /// On success return the `AuthCert` and the string that represents it within the string `within`.
487
    /// On failure, return an error.
488
6
    fn check_parsed_certificate<'s>(
489
6
        &self,
490
6
        parsed: tor_netdoc::Result<UncheckedAuthCert>,
491
6
        source: &DocSource,
492
6
        within: &'s str,
493
6
    ) -> Result<(AuthCert, &'s str)> {
494
6
        let parsed = parsed.map_err(|e| Error::from_netdoc(source.clone(), e))?;
495
6
        let cert_text = parsed
496
6
            .within(within)
497
6
            .expect("Certificate was not in input as expected");
498
6
        let wellsigned = parsed.check_signature()?;
499
6
        let now = self.rt.wallclock();
500
6
        let timely_cert = self
501
6
            .config
502
6
            .tolerance
503
6
            .extend_tolerance(wellsigned)
504
6
            .check_valid_at(&now)?;
505
6
        Ok((timely_cert, cert_text))
506
6
    }
507

            
508
    /// If we have enough certificates, and we have not yet checked the
509
    /// signatures on the consensus, try checking them.
510
    ///
511
    /// If the consensus is valid, remove the unvalidated consensus from `self`
512
    /// and put the validated consensus there instead.
513
    ///
514
    /// If the consensus is invalid, throw it out set a blocking error.
515
4
    fn try_checking_sigs(&mut self) -> Result<()> {
516
        use GetCertsConsensus as C;
517
        // Temporary value; we'll replace the consensus field with something
518
        // better before the method returns.
519
4
        let mut consensus = C::Failed;
520
4
        std::mem::swap(&mut consensus, &mut self.consensus);
521

            
522
4
        let unvalidated = match consensus {
523
4
            C::Unvalidated(uv) if uv.key_is_correct(&self.certs[..]).is_ok() => uv,
524
            _ => {
525
                // nothing to check at this point.  Either we already checked the consensus, or we don't yet have enough certificates.
526
2
                self.consensus = consensus;
527
2
                return Ok(());
528
            }
529
        };
530

            
531
2
        let (new_consensus, outcome) = match unvalidated.check_signature(&self.certs[..]) {
532
2
            Ok(validated) => (C::Validated(validated), Ok(())),
533
            Err(cause) => (
534
                C::Failed,
535
                Err(Error::ConsensusInvalid {
536
                    source: self.consensus_source.clone(),
537
                    cause,
538
                }),
539
            ),
540
        };
541
2
        self.consensus = new_consensus;
542

            
543
        // Update our protocol recommendations if we have a validated consensus,
544
        // and if we haven't already updated our recommendations.
545
2
        if let GetCertsConsensus::Validated(v) = &self.consensus {
546
2
            if self.protocol_statuses.is_none() {
547
2
                let protoset: &Arc<ProtoStatuses> = v.protocol_statuses();
548
2
                self.protocol_statuses = Some((
549
2
                    self.consensus_meta.lifetime().valid_after(),
550
2
                    Arc::clone(protoset),
551
2
                ));
552
2
            }
553
        }
554

            
555
2
        outcome
556
4
    }
557
}
558

            
559
impl<R: Runtime> DirState for GetCertsState<R> {
560
4
    fn describe(&self) -> String {
561
        use GetCertsConsensus as C;
562
4
        match &self.consensus {
563
            C::Unvalidated(_) => {
564
4
                let total = self.certs.len() + self.missing_certs.len();
565
4
                format!(
566
4
                    "Downloading certificates for consensus (we are missing {}/{}).",
567
4
                    self.missing_certs.len(),
568
4
                    total
569
4
                )
570
            }
571
            C::Validated(_) => "Validated consensus; about to get microdescriptors".to_string(),
572
            C::Failed => "Failed to validate consensus".to_string(),
573
        }
574
4
    }
575
10
    fn missing_docs(&self) -> Vec<DocId> {
576
10
        self.missing_certs
577
10
            .iter()
578
12
            .map(|id| DocId::AuthCert(*id))
579
10
            .collect()
580
10
    }
581
4
    fn is_ready(&self, _ready: Readiness) -> bool {
582
4
        false
583
4
    }
584
6
    fn can_advance(&self) -> bool {
585
6
        matches!(self.consensus, GetCertsConsensus::Validated(_))
586
6
    }
587
4
    fn bootstrap_progress(&self) -> DirProgress {
588
4
        let n_certs = self.certs.len();
589
4
        let n_missing_certs = self.missing_certs.len();
590
4
        let total_certs = n_missing_certs + n_certs;
591
4
        DirProgress::FetchingCerts {
592
4
            lifetime: self.consensus_meta.lifetime().clone(),
593
4
            usable_lifetime: self
594
4
                .config
595
4
                .tolerance
596
4
                .extend_lifetime(self.consensus_meta.lifetime()),
597
4

            
598
4
            n_certs: (n_certs as u16, total_certs as u16),
599
4
        }
600
4
    }
601
2
    fn dl_config(&self) -> DownloadSchedule {
602
2
        self.config.schedule.retry_certs
603
2
    }
604
2
    fn add_from_cache(
605
2
        &mut self,
606
2
        docs: HashMap<DocId, DocumentText>,
607
2
        changed: &mut bool,
608
2
    ) -> Result<()> {
609
2
        // Here we iterate over the documents we want, taking them from
610
2
        // our input and remembering them.
611
2
        let source = DocSource::LocalCache;
612
2
        let mut nonfatal_error = None;
613
4
        for id in &self.missing_docs() {
614
4
            if let Some(cert) = docs.get(id) {
615
2
                let text = cert.as_str().map_err(Error::BadUtf8InCache)?;
616
2
                let parsed = AuthCert::parse(text);
617
2
                match self.check_parsed_certificate(parsed, &source, text) {
618
2
                    Ok((cert, _text)) => {
619
2
                        self.missing_certs.remove(cert.key_ids());
620
2
                        self.certs.push(cert);
621
2
                        *changed = true;
622
2
                    }
623
                    Err(e) => {
624
                        nonfatal_error.get_or_insert(e);
625
                    }
626
                }
627
2
            }
628
        }
629
2
        if *changed {
630
2
            self.try_checking_sigs()?;
631
        }
632
2
        opt_err_to_result(nonfatal_error)
633
2
    }
634
4
    fn add_from_download(
635
4
        &mut self,
636
4
        text: &str,
637
4
        request: &ClientRequest,
638
4
        source: DocSource,
639
4
        storage: Option<&Mutex<DynStore>>,
640
4
        changed: &mut bool,
641
4
    ) -> Result<()> {
642
4
        let asked_for: HashSet<_> = match request {
643
4
            ClientRequest::AuthCert(a) => a.keys().collect(),
644
            _ => return Err(internal!("expected an AuthCert request").into()),
645
        };
646

            
647
4
        let mut nonfatal_error = None;
648
4
        let mut newcerts = Vec::new();
649
4
        for cert in
650
4
            AuthCert::parse_multiple(text).map_err(|e| Error::from_netdoc(source.clone(), e))?
651
        {
652
4
            match self.check_parsed_certificate(cert, &source, text) {
653
4
                Ok((cert, cert_text)) => {
654
4
                    newcerts.push((cert, cert_text));
655
4
                }
656
                Err(e) => {
657
                    warn_report!(e, "Problem with certificate received from {}", &source);
658
                    nonfatal_error.get_or_insert(e);
659
                }
660
            }
661
        }
662

            
663
        // Now discard any certs we didn't ask for.
664
4
        let len_orig = newcerts.len();
665
4
        newcerts.retain(|(cert, _)| asked_for.contains(cert.key_ids()));
666
4
        if newcerts.len() != len_orig {
667
2
            warn!(
668
                "Discarding certificates from {} that we didn't ask for.",
669
                source
670
            );
671
2
            nonfatal_error.get_or_insert(Error::Unwanted("Certificate we didn't request"));
672
2
        }
673

            
674
        // We want to exit early if we aren't saving any certificates.
675
4
        if newcerts.is_empty() {
676
2
            return opt_err_to_result(nonfatal_error);
677
2
        }
678

            
679
2
        if let Some(store) = storage {
680
            // Write the certificates to the store.
681
2
            let v: Vec<_> = newcerts[..]
682
2
                .iter()
683
2
                .map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
684
2
                .collect();
685
2
            let mut w = store.lock().expect("Directory storage lock poisoned");
686
2
            w.store_authcerts(&v[..])?;
687
        }
688

            
689
        // Remember the certificates in this state, and remove them
690
        // from our list of missing certs.
691
4
        for (cert, _) in newcerts {
692
2
            let ids = cert.key_ids();
693
2
            if self.missing_certs.contains(ids) {
694
2
                self.missing_certs.remove(ids);
695
2
                self.certs.push(cert);
696
2
                *changed = true;
697
2
            }
698
        }
699

            
700
2
        if *changed {
701
2
            self.try_checking_sigs()?;
702
        }
703
2
        opt_err_to_result(nonfatal_error)
704
4
    }
705

            
706
2
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
707
        use GetCertsConsensus::*;
708
2
        match self.consensus {
709
2
            Validated(validated) => Box::new(GetMicrodescsState::new(
710
2
                self.cache_usage,
711
2
                validated,
712
2
                self.consensus_meta,
713
2
                self.rt,
714
2
                self.config,
715
2
                self.prev_netdir,
716
2
                #[cfg(feature = "dirfilter")]
717
2
                self.filter,
718
2
            )),
719
            _ => self,
720
        }
721
2
    }
722

            
723
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
724
        self.protocol_statuses.as_ref().map(|(timestamp, protos)| {
725
            NetDirChange::SetRequiredProtocol {
726
                timestamp: *timestamp,
727
                protos: Arc::clone(protos),
728
            }
729
        })
730
    }
731

            
732
2
    fn reset_time(&self) -> Option<SystemTime> {
733
2
        Some(
734
2
            self.consensus_meta.lifetime().valid_until()
735
2
                + self.config.tolerance.post_valid_tolerance,
736
2
        )
737
2
    }
738
2
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
739
2
        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
740
            // Cache only means we can't ever download.
741
            CacheUsage::CacheOnly
742
        } else {
743
            // If we reset in this state, we should always go to "must
744
            // download": Either we've failed to get the certs we needed, or we
745
            // have found that the consensus wasn't valid.  Either case calls
746
            // for a fresh consensus download attempt.
747
2
            CacheUsage::MustDownload
748
        };
749

            
750
2
        Box::new(GetConsensusState::new(
751
2
            self.rt,
752
2
            self.config,
753
2
            cache_usage,
754
2
            self.prev_netdir,
755
2
            #[cfg(feature = "dirfilter")]
756
2
            self.filter,
757
2
        ))
758
2
    }
759
}
760

            
761
/// Final state: we're fetching or loading microdescriptors
762
#[derive(Debug, Clone)]
763
struct GetMicrodescsState<R: Runtime> {
764
    /// How should we get the consensus from the cache, if at all?
765
    cache_usage: CacheUsage,
766
    /// Total number of microdescriptors listed in the consensus.
767
    n_microdescs: usize,
768
    /// The current status of our netdir.
769
    partial: PendingNetDir,
770
    /// Metadata for the current consensus.
771
    meta: ConsensusMeta,
772
    /// A pending list of microdescriptor digests whose
773
    /// "last-listed-at" times we should update.
774
    newly_listed: Vec<MdDigest>,
775
    /// A time after which we should try to replace this directory and
776
    /// find a new one.  Since this is randomized, we only compute it
777
    /// once.
778
    reset_time: SystemTime,
779

            
780
    /// A `Runtime` implementation.
781
    rt: R,
782
    /// The configuration of the directory manager. Used for download configuration
783
    /// purposes.
784
    config: Arc<DirMgrConfig>,
785
    /// If one exists, the netdir we're trying to update.
786
    prev_netdir: Option<Arc<dyn PreviousNetDir>>,
787

            
788
    /// A filter that gets applied to directory objects before we use them.
789
    #[cfg(feature = "dirfilter")]
790
    filter: Arc<dyn crate::filter::DirFilter>,
791
}
792

            
793
/// Information about a network directory that might not be ready to become _the_ current network
794
/// directory.
795
#[derive(Debug, Clone)]
796
enum PendingNetDir {
797
    /// A NetDir for which we have a consensus, but not enough microdescriptors.
798
    Partial(PartialNetDir),
799
    /// A NetDir we're either trying to get our caller to replace, or that the caller
800
    /// has already taken from us.
801
    ///
802
    /// After the netdir gets taken, the `collected_microdescs` and `missing_microdescs`
803
    /// fields get used. Before then, we just do operations on the netdir.
804
    Yielding {
805
        /// The actual netdir. This starts out as `Some`, but our caller can `take()` it
806
        /// from us.
807
        netdir: Option<NetDir>,
808
        /// Microdescs we have collected in order to yield to our caller.
809
        collected_microdescs: Vec<Microdesc>,
810
        /// Which microdescs we need for the netdir that either is or used to be in `netdir`.
811
        ///
812
        /// NOTE(eta): This MUST always match the netdir's own idea of which microdescs we need.
813
        ///            We do this by copying the netdir's missing microdescs into here when we
814
        ///            instantiate it.
815
        ///            (This code assumes that it doesn't add more needed microdescriptors later!)
816
        missing_microdescs: HashSet<MdDigest>,
817
        /// The time at which we should renew this netdir, assuming we have
818
        /// driven it to a "usable" state.
819
        replace_dir_time: SystemTime,
820
    },
821
    /// A dummy value, so we can use `mem::replace`.
822
    Dummy,
823
}
824

            
825
impl MdReceiver for PendingNetDir {
826
6
    fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
827
6
        match self {
828
4
            PendingNetDir::Partial(partial) => partial.missing_microdescs(),
829
            PendingNetDir::Yielding {
830
2
                netdir,
831
2
                missing_microdescs,
832
                ..
833
            } => {
834
2
                if let Some(nd) = netdir.as_ref() {
835
                    nd.missing_microdescs()
836
                } else {
837
2
                    Box::new(missing_microdescs.iter())
838
                }
839
            }
840
            PendingNetDir::Dummy => unreachable!(),
841
        }
842
6
    }
843

            
844
8
    fn add_microdesc(&mut self, md: Microdesc) -> bool {
845
8
        match self {
846
8
            PendingNetDir::Partial(partial) => partial.add_microdesc(md),
847
            PendingNetDir::Yielding {
848
                netdir,
849
                missing_microdescs,
850
                collected_microdescs,
851
                ..
852
            } => {
853
                let wanted = missing_microdescs.remove(md.digest());
854
                if let Some(nd) = netdir.as_mut() {
855
                    let nd_wanted = nd.add_microdesc(md);
856
                    // This shouldn't ever happen; if it does, our invariants are violated.
857
                    debug_assert_eq!(wanted, nd_wanted);
858
                    nd_wanted
859
                } else {
860
                    collected_microdescs.push(md);
861
                    wanted
862
                }
863
            }
864
            PendingNetDir::Dummy => unreachable!(),
865
        }
866
8
    }
867

            
868
14
    fn n_missing(&self) -> usize {
869
14
        match self {
870
12
            PendingNetDir::Partial(partial) => partial.n_missing(),
871
            PendingNetDir::Yielding {
872
2
                netdir,
873
2
                missing_microdescs,
874
                ..
875
            } => {
876
2
                if let Some(nd) = netdir.as_ref() {
877
                    // This shouldn't ever happen; if it does, our invariants are violated.
878
                    debug_assert_eq!(nd.n_missing(), missing_microdescs.len());
879
                    nd.n_missing()
880
                } else {
881
2
                    missing_microdescs.len()
882
                }
883
            }
884
            PendingNetDir::Dummy => unreachable!(),
885
        }
886
14
    }
887
}
888

            
889
impl PendingNetDir {
890
    /// If this PendingNetDir is Partial and could not be partial, upgrade it.
891
10
    fn upgrade_if_necessary(&mut self) {
892
10
        if matches!(self, PendingNetDir::Partial(..)) {
893
10
            match mem::replace(self, PendingNetDir::Dummy) {
894
10
                PendingNetDir::Partial(p) => match p.unwrap_if_sufficient() {
895
2
                    Ok(nd) => {
896
2
                        let missing: HashSet<_> = nd.missing_microdescs().copied().collect();
897
2
                        let replace_dir_time = pick_download_time(nd.lifetime());
898
2
                        debug!(
899
                            "Consensus now usable, with {} microdescriptors missing. \
900
                                The current consensus is fresh until {}, and valid until {}. \
901
                                I've picked {} as the earliest time to replace it.",
902
                            missing.len(),
903
                            OffsetDateTime::from(nd.lifetime().fresh_until()),
904
                            OffsetDateTime::from(nd.lifetime().valid_until()),
905
                            OffsetDateTime::from(replace_dir_time)
906
                        );
907
2
                        *self = PendingNetDir::Yielding {
908
2
                            netdir: Some(nd),
909
2
                            collected_microdescs: vec![],
910
2
                            missing_microdescs: missing,
911
2
                            replace_dir_time,
912
2
                        };
913
                    }
914
8
                    Err(p) => {
915
8
                        *self = PendingNetDir::Partial(p);
916
8
                    }
917
                },
918
                _ => unreachable!(),
919
            }
920
        }
921
10
        assert!(!matches!(self, PendingNetDir::Dummy));
922
10
    }
923
}
924

            
925
impl<R: Runtime> GetMicrodescsState<R> {
926
    /// Create a new [`GetMicrodescsState`] from a provided
927
    /// microdescriptor consensus.
928
6
    fn new(
929
6
        cache_usage: CacheUsage,
930
6
        consensus: MdConsensus,
931
6
        meta: ConsensusMeta,
932
6
        rt: R,
933
6
        config: Arc<DirMgrConfig>,
934
6
        prev_netdir: Option<Arc<dyn PreviousNetDir>>,
935
6
        #[cfg(feature = "dirfilter")] filter: Arc<dyn crate::filter::DirFilter>,
936
6
    ) -> Self {
937
6
        let reset_time = consensus.lifetime().valid_until() + config.tolerance.post_valid_tolerance;
938
6
        let n_microdescs = consensus.relays().len();
939
6

            
940
6
        let params = &config.override_net_params;
941
6
        #[cfg(not(feature = "geoip"))]
942
6
        let mut partial_dir = PartialNetDir::new(consensus, Some(params));
943
6
        // TODO(eta): Make this embedded database configurable using the `DirMgrConfig`.
944
6
        #[cfg(feature = "geoip")]
945
6
        let mut partial_dir =
946
6
            PartialNetDir::new_with_geoip(consensus, Some(params), &GeoipDb::new_embedded());
947

            
948
6
        if let Some(old_dir) = prev_netdir.as_ref().and_then(|x| x.get_netdir()) {
949
            partial_dir.fill_from_previous_netdir(old_dir);
950
6
        }
951

            
952
        // Always upgrade at least once: otherwise, we won't notice we're ready unless we
953
        // add a microdescriptor.
954
6
        let mut partial = PendingNetDir::Partial(partial_dir);
955
6
        partial.upgrade_if_necessary();
956
6

            
957
6
        GetMicrodescsState {
958
6
            cache_usage,
959
6
            n_microdescs,
960
6
            partial,
961
6
            meta,
962
6
            newly_listed: Vec::new(),
963
6
            reset_time,
964
6
            rt,
965
6
            config,
966
6
            prev_netdir,
967
6

            
968
6
            #[cfg(feature = "dirfilter")]
969
6
            filter,
970
6
        }
971
6
    }
972

            
973
    /// Add a bunch of microdescriptors to the in-progress netdir.
974
4
    fn register_microdescs<I>(&mut self, mds: I, _source: &DocSource, changed: &mut bool)
975
4
    where
976
4
        I: IntoIterator<Item = Microdesc>,
977
4
    {
978
4
        #[cfg(feature = "dirfilter")]
979
4
        let mds: Vec<Microdesc> = mds
980
4
            .into_iter()
981
8
            .filter_map(|m| self.filter.filter_md(m).ok())
982
4
            .collect();
983
4
        let is_partial = matches!(self.partial, PendingNetDir::Partial(..));
984
12
        for md in mds {
985
8
            if is_partial {
986
8
                self.newly_listed.push(*md.digest());
987
8
            }
988
8
            self.partial.add_microdesc(md);
989
8
            *changed = true;
990
        }
991
4
        self.partial.upgrade_if_necessary();
992
4
    }
993
}
994

            
995
impl<R: Runtime> DirState for GetMicrodescsState<R> {
996
4
    fn describe(&self) -> String {
997
4
        format!(
998
4
            "Downloading microdescriptors (we are missing {}).",
999
4
            self.partial.n_missing()
4
        )
4
    }
6
    fn missing_docs(&self) -> Vec<DocId> {
6
        self.partial
6
            .missing_microdescs()
14
            .map(|d| DocId::Microdesc(*d))
6
            .collect()
6
    }
2
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
2
        match self.partial {
            PendingNetDir::Yielding {
2
                ref mut netdir,
2
                ref mut collected_microdescs,
2
                ..
2
            } => {
2
                if netdir.is_some() {
2
                    Some(NetDirChange::AttemptReplace {
2
                        netdir,
2
                        consensus_meta: &self.meta,
2
                    })
                } else {
                    collected_microdescs
                        .is_empty()
                        .then_some(NetDirChange::AddMicrodescs(collected_microdescs))
                }
            }
            _ => None,
        }
2
    }
18
    fn is_ready(&self, ready: Readiness) -> bool {
18
        match ready {
6
            Readiness::Complete => self.partial.n_missing() == 0,
            Readiness::Usable => {
                // We're "usable" if the calling code thought our netdir was usable enough to
                // steal it.
2
                matches!(self.partial, PendingNetDir::Yielding { ref netdir, .. } if netdir.is_none())
            }
        }
18
    }
4
    fn can_advance(&self) -> bool {
4
        false
4
    }
4
    fn bootstrap_progress(&self) -> DirProgress {
4
        let n_present = self.n_microdescs - self.partial.n_missing();
4
        DirProgress::Validated {
4
            lifetime: self.meta.lifetime().clone(),
4
            usable_lifetime: self.config.tolerance.extend_lifetime(self.meta.lifetime()),
4
            n_mds: (n_present as u32, self.n_microdescs as u32),
4
            usable: self.is_ready(Readiness::Usable),
4
        }
4
    }
2
    fn dl_config(&self) -> DownloadSchedule {
2
        self.config.schedule.retry_microdescs
2
    }
2
    fn add_from_cache(
2
        &mut self,
2
        docs: HashMap<DocId, DocumentText>,
2
        changed: &mut bool,
2
    ) -> Result<()> {
2
        let mut microdescs = Vec::new();
4
        for (id, text) in docs {
2
            if let DocId::Microdesc(digest) = id {
2
                if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
2
                    if md.digest() == &digest {
2
                        microdescs.push(md);
2
                        continue;
                    }
                }
                warn!("Found a mismatched microdescriptor in cache; ignoring");
            }
        }
2
        self.register_microdescs(microdescs, &DocSource::LocalCache, changed);
2
        Ok(())
2
    }
2
    fn add_from_download(
2
        &mut self,
2
        text: &str,
2
        request: &ClientRequest,
2
        source: DocSource,
2
        storage: Option<&Mutex<DynStore>>,
2
        changed: &mut bool,
2
    ) -> Result<()> {
2
        let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
2
            req.digests().collect()
        } else {
            return Err(internal!("expected a microdesc request").into());
        };
2
        let mut new_mds = Vec::new();
2
        let mut nonfatal_err = None;
6
        for anno in MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
2
            .map_err(|e| Error::from_netdoc(source.clone(), e))?
        {
6
            let anno = match anno {
                Err(e) => {
                    nonfatal_err.get_or_insert_with(|| Error::from_netdoc(source.clone(), e));
                    continue;
                }
6
                Ok(a) => a,
6
            };
6
            let txt = anno
6
                .within(text)
6
                .expect("microdesc not from within text as expected");
6
            let md = anno.into_microdesc();
6
            if !requested.contains(md.digest()) {
                warn!(
                    "Received microdescriptor from {} we did not ask for: {:?}",
                    source,
                    md.digest()
                );
                nonfatal_err.get_or_insert(Error::Unwanted("un-requested microdescriptor"));
                continue;
6
            }
6
            new_mds.push((txt, md));
        }
2
        let mark_listed = self.meta.lifetime().valid_after();
2
        if let Some(store) = storage {
2
            let mut s = store
2
                .lock()
2
                //.get_mut()
2
                .expect("Directory storage lock poisoned");
2
            if !self.newly_listed.is_empty() {
2
                s.update_microdescs_listed(&self.newly_listed, mark_listed)?;
2
                self.newly_listed.clear();
            }
2
            if !new_mds.is_empty() {
2
                s.store_microdescs(
2
                    &new_mds
2
                        .iter()
6
                        .map(|(text, md)| (*text, md.digest()))
2
                        .collect::<Vec<_>>(),
2
                    mark_listed,
2
                )?;
            }
        }
6
        self.register_microdescs(new_mds.into_iter().map(|(_, md)| md), &source, changed);
2

            
2
        opt_err_to_result(nonfatal_err)
2
    }
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
        self
    }
2
    fn reset_time(&self) -> Option<SystemTime> {
        // TODO(nickm): The reset logic is a little wonky here: we don't truly
        // want to _reset_ this state at `replace_dir_time`.  In fact, we ought
        // to be able to have multiple states running in parallel: one filling
        // in the mds for an old consensus, and one trying to fetch a better
        // one.  That's likely to require some amount of refactoring of the
        // bootstrap code.
        Some(match self.partial {
            // If the client has taken a completed netdir, the netdir is now
            // usable: We can reset our download attempt when we choose to try
            // to replace this directory.
            PendingNetDir::Yielding {
                replace_dir_time,
                netdir: None,
                ..
            } => replace_dir_time,
            // We don't have a completed netdir: Keep trying to fill this one in
            // until it is _definitely_ unusable.  (Our clock might be skewed;
            // there might be no up-to-date consensus.)
2
            _ => self.reset_time,
        })
2
    }
2
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
2
        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
            // Cache only means we can't ever download.
            CacheUsage::CacheOnly
2
        } else if self.is_ready(Readiness::Usable) {
            // If we managed to bootstrap a usable consensus, then we won't
            // accept our next consensus from the cache.
            CacheUsage::MustDownload
        } else {
            // If we didn't manage to bootstrap a usable consensus, then we can
            // indeed try again with the one in the cache.
            // TODO(nickm) is this right?
2
            CacheUsage::CacheOkay
        };
2
        Box::new(GetConsensusState::new(
2
            self.rt,
2
            self.config,
2
            cache_usage,
2
            self.prev_netdir,
2
            #[cfg(feature = "dirfilter")]
2
            self.filter,
2
        ))
2
    }
}
/// Choose a random download time to replace a consensus whose lifetime
/// is `lifetime`.
202
fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
202
    let (lowbound, uncertainty) = client_download_range(lifetime);
202
    lowbound + rand::rng().gen_range_infallible(..=uncertainty)
202
}
/// Based on the lifetime for a consensus, return the time range during which
/// clients should fetch the next one.
204
fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
204
    let valid_after = lt.valid_after();
204
    let valid_until = lt.valid_until();
204
    let voting_interval = lt.voting_period();
204
    let whole_lifetime = valid_until
204
        .duration_since(valid_after)
204
        .expect("valid-after must precede valid-until");
204

            
204
    // From dir-spec:
204
    // "This time is chosen uniformly at random from the interval
204
    // between the time 3/4 into the first interval after the
204
    // consensus is no longer fresh, and 7/8 of the time remaining
204
    // after that before the consensus is invalid."
204
    let lowbound = voting_interval + (voting_interval * 3) / 4;
204
    let remainder = whole_lifetime - lowbound;
204
    let uncertainty = (remainder * 7) / 8;
204

            
204
    (valid_after + lowbound, uncertainty)
204
}
/// If `err` is some, return `Err(err)`.  Otherwise return Ok(()).
8
fn opt_err_to_result(e: Option<Error>) -> Result<()> {
8
    match e {
2
        Some(e) => Err(e),
6
        None => Ok(()),
    }
8
}
/// A dummy state implementation, used when we need to temporarily write a
/// placeholder into a box.
///
/// Calling any method on this state will panic.
#[derive(Clone, Debug)]
pub(crate) struct PoisonedState;
impl DirState for PoisonedState {
    fn describe(&self) -> String {
        unimplemented!()
    }
    fn missing_docs(&self) -> Vec<DocId> {
        unimplemented!()
    }
    fn is_ready(&self, _ready: Readiness) -> bool {
        unimplemented!()
    }
    fn can_advance(&self) -> bool {
        unimplemented!()
    }
    fn add_from_cache(
        &mut self,
        _docs: HashMap<DocId, DocumentText>,
        _changed: &mut bool,
    ) -> Result<()> {
        unimplemented!()
    }
    fn add_from_download(
        &mut self,
        _text: &str,
        _request: &ClientRequest,
        _source: DocSource,
        _storage: Option<&Mutex<DynStore>>,
        _changed: &mut bool,
    ) -> Result<()> {
        unimplemented!()
    }
    fn bootstrap_progress(&self) -> event::DirProgress {
        unimplemented!()
    }
    fn dl_config(&self) -> DownloadSchedule {
        unimplemented!()
    }
    fn advance(self: Box<Self>) -> Box<dyn DirState> {
        unimplemented!()
    }
    fn reset_time(&self) -> Option<SystemTime> {
        unimplemented!()
    }
    fn reset(self: Box<Self>) -> Box<dyn DirState> {
        unimplemented!()
    }
}
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    #![allow(clippy::cognitive_complexity)]
    use super::*;
    use crate::{Authority, AuthorityBuilder, DownloadScheduleConfig};
    use std::convert::TryInto;
    use std::sync::Arc;
    use tempfile::TempDir;
    use time::macros::datetime;
    use tor_netdoc::doc::authcert::AuthCertKeyIds;
    use tor_rtcompat::RuntimeSubstExt as _;
    #[allow(deprecated)] // TODO #1885
    use tor_rtmock::time::MockSleepProvider;
    #[test]
    fn download_schedule() {
        let va = datetime!(2008-08-02 20:00 UTC).into();
        let fu = datetime!(2008-08-02 21:00 UTC).into();
        let vu = datetime!(2008-08-02 23:00 UTC).into();
        let lifetime = Lifetime::new(va, fu, vu).unwrap();
        let expected_start: SystemTime = datetime!(2008-08-02 21:45 UTC).into();
        let expected_range = Duration::from_millis((75 * 60 * 1000) * 7 / 8);
        let (start, range) = client_download_range(&lifetime);
        assert_eq!(start, expected_start);
        assert_eq!(range, expected_range);
        for _ in 0..100 {
            let when = pick_download_time(&lifetime);
            assert!(when > va);
            assert!(when >= expected_start);
            assert!(when < vu);
            assert!(when <= expected_start + range);
        }
    }
    /// Makes a memory-backed storage.
    fn temp_store() -> (TempDir, Mutex<DynStore>) {
        let tempdir = TempDir::new().unwrap();
        let store = crate::storage::SqliteStore::from_path_and_mistrust(
            tempdir.path(),
            &fs_mistrust::Mistrust::new_dangerously_trust_everyone(),
            false,
        )
        .unwrap();
        (tempdir, Mutex::new(Box::new(store)))
    }
    fn make_time_shifted_runtime(now: SystemTime, rt: impl Runtime) -> impl Runtime {
        #[allow(deprecated)] // TODO #1885
        let msp = MockSleepProvider::new(now);
        rt.with_sleep_provider(msp.clone())
            .with_coarse_time_provider(msp)
    }
    fn make_dirmgr_config(authorities: Option<Vec<AuthorityBuilder>>) -> Arc<DirMgrConfig> {
        let mut netcfg = crate::NetworkConfig::builder();
        netcfg.set_fallback_caches(vec![]);
        if let Some(a) = authorities {
            netcfg.set_authorities(a);
        }
        let cfg = DirMgrConfig {
            cache_dir: "/we_will_never_use_this/".into(),
            network: netcfg.build().unwrap(),
            ..Default::default()
        };
        Arc::new(cfg)
    }
    // Test data
    const CONSENSUS: &str = include_str!("../testdata/mdconsensus1.txt");
    const CONSENSUS2: &str = include_str!("../testdata/mdconsensus2.txt");
    const AUTHCERT_5696: &str = include_str!("../testdata/cert-5696.txt");
    const AUTHCERT_5A23: &str = include_str!("../testdata/cert-5A23.txt");
    #[allow(unused)]
    const AUTHCERT_7C47: &str = include_str!("../testdata/cert-7C47.txt");
    fn test_time() -> SystemTime {
        datetime!(2020-08-07 12:42:45 UTC).into()
    }
    fn rsa(s: &str) -> RsaIdentity {
        RsaIdentity::from_hex(s).unwrap()
    }
    fn test_authorities() -> Vec<AuthorityBuilder> {
        fn a(s: &str) -> AuthorityBuilder {
            Authority::builder().name("ignore").v3ident(rsa(s)).clone()
        }
        vec![
            a("5696AB38CB3852AFA476A5C07B2D4788963D5567"),
            a("5A23BA701776C9C1AB1C06E734E92AB3D5350D64"),
            // This is an authority according to the consensus, but we'll
            // pretend we don't recognize it, to make sure that we
            // don't fetch or accept it.
            // a("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
        ]
    }
    fn authcert_id_5696() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("5696ab38cb3852afa476a5c07b2d4788963d5567"),
            sk_fingerprint: rsa("f6ed4aa64d83caede34e19693a7fcf331aae8a6a"),
        }
    }
    fn authcert_id_5a23() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("5a23ba701776c9c1ab1c06e734e92ab3d5350d64"),
            sk_fingerprint: rsa("d08e965cc6dcb6cb6ed776db43e616e93af61177"),
        }
    }
    // remember, we're saying that we don't recognize this one as an authority.
    fn authcert_id_7c47() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
            sk_fingerprint: rsa("D3C013E0E6C82E246090D1C0798B75FCB7ACF120"),
        }
    }
    fn microdescs() -> HashMap<MdDigest, String> {
        const MICRODESCS: &str = include_str!("../testdata/microdescs.txt");
        let text = MICRODESCS;
        MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
            .unwrap()
            .map(|res| {
                let anno = res.unwrap();
                let text = anno.within(text).unwrap();
                let md = anno.into_microdesc();
                (*md.digest(), text.to_owned())
            })
            .collect()
    }
    #[test]
    fn get_consensus_state() {
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            let rt = make_time_shifted_runtime(test_time(), rt);
            let cfg = make_dirmgr_config(None);
            let (_tempdir, store) = temp_store();
            let mut state = GetConsensusState::new(
                rt.clone(),
                cfg,
                CacheUsage::CacheOkay,
                None,
                #[cfg(feature = "dirfilter")]
                Arc::new(crate::filter::NilFilter),
            );
            // Is description okay?
            assert_eq!(&state.describe(), "Looking for a consensus.");
            // Basic properties: without a consensus it is not ready to advance.
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            // Basic properties: it doesn't want to reset.
            assert!(state.reset_time().is_none());
            // Its starting DirStatus is "fetching a consensus".
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching a consensus"
            );
            // Download configuration is simple: only 1 request can be done in
            // parallel.  It uses a consensus retry schedule.
            let retry = state.dl_config();
            assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
            // Do we know what we want?
            let docs = state.missing_docs();
            assert_eq!(docs.len(), 1);
            let docid = docs[0];
            assert!(matches!(
                docid,
                DocId::LatestConsensus {
                    flavor: ConsensusFlavor::Microdesc,
                    cache_usage: CacheUsage::CacheOkay,
                }
            ));
            let source = DocSource::DirServer { source: None };
            // Now suppose that we get some complete junk from a download.
            let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
            let req = crate::docid::ClientRequest::Consensus(req);
            let mut changed = false;
            let outcome = state.add_from_download(
                "this isn't a consensus",
                &req,
                source.clone(),
                Some(&store),
                &mut changed,
            );
            assert!(matches!(outcome, Err(Error::NetDocError { .. })));
            assert!(!changed);
            // make sure it wasn't stored...
            assert!(store
                .lock()
                .unwrap()
                .latest_consensus(ConsensusFlavor::Microdesc, None)
                .unwrap()
                .is_none());
            // Now try again, with a real consensus... but the wrong authorities.
            let mut changed = false;
            let outcome = state.add_from_download(
                CONSENSUS,
                &req,
                source.clone(),
                Some(&store),
                &mut changed,
            );
            assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
            assert!(!changed);
            assert!(store
                .lock()
                .unwrap()
                .latest_consensus(ConsensusFlavor::Microdesc, None)
                .unwrap()
                .is_none());
            // Great. Change the receiver to use a configuration where these test
            // authorities are recognized.
            let cfg = make_dirmgr_config(Some(test_authorities()));
            let mut state = GetConsensusState::new(
                rt.clone(),
                cfg,
                CacheUsage::CacheOkay,
                None,
                #[cfg(feature = "dirfilter")]
                Arc::new(crate::filter::NilFilter),
            );
            let mut changed = false;
            let outcome =
                state.add_from_download(CONSENSUS, &req, source, Some(&store), &mut changed);
            assert!(outcome.is_ok());
            assert!(changed);
            assert!(store
                .lock()
                .unwrap()
                .latest_consensus(ConsensusFlavor::Microdesc, None)
                .unwrap()
                .is_some());
            // And with that, we should be asking for certificates
            assert!(state.can_advance());
            assert_eq!(&state.describe(), "About to fetch certificates.");
            assert_eq!(state.missing_docs(), Vec::new());
            let next = Box::new(state).advance();
            assert_eq!(
                &next.describe(),
                "Downloading certificates for consensus (we are missing 2/2)."
            );
            // Try again, but this time get the state from the cache.
            let cfg = make_dirmgr_config(Some(test_authorities()));
            let mut state = GetConsensusState::new(
                rt,
                cfg,
                CacheUsage::CacheOkay,
                None,
                #[cfg(feature = "dirfilter")]
                Arc::new(crate::filter::NilFilter),
            );
            let text: crate::storage::InputString = CONSENSUS.to_owned().into();
            let map = vec![(docid, text.into())].into_iter().collect();
            let mut changed = false;
            let outcome = state.add_from_cache(map, &mut changed);
            assert!(outcome.is_ok());
            assert!(changed);
            assert!(state.can_advance());
        });
    }
    #[test]
    fn get_certs_state() {
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            /// Construct a GetCertsState with our test data
            fn new_getcerts_state(rt: impl Runtime) -> Box<dyn DirState> {
                let rt = make_time_shifted_runtime(test_time(), rt);
                let cfg = make_dirmgr_config(Some(test_authorities()));
                let mut state = GetConsensusState::new(
                    rt,
                    cfg,
                    CacheUsage::CacheOkay,
                    None,
                    #[cfg(feature = "dirfilter")]
                    Arc::new(crate::filter::NilFilter),
                );
                let source = DocSource::DirServer { source: None };
                let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
                let req = crate::docid::ClientRequest::Consensus(req);
                let mut changed = false;
                let outcome = state.add_from_download(CONSENSUS, &req, source, None, &mut changed);
                assert!(outcome.is_ok());
                Box::new(state).advance()
            }
            let (_tempdir, store) = temp_store();
            let mut state = new_getcerts_state(rt.clone());
            // Basic properties: description, status, reset time.
            assert_eq!(
                &state.describe(),
                "Downloading certificates for consensus (we are missing 2/2)."
            );
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            let consensus_expires: SystemTime = datetime!(2020-08-07 12:43:20 UTC).into();
            let post_valid_tolerance = crate::DirTolerance::default().post_valid_tolerance;
            assert_eq!(
                state.reset_time(),
                Some(consensus_expires + post_valid_tolerance)
            );
            let retry = state.dl_config();
            assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
            // Bootstrap status okay?
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching authority certificates (0/2)"
            );
            // Check that we get the right list of missing docs.
            let missing = state.missing_docs();
            assert_eq!(missing.len(), 2); // We are missing two certificates.
            assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
            // we don't ask for this one because we don't recognize its authority
            assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
            // Add one from the cache; make sure the list is still right
            let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
            // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
            let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
                .into_iter()
                .collect();
            let mut changed = false;
            let outcome = state.add_from_cache(docs, &mut changed);
            assert!(changed);
            assert!(outcome.is_ok()); // no error, and something changed.
            assert!(!state.can_advance()); // But we aren't done yet.
            let missing = state.missing_docs();
            assert_eq!(missing.len(), 1); // Now we're only missing one!
            assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching authority certificates (1/2)"
            );
            // Now try to add the other from a download ... but fail
            // because we didn't ask for it.
            let source = DocSource::DirServer { source: None };
            let mut req = tor_dirclient::request::AuthCertRequest::new();
            req.push(authcert_id_5696()); // it's the wrong id.
            let req = ClientRequest::AuthCert(req);
            let mut changed = false;
            let outcome = state.add_from_download(
                AUTHCERT_5A23,
                &req,
                source.clone(),
                Some(&store),
                &mut changed,
            );
            assert!(matches!(outcome, Err(Error::Unwanted(_))));
            assert!(!changed);
            let missing2 = state.missing_docs();
            assert_eq!(missing, missing2); // No change.
            assert!(store
                .lock()
                .unwrap()
                .authcerts(&[authcert_id_5a23()])
                .unwrap()
                .is_empty());
            // Now try to add the other from a download ... for real!
            let mut req = tor_dirclient::request::AuthCertRequest::new();
            req.push(authcert_id_5a23()); // Right idea this time!
            let req = ClientRequest::AuthCert(req);
            let mut changed = false;
            let outcome =
                state.add_from_download(AUTHCERT_5A23, &req, source, Some(&store), &mut changed);
            assert!(outcome.is_ok()); // No error, _and_ something changed!
            assert!(changed);
            let missing3 = state.missing_docs();
            assert!(missing3.is_empty());
            assert!(state.can_advance());
            assert!(!store
                .lock()
                .unwrap()
                .authcerts(&[authcert_id_5a23()])
                .unwrap()
                .is_empty());
            let next = state.advance();
            assert_eq!(
                &next.describe(),
                "Downloading microdescriptors (we are missing 6)."
            );
            // If we start from scratch and reset, we're back in GetConsensus.
            let state = new_getcerts_state(rt);
            let state = state.reset();
            assert_eq!(&state.describe(), "Downloading a consensus.");
            // TODO: I'd like even more tests to make sure that we never
            // accept a certificate for an authority we don't believe in.
        });
    }
    #[test]
    fn get_microdescs_state() {
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            /// Construct a GetCertsState with our test data
            fn new_getmicrodescs_state(rt: impl Runtime) -> GetMicrodescsState<impl Runtime> {
                let rt = make_time_shifted_runtime(test_time(), rt);
                let cfg = make_dirmgr_config(Some(test_authorities()));
                let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
                let consensus = consensus
                    .dangerously_assume_timely()
                    .dangerously_assume_wellsigned();
                let meta = ConsensusMeta::from_consensus(signed, rest, &consensus);
                GetMicrodescsState::new(
                    CacheUsage::CacheOkay,
                    consensus,
                    meta,
                    rt,
                    cfg,
                    None,
                    #[cfg(feature = "dirfilter")]
                    Arc::new(crate::filter::NilFilter),
                )
            }
            fn d64(s: &str) -> MdDigest {
                use base64ct::{Base64Unpadded, Encoding as _};
                Base64Unpadded::decode_vec(s).unwrap().try_into().unwrap()
            }
            // If we start from scratch and reset, we're back in GetConsensus.
            let state = new_getmicrodescs_state(rt.clone());
            let state = Box::new(state).reset();
            assert_eq!(&state.describe(), "Looking for a consensus.");
            // Check the basics.
            let mut state = new_getmicrodescs_state(rt.clone());
            assert_eq!(
                &state.describe(),
                "Downloading microdescriptors (we are missing 4)."
            );
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            {
                let reset_time = state.reset_time().unwrap();
                let fresh_until: SystemTime = datetime!(2021-10-27 21:27:00 UTC).into();
                let valid_until: SystemTime = datetime!(2021-10-27 21:27:20 UTC).into();
                assert!(reset_time >= fresh_until);
                assert!(reset_time <= valid_until + state.config.tolerance.post_valid_tolerance);
            }
            let retry = state.dl_config();
            assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs);
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching microdescriptors (0/4)"
            );
            // Now check whether we're missing all the right microdescs.
            let missing = state.missing_docs();
            let md_text = microdescs();
            assert_eq!(missing.len(), 4);
            assert_eq!(md_text.len(), 4);
            let md1 = d64("LOXRj8YZP0kwpEAsYOvBZWZWGoWv5b/Bp2Mz2Us8d8g");
            let md2 = d64("iOhVp33NyZxMRDMHsVNq575rkpRViIJ9LN9yn++nPG0");
            let md3 = d64("/Cd07b3Bl0K0jX2/1cAvsYXJJMi5d8UBU+oWKaLxoGo");
            let md4 = d64("z+oOlR7Ga6cg9OoC/A3D3Ey9Rtc4OldhKlpQblMfQKo");
            for md_digest in [md1, md2, md3, md4] {
                assert!(missing.contains(&DocId::Microdesc(md_digest)));
                assert!(md_text.contains_key(&md_digest));
            }
            // Try adding a microdesc from the cache.
            let (_tempdir, store) = temp_store();
            let doc1: crate::storage::InputString = md_text.get(&md1).unwrap().clone().into();
            let docs = vec![(DocId::Microdesc(md1), doc1.into())]
                .into_iter()
                .collect();
            let mut changed = false;
            let outcome = state.add_from_cache(docs, &mut changed);
            assert!(outcome.is_ok()); // successfully loaded one MD.
            assert!(changed);
            assert!(!state.can_advance());
            assert!(!state.is_ready(Readiness::Complete));
            assert!(!state.is_ready(Readiness::Usable));
            // Now we should be missing 3.
            let missing = state.missing_docs();
            assert_eq!(missing.len(), 3);
            assert!(!missing.contains(&DocId::Microdesc(md1)));
            assert_eq!(
                state.bootstrap_progress().to_string(),
                "fetching microdescriptors (1/4)"
            );
            // Try adding the rest as if from a download.
            let mut req = tor_dirclient::request::MicrodescRequest::new();
            let mut response = "".to_owned();
            for md_digest in [md2, md3, md4] {
                response.push_str(md_text.get(&md_digest).unwrap());
                req.push(md_digest);
            }
            let req = ClientRequest::Microdescs(req);
            let source = DocSource::DirServer { source: None };
            let mut changed = false;
            let outcome = state.add_from_download(
                response.as_str(),
                &req,
                source,
                Some(&store),
                &mut changed,
            );
            assert!(outcome.is_ok()); // successfully loaded MDs
            assert!(changed);
            match state.get_netdir_change().unwrap() {
                NetDirChange::AttemptReplace { netdir, .. } => {
                    assert!(netdir.take().is_some());
                }
                x => panic!("wrong netdir change: {:?}", x),
            }
            assert!(state.is_ready(Readiness::Complete));
            assert!(state.is_ready(Readiness::Usable));
            assert_eq!(
                store
                    .lock()
                    .unwrap()
                    .microdescs(&[md2, md3, md4])
                    .unwrap()
                    .len(),
                3
            );
            let missing = state.missing_docs();
            assert!(missing.is_empty());
        });
    }
}