1use std::{
8 collections::{BinaryHeap, HashMap},
9 sync::{Arc, Mutex, RwLock},
10 task::Waker,
11 time::{Duration, SystemTime},
12};
13
14use arrayvec::ArrayVec;
15use futures::task::SpawnExt;
16use futures::{channel::mpsc, Stream};
17use futures::{SinkExt, StreamExt};
18use rand::{CryptoRng, RngCore};
19use serde::{Deserialize, Serialize};
20use tor_basic_utils::RngExt as _;
21use tor_cell::relaycell::hs::pow::{v1::ProofOfWorkV1, ProofOfWork};
22use tor_checkable::timed::TimerangeBound;
23use tor_hscrypto::{
24 pk::HsBlindIdKey,
25 pow::v1::{Effort, Instance, Seed, SeedHead, Solution, SolutionErrorV1, Verifier},
26 time::TimePeriod,
27};
28use tor_keymgr::KeyMgr;
29use tor_netdoc::doc::hsdesc::pow::{v1::PowParamsV1, PowParams};
30use tor_persist::{
31 hsnickname::HsNickname,
32 state_dir::{InstanceRawSubdir, StorageHandle},
33};
34use tor_rtcompat::Runtime;
35
36use crate::{
37 rend_handshake, replay::PowNonceReplayLog, BlindIdPublicKeySpecifier, CreateIptError,
38 RendRequest, ReplayError, StartupError,
39};
40
41use super::NewPowManager;
42
43pub(crate) struct PowManager<R>(RwLock<State<R>>);
45
46struct State<R> {
48 seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
53
54 verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
56
57 nickname: HsNickname,
61
62 instance_dir: InstanceRawSubdir,
64
65 keymgr: Arc<KeyMgr>,
67
68 suggested_effort: Effort,
70
71 runtime: R,
73
74 storage_handle: StorageHandle<PowManagerStateRecord>,
76
77 publisher_update_tx: mpsc::Sender<TimePeriod>,
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone)]
83struct SeedsForTimePeriod {
85 seeds: ArrayVec<Seed, 2>,
89
90 next_expiration_time: SystemTime,
92}
93
94#[derive(Debug)]
95#[allow(unused)]
96pub(crate) enum PowSolveError {
101 InvalidSeedHead,
103 NonceReplay(ReplayError),
105 InvalidEquixSolution(SolutionErrorV1),
107 InvalidSolve(tor_hscrypto::pow::Error),
109}
110
111#[derive(Serialize, Deserialize, Debug)]
113pub(crate) struct PowManagerStateRecord {
114 seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
121 }
123
124impl<R: Runtime> State<R> {
125 pub(crate) fn to_record(&self) -> PowManagerStateRecord {
127 PowManagerStateRecord {
128 seeds: self.seeds.clone().into_iter().collect(),
129 }
130 }
131}
132
133const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
135
136const EXPIRATION_TIME_MINS_MIN: u64 = 105;
139
140const EXPIRATION_TIME_MINS_MAX: u64 = 120;
143
144const _: () = assert!(
146 SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
147 "Early rotation time must be less than minimum expiration time"
148);
149
150const _: () = assert!(
152 EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
153 "Minimum expiration time must be less than or equal to max"
154);
155
156const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
160
161#[derive(Debug)]
162#[allow(dead_code)] pub(crate) enum InternalPowError {
165 MissingKey,
167 StorageError,
169 CreateIptError(CreateIptError),
171}
172
173impl<R: Runtime> PowManager<R> {
174 #[allow(clippy::new_ret_no_self)]
176 pub(crate) fn new(
177 runtime: R,
178 nickname: HsNickname,
179 instance_dir: InstanceRawSubdir,
180 keymgr: Arc<KeyMgr>,
181 storage_handle: StorageHandle<PowManagerStateRecord>,
182 ) -> Result<NewPowManager<R>, StartupError> {
183 let on_disk_state = storage_handle.load().map_err(StartupError::LoadState)?;
184 let seeds = on_disk_state.map_or(vec![], |on_disk_state| on_disk_state.seeds);
185 let seeds = seeds.into_iter().collect();
186
187 let (publisher_update_tx, publisher_update_rx) =
190 crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
191
192 let state = State {
193 seeds,
194 nickname,
195 instance_dir,
196 keymgr,
197 publisher_update_tx,
198 verifiers: HashMap::new(),
199 suggested_effort: Effort::zero(),
200 runtime: runtime.clone(),
201 storage_handle,
202 };
203 let pow_manager = Arc::new(PowManager(RwLock::new(state)));
204
205 let (rend_req_tx, rend_req_rx) = super::make_rend_queue();
206 let rend_req_rx = RendRequestReceiver::new(runtime, pow_manager.clone(), rend_req_rx);
207
208 Ok(NewPowManager {
209 pow_manager,
210 rend_req_tx,
211 rend_req_rx: Box::pin(rend_req_rx),
212 publisher_update_rx,
213 })
214 }
215
216 pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
218 let pow_manager = self.clone();
219 let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
220
221 runtime
222 .spawn(pow_manager.main_loop_task())
223 .map_err(|cause| StartupError::Spawn {
224 spawning: "pow manager",
225 cause: cause.into(),
226 })?;
227 Ok(())
228 }
229
230 async fn main_loop_task(self: Arc<Self>) {
232 let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
233
234 loop {
235 let next_update_time = self.rotate_seeds_if_expiring().await;
236
237 let max_delay =
242 Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60) - SEED_EARLY_ROTATION_TIME;
243 let delay = next_update_time
244 .map(|x| x.duration_since(SystemTime::now()).unwrap_or(max_delay))
245 .unwrap_or(max_delay)
246 .min(max_delay);
247
248 tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
249
250 runtime.sleep(delay).await;
251 }
252 }
253
254 fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
256 SystemTime::now()
257 + Duration::from_secs(
258 60 * rng
259 .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
260 .expect("Can't generate expiration_time"),
261 )
262 }
263
264 fn make_verifier(
271 keymgr: &Arc<KeyMgr>,
272 nickname: HsNickname,
273 time_period: TimePeriod,
274 seed: Seed,
275 ) -> Option<Verifier> {
276 let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
277 let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
278 Ok(blind_id_key) => blind_id_key,
279 Err(err) => {
280 tracing::warn!(?err, "KeyMgr error when getting blinded ID key for PoW");
281 None
282 }
283 };
284 let instance = Instance::new(blind_id_key?.id(), seed);
285 Some(Verifier::new(instance))
286 }
287
288 fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
291 expiration_time
302 .checked_sub(SEED_EARLY_ROTATION_TIME)
303 .expect("PoW seed expiration underflow")
304 }
305
306 #[allow(clippy::cognitive_complexity)]
312 async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
313 let mut expired_verifiers = vec![];
314 let mut new_verifiers = vec![];
315
316 let mut update_times = vec![];
317 let mut updated_tps = vec![];
318 let mut expired_tps = vec![];
319
320 let mut publisher_update_tx = {
321 let mut rng = rand::rng();
323
324 let mut state = self.0.write().expect("Lock poisoned");
325
326 let keymgr = state.keymgr.clone();
327 let nickname = state.nickname.clone();
328
329 for (time_period, info) in state.seeds.iter_mut() {
330 let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
331 update_times.push(rotation_time);
332
333 if rotation_time <= SystemTime::now() {
334 let seed = Seed::new(&mut rng, None);
335 let verifier = match Self::make_verifier(
336 &keymgr,
337 nickname.clone(),
338 *time_period,
339 seed.clone(),
340 ) {
341 Some(verifier) => verifier,
342 None => {
343 expired_tps.push(*time_period);
346 continue;
347 }
348 };
349
350 let expired_seed = if info.seeds.is_full() {
351 info.seeds.pop_at(0)
352 } else {
353 None
354 };
355 info.seeds.push(seed.clone());
357 info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
358 update_times.push(info.next_expiration_time);
359
360 new_verifiers.push((seed, verifier));
362 if let Some(expired_seed) = expired_seed {
363 expired_verifiers.push(expired_seed.head());
364 }
365
366 updated_tps.push(*time_period);
368
369 tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
370 }
371 }
372
373 for time_period in expired_tps {
374 if let Some(seeds) = state.seeds.remove(&time_period) {
375 for seed in seeds.seeds {
376 state.verifiers.remove(&seed.head());
377 }
378 }
379 }
380
381 for (seed, verifier) in new_verifiers {
382 let replay_log = Mutex::new(
383 PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
384 .expect("Couldn't make ReplayLog."),
385 );
386 state.verifiers.insert(seed.head(), (verifier, replay_log));
387 }
388
389 for seed_head in expired_verifiers {
390 state.verifiers.remove(&seed_head);
391 }
392
393 let record = state.to_record();
394 if let Err(err) = state.storage_handle.store(&record) {
395 tracing::warn!(?err, "Error saving PoW state");
396 }
397
398 state.publisher_update_tx.clone()
399 };
400
401 for time_period in updated_tps {
402 if let Err(err) = publisher_update_tx.send(time_period).await {
403 tracing::warn!(?err, "Couldn't send update message to publisher");
404 }
405 }
406
407 update_times.iter().min().cloned()
408 }
409
410 pub(crate) fn get_pow_params(
415 self: &Arc<Self>,
416 time_period: TimePeriod,
417 ) -> Result<PowParams, InternalPowError> {
418 let (seed_and_expiration, suggested_effort) = {
419 let state = self.0.read().expect("Lock poisoned");
420 let seed = state
421 .seeds
422 .get(&time_period)
423 .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
424 (seed, state.suggested_effort)
425 };
426
427 let (seed, expiration) = match seed_and_expiration {
428 Some(seed) => seed,
429 None => {
430 let mut rng = rand::rng();
434
435 let seed = Seed::new(&mut rng, None);
436 let next_expiration_time = Self::make_next_expiration_time(&mut rng);
437
438 let mut seeds = ArrayVec::new();
439 seeds.push(seed.clone());
440
441 let mut state = self.0.write().expect("Lock poisoned");
442
443 state.seeds.insert(
444 time_period,
445 SeedsForTimePeriod {
446 seeds,
447 next_expiration_time,
448 },
449 );
450
451 let verifier = Self::make_verifier(
452 &state.keymgr,
453 state.nickname.clone(),
454 time_period,
455 seed.clone(),
456 )
457 .ok_or(InternalPowError::MissingKey)?;
458
459 let replay_log = Mutex::new(
460 PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
461 .map_err(InternalPowError::CreateIptError)?,
462 );
463 state.verifiers.insert(seed.head(), (verifier, replay_log));
464
465 let record = state.to_record();
466 state
467 .storage_handle
468 .store(&record)
469 .map_err(|_| InternalPowError::StorageError)?;
470
471 (seed, next_expiration_time)
472 }
473 };
474
475 Ok(PowParams::V1(PowParamsV1::new(
476 TimerangeBound::new(seed, ..expiration),
477 suggested_effort,
478 )))
479 }
480
481 fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
483 {
488 let state = self.0.write().expect("Lock poisoned");
489 let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
490 Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
491 None => return Err(PowSolveError::InvalidSeedHead),
492 };
493 replay_log
494 .check_for_replay(solve.nonce())
495 .map_err(PowSolveError::NonceReplay)?;
496 }
497
498 let state = self.0.read().expect("Lock poisoned");
501 let verifier = match state.verifiers.get(&solve.seed_head()) {
502 Some((verifier, _)) => verifier,
503 None => return Err(PowSolveError::InvalidSeedHead),
504 };
505
506 let solution = match Solution::try_from_bytes(
507 solve.nonce().clone(),
508 solve.effort(),
509 solve.seed_head(),
510 solve.solution(),
511 ) {
512 Ok(solution) => solution,
513 Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
514 };
515
516 match verifier.check(&solution) {
517 Ok(()) => Ok(()),
518 Err(err) => Err(PowSolveError::InvalidSolve(err)),
519 }
520 }
521}
522
523#[derive(Debug)]
525struct RendRequestOrdByEffort {
526 request: RendRequest,
528 pow: Option<ProofOfWorkV1>,
530}
531
532impl RendRequestOrdByEffort {
533 fn new(request: RendRequest) -> Result<Self, rend_handshake::IntroRequestError> {
535 let pow = match request
536 .intro_request()?
537 .intro_payload()
538 .proof_of_work_extension()
539 .cloned()
540 {
541 Some(ProofOfWork::V1(pow)) => Some(pow),
542 None | Some(_) => None,
543 };
544
545 Ok(Self { request, pow })
546 }
547}
548
549impl Ord for RendRequestOrdByEffort {
550 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
551 let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
552 let other_effort = other
553 .pow
554 .as_ref()
555 .map_or(Effort::zero(), |pow| pow.effort());
556 self_effort.cmp(&other_effort)
557 }
558}
559
560impl PartialOrd for RendRequestOrdByEffort {
561 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
562 Some(self.cmp(other))
563 }
564}
565
566impl PartialEq for RendRequestOrdByEffort {
567 fn eq(&self, other: &Self) -> bool {
568 let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
569 let other_effort = other
570 .pow
571 .as_ref()
572 .map_or(Effort::zero(), |pow| pow.effort());
573 self_effort == other_effort
574 }
575}
576
577impl Eq for RendRequestOrdByEffort {}
578
579#[derive(Clone)]
591pub(crate) struct RendRequestReceiver(Arc<Mutex<RendRequestReceiverInner>>);
592
593struct RendRequestReceiverInner {
595 queue: BinaryHeap<RendRequestOrdByEffort>,
597
598 waker: Option<Waker>,
600}
601
602impl RendRequestReceiver {
603 fn new<R: Runtime>(
605 runtime: R,
606 pow_manager: Arc<PowManager<R>>,
607 inner_receiver: mpsc::Receiver<RendRequest>,
608 ) -> Self {
609 let receiver = RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
610 queue: BinaryHeap::new(),
611 waker: None,
612 })));
613 let receiver_clone = receiver.clone();
614 let accept_thread = runtime.clone().spawn_blocking(move || {
615 receiver_clone.accept_loop(&runtime, &pow_manager, inner_receiver);
616 });
617 drop(accept_thread);
618 receiver
619 }
620
621 fn accept_loop<R: Runtime>(
624 self,
625 runtime: &R,
626 pow_manager: &Arc<PowManager<R>>,
627 mut receiver: mpsc::Receiver<RendRequest>,
628 ) {
629 loop {
630 let rend_request = runtime
631 .reenter_block_on(receiver.next())
632 .expect("Other side of RendRequest queue hung up");
633 let rend_request = match RendRequestOrdByEffort::new(rend_request) {
634 Ok(rend_request) => rend_request,
635 Err(err) => {
636 tracing::trace!(?err, "Error processing RendRequest");
637 continue;
638 }
639 };
640
641 if let Some(ref pow) = rend_request.pow {
642 if let Err(err) = pow_manager.check_solve(pow) {
643 tracing::debug!(?err, "PoW verification failed");
644 continue;
645 }
646 }
647
648 let mut inner = self.0.lock().expect("Lock poisened");
649 inner.queue.push(rend_request);
650 if let Some(waker) = &inner.waker {
651 waker.wake_by_ref();
652 }
653 }
654 }
655}
656
657impl Stream for RendRequestReceiver {
658 type Item = RendRequest;
659
660 fn poll_next(
661 self: std::pin::Pin<&mut Self>,
662 cx: &mut std::task::Context<'_>,
663 ) -> std::task::Poll<Option<Self::Item>> {
664 let mut inner = self.get_mut().0.lock().expect("Lock poisened");
665 match inner.queue.pop() {
666 Some(item) => std::task::Poll::Ready(Some(item.request)),
667 None => {
668 inner.waker = Some(cx.waker().clone());
669 std::task::Poll::Pending
670 }
671 }
672 }
673}