1
//! Functions to download or load directory objects, using the
2
//! state machines in the `states` module.
3

            
4
use std::num::NonZeroUsize;
5
use std::ops::Deref;
6
use std::{
7
    collections::HashMap,
8
    sync::{Arc, Weak},
9
    time::{Duration, SystemTime},
10
};
11

            
12
use crate::err::BootstrapAction;
13
use crate::state::{DirState, PoisonedState};
14
use crate::DirMgrConfig;
15
use crate::DocSource;
16
use crate::{
17
    docid::{self, ClientRequest},
18
    upgrade_weak_ref, DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
19
};
20

            
21
use futures::FutureExt;
22
use futures::StreamExt;
23
use oneshot_fused_workaround as oneshot;
24
use tor_dirclient::DirResponse;
25
use tor_error::{info_report, warn_report};
26
use tor_rtcompat::scheduler::TaskSchedule;
27
use tor_rtcompat::Runtime;
28
use tracing::{debug, info, trace, warn};
29

            
30
use crate::storage::Store;
31
#[cfg(test)]
32
use once_cell::sync::Lazy;
33
#[cfg(test)]
34
use std::sync::Mutex;
35
use tor_circmgr::{CircMgr, DirInfo};
36
use tor_netdir::{NetDir, NetDirProvider as _};
37
use tor_netdoc::doc::netstatus::ConsensusFlavor;
38

            
39
/// Given a Result<()>, exit the current function if it is anything other than
40
/// Ok(), or a nonfatal error.
41
macro_rules! propagate_fatal_errors {
42
    ( $e:expr ) => {
43
        let v: Result<()> = $e;
44
        if let Err(e) = v {
45
            match e.bootstrap_action() {
46
                BootstrapAction::Nonfatal => {}
47
                _ => return Err(e),
48
            }
49
        }
50
    };
51
}
52

            
53
/// Identifier for an attempt to bootstrap a directory.
54
///
55
/// Every time that we decide to download a new directory, _despite already
56
/// having one_, counts as a new attempt.
57
///
58
/// These are used to track the progress of each attempt independently.
59
#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
60
#[display("{0}", id)]
61
pub(crate) struct AttemptId {
62
    /// Which attempt at downloading a directory is this?
63
    id: NonZeroUsize,
64
}
65

            
66
impl AttemptId {
67
    /// Return a new unused AtomicUsize that will be greater than any previous
68
    /// one.
69
    ///
70
    /// # Panics
71
    ///
72
    /// Panics if we have exhausted the possible space of AtomicIds.
73
8
    pub(crate) fn next() -> Self {
74
        use std::sync::atomic::{AtomicUsize, Ordering};
75
        /// atomic used to generate the next attempt.
76
        static NEXT: AtomicUsize = AtomicUsize::new(1);
77
8
        let id = NEXT.fetch_add(1, Ordering::Relaxed);
78
8
        let id = id.try_into().expect("Allocated too many AttemptIds");
79
8
        Self { id }
80
8
    }
81
}
82

            
83
/// If there were errors from a peer in `outcome`, record those errors by
84
/// marking the circuit (if any) as needing retirement, and noting the peer
85
/// (if any) as having failed.
86
fn note_request_outcome<R: Runtime>(
87
    circmgr: &CircMgr<R>,
88
    outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
89
) {
90
    use tor_dirclient::{Error::RequestFailed, RequestFailedError};
91
    // Extract an error and a source from this outcome, if there is one.
92
    //
93
    // This is complicated because DirResponse can encapsulate the notion of
94
    // a response that failed part way through a download: in the case, it
95
    // has some data, and also an error.
96
    let (err, source) = match outcome {
97
        Ok(req) => {
98
            if let (Some(e), Some(source)) = (req.error(), req.source()) {
99
                (
100
                    RequestFailed(RequestFailedError {
101
                        error: e.clone(),
102
                        source: Some(source.clone()),
103
                    }),
104
                    source,
105
                )
106
            } else {
107
                return;
108
            }
109
        }
110
        Err(
111
            error @ RequestFailed(RequestFailedError {
112
                source: Some(source),
113
                ..
114
            }),
115
        ) => (error.clone(), source),
116
        _ => return,
117
    };
118

            
119
    note_cache_error(circmgr, source, &err.into());
120
}
121

            
122
/// Record that a problem has occurred because of a failure in an answer from `source`.
123
fn note_cache_error<R: Runtime>(
124
    circmgr: &CircMgr<R>,
125
    source: &tor_dirclient::SourceInfo,
126
    problem: &Error,
127
) {
128
    use tor_circmgr::ExternalActivity;
129

            
130
    if !problem.indicates_cache_failure() {
131
        return;
132
    }
133

            
134
    // Does the error here tell us whom to really blame?  If so, blame them
135
    // instead.
136
    //
137
    // (This can happen if we notice a problem while downloading a certificate,
138
    // but the real problem is that the consensus was no good.)
139
    let real_source = match problem {
140
        Error::NetDocError {
141
            source: DocSource::DirServer { source: Some(info) },
142
            ..
143
        } => info,
144
        _ => source,
145
    };
146

            
147
    info_report!(problem, "Marking {:?} as failed", real_source);
148
    circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
149
    circmgr.retire_circ(source.unique_circ_id());
150
}
151

            
152
/// Record that `source` has successfully given us some directory info.
153
fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
154
    use tor_circmgr::ExternalActivity;
