tor_hsservice/pow/
v1.rs

1//! Code implementing version 1 proof-of-work for onion service hosts.
2//!
3//! Spec links:
4//! * <https://spec.torproject.org/hspow-spec/common-protocol.html>
5//! * <https://spec.torproject.org/hspow-spec/v1-equix.html>
6
7use std::{
8    collections::{BTreeSet, HashMap, VecDeque},
9    sync::{Arc, Mutex, RwLock},
10    task::Waker,
11    time::{Duration, Instant, SystemTime},
12};
13
14use arrayvec::ArrayVec;
15use equix::EquiXBuilder;
16use futures::{SinkExt, StreamExt};
17use futures::{Stream, channel::mpsc};
18use num_traits::FromPrimitive;
19use rand::{CryptoRng, RngCore};
20use serde::{Deserialize, Serialize};
21use thiserror::Error;
22use tor_basic_utils::RngExt as _;
23use tor_cell::relaycell::hs::pow::{ProofOfWork, v1::ProofOfWorkV1};
24use tor_checkable::timed::TimerangeBound;
25use tor_error::warn_report;
26use tor_hscrypto::{
27    pk::HsBlindIdKey,
28    pow::v1::{
29        Effort, Instance, RuntimeOption, Seed, SeedHead, Solution, SolutionErrorV1, Verifier,
30    },
31    time::TimePeriod,
32};
33use tor_keymgr::KeyMgr;
34use tor_netdir::{NetDirProvider, NetdirProviderShutdown, params::NetParameters};
35use tor_netdoc::doc::hsdesc::pow::{PowParams, v1::PowParamsV1};
36use tor_persist::{
37    hsnickname::HsNickname,
38    state_dir::{InstanceRawSubdir, StorageHandle},
39};
40use tor_rtcompat::Runtime;
41use tor_rtcompat::SpawnExt;
42
43use crate::{
44    BlindIdPublicKeySpecifier, OnionServiceConfig, RendRequest, ReplayError, StartupError,
45    rend_handshake,
46    replay::{OpenReplayLogError, PowNonceReplayLog},
47    status::{PowManagerStatusSender, Problem, State as PowManagerState},
48};
49
50use super::NewPowManager;
51
52/// Proof-of-Work manager type alias for production, using concrete [`RendRequest`].
53pub(crate) type PowManager<R> = PowManagerGeneric<R, RendRequest>;
54
55/// This is responsible for rotating Proof-of-Work seeds and doing verification of PoW solves.
56pub(crate) struct PowManagerGeneric<R, Q>(RwLock<State<R, Q>>);
57
58/// Internal state for [`PowManagerGeneric`].
59struct State<R, Q> {
60    /// The [`Seed`]s for a given [`TimePeriod`]
61    ///
62    /// The [`ArrayVec`] contains the current and previous seed, and the [`SystemTime`] is when the
63    /// current seed will expire.
64    seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
65
66    /// Verifiers for all the seeds that exist in `seeds`.
67    verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
68
69    /// The nickname for this hidden service.
70    ///
71    /// We need this so we can get the blinded keys from the [`KeyMgr`].
72    nickname: HsNickname,
73
74    /// Directory used to store nonce replay log.
75    instance_dir: InstanceRawSubdir,
76
77    /// Key manager.
78    keymgr: Arc<KeyMgr>,
79
80    /// Current suggested effort that we publish in the pow-params line.
81    ///
82    /// This is only read by the PowManagerGeneric, and is written to by the [`RendRequestReceiver`].
83    suggested_effort: Arc<Mutex<Effort>>,
84
85    /// Runtime
86    runtime: R,
87
88    /// Handle for storing state we need to persist to disk.
89    storage_handle: StorageHandle<PowManagerStateRecord>,
90
91    /// Queue to tell the publisher to re-upload a descriptor for a given TP, since we've rotated
92    /// that seed.
93    publisher_update_tx: mpsc::Sender<TimePeriod>,
94
95    /// The [`RendRequestReceiver`], which contains the queue of [`RendRequest`]s.
96    ///
97    /// We need a reference to this in order to tell it when to update the suggested_effort value.
98    rend_request_rx: RendRequestReceiver<R, Q>,
99
100    /// [`NetDirProvider`], used for getting consensus parameters for configuration values.
101    netdir_provider: Arc<dyn NetDirProvider>,
102
103    /// Sender for reporting back onion service status.
104    status_tx: PowManagerStatusSender,
105
106    /// Receiver for the current configuration.
107    config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
108}
109
110#[derive(Serialize, Deserialize, Debug, Clone)]
111/// Information about the current and previous [`Seed`] for a given [`TimePeriod`].
112struct SeedsForTimePeriod {
113    /// The previous and current [`Seed`].
114    ///
115    /// The last element in this array is the current seed.
116    seeds: ArrayVec<Seed, 2>,
117
118    /// When the current seed will expire.
119    next_expiration_time: SystemTime,
120}
121
122#[derive(Debug)]
123#[allow(unused)]
124/// A PoW solve was invalid.
125///
126/// While this contains the reason for the failure, we probably just want to use that for
127/// debugging, we shouldn't make any logical decisions based on what the particular error was.
128pub(crate) enum PowSolveError {
129    /// Seed head was not recognized, it may be expired.
130    InvalidSeedHead,
131    /// We have already seen a solve with this nonce
132    NonceReplay(ReplayError),
133    /// The bytes given as a solution do not form a valid Equi-X puzzle
134    InvalidEquixSolution(SolutionErrorV1),
135    /// The solution given was invalid.
136    InvalidSolve(tor_hscrypto::pow::Error),
137}
138
139/// On-disk record of [`PowManagerGeneric`] state.
140#[derive(Serialize, Deserialize, Debug, Default)]
141pub(crate) struct PowManagerStateRecord {
142    /// Seeds for each time period.
143    ///
144    /// Conceptually, this is a map between TimePeriod and SeedsForTimePeriod, but since TimePeriod
145    /// can't be serialized to a string, it's not very simple to use serde to serialize it like
146    /// that, so we instead store it as a list of tuples, and convert it to/from the map when
147    /// saving/loading.
148    seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
149
150    /// Most recently published suggested_effort value.
151    #[serde(default)]
152    suggested_effort: Effort,
153    // We don't persist any per-period state. While it might be sort of nice to, it's complex to
154    // decide when to write the state out to disk. The disadvantage to not storing it is that when
155    // we restart the process, we may be up to 5 minutes slower to update the suggested effort to a
156    // new value, which isn't particularly bad. The only case it would be bad is if a attacker has
157    // a way to cause the Arti process to restart (in which case they could do that just before the
158    // update period to pin the suggested effort value at a specific value), but if they have that,
159    // they have a much more valuable attack (including as a DoS vector) than just a PoW bypass.
160}
161
162impl<R: Runtime, Q> State<R, Q> {
163    /// Make a [`PowManagerStateRecord`] for this state.
164    pub(crate) fn to_record(&self) -> PowManagerStateRecord {
165        PowManagerStateRecord {
166            seeds: self.seeds.clone().into_iter().collect(),
167            suggested_effort: *self.suggested_effort.lock().expect("Lock poisoned"),
168        }
169    }
170}
171
172/// How frequently the suggested effort should be recalculated.
173const HS_UPDATE_PERIOD: Duration = Duration::from_secs(300);
174
175/// When the suggested effort has changed by less than this much, we don't republish it.
176///
177/// Specified as "15 percent" in <https://spec.torproject.org/hspow-spec/common-protocol.html>
178///
179/// However, we may want to make this configurable in the future.
180const SUGGESTED_EFFORT_DEADZONE: f64 = 0.15;
181
182/// How soon before a seed's expiration time we should rotate it and publish a new seed.
183const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
184
185/// Minimum seed expiration time in minutes. See:
186/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
187const EXPIRATION_TIME_MINS_MIN: u64 = 105;
188
189/// Maximum seed expiration time in minutes. See:
190/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
191const EXPIRATION_TIME_MINS_MAX: u64 = 120;
192
193/// Enforce that early rotation time is less than or equal to min expiration time.
194const _: () = assert!(
195    SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
196    "Early rotation time must be less than minimum expiration time"
197);
198
199/// Enforce that min expiration time is less than or equal to max.
200const _: () = assert!(
201    EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
202    "Minimum expiration time must be less than or equal to max"
203);
204
205/// Depth of the queue used to signal the publisher that it needs to update a given time period.
206///
207/// 32 is likely way larger than we need but the messages are tiny so we might as well.
208const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
209
210#[derive(Error, Debug, Clone)]
211#[allow(dead_code)] // We want to show fields in Debug even if we don't use them.
212#[non_exhaustive]
213/// Error within the PoW subsystem.
214pub enum PowError {
215    /// We don't have a key that is needed.
216    #[error("Missing required key.")]
217    MissingKey,
218    /// Error in the underlying storage layer.
219    #[error(transparent)]
220    StorageError(#[from] tor_persist::Error),
221    /// Error from the ReplayLog.
222    #[error(transparent)]
223    OpenReplayLog(#[from] OpenReplayLogError),
224    /// NetDirProvider has shut down
225    #[error(transparent)]
226    NetdirProviderShutdown(#[from] NetdirProviderShutdown),
227}
228
229impl<R: Runtime, Q: MockableRendRequest + Send + 'static> PowManagerGeneric<R, Q> {
230    /// Create a new [`PowManagerGeneric`].
231    #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
232    pub(crate) fn new(
233        runtime: R,
234        nickname: HsNickname,
235        instance_dir: InstanceRawSubdir,
236        keymgr: Arc<KeyMgr>,
237        storage_handle: StorageHandle<PowManagerStateRecord>,
238        netdir_provider: Arc<dyn NetDirProvider>,
239        status_tx: PowManagerStatusSender,
240        config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
241    ) -> Result<NewPowManager<R>, StartupError> {
242        let on_disk_state = storage_handle
243            .load()
244            .map_err(StartupError::LoadState)?
245            .unwrap_or(PowManagerStateRecord::default());
246
247        let seeds: HashMap<TimePeriod, SeedsForTimePeriod> =
248            on_disk_state.seeds.into_iter().collect();
249        let suggested_effort = Arc::new(Mutex::new(on_disk_state.suggested_effort));
250
251        let mut verifiers = HashMap::new();
252        for (tp, seeds_for_tp) in seeds.clone().into_iter() {
253            for seed in seeds_for_tp.seeds {
254                let verifier = match Self::make_verifier(
255                    &keymgr,
256                    nickname.clone(),
257                    tp,
258                    seed.clone(),
259                    &config_rx.borrow(),
260                ) {
261                    Some(verifier) => verifier,
262                    None => {
263                        tracing::warn!(
264                            "Couldn't construct verifier (key not available?). We will continue without this key, but this may prevent clients from connecting..."
265                        );
266                        continue;
267                    }
268                };
269                let replay_log = match PowNonceReplayLog::new_logged(&instance_dir, &seed) {
270                    Ok(replay_log) => replay_log,
271                    Err(err) => {
272                        warn_report!(
273                            err,
274                            "Error constructing replay log. We will continue without the log, but be aware that this may allow attackers to bypass PoW defenses..."
275                        );
276                        continue;
277                    }
278                };
279                verifiers.insert(seed.head(), (verifier, Mutex::new(replay_log)));
280            }
281        }
282
283        // This queue is extremely small, and we only make one of it per onion service, so it's
284        // fine to not use memquota tracking.
285        let (publisher_update_tx, publisher_update_rx) =
286            crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
287
288        let (rend_req_tx, rend_req_rx_channel) = super::make_rend_queue();
289        let rend_req_rx = RendRequestReceiver::new(
290            runtime.clone(),
291            nickname.clone(),
292            suggested_effort.clone(),
293            netdir_provider.clone(),
294            status_tx.clone(),
295            config_rx.clone(),
296        );
297
298        let state = State {
299            seeds,
300            nickname,
301            instance_dir,
302            keymgr,
303            publisher_update_tx,
304            verifiers,
305            suggested_effort: suggested_effort.clone(),
306            runtime: runtime.clone(),
307            storage_handle,
308            rend_request_rx: rend_req_rx.clone(),
309            netdir_provider,
310            status_tx,
311            config_rx,
312        };
313        let pow_manager = Arc::new(PowManagerGeneric(RwLock::new(state)));
314
315        rend_req_rx.start_accept_thread(runtime, pow_manager.clone(), rend_req_rx_channel);
316
317        Ok(NewPowManager {
318            pow_manager,
319            rend_req_tx,
320            rend_req_rx: Box::pin(rend_req_rx),
321            publisher_update_rx,
322        })
323    }
324
325    /// Launch background task to rotate seeds.
326    pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
327        let pow_manager = self.clone();
328        let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
329
330        runtime
331            .spawn(pow_manager.main_loop_error_wrapper())
332            .map_err(|cause| StartupError::Spawn {
333                spawning: "pow manager",
334                cause: cause.into(),
335            })?;
336
337        self.0
338            .write()
339            .expect("Lock poisoned")
340            .status_tx
341            .send(PowManagerState::Running, None);
342        Ok(())
343    }
344
345    /// Run [`Self::main_loop_task`], reporting any errors.
346    async fn main_loop_error_wrapper(self: Arc<Self>) {
347        if let Err(err) = self.clone().main_loop_task().await {
348            self.0
349                .write()
350                .expect("Lock poisoned")
351                .status_tx
352                .send_broken(Problem::Pow(err));
353        }
354    }
355
356    /// Main loop for rotating seeds.
357    async fn main_loop_task(self: Arc<Self>) -> Result<(), PowError> {
358        let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
359
360        let mut last_suggested_effort_update = runtime.now();
361        let mut last_published_suggested_effort: u32 = (*self
362            .0
363            .read()
364            .expect("Lock poisoned")
365            .suggested_effort
366            .lock()
367            .expect("Lock poisoned"))
368        .into();
369
370        let netdir_provider = self
371            .0
372            .read()
373            .expect("Lock poisoned")
374            .netdir_provider
375            .clone();
376        let net_params = netdir_provider
377            .wait_for_netdir(tor_netdir::Timeliness::Timely)
378            .await?
379            .params()
380            .clone();
381
382        loop {
383            let next_update_time = self.rotate_seeds_if_expiring().await;
384
385            // Update the suggested effort, if needed
386            if runtime.now() - last_suggested_effort_update >= HS_UPDATE_PERIOD {
387                let (tps_to_update, mut publisher_update_tx) = {
388                    let mut tps_to_update = vec![];
389
390                    let inner = self.0.read().expect("Lock poisoned");
391
392                    inner.rend_request_rx.update_suggested_effort(&net_params);
393                    last_suggested_effort_update = runtime.now();
394                    let new_suggested_effort: u32 =
395                        (*inner.suggested_effort.lock().expect("Lock poisoned")).into();
396
397                    let percent_change =
398                        f64::from(new_suggested_effort - last_published_suggested_effort)
399                            / f64::from(last_published_suggested_effort);
400                    if percent_change.abs() >= SUGGESTED_EFFORT_DEADZONE {
401                        last_published_suggested_effort = new_suggested_effort;
402
403                        tps_to_update = inner.seeds.iter().map(|x| *x.0).collect();
404                    }
405
406                    let publisher_update_tx = inner.publisher_update_tx.clone();
407                    (tps_to_update, publisher_update_tx)
408                };
409
410                for time_period in tps_to_update {
411                    let _ = publisher_update_tx.send(time_period).await;
412                }
413            }
414
415            let suggested_effort_update_delay = HS_UPDATE_PERIOD.saturating_sub(
416                runtime
417                    .now()
418                    .saturating_duration_since(last_suggested_effort_update),
419            );
420
421            // A new TimePeriod that we don't know about (and thus that isn't in next_update_time)
422            // might get added at any point. Making sure that our maximum delay is the minimum
423            // amount of time that it might take for a seed to expire means that we can be sure
424            // that we will rotate newly-added seeds properly.
425            const MAX_DELAY: Duration = Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60)
426                .checked_sub(SEED_EARLY_ROTATION_TIME)
427                .expect("SEED_EARLY_ROTATION_TIME too high, or EXPIRATION_TIME_MINS_MIN too low.");
428            let delay = next_update_time
429                .map(|x| x.duration_since(SystemTime::now()).unwrap_or(MAX_DELAY))
430                .unwrap_or(MAX_DELAY)
431                .min(MAX_DELAY)
432                .min(suggested_effort_update_delay);
433
434            tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
435
436            runtime.sleep(delay).await;
437        }
438    }
439
440    /// Make a randomized seed expiration time.
441    fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
442        SystemTime::now()
443            + Duration::from_secs(
444                60 * rng
445                    .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
446                    .expect("Can't generate expiration_time"),
447            )
448    }
449
450    /// Make a ner [`Verifier`] for a given [`TimePeriod`] and [`Seed`].
451    ///
452    /// If a key is not available for this TP, returns None.
453    ///
454    /// This takes individual arguments instead of `&self` to avoid getting into any trouble with
455    /// locking.
456    fn make_verifier(
457        keymgr: &Arc<KeyMgr>,
458        nickname: HsNickname,
459        time_period: TimePeriod,
460        seed: Seed,
461        config: &OnionServiceConfig,
462    ) -> Option<Verifier> {
463        let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
464        let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
465            Ok(blind_id_key) => blind_id_key,
466            Err(err) => {
467                warn_report!(err, "KeyMgr error when getting blinded ID key for PoW");
468                None
469            }
470        };
471        let instance = Instance::new(blind_id_key?.id(), seed);
472        let mut equix = EquiXBuilder::default();
473        if *config.disable_pow_compilation() {
474            equix.runtime(RuntimeOption::InterpretOnly);
475        }
476        Some(Verifier::new_with_equix(instance, equix))
477    }
478
479    /// Calculate a time when we want to rotate a seed, slightly before it expires, in order to
480    /// ensure that clients don't ever download a seed that is already out of date.
481    fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
482        // Underflow cannot happen because:
483        //
484        // * We set the expiration time to the current time plus at least the minimum
485        //   expiration time
486        // * We know (backed up by a compile-time assertion) that SEED_EARLY_ROTATION_TIME is
487        //   less than the minimum expiration time.
488        //
489        // Thus, the only way this subtraction can underflow is if the system time at the
490        // moment we set the expiration time was before the epoch, which is not possible on
491        // reasonable platforms.
492        expiration_time
493            .checked_sub(SEED_EARLY_ROTATION_TIME)
494            .expect("PoW seed expiration underflow")
495    }
496
497    /// Rotate any seeds that will expire soon.
498    ///
499    /// This also pokes the publisher when needed to cause rotated seeds to be published.
500    ///
501    /// Returns the next time this function should be called again.
502    #[allow(clippy::cognitive_complexity)]
503    async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
504        let mut expired_verifiers = vec![];
505        let mut new_verifiers = vec![];
506
507        let mut update_times = vec![];
508        let mut updated_tps = vec![];
509        let mut expired_tps = vec![];
510
511        let mut publisher_update_tx = {
512            let mut state = self.0.write().expect("Lock poisoned");
513
514            let config = state.config_rx.borrow().clone();
515            let keymgr = state.keymgr.clone();
516            let nickname = state.nickname.clone();
517
518            for (time_period, info) in state.seeds.iter_mut() {
519                let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
520                update_times.push(rotation_time);
521
522                if rotation_time <= SystemTime::now() {
523                    // This does not allow for easy testing, but because we're in a async function, it's
524                    // non-trivial to pass in a Rng from the outside world. If we end up writing tests that
525                    // require that, we can take a function to generate a Rng, but for now, just using the
526                    // thread rng is fine.
527                    let mut rng = rand::rng();
528
529                    let seed = Seed::new(&mut rng, None);
530                    let verifier = match Self::make_verifier(
531                        &keymgr,
532                        nickname.clone(),
533                        *time_period,
534                        seed.clone(),
535                        &config,
536                    ) {
537                        Some(verifier) => verifier,
538                        None => {
539                            // We use not having a key for a given TP as the signal that we should
540                            // stop keeping track of seeds for that TP.
541                            expired_tps.push(*time_period);
542                            continue;
543                        }
544                    };
545
546                    let expired_seed = if info.seeds.is_full() {
547                        info.seeds.pop_at(0)
548                    } else {
549                        None
550                    };
551                    // .push() is safe, since we just made space above.
552                    info.seeds.push(seed.clone());
553                    info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
554                    update_times.push(info.next_expiration_time);
555
556                    // Make a note to add the new verifier and remove the old one.
557                    new_verifiers.push((seed, verifier));
558                    if let Some(expired_seed) = expired_seed {
559                        expired_verifiers.push(expired_seed.head());
560                    }
561
562                    // Tell the publisher to update this TP
563                    updated_tps.push(*time_period);
564
565                    tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
566                }
567            }
568
569            for time_period in expired_tps {
570                if let Some(seeds) = state.seeds.remove(&time_period) {
571                    for seed in seeds.seeds {
572                        state.verifiers.remove(&seed.head());
573                    }
574                }
575            }
576
577            for (seed, verifier) in new_verifiers {
578                let replay_log = Mutex::new(
579                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
580                        .expect("Couldn't make ReplayLog."),
581                );
582                state.verifiers.insert(seed.head(), (verifier, replay_log));
583            }
584
585            for seed_head in expired_verifiers {
586                state.verifiers.remove(&seed_head);
587            }
588
589            let record = state.to_record();
590            if let Err(err) = state.storage_handle.store(&record) {
591                warn_report!(err, "Error saving PoW state");
592            }
593
594            state.publisher_update_tx.clone()
595        };
596
597        for time_period in updated_tps {
598            if let Err(err) = publisher_update_tx.send(time_period).await {
599                warn_report!(err, "Couldn't send update message to publisher");
600            }
601        }
602
603        update_times.iter().min().cloned()
604    }
605
606    /// Get [`PowParams`] for a given [`TimePeriod`].
607    ///
608    /// If we don't have any [`Seed`]s for the requested period, generate them. This is the only
609    /// way that [`PowManagerGeneric`] learns about new [`TimePeriod`]s.
610    pub(crate) fn get_pow_params<Rng: RngCore + CryptoRng>(
611        self: &Arc<Self>,
612        time_period: TimePeriod,
613        rng: &mut Rng,
614    ) -> Result<PowParams, PowError> {
615        let (seed_and_expiration, suggested_effort) = {
616            let state = self.0.read().expect("Lock poisoned");
617            let seed = state
618                .seeds
619                .get(&time_period)
620                .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
621            let suggested_effort = *state.suggested_effort.lock().expect("Lock poisoned");
622            (seed, suggested_effort)
623        };
624
625        let (seed, expiration) = match seed_and_expiration {
626            Some(seed) => seed,
627            None => {
628                // We don't have a seed for this time period, so we need to generate one.
629
630                let seed = Seed::new(rng, None);
631                let next_expiration_time = Self::make_next_expiration_time(rng);
632
633                let mut seeds = ArrayVec::new();
634                seeds.push(seed.clone());
635
636                let mut state = self.0.write().expect("Lock poisoned");
637
638                state.seeds.insert(
639                    time_period,
640                    SeedsForTimePeriod {
641                        seeds,
642                        next_expiration_time,
643                    },
644                );
645
646                let verifier = Self::make_verifier(
647                    &state.keymgr,
648                    state.nickname.clone(),
649                    time_period,
650                    seed.clone(),
651                    &state.config_rx.borrow(),
652                )
653                .ok_or(PowError::MissingKey)?;
654
655                let replay_log =
656                    Mutex::new(PowNonceReplayLog::new_logged(&state.instance_dir, &seed)?);
657                state.verifiers.insert(seed.head(), (verifier, replay_log));
658
659                let record = state.to_record();
660                state.storage_handle.store(&record)?;
661
662                (seed, next_expiration_time)
663            }
664        };
665
666        Ok(PowParams::V1(PowParamsV1::new(
667            TimerangeBound::new(seed, ..expiration),
668            suggested_effort,
669        )))
670    }
671
672    /// Verify a PoW solve.
673    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
674        // Note that we put the nonce into the replay log before we check the solve. While this
675        // might not be ideal, it's not a problem and is probably the most reasonable thing to do.
676        // See commit bc5b313028 for a more full explaination.
677        {
678            let state = self.0.write().expect("Lock poisoned");
679            let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
680                Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
681                None => return Err(PowSolveError::InvalidSeedHead),
682            };
683            replay_log
684                .check_for_replay(solve.nonce())
685                .map_err(PowSolveError::NonceReplay)?;
686        }
687
688        // TODO: Once RwLock::downgrade is stabilized, it would make sense to use it here...
689
690        let state = self.0.read().expect("Lock poisoned");
691        let verifier = match state.verifiers.get(&solve.seed_head()) {
692            Some((verifier, _)) => verifier,
693            None => return Err(PowSolveError::InvalidSeedHead),
694        };
695
696        let solution = match Solution::try_from_bytes(
697            solve.nonce().clone(),
698            solve.effort(),
699            solve.seed_head(),
700            solve.solution(),
701        ) {
702            Ok(solution) => solution,
703            Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
704        };
705
706        match verifier.check(&solution) {
707            Ok(()) => Ok(()),
708            Err(err) => Err(PowSolveError::InvalidSolve(err)),
709        }
710    }
711}
712
713/// Trait to allow mocking PowManagerGeneric in tests.
714trait MockablePowManager {
715    /// Verify a PoW solve.
716    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError>;
717}
718
719impl<R: Runtime> MockablePowManager for PowManager<R> {
720    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
721        PowManager::check_solve(self, solve)
722    }
723}
724
725/// Trait to allow mocking RendRequest in tests.
726pub(crate) trait MockableRendRequest {
727    /// Get the proof-of-work extension associated with this request.
728    fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError>;
729}
730
731impl MockableRendRequest for RendRequest {
732    fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError> {
733        Ok(self
734            .intro_request()?
735            .intro_payload()
736            .proof_of_work_extension())
737    }
738}
739
740/// Wrapper around [`RendRequest`] that implements [`std::cmp::Ord`] to sort by [`Effort`] and time.
741#[derive(Debug)]
742struct RendRequestOrdByEffort<Q> {
743    /// The underlying request.
744    request: Q,
745    /// The proof-of-work options, if given.
746    pow: Option<ProofOfWorkV1>,
747    /// The maximum effort allowed. If the effort of this request is higher than this, it will be
748    /// treated as though it is this value.
749    max_effort: Effort,
750    /// When this request was received, used for ordreing if the effort values are the same.
751    recv_time: Instant,
752    /// Unique number for this request, which is used for ordering among requests with the same
753    /// timestamp.
754    ///
755    /// This is intended to be monotonically increasing, although it may overflow. Overflows are
756    /// not handled in any special way, given that they are a edge case of an edge case, and
757    /// ordering among requests that came in at the same instant is not important.
758    request_num: u64,
759}
760
761impl<Q: MockableRendRequest> RendRequestOrdByEffort<Q> {
762    /// Create a new [`RendRequestOrdByEffort`].
763    fn new(
764        request: Q,
765        max_effort: Effort,
766        request_num: u64,
767    ) -> Result<Self, rend_handshake::IntroRequestError> {
768        let pow = match request.proof_of_work()?.cloned() {
769            Some(ProofOfWork::V1(pow)) => Some(pow),
770            None | Some(_) => None,
771        };
772
773        Ok(Self {
774            request,
775            pow,
776            max_effort,
777            recv_time: Instant::now(),
778            request_num,
779        })
780    }
781}
782
783impl<Q: MockableRendRequest> Ord for RendRequestOrdByEffort<Q> {
784    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
785        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| {
786            Effort::min(pow.effort(), self.max_effort)
787        });
788        let other_effort = other.pow.as_ref().map_or(Effort::zero(), |pow| {
789            Effort::min(pow.effort(), other.max_effort)
790        });
791        match self_effort.cmp(&other_effort) {
792            std::cmp::Ordering::Equal => {
793                // Flip ordering, since we want the oldest ones to be handled first.
794                match other.recv_time.cmp(&self.recv_time) {
795                    // Use request_num as a final tiebreaker, also flipping ordering (since
796                    // lower-numbered requests should be older and thus come first)
797                    std::cmp::Ordering::Equal => other.request_num.cmp(&self.request_num),
798                    not_equal => not_equal,
799                }
800            }
801            not_equal => not_equal,
802        }
803    }
804}
805
806impl<Q: MockableRendRequest> PartialOrd for RendRequestOrdByEffort<Q> {
807    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
808        Some(self.cmp(other))
809    }
810}
811
812impl<Q: MockableRendRequest> PartialEq for RendRequestOrdByEffort<Q> {
813    fn eq(&self, other: &Self) -> bool {
814        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| {
815            Effort::min(pow.effort(), self.max_effort)
816        });
817        let other_effort = other.pow.as_ref().map_or(Effort::zero(), |pow| {
818            Effort::min(pow.effort(), other.max_effort)
819        });
820        self_effort == other_effort && self.recv_time == other.recv_time
821    }
822}
823
824impl<Q: MockableRendRequest> Eq for RendRequestOrdByEffort<Q> {}
825
826/// Implements [`Stream`] for incoming [`RendRequest`]s, using a priority queue system to dequeue
827/// high-[`Effort`] requests first.
828///
829/// This is implemented on top of a [`mpsc::Receiver`]. There is a thread that dequeues from the
830/// [`mpsc::Receiver`], checks the PoW solve, and if it is correct, adds it to a [`BTreeSet`],
831/// which the [`Stream`] implementation reads from.
832///
833/// This is not particularly optimized — queueing and dequeuing use a [`Mutex`], so there may be
834/// some contention there. It's possible there may be some fancy lockless (or more optimized)
835/// priority queue that we could use, but we should properly benchmark things before trying to make
836/// a optimization like that.
837pub(crate) struct RendRequestReceiver<R, Q>(Arc<Mutex<RendRequestReceiverInner<R, Q>>>);
838
839impl<R, Q> Clone for RendRequestReceiver<R, Q> {
840    fn clone(&self) -> Self {
841        Self(self.0.clone())
842    }
843}
844
845/// Inner implementation for [`RendRequestReceiver`].
846struct RendRequestReceiverInner<R, Q> {
847    /// Internal priority queue of requests.
848    queue: BTreeSet<RendRequestOrdByEffort<Q>>,
849
850    /// Internal FIFO queue of requests used when PoW is disabled.
851    ///
852    /// We have this here to support switching back and forth between PoW enabled and disabled at
853    /// runtime, although that isn't currently supported.
854    queue_pow_disabled: VecDeque<Q>,
855
856    /// Waker to inform async readers when there is a new message on the queue.
857    waker: Option<Waker>,
858
859    /// Runtime, used to get current time in a testable way.
860    runtime: R,
861
862    /// Nickname, use when reporting metrics.
863    nickname: HsNickname,
864
865    /// [`NetDirProvider`], for getting configuration values in consensus parameters.
866    netdir_provider: Arc<dyn NetDirProvider>,
867
868    /// Current configuration, used to see whether PoW is enabled or not.
869    config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
870
871    /// When the current update period started.
872    update_period_start: Instant,
873    /// Number of requests that were enqueued during the current update period, and had an effort
874    /// greater than or equal to the suggested effort.
875    num_enqueued_gte_suggested: usize,
876    /// Number of requests that were dequeued during the current update period.
877    num_dequeued: u32,
878    /// Amount of time during the current update period that we spent with no requests in the
879    /// queue.
880    idle_time: Duration,
881    /// Time that the queue last went from having items in it to not having items in it, or vice
882    /// versa. This is used to update idle_time.
883    last_transition: Instant,
884    /// Sum of all effort values that were validated and enqueued during the current update period.
885    total_effort: u64,
886
887    /// Most recent published suggested effort value.
888    ///
889    /// We write to this, which is then published in the pow-params line by [`PowManagerGeneric`].
890    suggested_effort: Arc<Mutex<Effort>>,
891
892    /// Sender for reporting back onion service status.
893    status_tx: PowManagerStatusSender,
894}
895
896impl<R: Runtime, Q: MockableRendRequest + Send + 'static> RendRequestReceiver<R, Q> {
897    /// Create a new [`RendRequestReceiver`].
898    fn new(
899        runtime: R,
900        nickname: HsNickname,
901        suggested_effort: Arc<Mutex<Effort>>,
902        netdir_provider: Arc<dyn NetDirProvider>,
903        status_tx: PowManagerStatusSender,
904        config_rx: postage::watch::Receiver<Arc<OnionServiceConfig>>,
905    ) -> Self {
906        let now = runtime.now();
907        RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
908            queue: BTreeSet::new(),
909            queue_pow_disabled: VecDeque::new(),
910            waker: None,
911            runtime,
912            nickname,
913            netdir_provider,
914            config_rx,
915            update_period_start: now,
916            num_enqueued_gte_suggested: 0,
917            num_dequeued: 0,
918            idle_time: Duration::new(0, 0),
919            last_transition: now,
920            total_effort: 0,
921            suggested_effort,
922            status_tx,
923        })))
924    }
925
926    // spawn_blocking executes immediately, but some of our abstractions make clippy not
927    // realize this.
928    #[allow(clippy::let_underscore_future)]
929    /// Start helper thread to accept and validate [`RendRequest`]s.
930    fn start_accept_thread<P: MockablePowManager + Send + Sync + 'static>(
931        &self,
932        runtime: R,
933        pow_manager: Arc<P>,
934        inner_receiver: mpsc::Receiver<Q>,
935    ) {
936        let receiver = self.clone();
937        let runtime_clone = runtime.clone();
938        let _ = runtime.clone().spawn_blocking(move || {
939            if let Err(err) =
940                receiver
941                    .clone()
942                    .accept_loop(&runtime_clone, &pow_manager, inner_receiver)
943            {
944                warn_report!(err, "PoW accept loop error!");
945                receiver
946                    .0
947                    .lock()
948                    .expect("Lock poisoned")
949                    .status_tx
950                    .send_broken(Problem::Pow(err));
951            }
952        });
953
954        let receiver = self.clone();
955        let _ = runtime.clone().spawn_blocking(move || {
956            if let Err(err) = receiver.clone().expire_old_requests_loop(&runtime) {
957                warn_report!(err, "PoW request expiration loop error!");
958                receiver
959                    .0
960                    .lock()
961                    .expect("Lock poisoned")
962                    .status_tx
963                    .send_broken(Problem::Pow(err));
964            }
965        });
966    }
967
968    /// Update the suggested effort value, as per the algorithm in prop362
969    fn update_suggested_effort(&self, net_params: &NetParameters) {
970        let mut inner = self.0.lock().expect("Lock poisoned");
971
972        let decay_adjustment_fraction = net_params.hs_pow_v1_default_decay_adjustment.as_fraction();
973
974        if inner.num_dequeued != 0 {
975            let update_period_duration = inner.runtime.now() - inner.update_period_start;
976            let avg_request_duration = update_period_duration / inner.num_dequeued;
977            if inner.queue.is_empty() {
978                let now = inner.runtime.now();
979                let last_transition = inner.last_transition;
980                inner.idle_time += now - last_transition;
981            }
982            let adjusted_idle_time = Duration::saturating_sub(
983                inner.idle_time,
984                avg_request_duration * inner.queue.len().try_into().expect("Queue too large."),
985            );
986            // TODO: use as_millis_f64 when stable
987            let idle_fraction = f64::from_u128(adjusted_idle_time.as_millis())
988                .expect("Conversion error")
989                / f64::from_u128(update_period_duration.as_millis()).expect("Conversion error");
990            let busy_fraction = 1.0 - idle_fraction;
991
992            let mut suggested_effort = inner.suggested_effort.lock().expect("Lock poisoned");
993            let suggested_effort_inner: u32 = (*suggested_effort).into();
994
995            if busy_fraction == 0.0 {
996                let new_suggested_effort =
997                    u32::from_f64(f64::from(suggested_effort_inner) * decay_adjustment_fraction)
998                        .expect("Conversion error");
999                *suggested_effort = Effort::from(new_suggested_effort);
1000            } else {
1001                let theoretical_num_dequeued =
1002                    f64::from(inner.num_dequeued) * (1.0 / busy_fraction);
1003                let num_enqueued_gte_suggested_f64 =
1004                    f64::from_usize(inner.num_enqueued_gte_suggested).expect("Conversion error");
1005
1006                if num_enqueued_gte_suggested_f64 >= theoretical_num_dequeued {
1007                    let effort_per_dequeued = u32::from_f64(
1008                        f64::from_u64(inner.total_effort).expect("Conversion error")
1009                            / f64::from(inner.num_dequeued),
1010                    )
1011                    .expect("Conversion error");
1012                    *suggested_effort = Effort::from(std::cmp::max(
1013                        effort_per_dequeued,
1014                        suggested_effort_inner + 1,
1015                    ));
1016                } else {
1017                    let decay = num_enqueued_gte_suggested_f64 / theoretical_num_dequeued;
1018                    let adjusted_decay = decay + ((1.0 - decay) * decay_adjustment_fraction);
1019                    let new_suggested_effort =
1020                        u32::from_f64(f64::from(suggested_effort_inner) * adjusted_decay)
1021                            .expect("Conversion error");
1022                    *suggested_effort = Effort::from(new_suggested_effort);
1023                }
1024            }
1025
1026            drop(suggested_effort);
1027        }
1028
1029        let now = inner.runtime.now();
1030
1031        inner.update_period_start = now;
1032        inner.num_enqueued_gte_suggested = 0;
1033        inner.num_dequeued = 0;
1034        inner.idle_time = Duration::new(0, 0);
1035        inner.last_transition = now;
1036        inner.total_effort = 0;
1037    }
1038
1039    /// Loop to accept message from the wrapped [`mpsc::Receiver`], validate PoW sovles, and
1040    /// enqueue onto the priority queue.
1041    #[allow(clippy::cognitive_complexity)]
1042    fn accept_loop<P: MockablePowManager>(
1043        self,
1044        runtime: &R,
1045        pow_manager: &Arc<P>,
1046        mut receiver: mpsc::Receiver<Q>,
1047    ) -> Result<(), PowError> {
1048        let mut request_num = 0;
1049
1050        let netdir_provider = self
1051            .0
1052            .lock()
1053            .expect("Lock poisoned")
1054            .netdir_provider
1055            .clone();
1056        let net_params = runtime
1057            .reenter_block_on(netdir_provider.wait_for_netdir(tor_netdir::Timeliness::Timely))?
1058            .params()
1059            .clone();
1060
1061        let max_effort: u32 = net_params
1062            .hs_pow_v1_max_effort
1063            .get()
1064            .try_into()
1065            .expect("Bounded i32 not in range of u32?!");
1066        let max_effort = Effort::from(max_effort);
1067
1068        let config_rx = self.0.lock().expect("Lock poisoned").config_rx.clone();
1069
1070        let nickname = self.0.lock().expect("Lock poisoned").nickname.to_string();
1071
1072        cfg_if::cfg_if! {
1073            if #[cfg(feature = "metrics")] {
1074                let counter_rendrequest_error_total = metrics::counter!("arti_hss_pow_rendrequest_error_total", "nickname" => nickname.clone());
1075                let counter_rendrequest_verification_failure = metrics::counter!("arti_hss_pow_rendrequest_verification_failure_total", "nickname" => nickname.clone());
1076                let counter_rend_queue_overflow = metrics::counter!("arti_hss_pow_rend_queue_overflow_total", "nickname" => nickname.clone());
1077                let counter_rendrequest_enqueued = metrics::counter!("arti_hss_pow_rendrequest_enqueued_total", "nickname" => nickname.clone());
1078                let histogram_rendrequest_effort = metrics::histogram!("arti_hss_pow_rendrequest_effort_hist", "nickname" => nickname.clone());
1079            }
1080        }
1081
1082        loop {
1083            let rend_request = if let Some(rend_request) = runtime.reenter_block_on(receiver.next())
1084            {
1085                rend_request
1086            } else {
1087                self.0
1088                    .lock()
1089                    .expect("Lock poisoned")
1090                    .status_tx
1091                    .send_shutdown();
1092                return Ok(());
1093            };
1094
1095            if config_rx.borrow().enable_pow {
1096                let rend_request =
1097                    match RendRequestOrdByEffort::new(rend_request, max_effort, request_num) {
1098                        Ok(rend_request) => rend_request,
1099                        Err(err) => {
1100                            #[cfg(feature = "metrics")]
1101                            counter_rendrequest_error_total.increment(1);
1102                            tracing::trace!(?err, "Error processing RendRequest");
1103                            continue;
1104                        }
1105                    };
1106
1107                request_num = request_num.wrapping_add(1);
1108
1109                if let Some(ref pow) = rend_request.pow {
1110                    if let Err(err) = pow_manager.check_solve(pow) {
1111                        tracing::debug!(?err, "PoW verification failed");
1112                        #[cfg(feature = "metrics")]
1113                        counter_rendrequest_verification_failure.increment(1);
1114                        continue;
1115                    } else {
1116                        #[cfg(feature = "metrics")]
1117                        {
1118                            let effort: u32 = pow.effort().into();
1119                            histogram_rendrequest_effort.record(effort);
1120                        }
1121                    }
1122                }
1123
1124                let mut inner = self.0.lock().expect("Lock poisoned");
1125                if inner.queue.is_empty() {
1126                    let now = runtime.now();
1127                    let last_transition = inner.last_transition;
1128                    inner.idle_time += now - last_transition;
1129                    inner.last_transition = now;
1130                }
1131                if let Some(ref request_pow) = rend_request.pow {
1132                    if request_pow.effort()
1133                        >= *inner.suggested_effort.lock().expect("Lock poisoned")
1134                    {
1135                        inner.num_enqueued_gte_suggested += 1;
1136                        let effort: u32 = request_pow.effort().into();
1137                        if let Some(total_effort) = inner.total_effort.checked_add(effort.into()) {
1138                            inner.total_effort = total_effort;
1139                        } else {
1140                            tracing::warn!(
1141                                "PoW total_effort would overflow. The total effort has been capped, but this is not expected to happen - please file a bug report with logs and information about the circumstances under which this occured."
1142                            );
1143                            inner.total_effort = u64::MAX;
1144                        }
1145                    }
1146                }
1147                if inner.queue.len() >= config_rx.borrow().pow_rend_queue_depth {
1148                    let dropped_request = inner.queue.pop_first();
1149                    #[cfg(feature = "metrics")]
1150                    counter_rend_queue_overflow.increment(1);
1151                    tracing::debug!(
1152                        dropped_effort = ?dropped_request.map(|x| x.pow.map(|x| x.effort())),
1153                        "RendRequest queue full, dropping request."
1154                    );
1155                }
1156                inner.queue.insert(rend_request);
1157                #[cfg(feature = "metrics")]
1158                counter_rendrequest_enqueued.increment(1);
1159                if let Some(waker) = &inner.waker {
1160                    waker.wake_by_ref();
1161                }
1162            } else {
1163                // TODO (#2082): when allowing enable_pow to be toggled at runtime, we will need to
1164                // do bookkeeping here, as above. Perhaps it can be refactored nicely so the
1165                // bookkeeping code can be the same in both cases.
1166                let mut inner = self.0.lock().expect("Lock poisoned");
1167                inner.queue_pow_disabled.push_back(rend_request);
1168                #[cfg(feature = "metrics")]
1169                counter_rendrequest_enqueued.increment(1);
1170                if let Some(waker) = &inner.waker {
1171                    waker.wake_by_ref();
1172                }
1173            }
1174        }
1175    }
1176
1177    /// Loop to check for messages that are older than our timeout and remove them from the queue.
1178    fn expire_old_requests_loop(self, runtime: &R) -> Result<(), PowError> {
1179        let netdir_provider = self
1180            .0
1181            .lock()
1182            .expect("Lock poisoned")
1183            .netdir_provider
1184            .clone();
1185        let net_params = runtime
1186            .reenter_block_on(netdir_provider.wait_for_netdir(tor_netdir::Timeliness::Timely))?
1187            .params()
1188            .clone();
1189
1190        let max_age: Duration = net_params
1191            .hs_pow_v1_service_intro_timeout
1192            .try_into()
1193            .expect(
1194                "Couldn't convert HiddenServiceProofOfWorkV1ServiceIntroTimeoutSeconds to Duration",
1195            );
1196
1197        let nickname = self.0.lock().expect("Lock poisoned").nickname.to_string();
1198        #[cfg(feature = "metrics")]
1199        let counter_rendrequest_expired = metrics::counter!("arti_hss_pow_rendrequest_expired_total", "nickname" => nickname.clone());
1200
1201        loop {
1202            let inner = self.0.lock().expect("Lock poisoned");
1203            // Wake up when the oldest request will reach the expiration age, or, if there are no
1204            // items currently in the queue, wait for the maximum age.
1205            let wait_time = inner
1206                .queue
1207                .first()
1208                .map(|r| {
1209                    max_age.saturating_sub(runtime.now().saturating_duration_since(r.recv_time))
1210                })
1211                .unwrap_or(max_age);
1212            drop(inner);
1213
1214            runtime.reenter_block_on(runtime.sleep(wait_time));
1215
1216            let mut inner = self.0.lock().expect("Lock poisoned");
1217            let now = runtime.now();
1218            let prev_len = inner.queue.len();
1219            inner.queue.retain(|r| now - r.recv_time < max_age);
1220            let dropped = prev_len - inner.queue.len();
1221            tracing::trace!(dropped, "Expired timed out RendRequests");
1222            #[cfg(feature = "metrics")]
1223            counter_rendrequest_expired
1224                .increment(dropped.try_into().expect("usize overflowed u64!"));
1225        }
1226    }
1227}
1228
1229impl<R: Runtime, Q: MockableRendRequest> Stream for RendRequestReceiver<R, Q> {
1230    type Item = Q;
1231
1232    fn poll_next(
1233        self: std::pin::Pin<&mut Self>,
1234        cx: &mut std::task::Context<'_>,
1235    ) -> std::task::Poll<Option<Self::Item>> {
1236        let mut inner = self.get_mut().0.lock().expect("Lock poisoned");
1237        if inner.config_rx.borrow().enable_pow {
1238            match inner.queue.pop_last() {
1239                Some(item) => {
1240                    inner.num_dequeued += 1;
1241                    if inner.queue.is_empty() {
1242                        inner.last_transition = inner.runtime.now();
1243                    }
1244                    std::task::Poll::Ready(Some(item.request))
1245                }
1246                None => {
1247                    inner.waker = Some(cx.waker().clone());
1248                    std::task::Poll::Pending
1249                }
1250            }
1251        } else if let Some(request) = inner.queue_pow_disabled.pop_front() {
1252            // TODO (#2082): when we allow changing enable_pow at runtime, we will need to do
1253            // bookkeeping here.
1254            std::task::Poll::Ready(Some(request))
1255        } else {
1256            inner.waker = Some(cx.waker().clone());
1257            std::task::Poll::Pending
1258        }
1259    }
1260}
1261
1262#[cfg(test)]
1263mod test {
1264    #![allow(clippy::unwrap_used)]
1265    use crate::config::OnionServiceConfigBuilder;
1266    use crate::status::{OnionServiceStatus, StatusSender};
1267
1268    use super::*;
1269    use futures::FutureExt;
1270    use tor_hscrypto::pow::v1::{Nonce, SolutionByteArray};
1271    use tor_netdir::{testnet, testprovider::TestNetDirProvider};
1272    use tor_rtmock::MockRuntime;
1273
1274    struct MockPowManager;
1275
1276    #[derive(Debug)]
1277    struct MockRendRequest {
1278        id: usize,
1279        pow: Option<ProofOfWork>,
1280    }
1281
1282    impl MockablePowManager for MockPowManager {
1283        fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
1284            // For testing, treat all zeros as the only valid solve. Error is chosen arbitrarily.
1285            if solve.solution() == &[0; 16] {
1286                Ok(())
1287            } else {
1288                Err(PowSolveError::InvalidSeedHead)
1289            }
1290        }
1291    }
1292
1293    impl MockableRendRequest for MockRendRequest {
1294        fn proof_of_work(&self) -> Result<Option<&ProofOfWork>, rend_handshake::IntroRequestError> {
1295            Ok(self.pow.as_ref())
1296        }
1297    }
1298
1299    fn make_req(id: usize, effort: Option<u32>) -> MockRendRequest {
1300        MockRendRequest {
1301            id,
1302            pow: effort.map(|e| {
1303                ProofOfWork::V1(ProofOfWorkV1::new(
1304                    Nonce::from([0; 16]),
1305                    Effort::from(e),
1306                    SeedHead::from([0; 4]),
1307                    SolutionByteArray::from([0; 16]),
1308                ))
1309            }),
1310        }
1311    }
1312
1313    fn make_req_invalid(id: usize, effort: u32) -> MockRendRequest {
1314        MockRendRequest {
1315            id,
1316            pow: Some(ProofOfWork::V1(ProofOfWorkV1::new(
1317                Nonce::from([0; 16]),
1318                Effort::from(effort),
1319                SeedHead::from([0; 4]),
1320                SolutionByteArray::from([1; 16]),
1321            ))),
1322        }
1323    }
1324
1325    #[allow(clippy::type_complexity)]
1326    fn make_test_receiver(
1327        runtime: &MockRuntime,
1328        netdir_params: Vec<(String, i32)>,
1329        config: Option<OnionServiceConfig>,
1330    ) -> (
1331        RendRequestReceiver<MockRuntime, MockRendRequest>,
1332        mpsc::Sender<MockRendRequest>,
1333        Arc<Mutex<Effort>>,
1334        NetParameters,
1335        postage::watch::Sender<Arc<OnionServiceConfig>>,
1336    ) {
1337        let pow_manager = Arc::new(MockPowManager);
1338        let suggested_effort = Arc::new(Mutex::new(Effort::zero()));
1339        let netdir = testnet::construct_custom_netdir_with_params(
1340            testnet::simple_net_func,
1341            netdir_params,
1342            None,
1343        )
1344        .unwrap()
1345        .unwrap_if_sufficient()
1346        .unwrap();
1347        let net_params = netdir.params().clone();
1348        let netdir_provider: Arc<TestNetDirProvider> = Arc::new(netdir.into());
1349        let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
1350        let nickname = HsNickname::new("test-hs".to_string()).unwrap();
1351        let (config_tx, config_rx) = postage::watch::channel_with(Arc::new(
1352            config.unwrap_or(
1353                OnionServiceConfigBuilder::default()
1354                    .nickname(nickname.clone())
1355                    .enable_pow(true)
1356                    .build()
1357                    .unwrap(),
1358            ),
1359        ));
1360        let receiver: RendRequestReceiver<_, MockRendRequest> = RendRequestReceiver::new(
1361            runtime.clone(),
1362            nickname.clone(),
1363            suggested_effort.clone(),
1364            netdir_provider,
1365            status_tx,
1366            config_rx,
1367        );
1368        let (tx, rx) = mpsc::channel(32);
1369        receiver.start_accept_thread(runtime.clone(), pow_manager, rx);
1370
1371        (receiver, tx, suggested_effort, net_params, config_tx)
1372    }
1373
1374    #[test]
1375    fn test_basic_pow_ordering() {
1376        MockRuntime::test_with_various(|runtime| async move {
1377            let (mut receiver, mut tx, _suggested_effort, _net_params, _config_tx) =
1378                make_test_receiver(&runtime, vec![], None);
1379
1380            // Request with no PoW
1381            tx.send(make_req(0, None)).await.unwrap();
1382            assert_eq!(receiver.next().await.unwrap().id, 0);
1383
1384            // Request with PoW
1385            tx.send(make_req(1, Some(0))).await.unwrap();
1386            assert_eq!(receiver.next().await.unwrap().id, 1);
1387
1388            // Request with effort is before request with zero effort
1389            tx.send(make_req(2, Some(0))).await.unwrap();
1390            tx.send(make_req(3, Some(16))).await.unwrap();
1391            runtime.progress_until_stalled().await;
1392            assert_eq!(receiver.next().await.unwrap().id, 3);
1393            assert_eq!(receiver.next().await.unwrap().id, 2);
1394
1395            // Invalid solves are dropped
1396            tx.send(make_req_invalid(4, 32)).await.unwrap();
1397            tx.send(make_req(5, Some(16))).await.unwrap();
1398            runtime.progress_until_stalled().await;
1399            assert_eq!(receiver.next().await.unwrap().id, 5);
1400            assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
1401        });
1402    }
1403
1404    #[test]
1405    fn test_suggested_effort_increase() {
1406        MockRuntime::test_with_various(|runtime| async move {
1407            let (mut receiver, mut tx, suggested_effort, net_params, _config_tx) =
1408                make_test_receiver(
1409                    &runtime,
1410                    vec![(
1411                        "HiddenServiceProofOfWorkV1ServiceIntroTimeoutSeconds".to_string(),
1412                        60000,
1413                    )],
1414                    None,
1415                );
1416
1417            // Get through all the requests in plenty of time, no increase
1418
1419            for n in 0..128 {
1420                tx.send(make_req(n, Some(0))).await.unwrap();
1421            }
1422
1423            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1424
1425            for _ in 0..128 {
1426                receiver.next().await.unwrap();
1427            }
1428
1429            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1430            receiver.update_suggested_effort(&net_params);
1431
1432            assert_eq!(suggested_effort.lock().unwrap().clone(), Effort::zero());
1433
1434            // Requests left in the queue with zero suggested effort, suggested effort should
1435            // increase
1436
1437            for n in 0..128 {
1438                tx.send(make_req(n, Some(0))).await.unwrap();
1439            }
1440
1441            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1442
1443            for _ in 0..64 {
1444                receiver.next().await.unwrap();
1445            }
1446
1447            runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1448            receiver.update_suggested_effort(&net_params);
1449
1450            let mut new_suggested_effort = *suggested_effort.lock().unwrap();
1451            assert!(new_suggested_effort > Effort::zero());
1452
1453            // We keep on being behind, effort should increase again.
1454
1455            for n in 0..64 {
1456                tx.send(make_req(n, Some(new_suggested_effort.into())))
1457                    .await
1458                    .unwrap();
1459            }
1460
1461            receiver.next().await.unwrap();
1462            runtime.advance_by(HS_UPDATE_PERIOD).await;
1463            receiver.update_suggested_effort(&net_params);
1464
1465            let mut old_suggested_effort = new_suggested_effort;
1466            new_suggested_effort = *suggested_effort.lock().unwrap();
1467            assert!(new_suggested_effort > old_suggested_effort);
1468
1469            // We catch up now, effort should start dropping, but not be zero immediately.
1470
1471            for n in 0..32 {
1472                tx.send(make_req(n, Some(new_suggested_effort.into())))
1473                    .await
1474                    .unwrap();
1475            }
1476
1477            runtime.advance_by(HS_UPDATE_PERIOD / 16 * 15).await;
1478
1479            while receiver.next().now_or_never().is_some() {
1480                // Keep going...
1481            }
1482
1483            runtime.advance_by(HS_UPDATE_PERIOD / 16).await;
1484            receiver.update_suggested_effort(&net_params);
1485
1486            old_suggested_effort = new_suggested_effort;
1487            new_suggested_effort = *suggested_effort.lock().unwrap();
1488            assert!(new_suggested_effort < old_suggested_effort);
1489            assert!(new_suggested_effort > Effort::zero());
1490
1491            // Effort will drop to zero eventually
1492
1493            let mut num_loops = 0;
1494            loop {
1495                tx.send(make_req(0, Some(new_suggested_effort.into())))
1496                    .await
1497                    .unwrap();
1498                runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1499
1500                while receiver.next().now_or_never().is_some() {
1501                    // Keep going...
1502                }
1503
1504                runtime.advance_by(HS_UPDATE_PERIOD / 2).await;
1505                receiver.update_suggested_effort(&net_params);
1506
1507                old_suggested_effort = new_suggested_effort;
1508                new_suggested_effort = *suggested_effort.lock().unwrap();
1509
1510                assert!(new_suggested_effort < old_suggested_effort);
1511
1512                if new_suggested_effort == Effort::zero() {
1513                    break;
1514                }
1515
1516                num_loops += 1;
1517
1518                if num_loops > 5 {
1519                    panic!("Took too long for suggested effort to fall!");
1520                }
1521            }
1522        });
1523    }
1524
1525    #[test]
1526    fn test_rendrequest_timeout() {
1527        MockRuntime::test_with_various(|runtime| async move {
1528            let (receiver, mut tx, _suggested_effort, net_params, _config_tx) =
1529                make_test_receiver(&runtime, vec![], None);
1530
1531            let r0 = MockRendRequest { id: 0, pow: None };
1532            tx.send(r0).await.unwrap();
1533
1534            let max_age: Duration = net_params
1535                .hs_pow_v1_service_intro_timeout
1536                .try_into()
1537                .unwrap();
1538            runtime.advance_by(max_age * 2).await;
1539
1540            // Waited too long, request has been dropped
1541            assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
1542        });
1543    }
1544
1545    #[test]
1546    fn test_pow_disabled() {
1547        MockRuntime::test_with_various(|runtime| async move {
1548            let (mut receiver, mut tx, _suggested_effort, _net_params, _config_tx) =
1549                make_test_receiver(
1550                    &runtime,
1551                    vec![],
1552                    Some(
1553                        OnionServiceConfigBuilder::default()
1554                            .nickname(HsNickname::new("test-hs".to_string()).unwrap())
1555                            .enable_pow(false)
1556                            .build()
1557                            .unwrap(),
1558                    ),
1559                );
1560
1561            // Request with no PoW
1562            tx.send(make_req(0, None)).await.unwrap();
1563            tx.send(make_req(1, Some(0))).await.unwrap();
1564            tx.send(make_req(2, Some(20))).await.unwrap();
1565            tx.send(make_req(3, Some(10))).await.unwrap();
1566
1567            runtime.progress_until_stalled().await;
1568
1569            // Requests are FIFO, since PoW is disabled
1570            assert_eq!(receiver.next().await.unwrap().id, 0);
1571            assert_eq!(receiver.next().await.unwrap().id, 1);
1572            assert_eq!(receiver.next().await.unwrap().id, 2);
1573            assert_eq!(receiver.next().await.unwrap().id, 3);
1574        });
1575    }
1576
1577    #[test]
1578    fn test_rend_queue_max_depth() {
1579        MockRuntime::test_with_various(|runtime| async move {
1580            let (mut receiver, mut tx, _suggested_effort, _net_params, mut config_tx) =
1581                make_test_receiver(
1582                    &runtime,
1583                    vec![],
1584                    Some(
1585                        OnionServiceConfigBuilder::default()
1586                            .nickname(HsNickname::new("test-hs".to_string()).unwrap())
1587                            .enable_pow(true)
1588                            .pow_rend_queue_depth(2)
1589                            .build()
1590                            .unwrap(),
1591                    ),
1592                );
1593
1594            tx.send(make_req(0, None)).await.unwrap();
1595            tx.send(make_req(1, None)).await.unwrap();
1596            tx.send(make_req(2, None)).await.unwrap();
1597
1598            runtime.progress_until_stalled().await;
1599
1600            assert!(receiver.next().await.is_some());
1601            assert!(receiver.next().await.is_some());
1602            assert_eq!(receiver.0.lock().unwrap().queue.len(), 0);
1603
1604            // Check that increasing queue size at runtime works...
1605
1606            config_tx
1607                .send(Arc::new(
1608                    OnionServiceConfigBuilder::default()
1609                        .nickname(HsNickname::new("test-hs".to_string()).unwrap())
1610                        .enable_pow(true)
1611                        .pow_rend_queue_depth(8)
1612                        .build()
1613                        .unwrap(),
1614                ))
1615                .await
1616                .unwrap();
1617
1618            tx.send(make_req(0, None)).await.unwrap();
1619            tx.send(make_req(1, None)).await.unwrap();
1620            tx.send(make_req(2, None)).await.unwrap();
1621
1622            runtime.progress_until_stalled().await;
1623
1624            assert!(receiver.next().await.is_some());
1625            assert!(receiver.next().await.is_some());
1626            assert!(receiver.next().await.is_some());
1627        });
1628    }
1629}