1use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::{
7 collections::HashMap,
8 sync::{Arc, Weak},
9 time::{Duration, SystemTime},
10};
11
12use crate::err::BootstrapAction;
13use crate::state::{DirState, PoisonedState};
14use crate::DirMgrConfig;
15use crate::DocSource;
16use crate::{
17 docid::{self, ClientRequest},
18 upgrade_weak_ref, DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
19};
20
21use futures::FutureExt;
22use futures::StreamExt;
23use oneshot_fused_workaround as oneshot;
24use tor_dirclient::DirResponse;
25use tor_error::{info_report, warn_report};
26use tor_rtcompat::scheduler::TaskSchedule;
27use tor_rtcompat::Runtime;
28use tracing::{debug, info, trace, warn};
29
30use crate::storage::Store;
31#[cfg(test)]
32use once_cell::sync::Lazy;
33#[cfg(test)]
34use std::sync::Mutex;
35use tor_circmgr::{CircMgr, DirInfo};
36use tor_netdir::{NetDir, NetDirProvider as _};
37use tor_netdoc::doc::netstatus::ConsensusFlavor;
38
39macro_rules! propagate_fatal_errors {
42 ( $e:expr ) => {
43 let v: Result<()> = $e;
44 if let Err(e) = v {
45 match e.bootstrap_action() {
46 BootstrapAction::Nonfatal => {}
47 _ => return Err(e),
48 }
49 }
50 };
51}
52
53#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
60#[display("{0}", id)]
61pub(crate) struct AttemptId {
62 id: NonZeroUsize,
64}
65
66impl AttemptId {
67 pub(crate) fn next() -> Self {
74 use std::sync::atomic::{AtomicUsize, Ordering};
75 static NEXT: AtomicUsize = AtomicUsize::new(1);
77 let id = NEXT.fetch_add(1, Ordering::Relaxed);
78 let id = id.try_into().expect("Allocated too many AttemptIds");
79 Self { id }
80 }
81}
82
83fn note_request_outcome<R: Runtime>(
87 circmgr: &CircMgr<R>,
88 outcome: &tor_dirclient::Result<tor_dirclient::DirResponse>,
89) {
90 use tor_dirclient::{Error::RequestFailed, RequestFailedError};
91 let (err, source) = match outcome {
97 Ok(req) => {
98 if let (Some(e), Some(source)) = (req.error(), req.source()) {
99 (
100 RequestFailed(RequestFailedError {
101 error: e.clone(),
102 source: Some(source.clone()),
103 }),
104 source,
105 )
106 } else {
107 return;
108 }
109 }
110 Err(
111 error @ RequestFailed(RequestFailedError {
112 source: Some(source),
113 ..
114 }),
115 ) => (error.clone(), source),
116 _ => return,
117 };
118
119 note_cache_error(circmgr, source, &err.into());
120}
121
122fn note_cache_error<R: Runtime>(
124 circmgr: &CircMgr<R>,
125 source: &tor_dirclient::SourceInfo,
126 problem: &Error,
127) {
128 use tor_circmgr::ExternalActivity;
129
130 if !problem.indicates_cache_failure() {
131 return;
132 }
133
134 let real_source = match problem {
140 Error::NetDocError {
141 source: DocSource::DirServer { source: Some(info) },
142 ..
143 } => info,
144 _ => source,
145 };
146
147 info_report!(problem, "Marking {:?} as failed", real_source);
148 circmgr.note_external_failure(real_source.cache_id(), ExternalActivity::DirCache);
149 circmgr.retire_circ(source.unique_circ_id());
150}
151
152fn note_cache_success<R: Runtime>(circmgr: &CircMgr<R>, source: &tor_dirclient::SourceInfo) {
154 use tor_circmgr::ExternalActivity;
155
156 trace!("Marking {:?} as successful", source);
157 circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache);
158}
159
160fn load_and_apply_documents<R: Runtime>(
162 missing: &[DocId],
163 dirmgr: &Arc<DirMgr<R>>,
164 state: &mut Box<dyn DirState>,
165 changed: &mut bool,
166) -> Result<()> {
167 const CHUNK_SIZE: usize = 256;
172 for chunk in missing.chunks(CHUNK_SIZE) {
173 let documents = {
174 let store = dirmgr.store.lock().expect("store lock poisoned");
175 load_documents_from_store(chunk, &**store)?
176 };
177
178 state.add_from_cache(documents, changed)?;
179 }
180
181 Ok(())
182}
183
184fn load_documents_from_store(
187 missing: &[DocId],
188 store: &dyn Store,
189) -> Result<HashMap<DocId, DocumentText>> {
190 let mut loaded = HashMap::new();
191 for query in docid::partition_by_type(missing.iter().copied()).values() {
192 query.load_from_store_into(&mut loaded, store)?;
193 }
194 Ok(loaded)
195}
196
197pub(crate) fn make_consensus_request(
200 now: SystemTime,
201 flavor: ConsensusFlavor,
202 store: &dyn Store,
203 config: &DirMgrConfig,
204) -> Result<ClientRequest> {
205 let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
206
207 let default_cutoff = crate::default_consensus_cutoff(now, &config.tolerance)?;
208
209 match store.latest_consensus_meta(flavor) {
210 Ok(Some(meta)) => {
211 let valid_after = meta.lifetime().valid_after();
212 request.set_last_consensus_date(std::cmp::max(valid_after, default_cutoff));
213 request.push_old_consensus_digest(*meta.sha3_256_of_signed());
214 }
215 latest => {
216 if let Err(e) = latest {
217 warn_report!(e, "Error loading directory metadata");
218 }
219 request.set_last_consensus_date(default_cutoff);
223 }
224 }
225
226 request.set_skew_limit(
227 config.tolerance.post_valid_tolerance,
230 config.tolerance.pre_valid_tolerance,
233 );
234
235 Ok(ClientRequest::Consensus(request))
236}
237
238pub(crate) fn make_requests_for_documents<R: Runtime>(
240 rt: &R,
241 docs: &[DocId],
242 store: &dyn Store,
243 config: &DirMgrConfig,
244) -> Result<Vec<ClientRequest>> {
245 let mut res = Vec::new();
246 for q in docid::partition_by_type(docs.iter().copied())
247 .into_iter()
248 .flat_map(|(_, x)| x.split_for_download().into_iter())
249 {
250 match q {
251 DocQuery::LatestConsensus { flavor, .. } => {
252 res.push(make_consensus_request(
253 rt.wallclock(),
254 flavor,
255 store,
256 config,
257 )?);
258 }
259 DocQuery::AuthCert(ids) => {
260 res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
261 }
262 DocQuery::Microdesc(ids) => {
263 res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
264 }
265 #[cfg(feature = "routerdesc")]
266 DocQuery::RouterDesc(ids) => {
267 res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
268 }
269 }
270 }
271 Ok(res)
272}
273
274async fn fetch_single<R: Runtime>(
276 rt: &R,
277 request: ClientRequest,
278 current_netdir: Option<&NetDir>,
279 circmgr: Arc<CircMgr<R>>,
280) -> Result<(ClientRequest, DirResponse)> {
281 let dirinfo: DirInfo = match current_netdir {
282 Some(netdir) => netdir.into(),
283 None => tor_circmgr::DirInfo::Nothing,
284 };
285 let outcome =
286 tor_dirclient::get_resource(request.as_requestable(), dirinfo, rt, circmgr.clone()).await;
287
288 note_request_outcome(&circmgr, &outcome);
289
290 let resource = outcome?;
291 Ok((request, resource))
292}
293
294#[cfg(test)]
300static CANNED_RESPONSE: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(vec![]));
301
302#[allow(clippy::cognitive_complexity)] async fn fetch_multiple<R: Runtime>(
308 dirmgr: Arc<DirMgr<R>>,
309 attempt_id: AttemptId,
310 missing: &[DocId],
311 parallelism: usize,
312) -> Result<Vec<(ClientRequest, DirResponse)>> {
313 let requests = {
314 let store = dirmgr.store.lock().expect("store lock poisoned");
315 make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
316 };
317
318 trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
319 requests.len(), missing.len());
320
321 #[cfg(test)]
322 {
323 let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
324 if !m.is_empty() {
325 return Ok(requests
326 .into_iter()
327 .zip(m.iter().map(DirResponse::from_body))
328 .collect());
329 }
330 }
331
332 let circmgr = dirmgr.circmgr()?;
333 let netdir = dirmgr.netdir(tor_netdir::Timeliness::Timely).ok();
335
336 let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
339 .map(|query| fetch_single(&dirmgr.runtime, query, netdir.as_deref(), circmgr.clone()))
340 .buffer_unordered(parallelism)
341 .collect()
342 .await;
343
344 let mut useful_responses = Vec::new();
345 for r in responses {
346 match r {
348 Ok((request, response)) => {
349 if response.status_code() == 200 {
350 useful_responses.push((request, response));
351 } else {
352 trace!(
353 "cache declined request; reported status {:?}",
354 response.status_code()
355 );
356 }
357 }
358 Err(e) => warn_report!(e, "error while downloading"),
359 }
360 }
361
362 trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
363
364 Ok(useful_responses)
365}
366
367async fn load_once<R: Runtime>(
369 dirmgr: &Arc<DirMgr<R>>,
370 state: &mut Box<dyn DirState>,
371 attempt_id: AttemptId,
372 changed_out: &mut bool,
373) -> Result<()> {
374 let missing = state.missing_docs();
375 let mut changed = false;
376 let outcome: Result<()> = if missing.is_empty() {
377 trace!("Found no missing documents; can't advance current state");
378 Ok(())
379 } else {
380 trace!(
381 "Found {} missing documents; trying to load them",
382 missing.len()
383 );
384
385 load_and_apply_documents(&missing, dirmgr, state, &mut changed)
386 };
387
388 if changed {
392 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
393 *changed_out = true;
394 }
395
396 outcome
397}
398
399#[allow(clippy::cognitive_complexity)] pub(crate) async fn load<R: Runtime>(
405 dirmgr: Arc<DirMgr<R>>,
406 mut state: Box<dyn DirState>,
407 attempt_id: AttemptId,
408) -> Result<Box<dyn DirState>> {
409 let mut safety_counter = 0_usize;
410 loop {
411 trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
412 let mut changed = false;
413 let outcome = load_once(&dirmgr, &mut state, attempt_id, &mut changed).await;
414 {
415 let mut store = dirmgr.store.lock().expect("store lock poisoned");
416 dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
417 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
418 }
419 trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
420
421 if let Err(e) = outcome {
422 match e.bootstrap_action() {
423 BootstrapAction::Nonfatal => {
424 debug!("Recoverable error loading from cache: {}", e);
425 }
426 BootstrapAction::Fatal | BootstrapAction::Reset => {
427 return Err(e);
428 }
429 }
430 }
431
432 if state.can_advance() {
433 state = state.advance();
434 trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
435 safety_counter = 0;
436 } else {
437 if !changed {
438 trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
441 break;
442 }
443 safety_counter += 1;
444 assert!(
445 safety_counter < 100,
446 "Spent 100 iterations in the same state: this is a bug"
447 );
448 }
449 }
450
451 Ok(state)
452}
453
454#[allow(clippy::cognitive_complexity)] async fn download_attempt<R: Runtime>(
461 dirmgr: &Arc<DirMgr<R>>,
462 state: &mut Box<dyn DirState>,
463 parallelism: usize,
464 attempt_id: AttemptId,
465) -> Result<()> {
466 let missing = state.missing_docs();
467 let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
468 let mut n_errors = 0;
469 for (client_req, dir_response) in fetched {
470 let source = dir_response.source().cloned();
471 let text = match String::from_utf8(dir_response.into_output_unchecked())
472 .map_err(Error::BadUtf8FromDirectory)
473 {
474 Ok(t) => t,
475 Err(e) => {
476 if let Some(source) = source {
477 n_errors += 1;
478 note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
479 }
480 continue;
481 }
482 };
483 match dirmgr.expand_response_text(&client_req, text) {
484 Ok(text) => {
485 let doc_source = DocSource::DirServer {
486 source: source.clone(),
487 };
488 let mut changed = false;
489 let outcome = state.add_from_download(
490 &text,
491 &client_req,
492 doc_source,
493 Some(&dirmgr.store),
494 &mut changed,
495 );
496
497 if !changed {
498 debug_assert!(outcome.is_err());
499 }
500
501 if let Some(source) = source {
502 if let Err(e) = &outcome {
503 n_errors += 1;
504 note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
505 } else {
506 note_cache_success(dirmgr.circmgr()?.deref(), &source);
507 }
508 }
509
510 if let Err(e) = &outcome {
511 dirmgr.note_errors(attempt_id, 1);
512 warn_report!(e, "error while adding directory info");
513 }
514 propagate_fatal_errors!(outcome);
515 }
516 Err(e) => {
517 warn_report!(e, "Error when expanding directory text");
518 if let Some(source) = source {
519 n_errors += 1;
520 note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
521 }
522 propagate_fatal_errors!(Err(e));
523 }
524 }
525 }
526 if n_errors != 0 {
527 dirmgr.note_errors(attempt_id, n_errors);
528 }
529 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
530
531 Ok(())
532}
533
534#[allow(clippy::cognitive_complexity)] pub(crate) async fn download<R: Runtime>(
545 dirmgr: Weak<DirMgr<R>>,
546 state: &mut Box<dyn DirState>,
547 schedule: &mut TaskSchedule<R>,
548 attempt_id: AttemptId,
549 on_usable: &mut Option<oneshot::Sender<()>>,
550) -> Result<()> {
551 let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
552
553 trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
554
555 'next_state: loop {
556 let retry_config = state.dl_config();
557 let parallelism = retry_config.parallelism();
558
559 let mut now = {
563 let dirmgr = upgrade_weak_ref(&dirmgr)?;
564 let mut changed = false;
565 trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
566 let load_result = load_once(&dirmgr, state, attempt_id, &mut changed).await;
567 trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
568 if let Err(e) = &load_result {
569 if let Some(source) = e.responsible_cache() {
572 dirmgr.note_errors(attempt_id, 1);
573 note_cache_error(dirmgr.circmgr()?.deref(), source, e);
574 }
575 }
576 propagate_fatal_errors!(load_result);
577 dirmgr.runtime.wallclock()
578 };
579
580 {
583 let dirmgr = upgrade_weak_ref(&dirmgr)?;
584 let mut store = dirmgr.store.lock().expect("store lock poisoned");
585 dirmgr.apply_netdir_changes(state, &mut **store)?;
586 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
587 }
588 if state.can_advance() {
590 advance(state);
591 trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
592 continue 'next_state;
593 }
594 if state.is_ready(Readiness::Complete) {
595 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
596 return Ok(());
597 }
598
599 let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
600
601 let mut retry = retry_config.schedule();
602 let mut delay = None;
603
604 'next_attempt: for attempt in retry_config.attempts() {
608 let next_delay = retry.next_delay(&mut rand::rng());
612 if let Some(delay) = delay.replace(next_delay) {
613 let time_until_reset = {
614 reset_time
615 .duration_since(now)
616 .unwrap_or(Duration::from_secs(0))
617 };
618 let real_delay = delay.min(time_until_reset);
619 debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
620 schedule.sleep(real_delay).await?;
621
622 now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
623 if now >= reset_time {
624 info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
625 reset(state);
626 continue 'next_state;
627 }
628 }
629
630 info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
631 let reset_time = no_more_than_a_week_from(now, state.reset_time());
632
633 now = {
634 let dirmgr = upgrade_weak_ref(&dirmgr)?;
635 futures::select_biased! {
636 outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
637 if let Err(e) = outcome {
638 warn_report!(e, "Error while downloading (attempt {})", attempt_id);
640 propagate_fatal_errors!(Err(e));
641 continue 'next_attempt;
642 } else {
643 trace!(attempt=%attempt_id, "Successfully downloaded some information.");
644 }
645 }
646 _ = schedule.sleep_until_wallclock(reset_time).fuse() => {
647 info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
652 reset(state);
653 continue 'next_state;
654 },
655 };
656 dirmgr.runtime.wallclock()
657 };
658
659 {
662 let dirmgr = upgrade_weak_ref(&dirmgr)?;
663 let mut store = dirmgr.store.lock().expect("store lock poisoned");
664 let outcome = dirmgr.apply_netdir_changes(state, &mut **store);
665 dirmgr.update_progress(attempt_id, state.bootstrap_progress());
666 propagate_fatal_errors!(outcome);
667 }
668
669 if state.is_ready(Readiness::Complete) {
671 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
672 return Ok(());
673 }
674
675 if on_usable.is_some() && state.is_ready(Readiness::Usable) {
677 trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
678 #[allow(clippy::unwrap_used)]
680 let _ = on_usable.take().unwrap().send(());
681 }
682
683 if state.can_advance() {
684 advance(state);
686 trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
687 continue 'next_state;
688 }
689 }
690
691 warn!(n_attempts=retry_config.n_attempts(),
693 state=%state.describe(),
694 "Unable to advance downloading state");
695 return Err(Error::CantAdvanceState);
696 }
697}
698
699fn reset(state: &mut Box<dyn DirState>) {
701 let cur_state = std::mem::replace(state, Box::new(PoisonedState));
702 *state = cur_state.reset();
703}
704
705fn advance(state: &mut Box<dyn DirState>) {
707 let cur_state = std::mem::replace(state, Box::new(PoisonedState));
708 *state = cur_state.advance();
709}
710
711fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
718 let one_week_later = now + Duration::new(86400 * 7, 0);
719 match v {
720 Some(t) => std::cmp::min(t, one_week_later),
721 None => one_week_later,
722 }
723}
724
725#[cfg(test)]
726mod test {
727 #![allow(clippy::bool_assert_comparison)]
729 #![allow(clippy::clone_on_copy)]
730 #![allow(clippy::dbg_macro)]
731 #![allow(clippy::mixed_attributes_style)]
732 #![allow(clippy::print_stderr)]
733 #![allow(clippy::print_stdout)]
734 #![allow(clippy::single_char_pattern)]
735 #![allow(clippy::unwrap_used)]
736 #![allow(clippy::unchecked_duration_subtraction)]
737 #![allow(clippy::useless_vec)]
738 #![allow(clippy::needless_pass_by_value)]
739 use super::*;
741 use crate::storage::DynStore;
742 use crate::test::new_mgr;
743 use crate::DownloadSchedule;
744 use std::sync::Mutex;
745 use tor_netdoc::doc::microdesc::MdDigest;
746 use tor_rtcompat::SleepProvider;
747
748 #[test]
749 fn week() {
750 let now = SystemTime::now();
751 let one_day = Duration::new(86400, 0);
752
753 assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
754 assert_eq!(
755 no_more_than_a_week_from(now, Some(now + one_day)),
756 now + one_day
757 );
758 assert_eq!(
759 no_more_than_a_week_from(now, Some(now - one_day)),
760 now - one_day
761 );
762 assert_eq!(
763 no_more_than_a_week_from(now, Some(now + 30 * one_day)),
764 now + one_day * 7
765 );
766 }
767
768 #[derive(Debug, Clone)]
772 struct DemoState {
773 second_time_around: bool,
774 got_items: HashMap<MdDigest, bool>,
775 }
776
777 const H1: MdDigest = *b"satellite's gone up to the skies";
779 const H2: MdDigest = *b"things like that drive me out of";
780 const H3: MdDigest = *b"my mind i watched it for a littl";
781 const H4: MdDigest = *b"while i like to watch things on ";
782 const H5: MdDigest = *b"TV Satellite of love Satellite--";
783
784 impl DemoState {
785 fn new1() -> Self {
786 DemoState {
787 second_time_around: false,
788 got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
789 }
790 }
791 fn new2() -> Self {
792 DemoState {
793 second_time_around: true,
794 got_items: vec![(H3, false), (H4, false), (H5, false)]
795 .into_iter()
796 .collect(),
797 }
798 }
799 fn n_ready(&self) -> usize {
800 self.got_items.values().filter(|x| **x).count()
801 }
802 }
803
804 impl DirState for DemoState {
805 fn describe(&self) -> String {
806 format!("{:?}", &self)
807 }
808 fn bootstrap_progress(&self) -> crate::event::DirProgress {
809 crate::event::DirProgress::default()
810 }
811 fn is_ready(&self, ready: Readiness) -> bool {
812 match (ready, self.second_time_around) {
813 (_, false) => false,
814 (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
815 (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
816 }
817 }
818 fn can_advance(&self) -> bool {
819 if self.second_time_around {
820 false
821 } else {
822 self.n_ready() == self.got_items.len()
823 }
824 }
825 fn missing_docs(&self) -> Vec<DocId> {
826 self.got_items
827 .iter()
828 .filter_map(|(id, have)| {
829 if *have {
830 None
831 } else {
832 Some(DocId::Microdesc(*id))
833 }
834 })
835 .collect()
836 }
837 fn add_from_cache(
838 &mut self,
839 docs: HashMap<DocId, DocumentText>,
840 changed: &mut bool,
841 ) -> Result<()> {
842 for id in docs.keys() {
843 if let DocId::Microdesc(id) = id {
844 if self.got_items.get(id) == Some(&false) {
845 self.got_items.insert(*id, true);
846 *changed = true;
847 }
848 }
849 }
850 Ok(())
851 }
852 fn add_from_download(
853 &mut self,
854 text: &str,
855 _request: &ClientRequest,
856 _source: DocSource,
857 _storage: Option<&Mutex<DynStore>>,
858 changed: &mut bool,
859 ) -> Result<()> {
860 for token in text.split_ascii_whitespace() {
861 if let Ok(v) = hex::decode(token) {
862 if let Ok(id) = v.try_into() {
863 if self.got_items.get(&id) == Some(&false) {
864 self.got_items.insert(id, true);
865 *changed = true;
866 }
867 }
868 }
869 }
870 Ok(())
871 }
872 fn dl_config(&self) -> DownloadSchedule {
873 DownloadSchedule::default()
874 }
875 fn advance(self: Box<Self>) -> Box<dyn DirState> {
876 if self.can_advance() {
877 Box::new(Self::new2())
878 } else {
879 self
880 }
881 }
882 fn reset_time(&self) -> Option<SystemTime> {
883 None
884 }
885 fn reset(self: Box<Self>) -> Box<dyn DirState> {
886 Box::new(Self::new1())
887 }
888 }
889
890 #[test]
891 fn all_in_cache() {
892 tor_rtcompat::test_with_one_runtime!(|rt| async {
894 let now = rt.wallclock();
895 let (_tempdir, mgr) = new_mgr(rt.clone());
896 let (mut schedule, _handle) = TaskSchedule::new(rt);
897
898 {
899 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
900 for h in [H1, H2, H3, H4, H5] {
901 store.store_microdescs(&[("ignore", &h)], now).unwrap();
902 }
903 }
904 let mgr = Arc::new(mgr);
905 let attempt_id = AttemptId::next();
906
907 let state = Box::new(DemoState::new1());
909 let result = super::load(Arc::clone(&mgr), state, attempt_id)
910 .await
911 .unwrap();
912 assert!(result.is_ready(Readiness::Complete));
913
914 let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
916
917 let mut on_usable = None;
918 super::download(
919 Arc::downgrade(&mgr),
920 &mut state,
921 &mut schedule,
922 attempt_id,
923 &mut on_usable,
924 )
925 .await
926 .unwrap();
927 assert!(state.is_ready(Readiness::Complete));
928 });
929 }
930
931 #[test]
932 fn partly_in_cache() {
933 tor_rtcompat::test_with_one_runtime!(|rt| async {
936 let now = rt.wallclock();
937 let (_tempdir, mgr) = new_mgr(rt.clone());
938 let (mut schedule, _handle) = TaskSchedule::new(rt);
939
940 {
941 let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
942 for h in [H1, H2, H3] {
943 store.store_microdescs(&[("ignore", &h)], now).unwrap();
944 }
945 }
946 {
947 let mut resp = CANNED_RESPONSE.lock().unwrap();
948 *resp = vec![
950 "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
951 545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
952 .to_owned(),
953 ];
954 }
955 let mgr = Arc::new(mgr);
956 let mut on_usable = None;
957 let attempt_id = AttemptId::next();
958
959 let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
960 super::download(
961 Arc::downgrade(&mgr),
962 &mut state,
963 &mut schedule,
964 attempt_id,
965 &mut on_usable,
966 )
967 .await
968 .unwrap();
969 assert!(state.is_ready(Readiness::Complete));
970 });
971 }
972}