155

            
156
    trace!("Marking {:?} as successful", source);
157
    circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
158
}
159

            
160
/// Load every document in `missing` and try to apply it to `state`.
161
12
fn load_and_apply_documents<R: Runtime>(
162
12
    missing: &[DocId],
163
12
    dirmgr: &Arc<DirMgr<R>>,
164
12
    state: &mut Box<dyn DirState>,
165
12
    changed: &mut bool,
166
12
) -> Result<()> {
167
    /// How many documents will we try to load at once?  We try to keep this from being too large,
168
    /// to avoid excessive RAM usage.
169
    ///
170
    /// TODO: we may well want to tune this.
171
    const CHUNK_SIZE: usize = 256;
172
12
    for chunk in missing.chunks(CHUNK_SIZE) {
173
12
        let documents = {
174
12
            let store = dirmgr.store.lock().expect("store lock poisoned");
175
12
            load_documents_from_store(chunk, &**store)?
176
        };
177

            
178
12
        state.add_from_cache(documents, changed)?;
179
    }
180

            
181
12
    Ok(())
182
12
}
183

            
184
/// Load a set of documents from a `Store`, returning all documents found in the store.
185
/// Note that this may be less than the number of documents in `missing`.
186
12
fn load_documents_from_store(
187
12
    missing: &[DocId],
188
12
    store: &dyn Store,
189
12
) -> Result<HashMap<DocId, DocumentText>> {
190
12
    let mut loaded = HashMap::new();
191
12
    for query in docid::partition_by_type(missing.iter().copied()).values() {
192
12
        query.load_from_store_into(&mut loaded, store)?;
193
    }
194
12
    Ok(loaded)
195
12
}
196

            
197
/// Construct an appropriate ClientRequest to download a consensus
198
/// of the given flavor.
199
8
pub(crate) fn make_consensus_request(
200
8
    now: SystemTime,
201
8
    flavor: ConsensusFlavor,
202
8
    store: &dyn Store,
203
8
    config: &DirMgrConfig,
204
8
) -> Result<ClientRequest> {
205
8
    let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
206

            
207
8
    let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
208

            
209
8
    match store.latest_consensus_meta(flavor) {
210
4
        Ok(Some(meta)) => {
211
4
            let valid_after = meta.lifetime().valid_after();
212
4
            request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
213
4
            request.push_old_consensus_digest(*meta.sha3_256_of_signed());
214
4
        }
215
4
        latest => {
216
4
            if let Err(e) = latest {
217
                warn_report!(e, "Error loading directory metadata");
218
4
            }
219
            // If we don't have a consensus, then request one that's
220
            // "reasonably new".  That way, our clock is set far in the
221
            // future, we won't download stuff we can't use.
222
4
            request.set_last_consensus_date(default_cutoff);
223
        }
224
    }
225

            
226
8
    request.set_skew_limit(
227
8
        // If we are _fast_ by at least this much, then any valid directory will
228
8
        // seem to be at least this far in the past.
229
8
        config.tolerance.post_valid_tolerance,
230
8
        // If we are _slow_ by this much, then any valid directory will seem to
231
8
        // be at least this far in the future.
232
8
        config.tolerance.pre_valid_tolerance,
233
8
    );
234
8

            
235
8
    Ok(ClientRequest::Consensus(request))
236
8
}
237

            
238
/// Construct a set of `ClientRequest`s in order to fetch the documents in `docs`.
239
14
pub(crate) fn make_requests_for_documents<R: Runtime>(
240
14
    rt: &R,
241
14
    docs: &[DocId],
242
14
    store: &dyn Store,
243
14
    config: &DirMgrConfig,
244
14
) -> Result<Vec<ClientRequest>> {
245
14
    let mut res = Vec::new();
246
18
    for q in docid::partition_by_type(docs.iter().copied())
247
14
        .into_iter()
248
14
        .flat_map(|(_, x)| x.split_for_download().into_iter())
249
    {
250
18
        match q {
251
4
            DocQuery::LatestConsensus { flavor, .. } => {
252
4
                res.push(make_consensus_request(
253
4
                    rt.wallclock(),
254
4
                    flavor,
255
4
                    store,
256
4
                    config,
257
4
                )?);
258
            }
259
2
            DocQuery::AuthCert(ids) => {
260
2
                res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
261
2
            }
262
8
            DocQuery::Microdesc(ids) => {
263
8
                res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
264
8
            }
265
            #[cfg(feature = "routerdesc")]
266
4
            DocQuery::RouterDesc(ids) => {
267
4
                res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
268
4
            }
269
        }
270
    }
271
14
    Ok(res)
272
14
}
273

            
274
/// Launch a single client request and get an associated response.
275
async fn fetch_single<R: Runtime>(
276
    rt: &R,
277
    request: ClientRequest,
278
    current_netdir: Option<&NetDir>,
279
    circmgr: Arc<CircMgr<R>>,
280
) -> Result<(ClientRequest, DirResponse)> {
281
    let dirinfo: DirInfo = match current_netdir {
282
        Some(netdir) => netdir.into(),
283
        None => tor_circmgr::DirInfo::Nothing,
284
    };
285
    let outcome =
286
        tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
287

            
288
    note_request_outcome(&circmgr, &outcome);
289

            
290
    let resource = outcome?;
291
    Ok((request, resource))
292
}
293

            
294
/// Testing helper: if this is Some, then we return it in place of any
295
/// response to fetch_multiple.
296
///
297
/// Note that only one test uses this: otherwise there would be a race
298
/// condition. :p
299
#[cfg(test)]
300
2
static CANNED_RESPONSE: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(vec![]));
301

            
302
/// Launch a set of download requests for a set of missing objects in
303
/// `missing`, and return each request along with the response it received.
304
///
305
/// Don't launch more than `parallelism` requests at once.
306
2
async fn fetch_multiple<R: Runtime>(
307
2
    dirmgr: Arc<DirMgr<R>>,
308
2
    attempt_id: AttemptId,
309
2
    missing: &[DocId],
310
2
    parallelism: usize,
311
2
) -> Result<Vec<(ClientRequest, DirResponse)>> {
312
2
    let requests = {
313
2
        let store = dirmgr.store.lock().expect("store lock poisoned");
314
2
        make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
315
    };
316

            
317
2
    trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
318
           requests.len(), missing.len());
