1use std::{
8 collections::{BinaryHeap, HashMap},
9 sync::{Arc, Mutex, RwLock},
10 task::Waker,
11 time::{Duration, SystemTime},
12};
13
14use arrayvec::ArrayVec;
15use futures::task::SpawnExt;
16use futures::{channel::mpsc, Stream};
17use futures::{SinkExt, StreamExt};
18use rand::{CryptoRng, RngCore};
19use serde::{Deserialize, Serialize};
20use tor_basic_utils::RngExt as _;
21use tor_cell::relaycell::hs::pow::{v1::ProofOfWorkV1, ProofOfWork};
22use tor_checkable::timed::TimerangeBound;
23use tor_hscrypto::{
24 pk::HsBlindIdKey,
25 pow::v1::{Effort, Instance, Seed, SeedHead, Solution, SolutionErrorV1, Verifier},
26 time::TimePeriod,
27};
28use tor_keymgr::KeyMgr;
29use tor_netdoc::doc::hsdesc::pow::{v1::PowParamsV1, PowParams};
30use tor_persist::{
31 hsnickname::HsNickname,
32 state_dir::{InstanceRawSubdir, StorageHandle},
33};
34use tor_rtcompat::Runtime;
35
36use crate::{
37 rend_handshake, replay::PowNonceReplayLog, BlindIdPublicKeySpecifier, CreateIptError,
38 RendRequest, ReplayError, StartupError,
39};
40
41use super::NewPowManager;
42
43pub(crate) struct PowManager<R>(RwLock<State<R>>);
45
46struct State<R> {
48 seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
53
54 verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
56
57 nickname: HsNickname,
61
62 instance_dir: InstanceRawSubdir,
64
65 keymgr: Arc<KeyMgr>,
67
68 suggested_effort: Effort,
70
71 runtime: R,
73
74 storage_handle: StorageHandle<PowManagerStateRecord>,
76
77 publisher_update_tx: mpsc::Sender<TimePeriod>,
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone)]
83struct SeedsForTimePeriod {
85 seeds: ArrayVec<Seed, 2>,
89
90 next_expiration_time: SystemTime,
92}
93
94#[derive(Debug)]
95#[allow(unused)]
96pub(crate) enum PowSolveError {
101 InvalidSeedHead,
103 NonceReplay(ReplayError),
105 InvalidEquixSolution(SolutionErrorV1),
107 InvalidSolve(tor_hscrypto::pow::Error),
109}
110
111#[derive(Serialize, Deserialize, Debug)]
113pub(crate) struct PowManagerStateRecord {
114 seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
121 }
123
124impl<R: Runtime> State<R> {
125 pub(crate) fn to_record(&self) -> PowManagerStateRecord {
127 PowManagerStateRecord {
128 seeds: self.seeds.clone().into_iter().collect(),
129 }
130 }
131}
132
133const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
135
136const EXPIRATION_TIME_MINS_MIN: u64 = 105;
139
140const EXPIRATION_TIME_MINS_MAX: u64 = 120;
143
144const _: () = assert!(
146 SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
147 "Early rotation time must be less than minimum expiration time"
148);
149
150const _: () = assert!(
152 EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
153 "Minimum expiration time must be less than or equal to max"
154);
155
156const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
160
161#[derive(Debug)]
162#[allow(dead_code)] pub(crate) enum InternalPowError {
165 MissingKey,
167 StorageError,
169 CreateIptError(CreateIptError),
171}
172
173impl<R: Runtime> PowManager<R> {
174 #[allow(clippy::new_ret_no_self)]
176 pub(crate) fn new(
177 runtime: R,
178 nickname: HsNickname,
179 instance_dir: InstanceRawSubdir,
180 keymgr: Arc<KeyMgr>,
181 storage_handle: StorageHandle<PowManagerStateRecord>,
182 ) -> Result<NewPowManager<R>, StartupError> {
183 let on_disk_state = storage_handle.load().map_err(StartupError::LoadState)?;
184 let seeds = on_disk_state.map_or(vec![], |on_disk_state| on_disk_state.seeds);
185 let seeds = seeds.into_iter().collect();
186
187 let (publisher_update_tx, publisher_update_rx) =
190 crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
191
192 let state = State {
193 seeds,
194 nickname,
195 instance_dir,
196 keymgr,
197 publisher_update_tx,
198 verifiers: HashMap::new(),
199 suggested_effort: Effort::zero(),
200 runtime: runtime.clone(),
201 storage_handle,
202 };
203 let pow_manager = Arc::new(PowManager(RwLock::new(state)));
204
205 let (rend_req_tx, rend_req_rx) = super::make_rend_queue();
206 let rend_req_rx = RendRequestReceiver::new(runtime, pow_manager.clone(), rend_req_rx);
207
208 Ok(NewPowManager {
209 pow_manager,
210 rend_req_tx,
211 rend_req_rx: Box::pin(rend_req_rx),
212 publisher_update_rx,
213 })
214 }
215
216 pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
218 let pow_manager = self.clone();
219 let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
220
221 runtime
222 .spawn(pow_manager.main_loop_task())
223 .map_err(|cause| StartupError::Spawn {
224 spawning: "pow manager",
225 cause: cause.into(),
226 })?;
227 Ok(())
228 }
229
230 async fn main_loop_task(self: Arc<Self>) {
232 let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
233
234 loop {
235 let next_update_time = self.rotate_seeds_if_expiring().await;
236
237 let max_delay =
242 Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60) - SEED_EARLY_ROTATION_TIME;
243 let delay = next_update_time
244 .map(|x| x.duration_since(SystemTime::now()).unwrap_or(max_delay))
245 .unwrap_or(max_delay)
246 .min(max_delay);
247
248 tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
249
250 runtime.sleep(delay).await;
251 }
252 }
253
254 fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
256 SystemTime::now()
257 + Duration::from_secs(
258 60 * rng
259 .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
260 .expect("Can't generate expiration_time"),
261 )
262 }
263
264 fn make_verifier(
271 keymgr: &Arc<KeyMgr>,
272 nickname: HsNickname,
273 time_period: TimePeriod,
274 seed: Seed,
275 ) -> Option<Verifier> {
276 let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
277 let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
278 Ok(blind_id_key) => blind_id_key,
279 Err(err) => {
280 tracing::warn!(?err, "KeyMgr error when getting blinded ID key for PoW");
281 None
282 }
283 };
284 let instance = Instance::new(blind_id_key?.id(), seed);
285 Some(Verifier::new(instance))
286 }
287
288 fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
291 expiration_time
302 .checked_sub(SEED_EARLY_ROTATION_TIME)
303 .expect("PoW seed expiration underflow")
304 }
305
306 async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
312 let mut expired_verifiers = vec![];
313 let mut new_verifiers = vec![];
314
315 let mut update_times = vec![];
316 let mut updated_tps = vec![];
317 let mut expired_tps = vec![];
318
319 let mut publisher_update_tx = {
320 let mut rng = rand::rng();
322
323 let mut state = self.0.write().expect("Lock poisoned");
324
325 let keymgr = state.keymgr.clone();
326 let nickname = state.nickname.clone();
327
328 for (time_period, info) in state.seeds.iter_mut() {
329 let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
330 update_times.push(rotation_time);
331
332 if rotation_time <= SystemTime::now() {
333 let seed = Seed::new(&mut rng, None);
334 let verifier = match Self::make_verifier(
335 &keymgr,
336 nickname.clone(),
337 *time_period,
338 seed.clone(),
339 ) {
340 Some(verifier) => verifier,
341 None => {
342 expired_tps.push(*time_period);
345 continue;
346 }
347 };
348
349 let expired_seed = if info.seeds.is_full() {
350 info.seeds.pop_at(0)
351 } else {
352 None
353 };
354 info.seeds.push(seed.clone());
356 info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
357 update_times.push(info.next_expiration_time);
358
359 new_verifiers.push((seed, verifier));
361 if let Some(expired_seed) = expired_seed {
362 expired_verifiers.push(expired_seed.head());
363 }
364
365 updated_tps.push(*time_period);
367
368 tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
369 }
370 }
371
372 for time_period in expired_tps {
373 if let Some(seeds) = state.seeds.remove(&time_period) {
374 for seed in seeds.seeds {
375 state.verifiers.remove(&seed.head());
376 }
377 }
378 }
379
380 for (seed, verifier) in new_verifiers {
381 let replay_log = Mutex::new(
382 PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
383 .expect("Couldn't make ReplayLog."),
384 );
385 state.verifiers.insert(seed.head(), (verifier, replay_log));
386 }
387
388 for seed_head in expired_verifiers {
389 state.verifiers.remove(&seed_head);
390 }
391
392 let record = state.to_record();
393 if let Err(err) = state.storage_handle.store(&record) {
394 tracing::warn!(?err, "Error saving PoW state");
395 }
396
397 state.publisher_update_tx.clone()
398 };
399
400 for time_period in updated_tps {
401 if let Err(err) = publisher_update_tx.send(time_period).await {
402 tracing::warn!(?err, "Couldn't send update message to publisher");
403 }
404 }
405
406 update_times.iter().min().cloned()
407 }
408
409 pub(crate) fn get_pow_params(
414 self: &Arc<Self>,
415 time_period: TimePeriod,
416 ) -> Result<PowParams, InternalPowError> {
417 let (seed_and_expiration, suggested_effort) = {
418 let state = self.0.read().expect("Lock poisoned");
419 let seed = state
420 .seeds
421 .get(&time_period)
422 .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
423 (seed, state.suggested_effort)
424 };
425
426 let (seed, expiration) = match seed_and_expiration {
427 Some(seed) => seed,
428 None => {
429 let mut rng = rand::rng();
433
434 let seed = Seed::new(&mut rng, None);
435 let next_expiration_time = Self::make_next_expiration_time(&mut rng);
436
437 let mut seeds = ArrayVec::new();
438 seeds.push(seed.clone());
439
440 let mut state = self.0.write().expect("Lock poisoned");
441
442 state.seeds.insert(
443 time_period,
444 SeedsForTimePeriod {
445 seeds,
446 next_expiration_time,
447 },
448 );
449
450 let verifier = Self::make_verifier(
451 &state.keymgr,
452 state.nickname.clone(),
453 time_period,
454 seed.clone(),
455 )
456 .ok_or(InternalPowError::MissingKey)?;
457
458 let replay_log = Mutex::new(
459 PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
460 .map_err(InternalPowError::CreateIptError)?,
461 );
462 state.verifiers.insert(seed.head(), (verifier, replay_log));
463
464 let record = state.to_record();
465 state
466 .storage_handle
467 .store(&record)
468 .map_err(|_| InternalPowError::StorageError)?;
469
470 (seed, next_expiration_time)
471 }
472 };
473
474 Ok(PowParams::V1(PowParamsV1::new(
475 TimerangeBound::new(seed, ..expiration),
476 suggested_effort,
477 )))
478 }
479
480 fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
482 {
487 let state = self.0.write().expect("Lock poisoned");
488 let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
489 Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
490 None => return Err(PowSolveError::InvalidSeedHead),
491 };
492 replay_log
493 .check_for_replay(solve.nonce())
494 .map_err(PowSolveError::NonceReplay)?;
495 }
496
497 let state = self.0.read().expect("Lock poisoned");
500 let verifier = match state.verifiers.get(&solve.seed_head()) {
501 Some((verifier, _)) => verifier,
502 None => return Err(PowSolveError::InvalidSeedHead),
503 };
504
505 let solution = match Solution::try_from_bytes(
506 solve.nonce().clone(),
507 solve.effort(),
508 solve.seed_head(),
509 solve.solution(),
510 ) {
511 Ok(solution) => solution,
512 Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
513 };
514
515 match verifier.check(&solution) {
516 Ok(()) => Ok(()),
517 Err(err) => Err(PowSolveError::InvalidSolve(err)),
518 }
519 }
520}
521
522#[derive(Debug)]
524struct RendRequestOrdByEffort {
525 request: RendRequest,
527 pow: Option<ProofOfWorkV1>,
529}
530
531impl RendRequestOrdByEffort {
532 fn new(request: RendRequest) -> Result<Self, rend_handshake::IntroRequestError> {
534 let pow = match request
535 .intro_request()?
536 .intro_payload()
537 .proof_of_work_extension()
538 .cloned()
539 {
540 Some(ProofOfWork::V1(pow)) => Some(pow),
541 None | Some(_) => None,
542 };
543
544 Ok(Self { request, pow })
545 }
546}
547
548impl Ord for RendRequestOrdByEffort {
549 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
550 let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
551 let other_effort = other
552 .pow
553 .as_ref()
554 .map_or(Effort::zero(), |pow| pow.effort());
555 self_effort.cmp(&other_effort)
556 }
557}
558
559impl PartialOrd for RendRequestOrdByEffort {
560 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
561 Some(self.cmp(other))
562 }
563}
564
565impl PartialEq for RendRequestOrdByEffort {
566 fn eq(&self, other: &Self) -> bool {
567 let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
568 let other_effort = other
569 .pow
570 .as_ref()
571 .map_or(Effort::zero(), |pow| pow.effort());
572 self_effort == other_effort
573 }
574}
575
576impl Eq for RendRequestOrdByEffort {}
577
578#[derive(Clone)]
590pub(crate) struct RendRequestReceiver(Arc<Mutex<RendRequestReceiverInner>>);
591
592struct RendRequestReceiverInner {
594 queue: BinaryHeap<RendRequestOrdByEffort>,
596
597 waker: Option<Waker>,
599}
600
601impl RendRequestReceiver {
602 fn new<R: Runtime>(
604 runtime: R,
605 pow_manager: Arc<PowManager<R>>,
606 inner_receiver: mpsc::Receiver<RendRequest>,
607 ) -> Self {
608 let receiver = RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
609 queue: BinaryHeap::new(),
610 waker: None,
611 })));
612 let receiver_clone = receiver.clone();
613 let accept_thread = runtime.clone().spawn_blocking(move || {
614 receiver_clone.accept_loop(&runtime, &pow_manager, inner_receiver);
615 });
616 drop(accept_thread);
617 receiver
618 }
619
620 fn accept_loop<R: Runtime>(
623 self,
624 runtime: &R,
625 pow_manager: &Arc<PowManager<R>>,
626 mut receiver: mpsc::Receiver<RendRequest>,
627 ) {
628 loop {
629 let rend_request = runtime
630 .reenter_block_on(receiver.next())
631 .expect("Other side of RendRequest queue hung up");
632 let rend_request = match RendRequestOrdByEffort::new(rend_request) {
633 Ok(rend_request) => rend_request,
634 Err(err) => {
635 tracing::trace!(?err, "Error processing RendRequest");
636 continue;
637 }
638 };
639
640 if let Some(ref pow) = rend_request.pow {
641 if let Err(err) = pow_manager.check_solve(pow) {
642 tracing::debug!(?err, "PoW verification failed");
643 continue;
644 }
645 }
646
647 let mut inner = self.0.lock().expect("Lock poisened");
648 inner.queue.push(rend_request);
649 if let Some(waker) = &inner.waker {
650 waker.wake_by_ref();
651 }
652 }
653 }
654}
655
656impl Stream for RendRequestReceiver {
657 type Item = RendRequest;
658
659 fn poll_next(
660 self: std::pin::Pin<&mut Self>,
661 cx: &mut std::task::Context<'_>,
662 ) -> std::task::Poll<Option<Self::Item>> {
663 let mut inner = self.get_mut().0.lock().expect("Lock poisened");
664 match inner.queue.pop() {
665 Some(item) => std::task::Poll::Ready(Some(item.request)),
666 None => {
667 inner.waker = Some(cx.waker().clone());
668 std::task::Poll::Pending
669 }
670 }
671 }
672}