1use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::{
7 collections::HashMap,
8 sync::{Arc, Weak},
9 time::{Duration, SystemTime},
10};
11
12use crate::err::BootstrapAction;
13use crate::state::{DirState, PoisonedState};
14use crate::DirMgrConfig;
15use crate::DocSource;
16use crate::{
17 docid::{self, ClientRequest},
18 upgrade_weak_ref, DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
19};
20
21use futures::FutureExt;
22use futures::StreamExt;
23use oneshot_fused_workaround as oneshot;
24use tor_dirclient::DirResponse;
25use tor_error::{info_report, warn_report};
26use tor_rtcompat::scheduler::TaskSchedule;
27use tor_rtcompat::Runtime;
28use tracing::{debug, info, trace, warn};
29
30use crate::storage::Store;
31#[cfg(test)]
32use once_cell::sync::Lazy;
33#[cfg(test)]
34use std::sync::Mutex;
35use tor_circmgr::{CircMgr, DirInfo};
36use tor_netdir::{NetDir, NetDirProvider as _};
37use tor_netdoc::doc::netstatus::ConsensusFlavor;
38
39macro_rules! propagate_fatal_errors {
42 ( $e:expr ) => {
43 let v: Result<()> = $e;
44 if let Err(e) = v {
45 match e.bootstrap_action() {
46 BootstrapAction::Nonfatal => {}
47 _ => return Err(e),
48 }
49 }
50 };
51}
52
53#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
60#[display("{0}", id)]
61pub(crate) struct AttemptId {
62 id: NonZeroUsize,
64}
65
66impl AttemptId {
67 pub(crate) fn next() -> Self {
74 use std::sync::atomic::{AtomicUsize, Ordering};
75 static NEXT: AtomicUsize = AtomicUsize::new(1);
77 let id = NEXT.fetch_add(1, Ordering::Relaxed);
78 let id = id.try_into().expect("Allocated too many AttemptIds");
79 Self { id }
80 }
81}
82
83fn note_request_outcome<R: Runtime>(
87 circmgr: &CircMgr<R>,
88 outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
89) {
90 use tor_dirclient::{Error::RequestFailed, RequestFailedError};
91 let (err, source) = match outcome {
97 Ok(req) => {
98 if let (Some(e), Some(source)) = (req.error(), req.source()) {
99 (
100 RequestFailed(RequestFailedError {
101 error: e.clone(),
102 source: Some(source.clone()),
103 }),
104 source,
105 )
106 } else {
107 return;
108 }
109 }
110 Err(
111 error @ RequestFailed(RequestFailedError {
112 source: Some(source),
113 ..
114 }),
115 ) => (error.clone(), source),
116 _ => return,
117 };
118
119 note_cache_error(circmgr, source, &err.into());
120}
121
122fn note_cache_error<R: Runtime>(
124 circmgr: &CircMgr<R>,
125 source: &tor_dirclient::SourceInfo,
126 problem: &Error,
127) {
128 use tor_circmgr::ExternalActivity;
129
130 if !problem.indicates_cache_failure() {
131 return;
132 }
133
134 let real_source = match problem {
140 Error::NetDocError {
141 source: DocSource::DirServer { source: Some(info) },
142 ..
143 } => info,
144 _ => source,
145 };
146
147 info_report!(problem, "Marking {:?} as failed", real_source);
148 circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
149 circmgr.retire_circ(source.unique_circ_id());
150}
151
152fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
154 use tor_circmgr::ExternalActivity;
155
156 trace!("Marking {:?} as successful", source);
157 circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
158}
159
160fn load_and_apply_documents<R: Runtime>(
162 missing: &[DocId],
163 dirmgr: &Arc<DirMgr<R>>,
164 state: &mut Box<dyn DirState>,
165 changed: &mut bool,
166) -> Result<()> {
167 const CHUNK_SIZE: usize = 256;
172 for chunk in missing.chunks(CHUNK_SIZE) {
173 let documents = {
174 let store = dirmgr.store.lock().expect("store lock poisoned");
175 load_documents_from_store(chunk, &**store)?
176 };
177
178 state.add_from_cache(documents, changed)?;
179 }
180
181 Ok(())
182}
183
184fn load_documents_from_store(
187 missing: &[DocId],
188 store: &dyn Store,
189) -> Result<HashMap<DocId, DocumentText>> {
190 let mut loaded = HashMap::new();
191 for query in docid::partition_by_type(missing.iter().copied()).values() {
192 query.load_from_store_into(&mut loaded, store)?;
193 }
194 Ok(loaded)
195}
196
197pub(crate) fn make_consensus_request(
200 now: SystemTime,
201 flavor: ConsensusFlavor,
202 store: &dyn Store,
203 config: &DirMgrConfig,
204) -> Result<ClientRequest> {
205 let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
206
207 let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
208
209 match store.latest_consensus_meta(flavor) {
210 Ok(Some(meta)) => {
211 let valid_after = meta.lifetime().valid_after();
212 request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
213 request.push_old_consensus_digest(*meta.sha3_256_of_signed());
214 }
215 latest => {
216 if let Err(e) = latest {
217 warn_report!(e, "Error loading directory metadata");
218 }
219 request.set_last_consensus_date(default_cutoff);
223 }
224 }
225
226 request.set_skew_limit(
227 config.tolerance.post_valid_tolerance,
230 config.tolerance.pre_valid_tolerance,
233 );
234
235 Ok(ClientRequest::Consensus(request))
236}
237
238pub(crate) fn make_requests_for_documents<R: Runtime>(
240 rt: &R,
241 docs: &[DocId],
242 store: &dyn Store,
243 config: &DirMgrConfig,
244) -> Result<Vec<ClientRequest>> {
245 let mut res = Vec::new();
246 for q in docid::partition_by_type(docs.iter().copied())
247 .into_iter()
248 .flat_map(|(_, x)| x.split_for_download().into_iter())
249 {
250 match q {
251 DocQuery::LatestConsensus { flavor, .. } => {
252 res.push(make_consensus_request(
253 rt.wallclock(),
254 flavor,
255 store,
256 config,
257 )?);
258 }
259 DocQuery::AuthCert(ids) => {
260 res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
261 }
262 DocQuery::Microdesc(ids) => {
263 res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
264 }
265 #[cfg(feature = "routerdesc")]
266 DocQuery::RouterDesc(ids) => {
267 res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
268 }
269 }
270 }
271 Ok(res)
272}
273
274async fn fetch_single<R: Runtime>(
276 rt: &R,
277 request: ClientRequest,
278 current_netdir: Option<&NetDir>,
279 circmgr: Arc<CircMgr<R>>,
280) -> Result<(ClientRequest, DirResponse)> {
281 let dirinfo: DirInfo = match current_netdir {
282 Some(netdir) => netdir.into(),
283 None => tor_circmgr::DirInfo::Nothing,
284 };
285 let outcome =
286 tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
287
288 note_request_outcome(&circmgr, &outcome);
289
290 let resource = outcome?;
291 Ok((request, resource))
292}
293
294#[cfg(test)]
300static CANNED_RESPONSE: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(vec![]));
301
302async fn fetch_multiple<R: Runtime>(
307 dirmgr: Arc<DirMgr<R>>,
308 attempt_id: AttemptId,
309 missing: &[DocId],
310 parallelism: usize,
311) -> Result<Vec<(ClientRequest, DirResponse)>> {
312 let requests = {
313 let store = dirmgr.store.lock().expect("store lock poisoned");
314 make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
315 };
316
317 trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
318 requests.len(), missing.len());
319
320 #[cfg(test)]
321 {
322 let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
323 if !m.is_empty() {
324 return Ok(requests
325 .into_iter()
326 .zip(m.iter().map(DirResponse::from_body))
327 .collect());
328 }
329 }
330
331 let circmgr = dirmgr.circmgr()?;
332 let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
334
335 let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
338 .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
339 .buffer_unordered(parallelism)
340 .collect()
341 .await;
342
343 let mut useful_responses = Vec::new();
344 for r in responses {
345 match r {
347 Ok((request, response)) => {
348 if response.status_code() == 200 {
349 useful_responses.push((request, response));
350 } else {
351 trace!(
352 "cache declined request; reported status {:?}",
353 response.status_code()
354 );
355 }
356 }
357 Err(e) => warn_report!(e, "error while downloading"),
358 }
359 }
360
361 trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
362
363 Ok(useful_responses)
364}
365
366async fn load_once<R: Runtime>(
368 dirmgr: &Arc<DirMgr<R>>,
369 state: &mut Box<dyn DirState>,
370 attempt_id: AttemptId,
371 changed_out: &mut bool,
372) -> Result<()> {
373 let missing = state.missing_docs();
374 let mut changed = false;
375 let outcome: Result<()> = if missing.is_empty() {
376 trace!("Found no missing documents; can't advance current state");
377 Ok(())
378 } else {
379 trace!(
380 "Found {} missing documents; trying to load them",
381 missing.len()
382 );
383
384 load_and_apply_documents(&missing, dirmgr, state, &mut changed)
385 };
386
387 if changed {
391 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
392 *changed_out = true;
393 }
394
395 outcome
396}
397
398pub(crate) async fn load<R: Runtime>(
403 dirmgr: Arc<DirMgr<R>>,
404 mut state: Box<dyn DirState>,
405 attempt_id: AttemptId,
406) -> Result<Box<dyn DirState>> {
407 let mut safety_counter = 0_usize;
408 loop {
409 trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
410 let mut changed = false;
411 let outcome = load_once(&dirmgr, &mut state, attempt_id, &mut changed).await;
412 {
413 let mut store = dirmgr.store.lock().expect("store lock poisoned");
414 dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
415 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
416 }
417 trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
418
419 if let Err(e) = outcome {
420 match e.bootstrap_action() {
421 BootstrapAction::Nonfatal => {
422 debug!("Recoverable error loading from cache: {}", e);
423 }
424 BootstrapAction::Fatal | BootstrapAction::Reset => {
425 return Err(e);
426 }
427 }
428 }
429
430 if state.can_advance() {
431 state = state.advance();
432 trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
433 safety_counter = 0;
434 } else {
435 if !changed {
436 trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
439 break;
440 }
441 safety_counter += 1;
442 assert!(
443 safety_counter < 100,
444 "Spent 100 iterations in the same state: this is a bug"
445 );
446 }
447 }
448
449 Ok(state)
450}
451
452async fn download_attempt<R: Runtime>(
458 dirmgr: &Arc<DirMgr<R>>,
459 state: &mut Box<dyn DirState>,
460 parallelism: usize,
461 attempt_id: AttemptId,
462) -> Result<()> {
463 let missing = state.missing_docs();
464 let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
465 let mut n_errors = 0;
466 for (client_req, dir_response) in fetched {
467 let source = dir_response.source().cloned();
468 let text = match String::from_utf8(dir_response.into_output_unchecked())
469 .map_err(Error::BadUtf8FromDirectory)
470 {
471 Ok(t) => t,
472 Err(e) => {
473 if let Some(source) = source {
474 n_errors += 1;
475 note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
476 }
477 continue;
478 }
479 };
480 match dirmgr.expand_response_text(&client_req, text) {
481 Ok(text) => {
482 let doc_source = DocSource::DirServer {
483 source: source.clone(),
484 };
485 let mut changed = false;
486 let outcome = state.add_from_download(
487 &text,
488 &client_req,
489 doc_source,
490 Some(&dirmgr.store),
491 &mut changed,
492 );
493
494 if !changed {
495 debug_assert!(outcome.is_err());
496 }
497
498 if let Some(source) = source {
499 if let Err(e) = &outcome {
500 n_errors += 1;
501 note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
502 } else {
503 note_cache_success(dirmgr.circmgr()?.deref(), &source);
504 }
505 }
506
507 if let Err(e) = &outcome {
508 dirmgr.note_errors(attempt_id, 1);
509 warn_report!(e, "error while adding directory info");
510 }
511 propagate_fatal_errors!(outcome);
512 }
513 Err(e) => {
514 warn_report!(e, "Error when expanding directory text");
515 if let Some(source) = source {
516 n_errors += 1;
517 note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
518 }
519 propagate_fatal_errors!(Err(e));
520 }
521 }
522 }
523 if n_errors != 0 {
524 dirmgr.note_errors(attempt_id, n_errors);
525 }
526 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
527
528 Ok(())
529}
530
531pub(crate) async fn download<R: Runtime>(
541 dirmgr: Weak<DirMgr<R>>,
542 state: &mut Box<dyn DirState>,
543 schedule: &mut TaskSchedule<R>,
544 attempt_id: AttemptId,
545 on_usable: &mut Option<oneshot::Sender<()>>,
546) -> Result<()> {
547 let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
548
549 trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
550
551 'next_state: loop {
552 let retry_config = state.dl_config();
553 let parallelism = retry_config.parallelism();
554
555 let mut now = {
559 let dirmgr = upgrade_weak_ref(&dirmgr)?;
560 let mut changed = false;
561 trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
562 let load_result = load_once(&dirmgr, state, attempt_id, &mut changed).await;
563 trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
564 if let Err(e) = &load_result {
565 if let Some(source) = e.responsible_cache() {
568 dirmgr.note_errors(attempt_id, 1);
569 note_cache_error(dirmgr.circmgr()?.deref(), source, e);
570 }
571 }
572 propagate_fatal_errors!(load_result);
573 dirmgr.runtime.wallclock()
574 };
575
576 {
579 let dirmgr = upgrade_weak_ref(&dirmgr)?;
580 let mut store = dirmgr.store.lock().expect("store lock poisoned");
581 dirmgr.apply_netdir_changes(state, &mut **store)?;
582 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
583 }
584 if state.can_advance() {
586 advance(state);
587 trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
588 continue 'next_state;
589 }
590 if state.is_ready(Readiness::Complete) {
591 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
592 return Ok(());
593 }
594
595 let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
596
597 let mut retry = retry_config.schedule();
598 let mut delay = None;
599
600 'next_attempt: for attempt in retry_config.attempts() {
604 let next_delay = retry.next_delay(&mut rand::rng());
608 if let Some(delay) = delay.replace(next_delay) {
609 let time_until_reset = {
610 reset_time
611 .duration_since(now)
612 .unwrap_or(Duration::from_secs(0))
613 };
614 let real_delay = delay.min(time_until_reset);
615 debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
616 schedule.sleep(real_delay).await?;
617
618 now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
619 if now >= reset_time {
620 info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
621 reset(state);
622 continue 'next_state;
623 }
624 }
625
626 info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
627 let reset_time = no_more_than_a_week_from(now, state.reset_time());
628
629 now = {
630 let dirmgr = upgrade_weak_ref(&dirmgr)?;
631 futures::select_biased! {
632 outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
633 if let Err(e) = outcome {
634 warn_report!(e, "Error while downloading (attempt {})", attempt_id);
636 propagate_fatal_errors!(Err(e));
637 continue 'next_attempt;
638 } else {
639 trace!(attempt=%attempt_id, "Successfully downloaded some information.");
640 }
641 }
642 _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
643 info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
648 reset(state);
649 continue 'next_state;
650 },
651 };
652 dirmgr.runtime.wallclock()
653 };
654
655 {
658 let dirmgr = upgrade_weak_ref(&dirmgr)?;
659 let mut store = dirmgr.store.lock().expect("store lock poisoned");
660 let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
661 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
662 propagate_fatal_errors!(outcome);
663 }
664
665 if state.is_ready(Readiness::Complete) {
667 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
668 return Ok(());
669 }
670
671 if on_usable.is_some() && state.is_ready(Readiness::Usable) {
673 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
674 #[allow(clippy::unwrap_used)]
676 let _ = on_usable.take().unwrap().send(());
677 }
678
679 if state.can_advance() {
680 advance(state);
682 trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
683 continue 'next_state;
684 }
685 }
686
687 warn!(n_attempts=retry_config.n_attempts(),
689 state=%state.describe(),
690 "Unable to advance downloading state");
691 return Err(Error::CantAdvanceState);
692 }
693}
694
695fn reset(state: &mut Box<dyn DirState>) {
697 let cur_state = std::mem::replace(state, Box::new(PoisonedState));
698 *state = cur_state.reset();
699}
700
701fn advance(state: &mut Box<dyn DirState>) {
703 let cur_state = std::mem::replace(state, Box::new(PoisonedState));
704 *state = cur_state.advance();
705}
706
707fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
714 let one_week_later = now + Duration::new(86400 * 7, 0);
715 match v {
716 Some(t) => std::cmp::min(t, one_week_later),
717 None => one_week_later,
718 }
719}
720
721#[cfg(test)]
722mod test {
723 #![allow(clippy::bool_assert_comparison)]
725 #![allow(clippy::clone_on_copy)]
726 #![allow(clippy::dbg_macro)]
727 #![allow(clippy::mixed_attributes_style)]
728 #![allow(clippy::print_stderr)]
729 #![allow(clippy::print_stdout)]
730 #![allow(clippy::single_char_pattern)]
731 #![allow(clippy::unwrap_used)]
732 #![allow(clippy::unchecked_duration_subtraction)]
733 #![allow(clippy::useless_vec)]
734 #![allow(clippy::needless_pass_by_value)]
735 use super::*;
737 use crate::storage::DynStore;
738 use crate::test::new_mgr;
739 use crate::DownloadSchedule;
740 use std::sync::Mutex;
741 use tor_netdoc::doc::microdesc::MdDigest;
742 use tor_rtcompat::SleepProvider;
743
744 #[test]
745 fn week() {
746 let now = SystemTime::now();
747 let one_day = Duration::new(86400, 0);
748
749 assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
750 assert_eq!(
751 no_more_than_a_week_from(now, Some(now + one_day)),
752 now + one_day
753 );
754 assert_eq!(
755 no_more_than_a_week_from(now, Some(now - one_day)),
756 now - one_day
757 );
758 assert_eq!(
759 no_more_than_a_week_from(now, Some(now + 30 * one_day)),
760 now + one_day * 7
761 );
762 }
763
764 #[derive(Debug, Clone)]
768 struct DemoState {
769 second_time_around: bool,
770 got_items: HashMap<MdDigest, bool>,
771 }
772
773 const H1: MdDigest = *b"satellite's gone up to the skies";
775 const H2: MdDigest = *b"things like that drive me out of";
776 const H3: MdDigest = *b"my mind i watched it for a littl";
777 const H4: MdDigest = *b"while i like to watch things on ";
778 const H5: MdDigest = *b"TV Satellite of love Satellite--";
779
780 impl DemoState {
781 fn new1() -> Self {
782 DemoState {
783 second_time_around: false,
784 got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
785 }
786 }
787 fn new2() -> Self {
788 DemoState {
789 second_time_around: true,
790 got_items: vec![(H3, false), (H4, false), (H5, false)]
791 .into_iter()
792 .collect(),
793 }
794 }
795 fn n_ready(&self) -> usize {
796 self.got_items.values().filter(|x| **x).count()
797 }
798 }
799
800 impl DirState for DemoState {
801 fn describe(&self) -> String {
802 format!("{:?}", &self)
803 }
804 fn bootstrap_progress(&self) -> crate::event::DirProgress {
805 crate::event::DirProgress::default()
806 }
807 fn is_ready(&self, ready: Readiness) -> bool {
808 match (ready, self.second_time_around) {
809 (_, false) => false,
810 (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
811 (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
812 }
813 }
814 fn can_advance(&self) -> bool {
815 if self.second_time_around {
816 false
817 } else {
818 self.n_ready() == self.got_items.len()
819 }
820 }
821 fn missing_docs(&self) -> Vec<DocId> {
822 self.got_items
823 .iter()
824 .filter_map(|(id, have)| {
825 if *have {
826 None
827 } else {
828 Some(DocId::Microdesc(*id))
829 }
830 })
831 .collect()
832 }
833 fn add_from_cache(
834 &mut self,
835 docs: HashMap<DocId, DocumentText>,
836 changed: &mut bool,
837 ) -> Result<()> {
838 for id in docs.keys() {
839 if let DocId::Microdesc(id) = id {
840 if self.got_items.get(id) == Some(&false) {
841 self.got_items.insert(*id, true);
842 *changed = true;
843 }
844 }
845 }
846 Ok(())
847 }
848 fn add_from_download(
849 &mut self,
850 text: &str,
851 _request: &ClientRequest,
852 _source: DocSource,
853 _storage: Option<&Mutex<DynStore>>,
854 changed: &mut bool,
855 ) -> Result<()> {
856 for token in text.split_ascii_whitespace() {
857 if let Ok(v) = hex::decode(token) {
858 if let Ok(id) = v.try_into() {
859 if self.got_items.get(&id) == Some(&false) {
860 self.got_items.insert(id, true);
861 *changed = true;
862 }
863 }
864 }
865 }
866 Ok(())
867 }
868 fn dl_config(&self) -> DownloadSchedule {
869 DownloadSchedule::default()
870 }
871 fn advance(self: Box<Self>) -> Box<dyn DirState> {
872 if self.can_advance() {
873 Box::new(Self::new2())
874 } else {
875 self
876 }
877 }
878 fn reset_time(&self) -> Option<SystemTime> {
879 None
880 }
881 fn reset(self: Box<Self>) -> Box<dyn DirState> {
882 Box::new(Self::new1())
883 }
884 }
885
886 #[test]
887 fn all_in_cache() {
888 tor_rtcompat::test_with_one_runtime!(|rt| async {
890 let now = rt.wallclock();
891 let (_tempdir, mgr) = new_mgr(rt.clone());
892 let (mut schedule, _handle) = TaskSchedule::new(rt);
893
894 {
895 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
896 for h in [H1, H2, H3, H4, H5] {
897 store.store_microdescs(&[("ignore", &h)], now).unwrap();
898 }
899 }
900 let mgr = Arc::new(mgr);
901 let attempt_id = AttemptId::next();
902
903 let state = Box::new(DemoState::new1());
905 let result = super::load(Arc::clone(&mgr), state, attempt_id)
906 .await
907 .unwrap();
908 assert!(result.is_ready(Readiness::Complete));
909
910 let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
912
913 let mut on_usable = None;
914 super::download(
915 Arc::downgrade(&mgr),
916 &mut state,
917 &mut schedule,
918 attempt_id,
919 &mut on_usable,
920 )
921 .await
922 .unwrap();
923 assert!(state.is_ready(Readiness::Complete));
924 });
925 }
926
927 #[test]
928 fn partly_in_cache() {
929 tor_rtcompat::test_with_one_runtime!(|rt| async {
932 let now = rt.wallclock();
933 let (_tempdir, mgr) = new_mgr(rt.clone());
934 let (mut schedule, _handle) = TaskSchedule::new(rt);
935
936 {
937 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
938 for h in [H1, H2, H3] {
939 store.store_microdescs(&[("ignore", &h)], now).unwrap();
940 }
941 }
942 {
943 let mut resp = CANNED_RESPONSE.lock().unwrap();
944 *resp = vec![
946 "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
947 545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
948 .to_owned(),
949 ];
950 }
951 let mgr = Arc::new(mgr);
952 let mut on_usable = None;
953 let attempt_id = AttemptId::next();
954
955 let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
956 super::download(
957 Arc::downgrade(&mgr),
958 &mut state,
959 &mut schedule,
960 attempt_id,
961 &mut on_usable,
962 )
963 .await
964 .unwrap();
965 assert!(state.is_ready(Readiness::Complete));
966 });
967 }
968}