319

            
320
    #[cfg(test)]
321
    {
322
2
        let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
323
2
        if !m.is_empty() {
324
2
            return Ok(requests
325
2
                .into_iter()
326
2
                .zip(m.iter().map(DirResponse::from_body))
327
2
                .collect());
328
        }
329
    }
330

            
331
    let circmgr = dirmgr.circmgr()?;
332
    // Only use timely directories for bootstrapping directories; otherwise, we'll try fallbacks.
333
    let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
334

            
335
    // TODO: instead of waiting for all the queries to finish, we
336
    // could stream the responses back or something.
337
    let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
338
        .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
339
        .buffer_unordered(parallelism)
340
        .collect()
341
        .await;
342

            
343
    let mut useful_responses = Vec::new();
344
    for r in responses {
345
        // TODO: on some error cases we might want to stop using this source.
346
        match r {
347
            Ok((request, response)) => {
348
                if response.status_code() == 200 {
349
                    useful_responses.push((request, response));
350
                } else {
351
                    trace!(
352
                        "cache declined request; reported status {:?}",
353
                        response.status_code()
354
                    );
355
                }
356
            }
357
            Err(e) => warn_report!(e, "error while downloading"),
358
        }
359
    }
360

            
361
    trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
362

            
363
    Ok(useful_responses)
