1use std::{
8 collections::{BTreeSet, HashMap, VecDeque},
9 sync::{Arc, Mutex, RwLock},
10 task::Waker,
11 time::{Duration, Instant, SystemTime},
12};
13
14use arrayvec::ArrayVec;
15use equix::EquiXBuilder;
16use futures::{SinkExt, StreamExt};
17use futures::{Stream, channel::mpsc};
18use num_traits::FromPrimitive;
19use rand::{CryptoRng, RngCore};
20use serde::{Deserialize, Serialize};
21use thiserror::Error;
22use tor_basic_utils::RngExt as _;
23use tor_cell::relaycell::hs::pow::{ProofOfWork, v1::ProofOfWorkV1};
24use tor_checkable::timed::TimerangeBound;
25use tor_error::warn_report;
26use tor_hscrypto::{
27 pk::HsBlindIdKey,
28 pow::v1::{
29 Effort, Instance, RuntimeOption, Seed, SeedHead, Solution, SolutionErrorV1, Verifier,
30 },
31 time::TimePeriod,
32};
33use tor_keymgr::KeyMgr;
34use tor_netdir::{NetDirProvider, NetdirProviderShutdown, params::NetParameters};
35use tor_netdoc::doc::hsdesc::pow::{PowParams, v1::PowParamsV1};
36use tor_persist::{
37 hsnickname::HsNickname,
38 state_dir::{InstanceRawSubdir, StorageHandle},
39};
40use tor_rtcompat::Runtime;
41use tor_rtcompat::SpawnExt;
42
43use crate::{
44 BlindIdPublicKeySpecifier, OnionServiceConfig, RendRequest, ReplayError, StartupError,
45 rend_handshake,
46 replay::{OpenReplayLogError, PowNonceReplayLog},
47 status::{PowManagerStatusSender, Problem, State as PowManagerState},
48};
49
50use super::NewPowManager;
51
52pub(crate) type PowManager<R> = PowManagerGeneric<R, RendRequest>;
54
55pub(crate) struct PowManagerGeneric<R, Q>(RwLock<State<R, Q>>);
57
58struct State<R, Q> {
60 seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
65
66 verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
68
69 nickname: HsNickname,
73
74 instance_dir: InstanceRawSubdir,
76
77 keymgr: Arc<KeyMgr>,
79
80 suggested_effort: Arc<Mutex<Effort>>,
84
85 runtime: R,
87
88 storage_handle: StorageHandle<PowManagerStateRecord>,
90
91 publisher_update_tx: mpsc::Sender<TimePeriod>,
94
95 rend_request_rx: RendRequestReceiver<R, Q>,
99
100 netdir_provider: Arc<dyn NetDirProvider>,
102
103 status_tx: PowManagerStatusSender,
105
106 config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
108}
109
110#[derive(Serialize, Deserialize, Debug, Clone)]
111struct SeedsForTimePeriod {
113 seeds: ArrayVec<Seed, 2>,
117
118 next_expiration_time: SystemTime,
120}
121
122#[derive(Debug)]
123#[allow(unused)]
124pub(crate) enum PowSolveError {
129 InvalidSeedHead,
131 NonceReplay(ReplayError),
133 InvalidEquixSolution(SolutionErrorV1),
135 InvalidSolve(tor_hscrypto::pow::Error),
137}
138
139#[derive(Serialize, Deserialize, Debug, Default)]
141pub(crate) struct PowManagerStateRecord {
142 seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
149
150 #[serde(default)]
152 suggested_effort: Effort,
153 }
161
162impl<R: Runtime, Q> State<R, Q> {
163 pub(crate) fn to_record(&self) -> PowManagerStateRecord {
165 PowManagerStateRecord {
166 seeds: self.seeds.clone().into_iter().collect(),
167 suggested_effort: *self.suggested_effort.lock().expect("Lock poisoned"),
168 }
169 }
170}
171
172const HS_UPDATE_PERIOD: Duration = Duration::from_secs(300);
174
175const SUGGESTED_EFFORT_DEADZONE: f64 = 0.15;
181
182const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
184
185const EXPIRATION_TIME_MINS_MIN: u64 = 105;
188
189const EXPIRATION_TIME_MINS_MAX: u64 = 120;
192
193const _: () = assert!(
195 SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
196 "Early rotation time must be less than minimum expiration time"
197);
198
199const _: () = assert!(
201 EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
202 "Minimum expiration time must be less than or equal to max"
203);
204
205const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
209
210#[derive(Error, Debug, Clone)]
211#[allow(dead_code)] #[non_exhaustive]
213pub enum PowError {
215 #[error("Missing required key.")]
217 MissingKey,
218 #[error(transparent)]
220 StorageError(#[from] tor_persist::Error),
221 #[error(transparent)]
223 OpenReplayLog(#[from] OpenReplayLogError),
224 #[error(transparent)]
226 NetdirProviderShutdown(#[from] NetdirProviderShutdown),
227}
228
229impl<R: Runtime, Q: MockableRendRequest + Send + 'static> PowManagerGeneric<R, Q> {
230 #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
232 pub(crate) fn new(
233 runtime: R,
234 nickname: HsNickname,
235 instance_dir: InstanceRawSubdir,
236 keymgr: Arc<KeyMgr>,
237 storage_handle: StorageHandle<PowManagerStateRecord>,
238 netdir_provider: Arc<dyn NetDirProvider>,
239 status_tx: PowManagerStatusSender,
240 config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
241 ) -> Result<NewPowManager<R>, StartupError> {
242 let on_disk_state = storage_handle
243 .load()
244 .map_err(StartupError::LoadState)?
245 .unwrap_or(PowManagerStateRecord::default());
246
247 let seeds: HashMap<TimePeriod, SeedsForTimePeriod> =
248 on_disk_state.seeds.into_iter().collect();
249 let suggested_effort = Arc::new(Mutex::new(on_disk_state.suggested_effort));
250
251 let mut verifiers = HashMap::new();
252 for (tp, seeds_for_tp) in seeds.clone().into_iter() {
253 for seed in seeds_for_tp.seeds {
254 let verifier = match Self::make_verifier(
255 &keymgr,
256 nickname.clone(),
257 tp,
258 seed.clone(),
259 &config_rx.borrow(),
260 ) {
261 Some(verifier) => verifier,
262 None => {
263 tracing::warn!(
264 "Couldn't construct verifier (key not available?). We will continue without this key, but this may prevent clients from connecting..."
265 );
266 continue;
267 }
268 };
269 let replay_log = match PowNonceReplayLog::new_logged(&instance_dir, &seed) {
270 Ok(replay_log) => replay_log,
271 Err(err) => {
272 warn_report!(
273 err,
274 "Error constructing replay log. We will continue without the log, but be aware that this may allow attackers to bypass PoW defenses..."
275 );
276 continue;
277 }
278 };
279 verifiers.insert(seed.head(), (verifier, Mutex::new(replay_log)));
280 }
281 }
282
283 let (publisher_update_tx, publisher_update_rx) =
286 crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
287
288 let (rend_req_tx, rend_req_rx_channel) = super::make_rend_queue();
289 let rend_req_rx = RendRequestReceiver::new(
290 runtime.clone(),
291 nickname.clone(),
292 suggested_effort.clone(),
293 netdir_provider.clone(),
294 status_tx.clone(),
295 config_rx.clone(),
296 );
297
298 let state = State {
299 seeds,
300 nickname,
301 instance_dir,
302 keymgr,
303 publisher_update_tx,
304 verifiers,
305 suggested_effort: suggested_effort.clone(),
306 runtime: runtime.clone(),
307 storage_handle,
308 rend_request_rx: rend_req_rx.clone(),
309 netdir_provider,
310 status_tx,
311 config_rx,
312 };
313 let pow_manager = Arc::new(PowManagerGeneric(RwLock::new(state)));
314
315 rend_req_rx.start_accept_thread(runtime, pow_manager.clone(), rend_req_rx_channel);
316
317 Ok(NewPowManager {
318 pow_manager,
319 rend_req_tx,
320 rend_req_rx: Box::pin(rend_req_rx),
321 publisher_update_rx,
322 })
323 }
324
325 pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
327 let pow_manager = self.clone();
328 let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
329
330 runtime
331 .spawn(pow_manager.main_loop_error_wrapper())
332 .map_err(|cause| StartupError::Spawn {
333 spawning: "pow manager",
334 cause: cause.into(),
335 })?;
336
337 self.0
338 .write()
339 .expect("Lock poisoned")
340 .status_tx
341 .send(PowManagerState::Running, None);
342 Ok(())
343 }
344
345 async fn main_loop_error_wrapper(self: Arc<Self>) {
347 if let Err(err) = self.clone().main_loop_task().await {
348 self.0
349 .write()
350 .expect("Lock poisoned")
351 .status_tx
352 .send_broken(Problem::Pow(err));
353 }
354 }
355
356 async fn main_loop_task(self: Arc<Self>) -> Result<(), PowError> {
358 let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
359
360 let mut last_suggested_effort_update = runtime.now();
361 let mut last_published_suggested_effort: u32 = (*self
362 .0
363 .read()
364 .expect("Lock poisoned")
365 .suggested_effort
366 .lock()
367 .expect("Lock poisoned"))
368 .into();
369
370 let netdir_provider = self
371 .0
372 .read()
373 .expect("Lock poisoned")
374 .netdir_provider
375 .clone();
376 let net_params = netdir_provider
377 .wait_for_netdir(tor_netdir::Timeliness::Timely)
378 .await?
379 .params()
380 .clone();
381
382 loop {
383 let next_update_time = self.rotate_seeds_if_expiring().await;
384
385 if runtime.now() - last_suggested_effort_update >= HS_UPDATE_PERIOD {
387 let (tps_to_update, mut publisher_update_tx) = {
388 let mut tps_to_update = vec![];
389
390 let inner = self.0.read().expect("Lock poisoned");
391
392 inner.rend_request_rx.update_suggested_effort(&net_params);
393 last_suggested_effort_update = runtime.now();
394 let new_suggested_effort: u32 =
395 (*inner.suggested_effort.lock().expect("Lock poisoned")).into();
396
397 let percent_change =
398 f64::from(new_suggested_effort - last_published_suggested_effort)
399 / f64::from(last_published_suggested_effort);
400 if percent_change.abs() >= SUGGESTED_EFFORT_DEADZONE {
401 last_published_suggested_effort = new_suggested_effort;
402
403 tps_to_update = inner.seeds.iter().map(|x| *x.0).collect();
404 }
405
406 let publisher_update_tx = inner.publisher_update_tx.clone();
407 (tps_to_update, publisher_update_tx)
408 };
409
410 for time_period in tps_to_update {
411 let _ = publisher_update_tx.send(time_period).await;
412 }
413 }
414
415 let suggested_effort_update_delay = HS_UPDATE_PERIOD.saturating_sub(
416 runtime
417 .now()
418 .saturating_duration_since(last_suggested_effort_update),
419 );
420
421 const MAX_DELAY: Duration = Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60)
426 .checked_sub(SEED_EARLY_ROTATION_TIME)
427 .expect("SEED_EARLY_ROTATION_TIME too high, or EXPIRATION_TIME_MINS_MIN too low.");
428 let delay = next_update_time
429 .map(|x| x.duration_since(SystemTime::now()).unwrap_or(MAX_DELAY))
430 .unwrap_or(MAX_DELAY)
431 .min(MAX_DELAY)
432 .min(suggested_effort_update_delay);
433
434 tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
435
436 runtime.sleep(delay).await;
437 }
438 }
439
440 fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
442 SystemTime::now()
443 + Duration::from_secs(
444 60 * rng
445 .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
446 .expect("Can't generate expiration_time"),
447 )
448 }
449
450 fn make_verifier(
457 keymgr: &Arc<KeyMgr>,
458 nickname: HsNickname,
459 time_period: TimePeriod,
460 seed: Seed,
461 config: &OnionServiceConfig,
462 ) -> Option<Verifier> {
463 let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
464 let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
465 Ok(blind_id_key) => blind_id_key,
466 Err(err) => {
467 warn_report!(err, "KeyMgr error when getting blinded ID key for PoW");
468 None
469 }
470 };
471 let instance = Instance::new(blind_id_key?.id(), seed);
472 let mut equix = EquiXBuilder::default();
473 if *config.disable_pow_compilation() {
474 equix.runtime(RuntimeOption::InterpretOnly);
475 }
476 Some(Verifier::new_with_equix(instance, equix))
477 }
478
479 fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
482 expiration_time
493 .checked_sub(SEED_EARLY_ROTATION_TIME)
494 .expect("PoW seed expiration underflow")
495 }
496
497 #[allow(clippy::cognitive_complexity)]
503 async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
504 let mut expired_verifiers = vec![];
505 let mut new_verifiers = vec![];
506
507 let mut update_times = vec![];
508 let mut updated_tps = vec![];
509 let mut expired_tps = vec![];
510
511 let mut publisher_update_tx = {
512 let mut state = self.0.write().expect("Lock poisoned");
513
514 let config = state.config_rx.borrow().clone();
515 let keymgr = state.keymgr.clone();
516 let nickname = state.nickname.clone();
517
518 for (time_period, info) in state.seeds.iter_mut() {
519 let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
520 update_times.push(rotation_time);
521
522 if rotation_time <= SystemTime::now() {
523 let mut rng = rand::rng();
528
529 let seed = Seed::new(&mut rng, None);
530 let verifier = match Self::make_verifier(
531 &keymgr,
532 nickname.clone(),
533 *time_period,
534 seed.clone(),
535 &config,
536 ) {
537 Some(verifier) => verifier,
538 None => {
539 expired_tps.push(*time_period);
542 continue;
543 }
544 };
545
546 let expired_seed = if info.seeds.is_full() {
547 info.seeds.pop_at(0)
548 } else {
549 None
550 };
551 info.seeds.push(seed.clone());
553 info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
554 update_times.push(info.next_expiration_time);
555
556 new_verifiers.push((seed, verifier));
558 if let Some(expired_seed) = expired_seed {
559 expired_verifiers.push(expired_seed.head());
560 }
561
562 updated_tps.push(*time_period);
564
565 tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
566 }
567 }
568
569 for time_period in expired_tps {
570 if let Some(seeds) = state.seeds.remove(&time_period) {
571 for seed in seeds.seeds {
572 state.verifiers.remove(&seed.head());
573 }
574 }
575 }
576
577 for (seed, verifier) in new_verifiers {
578 let replay_log = Mutex::new(
579 PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
580 .expect("Couldn't make ReplayLog."),
581 );
582 state.verifiers.insert(seed.head(), (verifier, replay_log));
583 }
584
585 for seed_head in expired_verifiers {
586 state.verifiers.remove(&seed_head);
587 }
588
589 let record = state.to_record();
590 if let Err(err) = state.storage_handle.store(&record) {
591 warn_report!(err, "Error saving PoW state");
592 }
593
594 state.publisher_update_tx.clone()
595 };
596
597 for time_period in updated_tps {
598 if let Err(err) = publisher_update_tx.send(time_period).await {
599 warn_report!(err, "Couldn't send update message to publisher");
600 }
601 }
602
603 update_times.iter().min().cloned()
604 }
605
606 pub(crate) fn get_pow_params<Rng: RngCore + CryptoRng>(
611 self: &Arc<Self>,
612 time_period: TimePeriod,
613 rng: &mut Rng,
614 ) -> Result<PowParams, PowError> {
615 let (seed_and_expiration, suggested_effort) = {
616 let state = self.0.read().expect("Lock poisoned");
617 let seed = state
618 .seeds
619 .get(&time_period)
620 .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
621 let suggested_effort = *state.suggested_effort.lock().expect("Lock poisoned");
622 (seed, suggested_effort)
623 };
624
625 let (seed, expiration) = match seed_and_expiration {
626 Some(seed) => seed,
627 None => {
628 let seed = Seed::new(rng, None);
631 let next_expiration_time = Self::make_next_expiration_time(rng);
632
633 let mut seeds = ArrayVec::new();
634 seeds.push(seed.clone());
635
636 let mut state = self.0.write().expect("Lock poisoned");
637
638 state.seeds.insert(
639 time_period,
640 SeedsForTimePeriod {
641 seeds,
642 next_expiration_time,
643 },
644 );
645
646 let verifier = Self::make_verifier(
647 &state.keymgr,
648 state.nickname.clone(),
649 time_period,
650 seed.clone(),
651 &state.config_rx.borrow(),
652 )
653 .ok_or(PowError::MissingKey)?;
654
655 let replay_log =
656 Mutex::new(PowNonceReplayLog::new_logged(&state.instance_dir, &seed)?);
657 state.verifiers.insert(seed.head(), (verifier, replay_log));
658
659 let record = state.to_record();
660 state.storage_handle.store(&record)?;
661
662 (seed, next_expiration_time)
663 }
664 };
665
666 Ok(PowParams::V1(PowParamsV1::new(
667 TimerangeBound::new(seed, ..expiration),
668 suggested_effort,
669 )))
670 }
671
672 fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
674 {
678 let state = self.0.write().expect("Lock poisoned");
679 let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
680 Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
681 None => return Err(PowSolveError::InvalidSeedHead),
682 };
683 replay_log
684 .check_for_replay(solve.nonce())
685 .map_err(PowSolveError::NonceReplay)?;
686 }
687
688 let state = self.0.read().expect("Lock poisoned");
691 let verifier = match state.verifiers.get(&solve.seed_head()) {
692 Some((verifier, _)) => verifier,
693 None => return Err(PowSolveError::InvalidSeedHead),
694 };
695
696 let solution = match Solution::try_from_bytes(
697 solve.nonce().clone(),
698 solve.effort(),
699 solve.seed_head(),
700 solve.solution(),
701 ) {
702 Ok(solution) => solution,
703 Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
704 };
705
706 match verifier.check(&solution) {
707 Ok(()) => Ok(()),
708 Err(err) => Err(PowSolveError::InvalidSolve(err)),
709 }
710 }
711}
712
713trait MockablePowManager {
715 fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError>;
717}
718
719impl<R: Runtime> MockablePowManager for PowManager<R> {
720 fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
721 PowManager::check_solve(self, solve)
722 }
723}
724
725pub(crate) trait MockableRendRequest {
727 fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError>;
729}
730
731impl MockableRendRequest for RendRequest {
732 fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError> {
733 Ok(self
734 .intro_request()?
735 .intro_payload()
736 .proof_of_work_extension())
737 }
738}
739
740#[derive(Debug)]
742struct RendRequestOrdByEffort<Q> {
743 request: Q,
745 pow: Option<ProofOfWorkV1>,
747 max_effort: Effort,
750 recv_time: Instant,
752 request_num: u64,
759}
760
761impl<Q: MockableRendRequest> RendRequestOrdByEffort<Q> {
762 fn new(
764 request: Q,
765 max_effort: Effort,
766 request_num: u64,
767 ) -> Result<Self, rend_handshake::IntroRequestError> {
768 let pow = match request.proof_of_work()?.cloned() {
769 Some(ProofOfWork::V1(pow)) => Some(pow),
770 None | Some(_) => None,
771 };
772
773 Ok(Self {
774 request,
775 pow,
776 max_effort,
777 recv_time: Instant::now(),
778 request_num,
779 })
780 }
781}
782
783impl<Q: MockableRendRequest> Ord for RendRequestOrdByEffort<Q> {
784 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
785 let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| {
786 Effort::min(pow.effort(), self.max_effort)
787 });
788 let other_effort = other.pow.as_ref().map_or(Effort::zero(), |pow| {
789 Effort::min(pow.effort(), other.max_effort)
790 });
791 match self_effort.cmp(&other_effort) {
792 std::cmp::Ordering::Equal => {
793 match other.recv_time.cmp(&self.recv_time) {
795 std::cmp::Ordering::Equal => other.request_num.cmp(&self.request_num),
798 not_equal => not_equal,
799 }
800 }
801 not_equal => not_equal,
802 }
803 }
804}
805
806impl<Q: MockableRendRequest> PartialOrd for RendRequestOrdByEffort<Q> {
807 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
808 Some(self.cmp(other))
809 }
810}
811
812impl<Q: MockableRendRequest> PartialEq for RendRequestOrdByEffort<Q> {
813 fn eq(&self, other: &Self) -> bool {
814 let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| {
815 Effort::min(pow.effort(), self.max_effort)
816 });
817 let other_effort = other.pow.as_ref().map_or(Effort::zero(), |pow| {
818 Effort::min(pow.effort(), other.max_effort)
819 });
820 self_effort == other_effort && self.recv_time == other.recv_time
821 }
822}
823
824impl<Q: MockableRendRequest> Eq for RendRequestOrdByEffort<Q> {}
825
826pub(crate) struct RendRequestReceiver<R, Q>(Arc<Mutex<RendRequestReceiverInner<R, Q>>>);
838
839impl<R, Q> Clone for RendRequestReceiver<R, Q> {
840 fn clone(&self) -> Self {
841 Self(self.0.clone())
842 }
843}
844
845struct RendRequestReceiverInner<R, Q> {
847 queue: BTreeSet<RendRequestOrdByEffort<Q>>,
849
850 queue_pow_disabled: VecDeque<Q>,
855
856 waker: Option<Waker>,
858
859 runtime: R,
861
862 nickname: HsNickname,
864
865 netdir_provider: Arc<dyn NetDirProvider>,
867
868 config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
870
871 update_period_start: Instant,
873 num_enqueued_gte_suggested: usize,
876 num_dequeued: u32,
878 idle_time: Duration,
881 last_transition: Instant,
884 total_effort: u64,
886
887 suggested_effort: Arc<Mutex<Effort>>,
891
892 status_tx: PowManagerStatusSender,
894}
895
896impl<R: Runtime, Q: MockableRendRequest + Send + 'static> RendRequestReceiver<R, Q> {
897 fn new(
899 runtime: R,
900 nickname: HsNickname,
901 suggested_effort: Arc<Mutex<Effort>>,
902 netdir_provider: Arc<dyn NetDirProvider>,
903 status_tx: PowManagerStatusSender,
904 config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
905 ) -> Self {
906 let now = runtime.now();
907 RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
908 queue: BTreeSet::new(),
909 queue_pow_disabled: VecDeque::new(),
910 waker: None,
911 runtime,
912 nickname,
913 netdir_provider,
914 config_rx,
915 update_period_start: now,
916 num_enqueued_gte_suggested: 0,
917 num_dequeued: 0,
918 idle_time: Duration::new(0, 0),
919 last_transition: now,
920 total_effort: 0,
921 suggested_effort,
922 status_tx,
923 })))
924 }
925
926 #[allow(clippy::let_underscore_future)]
929 fn start_accept_thread<P: MockablePowManager + Send + Sync + 'static>(
931 &self,
932 runtime: R,
933 pow_manager: Arc<P>,
934 inner_receiver: mpsc::Receiver<Q>,
935 ) {
936 let receiver = self.clone();
937 let runtime_clone = runtime.clone();
938 let _ = runtime.clone().spawn_blocking(move || {
939 if let Err(err) =
940 receiver
941 .clone()
942 .accept_loop(&runtime_clone, &pow_manager, inner_receiver)
943 {
944 warn_report!(err, "PoW accept loop error!");
945 receiver
946 .0
947 .lock()
948 .expect("Lock poisoned")
949 .status_tx
950 .send_broken(Problem::Pow(err));
951 }
952 });
953
954 let receiver = self.clone();
955 let _ = runtime.clone().spawn_blocking(move || {
956 if let Err(err) = receiver.clone().expire_old_requests_loop(&runtime) {
957 warn_report!(err, "PoW request expiration loop error!");
958 receiver
959 .0
960 .lock()
961 .expect("Lock poisoned")
962 .status_tx
963 .send_broken(Problem::Pow(err));
964 }
965 });
966 }
967
968 fn update_suggested_effort(&self, net_params: &NetParameters) {
970 let mut inner = self.0.lock().expect("Lock poisoned");
971
972 let decay_adjustment_fraction = net_params.hs_pow_v1_default_decay_adjustment.as_fraction();
973
974 if inner.num_dequeued != 0 {
975 let update_period_duration = inner.runtime.now() - inner.update_period_start;
976 let avg_request_duration = update_period_duration / inner.num_dequeued;
977 if inner.queue.is_empty() {
978 let now = inner.runtime.now();
979 let last_transition = inner.last_transition;
980 inner.idle_time += now - last_transition;
981 }
982 let adjusted_idle_time = Duration::saturating_sub(
983 inner.idle_time,
984 avg_request_duration * inner.queue.len().try_into().expect("Queue too large."),
985 );
986 let idle_fraction = f64::from_u128(adjusted_idle_time.as_millis())
988 .expect("Conversion error")
989 / f64::from_u128(update_period_duration.as_millis()).expect("Conversion error");
990 let busy_fraction = 1.0 - idle_fraction;
991
992 let mut suggested_effort = inner.suggested_effort.lock().expect("Lock poisoned");
993 let suggested_effort_inner: u32 = (*suggested_effort).into();
994
995 if busy_fraction == 0.0 {
996 let new_suggested_effort =
997 u32::from_f64(f64::from(suggested_effort_inner) * decay_adjustment_fraction)
998 .expect("Conversion error");
999 *suggested_effort = Effort::from(new_suggested_effort);
1000 } else {
1001 let theoretical_num_dequeued =
1002 f64::from(inner.num_dequeued) * (1.0 / busy_fraction);
1003 let num_enqueued_gte_suggested_f64 =
1004 f64::from_usize(inner.num_enqueued_gte_suggested).expect("Conversion error");
1005
1006 if num_enqueued_gte_suggested_f64 >= theoretical_num_dequeued {
1007 let effort_per_dequeued = u32::from_f64(
1008 f64::from_u64(inner.total_effort).expect("Conversion error")
1009 / f64::from(inner.num_dequeued),
1010 )
1011 .expect("Conversion error");
1012 *suggested_effort = Effort::from(std::cmp::max(
1013 effort_per_dequeued,
1014 suggested_effort_inner + 1,
1015 ));
1016 } else {
1017 let decay = num_enqueued_gte_suggested_f64 / theoretical_num_dequeued;
1018 let adjusted_decay = decay + ((1.0 - decay) * decay_adjustment_fraction);
1019 let new_suggested_effort =
1020 u32::from_f64(f64::from(suggested_effort_inner) * adjusted_decay)
1021 .expect("Conversion error");
1022 *suggested_effort = Effort::from(new_suggested_effort);
1023 }
1024 }
1025
1026 drop(suggested_effort);
1027 }
1028
1029 let now = inner.runtime.now();
1030
1031 inner.update_period_start = now;
1032 inner.num_enqueued_gte_suggested = 0;
1033 inner.num_dequeued = 0;
1034 inner.idle_time = Duration::new(0, 0);
1035 inner.last_transition = now;
1036 inner.total_effort = 0;
1037 }
1038
1039 #[allow(clippy::cognitive_complexity)]
1042 fn accept_loop<P: MockablePowManager>(
1043 self,
1044 runtime: &R,
1045 pow_manager: &Arc<P>,
1046 mut receiver: mpsc::Receiver<Q>,
1047 ) -> Result<(), PowError> {
1048 let mut request_num = 0;
1049
1050 let netdir_provider = self
1051 .0
1052 .lock()
1053 .expect("Lock poisoned")
1054 .netdir_provider
1055 .clone();
1056 let net_params = runtime
1057 .reenter_block_on(netdir_provider.wait_for_netdir(tor_netdir::Timeliness::Timely))?
1058 .params()
1059 .clone();
1060
1061 let max_effort: u32 = net_params
1062 .hs_pow_v1_max_effort
1063 .get()
1064 .try_into()
1065 .expect("Bounded i32 not in range of u32?!");
1066 let max_effort = Effort::from(max_effort);
1067
1068 let config_rx = self.0.lock().expect("Lock poisoned").config_rx.clone();
1069
1070 let nickname = self.0.lock().expect("Lock poisoned").nickname.to_string();
1071
1072 cfg_if::cfg_if! {
1073 if #[cfg(feature = "metrics")] {
1074 let counter_rendrequest_error_total = metrics::counter!("arti_hss_pow_rendrequest_error_total", "nickname" => nickname.clone());
1075 let counter_rendrequest_verification_failure = metrics::counter!("arti_hss_pow_rendrequest_verification_failure_total", "nickname" => nickname.clone());
1076 let counter_rend_queue_overflow = metrics::counter!("arti_hss_pow_rend_queue_overflow_total", "nickname" => nickname.clone());
1077 let counter_rendrequest_enqueued = metrics::counter!("arti_hss_pow_rendrequest_enqueued_total", "nickname" => nickname.clone());
1078 let histogram_rendrequest_effort = metrics::histogram!("arti_hss_pow_rendrequest_effort_hist", "nickname" => nickname.clone());
1079 }
1080 }
1081
1082 loop {
1083 let rend_request = if let Some(rend_request) = runtime.reenter_block_on(receiver.next())
1084 {
1085 rend_request
1086 } else {
1087 self.0
1088 .lock()
1089 .expect("Lock poisoned")
1090 .status_tx
1091 .send_shutdown();
1092 return Ok(());
1093 };
1094
1095 if config_rx.borrow().enable_pow {
1096 let rend_request =
1097 match RendRequestOrdByEffort::new(rend_request, max_effort, request_num) {
1098 Ok(rend_request) => rend_request,
1099 Err(err) => {
1100 #[cfg(feature = "metrics")]
1101 counter_rendrequest_error_total.increment(1);
1102 tracing::trace!(?err, "Error processing RendRequest");
1103 continue;
1104 }
1105 };
1106
1107 request_num = request_num.wrapping_add(1);
1108
1109 if let Some(ref pow) = rend_request.pow {
1110 if let Err(err) = pow_manager.check_solve(pow) {
1111 tracing::debug!(?err, "PoW verification failed");
1112 #[cfg(feature = "metrics")]
1113 counter_rendrequest_verification_failure.increment(1);
1114 continue;
1115 } else {
1116 #[cfg(feature = "metrics")]
1117 {
1118 let effort: u32 = pow.effort().into();
1119 histogram_rendrequest_effort.record(effort);
1120 }
1121 }
1122 }
1123
1124 let mut inner = self.0.lock().expect("Lock poisoned");
1125 if inner.queue.is_empty() {
1126 let now = runtime.now();
1127 let last_transition = inner.last_transition;
1128 inner.idle_time += now - last_transition;
1129 inner.last_transition = now;
1130 }
1131 if let Some(ref request_pow) = rend_request.pow {
1132 if request_pow.effort()
1133 >= *inner.suggested_effort.lock().expect("Lock poisoned")
1134 {
1135 inner.num_enqueued_gte_suggested += 1;
1136 let effort: u32 = request_pow.effort().into();
1137 if let Some(total_effort) = inner.total_effort.checked_add(effort.into()) {
1138 inner.total_effort = total_effort;
1139 } else {
1140 tracing::warn!(
1141 "PoW total_effort would overflow. The total effort has been capped, but this is not expected to happen - please file a bug report with logs and information about the circumstances under which this occured."
1142 );
1143 inner.total_effort = u64::MAX;
1144 }
1145 }
1146 }
1147 if inner.queue.len() >= config_rx.borrow().pow_rend_queue_depth {
1148 let dropped_request = inner.queue.pop_first();
1149 #[cfg(feature = "metrics")]
1150 counter_rend_queue_overflow.increment(1);
1151 tracing::debug!(
1152 dropped_effort = ?dropped_request.map(|x| x.pow.map(|x| x.effort())),
1153 "RendRequest queue full, dropping request."
1154 );
1155 }
1156 inner.queue.insert(rend_request);
1157 #[cfg(feature = "metrics")]
1158 counter_rendrequest_enqueued.increment(1);
1159 if let Some(waker) = &inner.waker {
1160 waker.wake_by_ref();
1161 }
1162 } else {
1163 let mut inner = self.0.lock().expect("Lock poisoned");
1167 inner.queue_pow_disabled.push_back(rend_request);
1168 #[cfg(feature = "metrics")]
1169 counter_rendrequest_enqueued.increment(1);
1170 if let Some(waker) = &inner.waker {
1171 waker.wake_by_ref();
1172 }
1173 }
1174 }
1175 }
1176
1177 fn expire_old_requests_loop(self, runtime: &R) -> Result<(), PowError> {
1179 let netdir_provider = self
1180 .0
1181 .lock()
1182 .expect("Lock poisoned")
1183 .netdir_provider
1184 .clone();
1185 let net_params = runtime
1186 .reenter_block_on(netdir_provider.wait_for_netdir(tor_netdir::Timeliness::Timely))?
1187 .params()
1188 .clone();
1189
1190 let max_age: Duration = net_params
1191 .hs_pow_v1_service_intro_timeout
1192 .try_into()
1193 .expect(
1194 "Couldn't convert HiddenServiceProofOfWorkV1ServiceIntroTimeoutSeconds to Duration",
1195 );
1196
1197 let nickname = self.0.lock().expect("Lock poisoned").nickname.to_string();
1198 #[cfg(feature = "metrics")]
1199 let counter_rendrequest_expired = metrics::counter!("arti_hss_pow_rendrequest_expired_total", "nickname" => nickname.clone());
1200
1201 loop {
1202 let inner = self.0.lock().expect("Lock poisoned");
1203 let wait_time = inner
1206 .queue
1207 .first()
1208 .map(|r| {
1209 max_age.saturating_sub(runtime.now().saturating_duration_since(r.recv_time))
1210 })
1211 .unwrap_or(max_age);
1212 drop(inner);
1213
1214 runtime.reenter_block_on(runtime.sleep(wait_time));
1215
1216 let mut inner = self.0.lock().expect("Lock poisoned");
1217 let now = runtime.now();
1218 let prev_len = inner.queue.len();
1219 inner.queue.retain(|r| now - r.recv_time < max_age);
1220 let dropped = prev_len - inner.queue.len();
1221 tracing::trace!(dropped, "Expired timed out RendRequests");
1222 #[cfg(feature = "metrics")]
1223 counter_rendrequest_expired
1224 .increment(dropped.try_into().expect("usize overflowed u64!"));
1225 }
1226 }
1227}
1228
1229impl<R: Runtime, Q: MockableRendRequest> Stream for RendRequestReceiver<R, Q> {
1230 type Item = Q;
1231
1232 fn poll_next(
1233 self: std::pin::Pin<&mut Self>,
1234 cx: &mut std::task::Context<'_>,
1235 ) -> std::task::Poll<Option<Self::Item>> {
1236 let mut inner = self.get_mut().0.lock().expect("Lock poisoned");
1237 if inner.config_rx.borrow().enable_pow {
1238 match inner.queue.pop_last() {
1239 Some(item) => {
1240 inner.num_dequeued += 1;
1241 if inner.queue.is_empty() {
1242 inner.last_transition = inner.runtime.now();
1243 }
1244 std::task::Poll::Ready(Some(item.request))
1245 }
1246 None => {
1247 inner.waker = Some(cx.waker().clone());
1248 std::task::Poll::Pending
1249 }
1250 }
1251 } else if let Some(request) = inner.queue_pow_disabled.pop_front() {
1252 std::task::Poll::Ready(Some(request))
1255 } else {
1256 inner.waker = Some(cx.waker().clone());
1257 std::task::Poll::Pending
1258 }
1259 }
1260}
1261
1262#[cfg(test)]
1263mod test {
1264 #![allow(clippy::unwrap_used)]
1265 use crate::config::OnionServiceConfigBuilder;
1266 use crate::status::{OnionServiceStatus, StatusSender};
1267
1268 use super::*;
1269 use futures::FutureExt;
1270 use tor_hscrypto::pow::v1::{Nonce, SolutionByteArray};
1271 use tor_netdir::{testnet, testprovider::TestNetDirProvider};
1272 use tor_rtmock::MockRuntime;
1273
1274 struct MockPowManager;
1275
1276 #[derive(Debug)]
1277 struct MockRendRequest {
1278 id: usize,
1279 pow: Option<ProofOfWork>,
1280 }
1281
1282 impl MockablePowManager for MockPowManager {
1283 fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
1284 if solve.solution() == &[0; 16] {
1286 Ok(())
1287 } else {
1288 Err(PowSolveError::InvalidSeedHead)
1289 }
1290 }
1291 }
1292
1293 impl MockableRendRequest for MockRendRequest {
1294 fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError> {
1295 Ok(self.pow.as_ref())
1296 }
1297 }
1298
1299 fn make_req(id: usize, effort: Option<u32>) -> MockRendRequest {
1300 MockRendRequest {
1301 id,
1302 pow: effort.map(|e| {
1303 ProofOfWork::V1(ProofOfWorkV1::new(
1304 Nonce::from([0; 16]),
1305 Effort::from(e),
1306 SeedHead::from([0; 4]),
1307 SolutionByteArray::from([0; 16]),
1308 ))
1309 }),
1310 }
1311 }
1312
1313 fn make_req_invalid(id: usize, effort: u32) -> MockRendRequest {
1314 MockRendRequest {
1315 id,
1316 pow: Some(ProofOfWork::V1(ProofOfWorkV1::new(
1317 Nonce::from([0; 16]),
1318 Effort::from(effort),
1319 SeedHead::from([0; 4]),
1320 SolutionByteArray::from([1; 16]),
1321 ))),
1322 }
1323 }
1324
1325 #[allow(clippy::type_complexity)]
1326 fn make_test_receiver(
1327 runtime: &MockRuntime,
1328 netdir_params: Vec<(String, i32)>,
1329 config: Option<OnionServiceConfig>,
1330 ) -> (
1331 RendRequestReceiver<MockRuntime, MockRendRequest>,
1332 mpsc::Sender<MockRendRequest>,
1333 Arc<Mutex<Effort>>,
1334 NetParameters,
1335 postage::watch::Sender<Arc<OnionServiceConfig>>,
1336 ) {
1337 let pow_manager = Arc::new(MockPowManager);
1338 let suggested_effort = Arc::new(Mutex::new(Effort::zero()));
1339 let netdir = testnet::construct_custom_netdir_with_params(
1340 testnet::simple_net_func,
1341 netdir_params,
1342 None,
1343 )
1344 .unwrap()
1345 .unwrap_if_sufficient()
1346 .unwrap();
1347 let net_params = netdir.params().clone();
1348 let netdir_provider: Arc<TestNetDirProvider> = Arc::new(netdir.into());
1349 let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
1350 let nickname = HsNickname::new("test-hs".to_string()).unwrap();
1351 let (config_tx, config_rx) = postage::watch::channel_with(Arc::new(
1352 config.unwrap_or(
1353 OnionServiceConfigBuilder::default()
1354 .nickname(nickname.clone())
1355 .enable_pow(true)
1356 .build()
1357 .unwrap(),
1358 ),
1359 ));
1360 let receiver: RendRequestReceiver<_, MockRendRequest> = RendRequestReceiver::new(
1361 runtime.clone(),
1362 nickname.clone(),
1363 suggested_effort.clone(),
1364 netdir_provider,
1365 status_tx,
1366 config_rx,
1367 );
1368 let (tx, rx) = mpsc::channel(32);
1369 receiver.start_accept_thread(runtime.clone(), pow_manager, rx);
1370
1371 (receiver, tx, suggested_effort, net_params, config_tx)
1372 }
1373
1374 #[test]
1375 fn test_basic_pow_ordering() {
1376 MockRuntime::test_with_various(|runtime| async move {
1377 let (mut receiver, mut tx, _suggested_effort, _net_params, _config_tx) =
1378 make_test_receiver(&runtime, vec![], None);
1379
1380 tx.send(make_req(0, None)).await.unwrap();
1382 assert_eq!(receiver.next().await.unwrap().id, 0);
1383
1384 tx.send(make_req(1, Some(0))).await.unwrap();
1386 assert_eq!(receiver.next().await.unwrap().id, 1);
1387
1388 tx.send(make_req(2, Some(0))).await.unwrap();
1390 tx.send(make_req(3, Some(16))).await.unwrap();
1391 runtime.progress_until_stalled().await;
1392 assert_eq!(receiver.next().await.unwrap().id, 3);
1393 assert_eq!(receiver.next().await.unwrap().id, 2);
1394
1395 tx.send(make_req_invalid(4, 32)).await.unwrap();
1397 tx.send(make_req(5, Some(16))).await.unwrap();
1398 runtime.progress_until_stalled().await;
1399 assert_eq!(receiver.next().await.unwrap().id, 5);
1400 assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
1401 });
1402 }
1403
1404 #[test]
1405 fn test_suggested_effort_increase() {
1406 MockRuntime::test_with_various(|runtime| async move {
1407 let (mut receiver, mut tx, suggested_effort, net_params, _config_tx) =
1408 make_test_receiver(
1409 &runtime,
1410 vec![(
1411 "HiddenServiceProofOfWorkV1ServiceIntroTimeoutSeconds".to_string(),
1412 60000,
1413 )],
1414 None,
1415 );
1416
1417 for n in 0..128 {
1420 tx.send(make_req(n, Some(0))).await.unwrap();
1421 }
1422
1423 runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1424
1425 for _ in 0..128 {
1426 receiver.next().await.unwrap();
1427 }
1428
1429 runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1430 receiver.update_suggested_effort(&net_params);
1431
1432 assert_eq!(suggested_effort.lock().unwrap().clone(), Effort::zero());
1433
1434 for n in 0..128 {
1438 tx.send(make_req(n, Some(0))).await.unwrap();
1439 }
1440
1441 runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1442
1443 for _ in 0..64 {
1444 receiver.next().await.unwrap();
1445 }
1446
1447 runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1448 receiver.update_suggested_effort(&net_params);
1449
1450 let mut new_suggested_effort = *suggested_effort.lock().unwrap();
1451 assert!(new_suggested_effort > Effort::zero());
1452
1453 for n in 0..64 {
1456 tx.send(make_req(n, Some(new_suggested_effort.into())))
1457 .await
1458 .unwrap();
1459 }
1460
1461 receiver.next().await.unwrap();
1462 runtime.advance_by(HS_UPDATE_PERIOD).await;
1463 receiver.update_suggested_effort(&net_params);
1464
1465 let mut old_suggested_effort = new_suggested_effort;
1466 new_suggested_effort = *suggested_effort.lock().unwrap();
1467 assert!(new_suggested_effort > old_suggested_effort);
1468
1469 for n in 0..32 {
1472 tx.send(make_req(n, Some(new_suggested_effort.into())))
1473 .await
1474 .unwrap();
1475 }
1476
1477 runtime.advance_by(HS_UPDATE_PERIOD / 16 * 15).await;
1478
1479 while receiver.next().now_or_never().is_some() {
1480 }
1482
1483 runtime.advance_by(HS_UPDATE_PERIOD / 16).await;
1484 receiver.update_suggested_effort(&net_params);
1485
1486 old_suggested_effort = new_suggested_effort;
1487 new_suggested_effort = *suggested_effort.lock().unwrap();
1488 assert!(new_suggested_effort < old_suggested_effort);
1489 assert!(new_suggested_effort > Effort::zero());
1490
1491 let mut num_loops = 0;
1494 loop {
1495 tx.send(make_req(0, Some(new_suggested_effort.into())))
1496 .await
1497 .unwrap();
1498 runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1499
1500 while receiver.next().now_or_never().is_some() {
1501 }
1503
1504 runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1505 receiver.update_suggested_effort(&net_params);
1506
1507 old_suggested_effort = new_suggested_effort;
1508 new_suggested_effort = *suggested_effort.lock().unwrap();
1509
1510 assert!(new_suggested_effort < old_suggested_effort);
1511
1512 if new_suggested_effort == Effort::zero() {
1513 break;
1514 }
1515
1516 num_loops += 1;
1517
1518 if num_loops > 5 {
1519 panic!("Took too long for suggested effort to fall!");
1520 }
1521 }
1522 });
1523 }
1524
1525 #[test]
1526 fn test_rendrequest_timeout() {
1527 MockRuntime::test_with_various(|runtime| async move {
1528 let (receiver, mut tx, _suggested_effort, net_params, _config_tx) =
1529 make_test_receiver(&runtime, vec![], None);
1530
1531 let r0 = MockRendRequest { id: 0, pow: None };
1532 tx.send(r0).await.unwrap();
1533
1534 let max_age: Duration = net_params
1535 .hs_pow_v1_service_intro_timeout
1536 .try_into()
1537 .unwrap();
1538 runtime.advance_by(max_age * 2).await;
1539
1540 assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
1542 });
1543 }
1544
1545 #[test]
1546 fn test_pow_disabled() {
1547 MockRuntime::test_with_various(|runtime| async move {
1548 let (mut receiver, mut tx, _suggested_effort, _net_params, _config_tx) =
1549 make_test_receiver(
1550 &runtime,
1551 vec![],
1552 Some(
1553 OnionServiceConfigBuilder::default()
1554 .nickname(HsNickname::new("test-hs".to_string()).unwrap())
1555 .enable_pow(false)
1556 .build()
1557 .unwrap(),
1558 ),
1559 );
1560
1561 tx.send(make_req(0, None)).await.unwrap();
1563 tx.send(make_req(1, Some(0))).await.unwrap();
1564 tx.send(make_req(2, Some(20))).await.unwrap();
1565 tx.send(make_req(3, Some(10))).await.unwrap();
1566
1567 runtime.progress_until_stalled().await;
1568
1569 assert_eq!(receiver.next().await.unwrap().id, 0);
1571 assert_eq!(receiver.next().await.unwrap().id, 1);
1572 assert_eq!(receiver.next().await.unwrap().id, 2);
1573 assert_eq!(receiver.next().await.unwrap().id, 3);
1574 });
1575 }
1576
1577 #[test]
1578 fn test_rend_queue_max_depth() {
1579 MockRuntime::test_with_various(|runtime| async move {
1580 let (mut receiver, mut tx, _suggested_effort, _net_params, mut config_tx) =
1581 make_test_receiver(
1582 &runtime,
1583 vec![],
1584 Some(
1585 OnionServiceConfigBuilder::default()
1586 .nickname(HsNickname::new("test-hs".to_string()).unwrap())
1587 .enable_pow(true)
1588 .pow_rend_queue_depth(2)
1589 .build()
1590 .unwrap(),
1591 ),
1592 );
1593
1594 tx.send(make_req(0, None)).await.unwrap();
1595 tx.send(make_req(1, None)).await.unwrap();
1596 tx.send(make_req(2, None)).await.unwrap();
1597
1598 runtime.progress_until_stalled().await;
1599
1600 assert!(receiver.next().await.is_some());
1601 assert!(receiver.next().await.is_some());
1602 assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
1603
1604 config_tx
1607 .send(Arc::new(
1608 OnionServiceConfigBuilder::default()
1609 .nickname(HsNickname::new("test-hs".to_string()).unwrap())
1610 .enable_pow(true)
1611 .pow_rend_queue_depth(8)
1612 .build()
1613 .unwrap(),
1614 ))
1615 .await
1616 .unwrap();
1617
1618 tx.send(make_req(0, None)).await.unwrap();
1619 tx.send(make_req(1, None)).await.unwrap();
1620 tx.send(make_req(2, None)).await.unwrap();
1621
1622 runtime.progress_until_stalled().await;
1623
1624 assert!(receiver.next().await.is_some());
1625 assert!(receiver.next().await.is_some());
1626 assert!(receiver.next().await.is_some());
1627 });
1628 }
1629}