tor_dirmgr/
bootstrap.rs

1//! Functions to download or load directory objects, using the
2//! state machines in the `states` module.
3
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::{
7    collections::HashMap,
8    sync::{Arc, Weak},
9    time::{Duration, SystemTime},
10};
11
12use crate::err::BootstrapAction;
13use crate::state::{DirState, PoisonedState};
14use crate::DirMgrConfig;
15use crate::DocSource;
16use crate::{
17    docid::{self, ClientRequest},
18    upgrade_weak_ref, DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
19};
20
21use futures::FutureExt;
22use futures::StreamExt;
23use oneshot_fused_workaround as oneshot;
24use tor_dirclient::DirResponse;
25use tor_error::{info_report, warn_report};
26use tor_rtcompat::scheduler::TaskSchedule;
27use tor_rtcompat::Runtime;
28use tracing::{debug, info, trace, warn};
29
30use crate::storage::Store;
31#[cfg(test)]
32use once_cell::sync::Lazy;
33#[cfg(test)]
34use std::sync::Mutex;
35use tor_circmgr::{CircMgr, DirInfo};
36use tor_netdir::{NetDir, NetDirProvider as _};
37use tor_netdoc::doc::netstatus::ConsensusFlavor;
38
39/// Given a Result<()>, exit the current function if it is anything other than
40/// Ok(), or a nonfatal error.
41macro_rules! propagate_fatal_errors {
42    ( $e:expr ) => {
43        let v: Result<()> = $e;
44        if let Err(e) = v {
45            match e.bootstrap_action() {
46                BootstrapAction::Nonfatal => {}
47                _ => return Err(e),
48            }
49        }
50    };
51}
52
53/// Identifier for an attempt to bootstrap a directory.
54///
55/// Every time that we decide to download a new directory, _despite already
56/// having one_, counts as a new attempt.
57///
58/// These are used to track the progress of each attempt independently.
59#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
60#[display("{0}", id)]
61pub(crate) struct AttemptId {
62    /// Which attempt at downloading a directory is this?
63    id: NonZeroUsize,
64}
65
66impl AttemptId {
67    /// Return a new unused AtomicUsize that will be greater than any previous
68    /// one.
69    ///
70    /// # Panics
71    ///
72    /// Panics if we have exhausted the possible space of AtomicIds.
73    pub(crate) fn next() -> Self {
74        use std::sync::atomic::{AtomicUsize, Ordering};
75        /// atomic used to generate the next attempt.
76        static NEXT: AtomicUsize = AtomicUsize::new(1);
77        let id = NEXT.fetch_add(1, Ordering::Relaxed);
78        let id = id.try_into().expect("Allocated too many AttemptIds");
79        Self { id }
80    }
81}
82
83/// If there were errors from a peer in `outcome`, record those errors by
84/// marking the circuit (if any) as needing retirement, and noting the peer
85/// (if any) as having failed.
86fn note_request_outcome<R: Runtime>(
87    circmgr: &CircMgr<R>,
88    outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
89) {
90    use tor_dirclient::{Error::RequestFailed, RequestFailedError};
91    // Extract an error and a source from this outcome, if there is one.
92    //
93    // This is complicated because DirResponse can encapsulate the notion of
94    // a response that failed part way through a download: in the case, it
95    // has some data, and also an error.
96    let (err, source) = match outcome {
97        Ok(req) => {
98            if let (Some(e), Some(source)) = (req.error(), req.source()) {
99                (
100                    RequestFailed(RequestFailedError {
101                        error: e.clone(),
102                        source: Some(source.clone()),
103                    }),
104                    source,
105                )
106            } else {
107                return;
108            }
109        }
110        Err(
111            error @ RequestFailed(RequestFailedError {
112                source: Some(source),
113                ..
114            }),
115        ) => (error.clone(), source),
116        _ => return,
117    };
118
119    note_cache_error(circmgr, source, &err.into());
120}
121
122/// Record that a problem has occurred because of a failure in an answer from `source`.
123fn note_cache_error<R: Runtime>(
124    circmgr: &CircMgr<R>,
125    source: &tor_dirclient::SourceInfo,
126    problem: &Error,
127) {
128    use tor_circmgr::ExternalActivity;
129
130    if !problem.indicates_cache_failure() {
131        return;
132    }
133
134    // Does the error here tell us whom to really blame?  If so, blame them
135    // instead.
136    //
137    // (This can happen if we notice a problem while downloading a certificate,
138    // but the real problem is that the consensus was no good.)
139    let real_source = match problem {
140        Error::NetDocError {
141            source: DocSource::DirServer { source: Some(info) },
142            ..
143        } => info,
144        _ => source,
145    };
146
147    info_report!(problem, "Marking {:?} as failed", real_source);
148    circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
149    circmgr.retire_circ(source.unique_circ_id());
150}
151
152/// Record that `source` has successfully given us some directory info.
153fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
154    use tor_circmgr::ExternalActivity;
155
156    trace!("Marking {:?} as successful", source);
157    circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
158}
159
160/// Load every document in `missing` and try to apply it to `state`.
161fn load_and_apply_documents<R: Runtime>(
162    missing: &[DocId],
163    dirmgr: &Arc<DirMgr<R>>,
164    state: &mut Box<dyn DirState>,
165    changed: &mut bool,
166) -> Result<()> {
167    /// How many documents will we try to load at once?  We try to keep this from being too large,
168    /// to avoid excessive RAM usage.
169    ///
170    /// TODO: we may well want to tune this.
171    const CHUNK_SIZE: usize = 256;
172    for chunk in missing.chunks(CHUNK_SIZE) {
173        let documents = {
174            let store = dirmgr.store.lock().expect("store lock poisoned");
175            load_documents_from_store(chunk, &**store)?
176        };
177
178        state.add_from_cache(documents, changed)?;
179    }
180
181    Ok(())
182}
183
184/// Load a set of documents from a `Store`, returning all documents found in the store.
185/// Note that this may be less than the number of documents in `missing`.
186fn load_documents_from_store(
187    missing: &[DocId],
188    store: &dyn Store,
189) -> Result<HashMap<DocId, DocumentText>> {
190    let mut loaded = HashMap::new();
191    for query in docid::partition_by_type(missing.iter().copied()).values() {
192        query.load_from_store_into(&mut loaded, store)?;
193    }
194    Ok(loaded)
195}
196
197/// Construct an appropriate ClientRequest to download a consensus
198/// of the given flavor.
199pub(crate) fn make_consensus_request(
200    now: SystemTime,
201    flavor: ConsensusFlavor,
202    store: &dyn Store,
203    config: &DirMgrConfig,
204) -> Result<ClientRequest> {
205    let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
206
207    let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
208
209    match store.latest_consensus_meta(flavor) {
210        Ok(Some(meta)) => {
211            let valid_after = meta.lifetime().valid_after();
212            request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
213            request.push_old_consensus_digest(*meta.sha3_256_of_signed());
214        }
215        latest => {
216            if let Err(e) = latest {
217                warn_report!(e, "Error loading directory metadata");
218            }
219            // If we don't have a consensus, then request one that's
220            // "reasonably new".  That way, our clock is set far in the
221            // future, we won't download stuff we can't use.
222            request.set_last_consensus_date(default_cutoff);
223        }
224    }
225
226    request.set_skew_limit(
227        // If we are _fast_ by at least this much, then any valid directory will
228        // seem to be at least this far in the past.
229        config.tolerance.post_valid_tolerance,
230        // If we are _slow_ by this much, then any valid directory will seem to
231        // be at least this far in the future.
232        config.tolerance.pre_valid_tolerance,
233    );
234
235    Ok(ClientRequest::Consensus(request))
236}
237
238/// Construct a set of `ClientRequest`s in order to fetch the documents in `docs`.
239pub(crate) fn make_requests_for_documents<R: Runtime>(
240    rt: &R,
241    docs: &[DocId],
242    store: &dyn Store,
243    config: &DirMgrConfig,
244) -> Result<Vec<ClientRequest>> {
245    let mut res = Vec::new();
246    for q in docid::partition_by_type(docs.iter().copied())
247        .into_iter()
248        .flat_map(|(_, x)| x.split_for_download().into_iter())
249    {
250        match q {
251            DocQuery::LatestConsensus { flavor, .. } => {
252                res.push(make_consensus_request(
253                    rt.wallclock(),
254                    flavor,
255                    store,
256                    config,
257                )?);
258            }
259            DocQuery::AuthCert(ids) => {
260                res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
261            }
262            DocQuery::Microdesc(ids) => {
263                res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
264            }
265            #[cfg(feature = "routerdesc")]
266            DocQuery::RouterDesc(ids) => {
267                res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
268            }
269        }
270    }
271    Ok(res)
272}
273
274/// Launch a single client request and get an associated response.
275async fn fetch_single<R: Runtime>(
276    rt: &R,
277    request: ClientRequest,
278    current_netdir: Option<&NetDir>,
279    circmgr: Arc<CircMgr<R>>,
280) -> Result<(ClientRequest, DirResponse)> {
281    let dirinfo: DirInfo = match current_netdir {
282        Some(netdir) => netdir.into(),
283        None => tor_circmgr::DirInfo::Nothing,
284    };
285    let outcome =
286        tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
287
288    note_request_outcome(&circmgr, &outcome);
289
290    let resource = outcome?;
291    Ok((request, resource))
292}
293
294/// Testing helper: if this is Some, then we return it in place of any
295/// response to fetch_multiple.
296///
297/// Note that only one test uses this: otherwise there would be a race
298/// condition. :p
299#[cfg(test)]
300static CANNED_RESPONSE: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(vec![]));
301
302/// Launch a set of download requests for a set of missing objects in
303/// `missing`, and return each request along with the response it received.
304///
305/// Don't launch more than `parallelism` requests at once.
306#[allow(clippy::cognitive_complexity)] // TODO: maybe refactor?
307async fn fetch_multiple<R: Runtime>(
308    dirmgr: Arc<DirMgr<R>>,
309    attempt_id: AttemptId,
310    missing: &[DocId],
311    parallelism: usize,
312) -> Result<Vec<(ClientRequest, DirResponse)>> {
313    let requests = {
314        let store = dirmgr.store.lock().expect("store lock poisoned");
315        make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
316    };
317
318    trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
319           requests.len(), missing.len());
320
321    #[cfg(test)]
322    {
323        let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
324        if !m.is_empty() {
325            return Ok(requests
326                .into_iter()
327                .zip(m.iter().map(DirResponse::from_body))
328                .collect());
329        }
330    }
331
332    let circmgr = dirmgr.circmgr()?;
333    // Only use timely directories for bootstrapping directories; otherwise, we'll try fallbacks.
334    let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
335
336    // TODO: instead of waiting for all the queries to finish, we
337    // could stream the responses back or something.
338    let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
339        .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
340        .buffer_unordered(parallelism)
341        .collect()
342        .await;
343
344    let mut useful_responses = Vec::new();
345    for r in responses {
346        // TODO: on some error cases we might want to stop using this source.
347        match r {
348            Ok((request, response)) => {
349                if response.status_code() == 200 {
350                    useful_responses.push((request, response));
351                } else {
352                    trace!(
353                        "cache declined request; reported status {:?}",
354                        response.status_code()
355                    );
356                }
357            }
358            Err(e) => warn_report!(e, "error while downloading"),
359        }
360    }
361
362    trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
363
364    Ok(useful_responses)
365}
366
367/// Try to update `state` by loading cached information from `dirmgr`.
368async fn load_once<R: Runtime>(
369    dirmgr: &Arc<DirMgr<R>>,
370    state: &mut Box<dyn DirState>,
371    attempt_id: AttemptId,
372    changed_out: &mut bool,
373) -> Result<()> {
374    let missing = state.missing_docs();
375    let mut changed = false;
376    let outcome: Result<()> = if missing.is_empty() {
377        trace!("Found no missing documents; can't advance current state");
378        Ok(())
379    } else {
380        trace!(
381            "Found {} missing documents; trying to load them",
382            missing.len()
383        );
384
385        load_and_apply_documents(&missing, dirmgr, state, &mut changed)
386    };
387
388    // We have to update the status here regardless of the outcome, if we got
389    // any information: even if there was an error, we might have received
390    // partial information that changed our status.
391    if changed {
392        dirmgr.update_progress(attempt_id, state.bootstrap_progress());
393        *changed_out = true;
394    }
395
396    outcome
397}
398
399/// Try to load as much state as possible for a provided `state` from the
400/// cache in `dirmgr`, advancing the state to the extent possible.
401///
402/// No downloads are performed; the provided state will not be reset.
403#[allow(clippy::cognitive_complexity)] // TODO: Refactor? Somewhat due to tracing.
404pub(crate) async fn load<R: Runtime>(
405    dirmgr: Arc<DirMgr<R>>,
406    mut state: Box<dyn DirState>,
407    attempt_id: AttemptId,
408) -> Result<Box<dyn DirState>> {
409    let mut safety_counter = 0_usize;
410    loop {
411        trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
412        let mut changed = false;
413        let outcome = load_once(&dirmgr, &mut state, attempt_id, &mut changed).await;
414        {
415            let mut store = dirmgr.store.lock().expect("store lock poisoned");
416            dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
417            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
418        }
419        trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
420
421        if let Err(e) = outcome {
422            match e.bootstrap_action() {
423                BootstrapAction::Nonfatal => {
424                    debug!("Recoverable error loading from cache: {}", e);
425                }
426                BootstrapAction::Fatal | BootstrapAction::Reset => {
427                    return Err(e);
428                }
429            }
430        }
431
432        if state.can_advance() {
433            state = state.advance();
434            trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
435            safety_counter = 0;
436        } else {
437            if !changed {
438                // TODO: Are there more nonfatal errors that mean we should
439                // break?
440                trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
441                break;
442            }
443            safety_counter += 1;
444            assert!(
445                safety_counter < 100,
446                "Spent 100 iterations in the same state: this is a bug"
447            );
448        }
449    }
450
451    Ok(state)
452}
453
454/// Helper: Make a set of download attempts for the current directory state,
455/// and on success feed their results into the state object.
456///
457/// This can launch one or more download requests, but will not launch more
458/// than `parallelism` requests at a time.
459#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
460async fn download_attempt<R: Runtime>(
461    dirmgr: &Arc<DirMgr<R>>,
462    state: &mut Box<dyn DirState>,
463    parallelism: usize,
464    attempt_id: AttemptId,
465) -> Result<()> {
466    let missing = state.missing_docs();
467    let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
468    let mut n_errors = 0;
469    for (client_req, dir_response) in fetched {
470        let source = dir_response.source().cloned();
471        let text = match String::from_utf8(dir_response.into_output_unchecked())
472            .map_err(Error::BadUtf8FromDirectory)
473        {
474            Ok(t) => t,
475            Err(e) => {
476                if let Some(source) = source {
477                    n_errors += 1;
478                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
479                }
480                continue;
481            }
482        };
483        match dirmgr.expand_response_text(&client_req, text) {
484            Ok(text) => {
485                let doc_source = DocSource::DirServer {
486                    source: source.clone(),
487                };
488                let mut changed = false;
489                let outcome = state.add_from_download(
490                    &text,
491                    &client_req,
492                    doc_source,
493                    Some(&dirmgr.store),
494                    &mut changed,
495                );
496
497                if !changed {
498                    debug_assert!(outcome.is_err());
499                }
500
501                if let Some(source) = source {
502                    if let Err(e) = &outcome {
503                        n_errors += 1;
504                        note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
505                    } else {
506                        note_cache_success(dirmgr.circmgr()?.deref(), &source);
507                    }
508                }
509
510                if let Err(e) = &outcome {
511                    dirmgr.note_errors(attempt_id, 1);
512                    warn_report!(e, "error while adding directory info");
513                }
514                propagate_fatal_errors!(outcome);
515            }
516            Err(e) => {
517                warn_report!(e, "Error when expanding directory text");
518                if let Some(source) = source {
519                    n_errors += 1;
520                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
521                }
522                propagate_fatal_errors!(Err(e));
523            }
524        }
525    }
526    if n_errors != 0 {
527        dirmgr.note_errors(attempt_id, n_errors);
528    }
529    dirmgr.update_progress(attempt_id, state.bootstrap_progress());
530
531    Ok(())
532}
533
534/// Download information into a DirState state machine until it is
535/// ["complete"](Readiness::Complete), or until we hit a non-recoverable error.
536///
537/// Use `dirmgr` to load from the cache or to launch downloads.
538///
539/// Keep resetting the state as needed.
540///
541/// The first time that the state becomes ["usable"](Readiness::Usable), notify
542/// the sender in `on_usable`.
543#[allow(clippy::cognitive_complexity)] // TODO: Refactor!
544pub(crate) async fn download<R: Runtime>(
545    dirmgr: Weak<DirMgr<R>>,
546    state: &mut Box<dyn DirState>,
547    schedule: &mut TaskSchedule<R>,
548    attempt_id: AttemptId,
549    on_usable: &mut Option<oneshot::Sender<()>>,
550) -> Result<()> {
551    let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
552
553    trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
554
555    'next_state: loop {
556        let retry_config = state.dl_config();
557        let parallelism = retry_config.parallelism();
558
559        // In theory this could be inside the loop below maybe?  If we
560        // want to drop the restriction that the missing() members of a
561        // state must never grow, then we'll need to move it inside.
562        let mut now = {
563            let dirmgr = upgrade_weak_ref(&dirmgr)?;
564            let mut changed = false;
565            trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
566            let load_result = load_once(&dirmgr, state, attempt_id, &mut changed).await;
567            trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
568            if let Err(e) = &load_result {
569                // If the load failed but the error can be blamed on a directory
570                // cache, do so.
571                if let Some(source) = e.responsible_cache() {
572                    dirmgr.note_errors(attempt_id, 1);
573                    note_cache_error(dirmgr.circmgr()?.deref(), source, e);
574                }
575            }
576            propagate_fatal_errors!(load_result);
577            dirmgr.runtime.wallclock()
578        };
579
580        // Apply any netdir changes that the state gives us.
581        // TODO(eta): Consider deprecating state.is_ready().
582        {
583            let dirmgr = upgrade_weak_ref(&dirmgr)?;
584            let mut store = dirmgr.store.lock().expect("store lock poisoned");
585            dirmgr.apply_netdir_changes(state, &mut **store)?;
586            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
587        }
588        // Skip the downloads if we can...
589        if state.can_advance() {
590            advance(state);
591            trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
592            continue 'next_state;
593        }
594        if state.is_ready(Readiness::Complete) {
595            trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
596            return Ok(());
597        }
598
599        let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
600
601        let mut retry = retry_config.schedule();
602        let mut delay = None;
603
604        // Make several attempts to fetch whatever we're missing,
605        // until either we can advance, or we've got a complete
606        // document, or we run out of tries, or we run out of time.
607        'next_attempt: for attempt in retry_config.attempts() {
608            // We wait at the start of this loop, on all attempts but the first.
609            // This ensures that we always wait between attempts, but not after
610            // the final attempt.
611            let next_delay = retry.next_delay(&mut rand::rng());
612            if let Some(delay) = delay.replace(next_delay) {
613                let time_until_reset = {
614                    reset_time
615                        .duration_since(now)
616                        .unwrap_or(Duration::from_secs(0))
617                };
618                let real_delay = delay.min(time_until_reset);
619                debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
620                schedule.sleep(real_delay).await?;
621
622                now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
623                if now >= reset_time {
624                    info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
625                    reset(state);
626                    continue 'next_state;
627                }
628            }
629
630            info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
631            let reset_time = no_more_than_a_week_from(now, state.reset_time());
632
633            now = {
634                let dirmgr = upgrade_weak_ref(&dirmgr)?;
635                futures::select_biased! {
636                    outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
637                        if let Err(e) = outcome {
638                            // TODO: get warn_report! to support `attempt=%attempt_id`?
639                            warn_report!(e, "Error while downloading (attempt {})", attempt_id);
640                            propagate_fatal_errors!(Err(e));
641                            continue 'next_attempt;
642                        } else {
643                            trace!(attempt=%attempt_id, "Successfully downloaded some information.");
644                        }
645                    }
646                    _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
647                        // We need to reset. This can happen if (for
648                        // example) we're downloading the last few
649                        // microdescriptors on a consensus that now
650                        // we're ready to replace.
651                        info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
652                        reset(state);
653                        continue 'next_state;
654                    },
655                };
656                dirmgr.runtime.wallclock()
657            };
658
659            // Apply any netdir changes that the state gives us.
660            // TODO(eta): Consider deprecating state.is_ready().
661            {
662                let dirmgr = upgrade_weak_ref(&dirmgr)?;
663                let mut store = dirmgr.store.lock().expect("store lock poisoned");
664                let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
665                dirmgr.update_progress(attempt_id, state.bootstrap_progress());
666                propagate_fatal_errors!(outcome);
667            }
668
669            // Exit if there is nothing more to download.
670            if state.is_ready(Readiness::Complete) {
671                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
672                return Ok(());
673            }
674
675            // Report usable-ness if appropriate.
676            if on_usable.is_some() && state.is_ready(Readiness::Usable) {
677                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
678                // Unwrap should be safe due to parent `.is_some()` check
679                #[allow(clippy::unwrap_used)]
680                let _ = on_usable.take().unwrap().send(());
681            }
682
683            if state.can_advance() {
684                // We have enough info to advance to another state.
685                advance(state);
686                trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
687                continue 'next_state;
688            }
689        }
690
691        // We didn't advance the state, after all the retries.
692        warn!(n_attempts=retry_config.n_attempts(),
693              state=%state.describe(),
694              "Unable to advance downloading state");
695        return Err(Error::CantAdvanceState);
696    }
697}
698
699/// Replace `state` with `state.reset()`.
700fn reset(state: &mut Box<dyn DirState>) {
701    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
702    *state = cur_state.reset();
703}
704
705/// Replace `state` with `state.advance()`.
706fn advance(state: &mut Box<dyn DirState>) {
707    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
708    *state = cur_state.advance();
709}
710
711/// Helper: Clamp `v` so that it is no more than one week from `now`.
712///
713/// If `v` is absent, return the time that's one week from now.
714///
715/// We use this to determine a reset time when no reset time is
716/// available, or when it is too far in the future.
717fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
718    let one_week_later = now + Duration::new(86400 * 7, 0);
719    match v {
720        Some(t) => std::cmp::min(t, one_week_later),
721        None => one_week_later,
722    }
723}
724
725#[cfg(test)]
726mod test {
727    // @@ begin test lint list maintained by maint/add_warning @@
728    #![allow(clippy::bool_assert_comparison)]
729    #![allow(clippy::clone_on_copy)]
730    #![allow(clippy::dbg_macro)]
731    #![allow(clippy::mixed_attributes_style)]
732    #![allow(clippy::print_stderr)]
733    #![allow(clippy::print_stdout)]
734    #![allow(clippy::single_char_pattern)]
735    #![allow(clippy::unwrap_used)]
736    #![allow(clippy::unchecked_duration_subtraction)]
737    #![allow(clippy::useless_vec)]
738    #![allow(clippy::needless_pass_by_value)]
739    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
740    use super::*;
741    use crate::storage::DynStore;
742    use crate::test::new_mgr;
743    use crate::DownloadSchedule;
744    use std::sync::Mutex;
745    use tor_netdoc::doc::microdesc::MdDigest;
746    use tor_rtcompat::SleepProvider;
747
748    #[test]
749    fn week() {
750        let now = SystemTime::now();
751        let one_day = Duration::new(86400, 0);
752
753        assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
754        assert_eq!(
755            no_more_than_a_week_from(now, Some(now + one_day)),
756            now + one_day
757        );
758        assert_eq!(
759            no_more_than_a_week_from(now, Some(now - one_day)),
760            now - one_day
761        );
762        assert_eq!(
763            no_more_than_a_week_from(now, Some(now + 30 * one_day)),
764            now + one_day * 7
765        );
766    }
767
768    /// A fake implementation of DirState that just wants a fixed set
769    /// of microdescriptors.  It doesn't care if it gets them: it just
770    /// wants to be told that the IDs exist.
771    #[derive(Debug, Clone)]
772    struct DemoState {
773        second_time_around: bool,
774        got_items: HashMap<MdDigest, bool>,
775    }
776
777    // Constants from Lou Reed
778    const H1: MdDigest = *b"satellite's gone up to the skies";
779    const H2: MdDigest = *b"things like that drive me out of";
780    const H3: MdDigest = *b"my mind i watched it for a littl";
781    const H4: MdDigest = *b"while i like to watch things on ";
782    const H5: MdDigest = *b"TV Satellite of love Satellite--";
783
784    impl DemoState {
785        fn new1() -> Self {
786            DemoState {
787                second_time_around: false,
788                got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
789            }
790        }
791        fn new2() -> Self {
792            DemoState {
793                second_time_around: true,
794                got_items: vec![(H3, false), (H4, false), (H5, false)]
795                    .into_iter()
796                    .collect(),
797            }
798        }
799        fn n_ready(&self) -> usize {
800            self.got_items.values().filter(|x| **x).count()
801        }
802    }
803
804    impl DirState for DemoState {
805        fn describe(&self) -> String {
806            format!("{:?}", &self)
807        }
808        fn bootstrap_progress(&self) -> crate::event::DirProgress {
809            crate::event::DirProgress::default()
810        }
811        fn is_ready(&self, ready: Readiness) -> bool {
812            match (ready, self.second_time_around) {
813                (_, false) => false,
814                (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
815                (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
816            }
817        }
818        fn can_advance(&self) -> bool {
819            if self.second_time_around {
820                false
821            } else {
822                self.n_ready() == self.got_items.len()
823            }
824        }
825        fn missing_docs(&self) -> Vec<DocId> {
826            self.got_items
827                .iter()
828                .filter_map(|(id, have)| {
829                    if *have {
830                        None
831                    } else {
832                        Some(DocId::Microdesc(*id))
833                    }
834                })
835                .collect()
836        }
837        fn add_from_cache(
838            &mut self,
839            docs: HashMap<DocId, DocumentText>,
840            changed: &mut bool,
841        ) -> Result<()> {
842            for id in docs.keys() {
843                if let DocId::Microdesc(id) = id {
844                    if self.got_items.get(id) == Some(&false) {
845                        self.got_items.insert(*id, true);
846                        *changed = true;
847                    }
848                }
849            }
850            Ok(())
851        }
852        fn add_from_download(
853            &mut self,
854            text: &str,
855            _request: &ClientRequest,
856            _source: DocSource,
857            _storage: Option<&Mutex<DynStore>>,
858            changed: &mut bool,
859        ) -> Result<()> {
860            for token in text.split_ascii_whitespace() {
861                if let Ok(v) = hex::decode(token) {
862                    if let Ok(id) = v.try_into() {
863                        if self.got_items.get(&id) == Some(&false) {
864                            self.got_items.insert(id, true);
865                            *changed = true;
866                        }
867                    }
868                }
869            }
870            Ok(())
871        }
872        fn dl_config(&self) -> DownloadSchedule {
873            DownloadSchedule::default()
874        }
875        fn advance(self: Box<Self>) -> Box<dyn DirState> {
876            if self.can_advance() {
877                Box::new(Self::new2())
878            } else {
879                self
880            }
881        }
882        fn reset_time(&self) -> Option<SystemTime> {
883            None
884        }
885        fn reset(self: Box<Self>) -> Box<dyn DirState> {
886            Box::new(Self::new1())
887        }
888    }
889
890    #[test]
891    fn all_in_cache() {
892        // Let's try bootstrapping when everything is in the cache.
893        tor_rtcompat::test_with_one_runtime!(|rt| async {
894            let now = rt.wallclock();
895            let (_tempdir, mgr) = new_mgr(rt.clone());
896            let (mut schedule, _handle) = TaskSchedule::new(rt);
897
898            {
899                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
900                for h in [H1, H2, H3, H4, H5] {
901                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
902                }
903            }
904            let mgr = Arc::new(mgr);
905            let attempt_id = AttemptId::next();
906
907            // Try just a load.
908            let state = Box::new(DemoState::new1());
909            let result = super::load(Arc::clone(&mgr), state, attempt_id)
910                .await
911                .unwrap();
912            assert!(result.is_ready(Readiness::Complete));
913
914            // Try a bootstrap that could (but won't!) download.
915            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
916
917            let mut on_usable = None;
918            super::download(
919                Arc::downgrade(&mgr),
920                &mut state,
921                &mut schedule,
922                attempt_id,
923                &mut on_usable,
924            )
925            .await
926            .unwrap();
927            assert!(state.is_ready(Readiness::Complete));
928        });
929    }
930
931    #[test]
932    fn partly_in_cache() {
933        // Let's try bootstrapping with all of phase1 and part of
934        // phase 2 in cache.
935        tor_rtcompat::test_with_one_runtime!(|rt| async {
936            let now = rt.wallclock();
937            let (_tempdir, mgr) = new_mgr(rt.clone());
938            let (mut schedule, _handle) = TaskSchedule::new(rt);
939
940            {
941                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
942                for h in [H1, H2, H3] {
943                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
944                }
945            }
946            {
947                let mut resp = CANNED_RESPONSE.lock().unwrap();
948                // H4 and H5.
949                *resp = vec![
950                    "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
951                     545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
952                        .to_owned(),
953                ];
954            }
955            let mgr = Arc::new(mgr);
956            let mut on_usable = None;
957            let attempt_id = AttemptId::next();
958
959            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
960            super::download(
961                Arc::downgrade(&mgr),
962                &mut state,
963                &mut schedule,
964                attempt_id,
965                &mut on_usable,
966            )
967            .await
968            .unwrap();
969            assert!(state.is_ready(Readiness::Complete));
970        });
971    }
972}