364
2
}
365

            
366
/// Try to update `state` by loading cached information from `dirmgr`.
367
14
async fn load_once<R: Runtime>(
368
14
    dirmgr: &Arc<DirMgr<R>>,
369
14
    state: &mut Box<dyn DirState>,
370
14
    attempt_id: AttemptId,
371
14
    changed_out: &mut bool,
372
14
) -> Result<()> {
373
14
    let missing = state.missing_docs();
374
14
    let mut changed = false;
375
14
    let outcome: Result<()> = if missing.is_empty() {
376
2
        trace!("Found no missing documents; can't advance current state");
377
2
        Ok(())
378
    } else {
379
12
        trace!(
380
            "Found {} missing documents; trying to load them",
381
            missing.len()
382
        );
383

            
384
12
        load_and_apply_documents(&missing, dirmgr, state, &mut changed)
385
    };
386

            
387
    // We have to update the status here regardless of the outcome, if we got
388
    // any information: even if there was an error, we might have received
389
    // partial information that changed our status.
390
14
    if changed {
391
12
        dirmgr.update_progress(attempt_id, state.bootstrap_progress());
392
12
        *changed_out = true;
393
12
    }
394

            
395
14
    outcome
396
14
}
397

            
398
/// Try to load as much state as possible for a provided `state` from the
399
/// cache in `dirmgr`, advancing the state to the extent possible.
400
///
401
/// No downloads are performed; the provided state will not be reset.
402
2
pub(crate) async fn load<R: Runtime>(
403
2
    dirmgr: Arc<DirMgr<R>>,
404
2
    mut state: Box<dyn DirState>,
405
2
    attempt_id: AttemptId,
406
2
) -> Result<Box<dyn DirState>> {
407
2
    let mut safety_counter = 0_usize;
408
    loop {
409
6
        trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
410
6
        let mut changed = false;
411
6
        let outcome = load_once(&dirmgr, &mut state, attempt_id, &mut changed).await;
412
        {
413
6
            let mut store = dirmgr.store.lock().expect("store lock poisoned");
414
6
            dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
415
6
            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
416
6
        }
417
6
        trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
418

            
419
6
        if let Err(e) = outcome {
420
            match e.bootstrap_action() {
421
                BootstrapAction::Nonfatal => {
422
                    debug!("Recoverable error loading from cache: {}", e);
423
                }
424
                BootstrapAction::Fatal | BootstrapAction::Reset => {
425
                    return Err(e);
426
                }
427
            }
428
6
        }
429

            
430
6
        if state.can_advance() {
431
2
            state = state.advance();
432
2
            trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
433
2
            safety_counter = 0;
434
        } else {
435
4
            if !changed {
436
                // TODO: Are there more nonfatal errors that mean we should
437
                // break?
438
2
                trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
439
2
                break;
440
2
            }
441
2
            safety_counter += 1;
442
2
            assert!(
443
2
                safety_counter < 100,
444
                "Spent 100 iterations in the same state: this is a bug"
445
            );
446
        }
447
    }
448

            
449
2
    Ok(state)
450
2
}
451

            
452
/// Helper: Make a set of download attempts for the current directory state,
453
/// and on success feed their results into the state object.
454
///
455
/// This can launch one or more download requests, but will not launch more
456
/// than `parallelism` requests at a time.
457
2
async fn download_attempt<R: Runtime>(
458
2
    dirmgr: &Arc<DirMgr<R>>,
459
2
    state: &mut Box<dyn DirState>,
460
2
    parallelism: usize,
461
2
    attempt_id: AttemptId,
462
2
) -> Result<()> {
463
2
    let missing = state.missing_docs();
464
2
    let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
465
2
    let mut n_errors = 0;
466
4
    for (client_req, dir_response) in fetched {
467
2
        let source = dir_response.source().cloned();
468
2
        let text = match String::from_utf8(dir_response.into_output_unchecked())
469
2
            .map_err(Error::BadUtf8FromDirectory)
470
        {
471
2
            Ok(t) => t,
472
            Err(e) => {
473
                if let Some(source) = source {
474
                    n_errors += 1;
475
                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
476
                }
477
                continue;
478
            }
479
        };
480
2
        match dirmgr.expand_response_text(&client_req, text) {
481
2
            Ok(text) => {
482
2
                let doc_source = DocSource::DirServer {
483
2
                    source: source.clone(),
484
2
                };
485
2
                let mut changed = false;
486
2
                let outcome = state.add_from_download(
487
2
                    &text,
488
2
                    &client_req,
489
2
                    doc_source,
490
2
                    Some(&dirmgr.store),
491
2
                    &mut changed,
492
2
                );
493
2

            
494
2
                if !changed {
495
                    debug_assert!(outcome.is_err());
496
2
                }
497

            
498
2
                if let Some(source) = source {
499
                    if let Err(e) = &outcome {
500
                        n_errors += 1;
501
                        note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
502
                    } else {
503
                        note_cache_success(dirmgr.circmgr()?.deref(), &source);
504
                    }
505
2
                }
506

            
507
2
                if let Err(e) = &outcome {
508
                    dirmgr.note_errors(attempt_id, 1);
509
                    warn_report!(e, "error while adding directory info");
510
2
                }
511
2
                propagate_fatal_errors!(outcome);
512
            }
513
            Err(e) => {
514
                warn_report!(e, "Error when expanding directory text");
515
                if let Some(source) = source {
516
                    n_errors += 1;
517
                    note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
518
                }
519
                propagate_fatal_errors!(Err(e));
520
            }
521
        }
522
    }
523
2
    if n_errors != 0 {
524
        dirmgr.note_errors(attempt_id, n_errors);
525
2
    }
526
2
    dirmgr.update_progress(attempt_id, state.bootstrap_progress());
527
2

            
528
2
    Ok(())
529
2
}
530

            
531
/// Download information into a DirState state machine until it is
532
/// ["complete"](Readiness::Complete), or until we hit a non-recoverable error.
533
///
534
/// Use `dirmgr` to load from the cache or to launch downloads.
535
///
536
/// Keep resetting the state as needed.
537
///
538
/// The first time that the state becomes ["usable"](Readiness::Usable), notify
539
/// the sender in `on_usable`.
540
4
pub(crate) async fn download<R: Runtime>(
541
4
    dirmgr: Weak<DirMgr<R>>,
542
4
    state: &mut Box<dyn DirState>,
543
4
    schedule: &mut TaskSchedule<R>,
544
4
    attempt_id: AttemptId,
545
4
    on_usable: &mut Option<oneshot::Sender<()>>,
546
4
) -> Result<()> {
547
4
    let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
548
4

            
549
4
    trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
550

            
551
    'next_state: loop {
552
8
        let retry_config = state.dl_config();
553
8
        let parallelism = retry_config.parallelism();
554

            
555
        // In theory this could be inside the loop below maybe?  If we
556
        // want to drop the restriction that the missing() members of a
557
        // state must never grow, then we'll need to move it inside.
558
8
        let mut now = {
559
8
            let dirmgr = upgrade_weak_ref(&dirmgr)?;
560
8
            let mut changed = false;
561
8
            trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
562
8
            let load_result = load_once(&dirmgr, state, attempt_id, &mut changed).await;
563
8
            trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
564
8
            if let Err(e) = &load_result {
565
                // If the load failed but the error can be blamed on a directory
566
                // cache, do so.
567
                if let Some(source) = e.responsible_cache() {
568
                    dirmgr.note_errors(attempt_id, 1);
569
                    note_cache_error(dirmgr.circmgr()?.deref(), source, e);
570
                }
571
8
            }
572
8
            propagate_fatal_errors!(load_result);
573
8
            dirmgr.runtime.wallclock()
574
        };
575

            
576
        // Apply any netdir changes that the state gives us.
577
        // TODO(eta): Consider deprecating state.is_ready().
578
        {
579
8
            let dirmgr = upgrade_weak_ref(&dirmgr)?;
580
8
            let mut store = dirmgr.store.lock().expect("store lock poisoned");
581
8
            dirmgr.apply_netdir_changes(state, &mut **store)?;
582
8
            dirmgr.update_progress(attempt_id, state.bootstrap_progress());
583
8
        }
584
8
        // Skip the downloads if we can...
585
8
        if state.can_advance() {
586
4
            advance(state);
587
4
            trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
588
4
            continue 'next_state;
589
4
        }
590
4
        if state.is_ready(Readiness::Complete) {
591
2
            trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
592
2
            return Ok(());
593
2
        }
594
2

            
595
2
        let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
596
2

            
597
2
        let mut retry = retry_config.schedule();
598
2
        let mut delay = None;
599

            
600
        // Make several attempts to fetch whatever we're missing,
601
        // until either we can advance, or we've got a complete
602
        // document, or we run out of tries, or we run out of time.
603
2
        'next_attempt: for attempt in retry_config.attempts() {
604
            // We wait at the start of this loop, on all attempts but the first.
605
            // This ensures that we always wait between attempts, but not after
606
            // the final attempt.
607
2
            let next_delay = retry.next_delay(&mut rand::rng());
608
2
            if let Some(delay) = delay.replace(next_delay) {
609
                let time_until_reset = {
610
                    reset_time
611
                        .duration_since(now)
612
                        .unwrap_or(Duration::from_secs(0))
613
                };
614
                let real_delay = delay.min(time_until_reset);
615
                debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
616
                schedule.sleep(real_delay).await?;
617

            
618
                now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
619
                if now >= reset_time {
620
                    info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
621
                    reset(state);
622
                    continue 'next_state;
623
                }
624
2
            }
625

            
626
2
            info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
627
2
            let reset_time = no_more_than_a_week_from(now, state.reset_time());
628

            
629
            now = {
630
2
                let dirmgr = upgrade_weak_ref(&dirmgr)?;
631
2
                futures::select_biased! {
632
2
                    outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
633
2
                        if let Err(e) = outcome {
634
                            // TODO: get warn_report! to support `attempt=%attempt_id`?
635
                            warn_report!(e, "Error while downloading (attempt {})", attempt_id);
636
                            propagate_fatal_errors!(Err(e));
637
                            continue 'next_attempt;
638
                        } else {
639
2
                            trace!(attempt=%attempt_id, "Successfully downloaded some information.");
640
                        }
641
                    }
642
2
                    _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
643
                        // We need to reset. This can happen if (for
644
                        // example) we're downloading the last few
645
                        // microdescriptors on a consensus that now
646
                        // we're ready to replace.
647
                        info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
648
                        reset(state);
649
                        continue 'next_state;
650
                    },
651
                };
652
2
                dirmgr.runtime.wallclock()
653
            };
654

            
655
            // Apply any netdir changes that the state gives us.
656
            // TODO(eta): Consider deprecating state.is_ready().
657
            {
658
2
                let dirmgr = upgrade_weak_ref(&dirmgr)?;
659
2
                let mut store = dirmgr.store.lock().expect("store lock poisoned");
660
2
                let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
661
2
                dirmgr.update_progress(attempt_id, state.bootstrap_progress());
662
2
                propagate_fatal_errors!(outcome);
663
            }
664

            
665
            // Exit if there is nothing more to download.
666
2
            if state.is_ready(Readiness::Complete) {
667
2
                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
668
2
                return Ok(());
669
            }
670

            
671
            // Report usable-ness if appropriate.
672
            if on_usable.is_some() && state.is_ready(Readiness::Usable) {
673
                trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
674
                // Unwrap should be safe due to parent `.is_some()` check
675
                #[allow(clippy::unwrap_used)]
676
                let _ = on_usable.take().unwrap().send(());
677
            }
678

            
679
            if state.can_advance() {
680
                // We have enough info to advance to another state.
681
                advance(state);
682
                trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
683
                continue 'next_state;
684
            }
685
        }
686

            
687
        // We didn't advance the state, after all the retries.
688
        warn!(n_attempts=retry_config.n_attempts(),
689
              state=%state.describe(),
690
              "Unable to advance downloading state");
691
        return Err(Error::CantAdvanceState);
692
    }
693
4
}
694

            
695
/// Replace `state` with `state.reset()`.
696
fn reset(state: &mut Box<dyn DirState>) {
697
    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
698
    *state = cur_state.reset();
699
}
700

            
701
/// Replace `state` with `state.advance()`.
702
4
fn advance(state: &mut Box<dyn DirState>) {
703
4
    let cur_state = std::mem::replace(state, Box::new(PoisonedState));
704
4
    *state = cur_state.advance();
705
4
}
706

            
707
/// Helper: Clamp `v` so that it is no more than one week from `now`.
708
///
709
/// If `v` is absent, return the time that's one week from now.
710
///
711
/// We use this to determine a reset time when no reset time is
712
/// available, or when it is too far in the future.
713
12
fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
714
12
    let one_week_later = now + Duration::new(86400 * 7, 0);
715
12
    match v {
716
6
        Some(t) => std::cmp::min(t, one_week_later),
717
6
        None => one_week_later,
718
    }
719
12
}
720

            
721
#[cfg(test)]
722
mod test {
723
    // @@ begin test lint list maintained by maint/add_warning @@
724
    #![allow(clippy::bool_assert_comparison)]
725
    #![allow(clippy::clone_on_copy)]
726
    #![allow(clippy::dbg_macro)]
727
    #![allow(clippy::mixed_attributes_style)]
728
    #![allow(clippy::print_stderr)]
729
    #![allow(clippy::print_stdout)]
730
    #![allow(clippy::single_char_pattern)]
731
    #![allow(clippy::unwrap_used)]
732
    #![allow(clippy::unchecked_duration_subtraction)]
733
    #![allow(clippy::useless_vec)]
734
    #![allow(clippy::needless_pass_by_value)]
735
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
736
    use super::*;
737
    use crate::storage::DynStore;
738
    use crate::test::new_mgr;
739
    use crate::DownloadSchedule;
740
    use std::sync::Mutex;
741
    use tor_netdoc::doc::microdesc::MdDigest;
742
    use tor_rtcompat::SleepProvider;
743

            
744
    #[test]
745
    fn week() {
746
        let now = SystemTime::now();
747
        let one_day = Duration::new(86400, 0);
748

            
749
        assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
750
        assert_eq!(
751
            no_more_than_a_week_from(now, Some(now + one_day)),
752
            now + one_day
753
        );
754
        assert_eq!(
755
            no_more_than_a_week_from(now, Some(now - one_day)),
756
            now - one_day
757
        );
758
        assert_eq!(
759
            no_more_than_a_week_from(now, Some(now + 30 * one_day)),
760
            now + one_day * 7
761
        );
762
    }
763

            
764
    /// A fake implementation of DirState that just wants a fixed set
765
    /// of microdescriptors.  It doesn't care if it gets them: it just
766
    /// wants to be told that the IDs exist.
767
    #[derive(Debug, Clone)]
768
    struct DemoState {
769
        second_time_around: bool,
770
        got_items: HashMap<MdDigest, bool>,
771
    }
772

            
773
    // Constants from Lou Reed
774
    const H1: MdDigest = *b"satellite's gone up to the skies";
775
    const H2: MdDigest = *b"things like that drive me out of";
776
    const H3: MdDigest = *b"my mind i watched it for a littl";
777
    const H4: MdDigest = *b"while i like to watch things on ";
778
    const H5: MdDigest = *b"TV Satellite of love Satellite--";
779

            
780
    impl DemoState {
781
        fn new1() -> Self {
782
            DemoState {
783
                second_time_around: false,
784
                got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
785
            }
786
        }
787
        fn new2() -> Self {
788
            DemoState {
789
                second_time_around: true,
790
                got_items: vec![(H3, false), (H4, false), (H5, false)]
791
                    .into_iter()
792
                    .collect(),
793
            }
794
        }
795
        fn n_ready(&self) -> usize {
796
            self.got_items.values().filter(|x| **x).count()
797
        }
798
    }
799

            
800
    impl DirState for DemoState {
801
        fn describe(&self) -> String {
802
            format!("{:?}", &self)
803
        }
804
        fn bootstrap_progress(&self) -> crate::event::DirProgress {
805
            crate::event::DirProgress::default()
806
        }
807
        fn is_ready(&self, ready: Readiness) -> bool {
808
            match (ready, self.second_time_around) {
809
                (_, false) => false,
810
                (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
811
                (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
812
            }
813
        }
814
        fn can_advance(&self) -> bool {
815
            if self.second_time_around {
816
                false
817
            } else {
818
                self.n_ready() == self.got_items.len()
819
            }
820
        }
821
        fn missing_docs(&self) -> Vec<DocId> {
822
            self.got_items
823
                .iter()
824
                .filter_map(|(id, have)| {
825
                    if *have {
826
                        None
827
                    } else {
828
                        Some(DocId::Microdesc(*id))
829
                    }
830
                })
831
                .collect()
832
        }
833
        fn add_from_cache(
834
            &mut self,
835
            docs: HashMap<DocId, DocumentText>,
836
            changed: &mut bool,
837
        ) -> Result<()> {
838
            for id in docs.keys() {
839
                if let DocId::Microdesc(id) = id {
840
                    if self.got_items.get(id) == Some(&false) {
841
                        self.got_items.insert(*id, true);
842
                        *changed = true;
843
                    }
844
                }
845
            }
846
            Ok(())
847
        }
848
        fn add_from_download(
849
            &mut self,
850
            text: &str,
851
            _request: &ClientRequest,
852
            _source: DocSource,
853
            _storage: Option<&Mutex<DynStore>>,
854
            changed: &mut bool,
855
        ) -> Result<()> {
856
            for token in text.split_ascii_whitespace() {
857
                if let Ok(v) = hex::decode(token) {
858
                    if let Ok(id) = v.try_into() {
859
                        if self.got_items.get(&id) == Some(&false) {
860
                            self.got_items.insert(id, true);
861
                            *changed = true;
862
                        }
863
                    }
864
                }
865
            }
866
            Ok(())
867
        }
868
        fn dl_config(&self) -> DownloadSchedule {
869
            DownloadSchedule::default()
870
        }
871
        fn advance(self: Box<Self>) -> Box<dyn DirState> {
872
            if self.can_advance() {
873
                Box::new(Self::new2())
874
            } else {
875
                self
876
            }
877
        }
878
        fn reset_time(&self) -> Option<SystemTime> {
879
            None
880
        }
881
        fn reset(self: Box<Self>) -> Box<dyn DirState> {
882
            Box::new(Self::new1())
883
        }
884
    }
885

            
886
    #[test]
887
    fn all_in_cache() {
888
        // Let's try bootstrapping when everything is in the cache.
889
        tor_rtcompat::test_with_one_runtime!(|rt| async {
890
            let now = rt.wallclock();
891
            let (_tempdir, mgr) = new_mgr(rt.clone());
892
            let (mut schedule, _handle) = TaskSchedule::new(rt);
893

            
894
            {
895
                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
896
                for h in [H1, H2, H3, H4, H5] {
897
                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
898
                }
899
            }
900
            let mgr = Arc::new(mgr);
901
            let attempt_id = AttemptId::next();
902

            
903
            // Try just a load.
904
            let state = Box::new(DemoState::new1());
905
            let result = super::load(Arc::clone(&mgr), state, attempt_id)
906
                .await
907
                .unwrap();
908
            assert!(result.is_ready(Readiness::Complete));
909

            
910
            // Try a bootstrap that could (but won't!) download.
911
            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
912

            
913
            let mut on_usable = None;
914
            super::download(
915
                Arc::downgrade(&mgr),
916
                &mut state,
917
                &mut schedule,
918
                attempt_id,
919
                &mut on_usable,
920
            )
921
            .await
922
            .unwrap();
923
            assert!(state.is_ready(Readiness::Complete));
924
        });
925
    }
926

            
927
    #[test]
928
    fn partly_in_cache() {
929
        // Let's try bootstrapping with all of phase1 and part of
930
        // phase 2 in cache.
931
        tor_rtcompat::test_with_one_runtime!(|rt| async {
932
            let now = rt.wallclock();
933
            let (_tempdir, mgr) = new_mgr(rt.clone());
934
            let (mut schedule, _handle) = TaskSchedule::new(rt);
935

            
936
            {
937
                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
938
                for h in [H1, H2, H3] {
939
                    store.store_microdescs(&[("ignore", &h)], now).unwrap();
940
                }
941
            }
942
            {
943
                let mut resp = CANNED_RESPONSE.lock().unwrap();
944
                // H4 and H5.
945
                *resp = vec![
946
                    "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
947
                     545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
948
                        .to_owned(),
949
                ];
950
            }
951
            let mgr = Arc::new(mgr);
952
            let mut on_usable = None;
953
            let attempt_id = AttemptId::next();
954

            
955
            let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
956
            super::download(
957
                Arc::downgrade(&mgr),
958
                &mut state,
959
                &mut schedule,
960
                attempt_id,
961
                &mut on_usable,
962
            )
963
            .await
964
            .unwrap();
965
            assert!(state.is_ready(Readiness::Complete));
966
        });
967
    }
968
}