1
//! Code implementing version 1 proof-of-work for onion service hosts.
2
//!
3
//! Spec links:
4
//! * <https://spec.torproject.org/hspow-spec/common-protocol.html>
5
//! * <https://spec.torproject.org/hspow-spec/v1-equix.html>
6

            
7
use std::{
8
    collections::{BinaryHeap, HashMap},
9
    sync::{Arc, Mutex, RwLock},
10
    task::Waker,
11
    time::{Duration, SystemTime},
12
};
13

            
14
use arrayvec::ArrayVec;
15
use futures::task::SpawnExt;
16
use futures::{channel::mpsc, Stream};
17
use futures::{SinkExt, StreamExt};
18
use rand::{CryptoRng, RngCore};
19
use serde::{Deserialize, Serialize};
20
use tor_basic_utils::RngExt as _;
21
use tor_cell::relaycell::hs::pow::{v1::ProofOfWorkV1, ProofOfWork};
22
use tor_checkable::timed::TimerangeBound;
23
use tor_hscrypto::{
24
    pk::HsBlindIdKey,
25
    pow::v1::{Effort, Instance, Seed, SeedHead, Solution, SolutionErrorV1, Verifier},
26
    time::TimePeriod,
27
};
28
use tor_keymgr::KeyMgr;
29
use tor_netdoc::doc::hsdesc::pow::{v1::PowParamsV1, PowParams};
30
use tor_persist::{
31
    hsnickname::HsNickname,
32
    state_dir::{InstanceRawSubdir, StorageHandle},
33
};
34
use tor_rtcompat::Runtime;
35

            
36
use crate::{
37
    rend_handshake, replay::PowNonceReplayLog, BlindIdPublicKeySpecifier, CreateIptError,
38
    RendRequest, ReplayError, StartupError,
39
};
40

            
41
use super::NewPowManager;
42

            
43
/// This is responsible for rotating Proof-of-Work seeds and doing verification of PoW solves.
44
pub(crate) struct PowManager<R>(RwLock<State<R>>);
45

            
46
/// Internal state for [`PowManager`].
47
struct State<R> {
48
    /// The [`Seed`]s for a given [`TimePeriod`]
49
    ///
50
    /// The [`ArrayVec`] contains the current and previous seed, and the [`SystemTime`] is when the
51
    /// current seed will expire.
52
    seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
53

            
54
    /// Verifiers for all the seeds that exist in `seeds`.
55
    verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
56

            
57
    /// The nickname for this hidden service.
58
    ///
59
    /// We need this so we can get the blinded keys from the [`KeyMgr`].
60
    nickname: HsNickname,
61

            
62
    /// Directory used to store nonce replay log.
63
    instance_dir: InstanceRawSubdir,
64

            
65
    /// Key manager.
66
    keymgr: Arc<KeyMgr>,
67

            
68
    /// Current suggested effort that we publish in the pow-params line.
69
    suggested_effort: Effort,
70

            
71
    /// Runtime
72
    runtime: R,
73

            
74
    /// Handle for storing state we need to persist to disk.
75
    storage_handle: StorageHandle<PowManagerStateRecord>,
76

            
77
    /// Queue to tell the publisher to re-upload a descriptor for a given TP, since we've rotated
78
    /// that seed.
79
    publisher_update_tx: mpsc::Sender<TimePeriod>,
80
}
81

            
82
#[derive(Serialize, Deserialize, Debug, Clone)]
83
/// Information about the current and previous [`Seed`] for a given [`TimePeriod`].
84
struct SeedsForTimePeriod {
85
    /// The previous and current [`Seed`].
86
    ///
87
    /// The last element in this array is the current seed.
88
    seeds: ArrayVec<Seed, 2>,
89

            
90
    /// When the current seed will expire.
91
    next_expiration_time: SystemTime,
92
}
93

            
94
#[derive(Debug)]
95
#[allow(unused)]
96
/// A PoW solve was invalid.
97
///
98
/// While this contains the reason for the failure, we probably just want to use that for
99
/// debugging, we shouldn't make any logical decisions based on what the particular error was.
100
pub(crate) enum PowSolveError {
101
    /// Seed head was not recognized, it may be expired.
102
    InvalidSeedHead,
103
    /// We have already seen a solve with this nonce
104
    NonceReplay(ReplayError),
105
    /// The bytes given as a solution do not form a valid Equi-X puzzle
106
    InvalidEquixSolution(SolutionErrorV1),
107
    /// The solution given was invalid.
108
    InvalidSolve(tor_hscrypto::pow::Error),
109
}
110

            
111
/// On-disk record of [`PowManager`] state.
112
#[derive(Serialize, Deserialize, Debug)]
113
pub(crate) struct PowManagerStateRecord {
114
    /// Seeds for each time period.
115
    ///
116
    /// Conceptually, this is a map between TimePeriod and SeedsForTimePeriod, but since TimePeriod
117
    /// can't be serialized to a string, it's not very simple to use serde to serialize it like
118
    /// that, so we instead store it as a list of tuples, and convert it to/from the map when
119
    /// saving/loading.
120
    seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
121
    // TODO POW: suggested_effort / etc should be serialized
122
}
123

            
124
impl<R: Runtime> State<R> {
125
    /// Make a [`PowManagerStateRecord`] for this state.
126
    pub(crate) fn to_record(&self) -> PowManagerStateRecord {
127
        PowManagerStateRecord {
128
            seeds: self.seeds.clone().into_iter().collect(),
129
        }
130
    }
131
}
132

            
133
/// How soon before a seed's expiration time we should rotate it and publish a new seed.
134
const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
135

            
136
/// Minimum seed expiration time in minutes. See:
137
/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
138
const EXPIRATION_TIME_MINS_MIN: u64 = 105;
139

            
140
/// Maximum seed expiration time in minutes. See:
141
/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
142
const EXPIRATION_TIME_MINS_MAX: u64 = 120;
143

            
144
/// Enforce that early rotation time is less than or equal to min expiration time.
145
const _: () = assert!(
146
    SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
147
    "Early rotation time must be less than minimum expiration time"
148
);
149

            
150
/// Enforce that min expiration time is less than or equal to max.
151
const _: () = assert!(
152
    EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
153
    "Minimum expiration time must be less than or equal to max"
154
);
155

            
156
/// Depth of the queue used to signal the publisher that it needs to update a given time period.
157
///
158
/// 32 is likely way larger than we need but the messages are tiny so we might as well.
159
const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
160

            
161
#[derive(Debug)]
162
#[allow(dead_code)] // We want to show fields in Debug even if we don't use them.
163
/// Internal error within the PoW subsystem.
164
pub(crate) enum InternalPowError {
165
    /// We don't have a key that is needed.
166
    MissingKey,
167
    /// Error in the underlying storage layer.
168
    StorageError,
169
    /// Error from the ReplayLog.
170
    CreateIptError(CreateIptError),
171
}
172

            
173
impl<R: Runtime> PowManager<R> {
174
    /// Create a new [`PowManager`].
175
    #[allow(clippy::new_ret_no_self)]
176
    pub(crate) fn new(
177
        runtime: R,
178
        nickname: HsNickname,
179
        instance_dir: InstanceRawSubdir,
180
        keymgr: Arc<KeyMgr>,
181
        storage_handle: StorageHandle<PowManagerStateRecord>,
182
    ) -> Result<NewPowManager<R>, StartupError> {
183
        let on_disk_state = storage_handle.load().map_err(StartupError::LoadState)?;
184
        let seeds = on_disk_state.map_or(vec![], |on_disk_state| on_disk_state.seeds);
185
        let seeds = seeds.into_iter().collect();
186

            
187
        // This queue is extremely small, and we only make one of it per onion service, so it's
188
        // fine to not use memquota tracking.
189
        let (publisher_update_tx, publisher_update_rx) =
190
            crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
191

            
192
        let state = State {
193
            seeds,
194
            nickname,
195
            instance_dir,
196
            keymgr,
197
            publisher_update_tx,
198
            verifiers: HashMap::new(),
199
            suggested_effort: Effort::zero(),
200
            runtime: runtime.clone(),
201
            storage_handle,
202
        };
203
        let pow_manager = Arc::new(PowManager(RwLock::new(state)));
204

            
205
        let (rend_req_tx, rend_req_rx) = super::make_rend_queue();
206
        let rend_req_rx = RendRequestReceiver::new(runtime, pow_manager.clone(), rend_req_rx);
207

            
208
        Ok(NewPowManager {
209
            pow_manager,
210
            rend_req_tx,
211
            rend_req_rx: Box::pin(rend_req_rx),
212
            publisher_update_rx,
213
        })
214
    }
215

            
216
    /// Launch background task to rotate seeds.
217
    pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
218
        let pow_manager = self.clone();
219
        let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
220

            
221
        runtime
222
            .spawn(pow_manager.main_loop_task())
223
            .map_err(|cause| StartupError::Spawn {
224
                spawning: "pow manager",
225
                cause: cause.into(),
226
            })?;
227
        Ok(())
228
    }
229

            
230
    /// Main loop for rotating seeds.
231
    async fn main_loop_task(self: Arc<Self>) {
232
        let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
233

            
234
        loop {
235
            let next_update_time = self.rotate_seeds_if_expiring().await;
236

            
237
            // A new TimePeriod that we don't know about (and thus that isn't in next_update_time)
238
            // might get added at any point. Making sure that our maximum delay is the minimum
239
            // amount of time that it might take for a seed to expire means that we can be sure
240
            // that we will rotate newly-added seeds properly.
241
            let max_delay =
242
                Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60) - SEED_EARLY_ROTATION_TIME;
243
            let delay = next_update_time
244
                .map(|x| x.duration_since(SystemTime::now()).unwrap_or(max_delay))
245
                .unwrap_or(max_delay)
246
                .min(max_delay);
247

            
248
            tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
249

            
250
            runtime.sleep(delay).await;
251
        }
252
    }
253

            
254
    /// Make a randomized seed expiration time.
255
    fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
256
        SystemTime::now()
257
            + Duration::from_secs(
258
                60 * rng
259
                    .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
260
                    .expect("Can't generate expiration_time"),
261
            )
262
    }
263

            
264
    /// Make a ner [`Verifier`] for a given [`TimePeriod`] and [`Seed`].
265
    ///
266
    /// If a key is not available for this TP, returns None.
267
    ///
268
    /// This takes individual arguments instead of `&self` to avoid getting into any trouble with
269
    /// locking.
270
    fn make_verifier(
271
        keymgr: &Arc<KeyMgr>,
272
        nickname: HsNickname,
273
        time_period: TimePeriod,
274
        seed: Seed,
275
    ) -> Option<Verifier> {
276
        let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
277
        let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
278
            Ok(blind_id_key) => blind_id_key,
279
            Err(err) => {
280
                tracing::warn!(?err, "KeyMgr error when getting blinded ID key for PoW");
281
                None
282
            }
283
        };
284
        let instance = Instance::new(blind_id_key?.id(), seed);
285
        Some(Verifier::new(instance))
286
    }
287

            
288
    /// Calculate a time when we want to rotate a seed, slightly before it expires, in order to
289
    /// ensure that clients don't ever download a seed that is already out of date.
290
    fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
291
        // Underflow cannot happen because:
292
        //
293
        // * We set the expiration time to the current time plus at least the minimum
294
        //   expiration time
295
        // * We know (backed up by a compile-time assertion) that SEED_EARLY_ROTATION_TIME is
296
        //   less than the minimum expiration time.
297
        //
298
        // Thus, the only way this subtraction can underflow is if the system time at the
299
        // moment we set the expiration time was before the epoch, which is not possible on
300
        // reasonable platforms.
301
        expiration_time
302
            .checked_sub(SEED_EARLY_ROTATION_TIME)
303
            .expect("PoW seed expiration underflow")
304
    }
305

            
306
    /// Rotate any seeds that will expire soon.
307
    ///
308
    /// This also pokes the publisher when needed to cause rotated seeds to be published.
309
    ///
310
    /// Returns the next time this function should be called again.
311
    #[allow(clippy::cognitive_complexity)]
312
    async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
313
        let mut expired_verifiers = vec![];
314
        let mut new_verifiers = vec![];
315

            
316
        let mut update_times = vec![];
317
        let mut updated_tps = vec![];
318
        let mut expired_tps = vec![];
319

            
320
        let mut publisher_update_tx = {
321
            // TODO POW: get rng from the right place...
322
            let mut rng = rand::rng();
323

            
324
            let mut state = self.0.write().expect("Lock poisoned");
325

            
326
            let keymgr = state.keymgr.clone();
327
            let nickname = state.nickname.clone();
328

            
329
            for (time_period, info) in state.seeds.iter_mut() {
330
                let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
331
                update_times.push(rotation_time);
332

            
333
                if rotation_time <= SystemTime::now() {
334
                    let seed = Seed::new(&mut rng, None);
335
                    let verifier = match Self::make_verifier(
336
                        &keymgr,
337
                        nickname.clone(),
338
                        *time_period,
339
                        seed.clone(),
340
                    ) {
341
                        Some(verifier) => verifier,
342
                        None => {
343
                            // We use not having a key for a given TP as the signal that we should
344
                            // stop keeping track of seeds for that TP.
345
                            expired_tps.push(*time_period);
346
                            continue;
347
                        }
348
                    };
349

            
350
                    let expired_seed = if info.seeds.is_full() {
351
                        info.seeds.pop_at(0)
352
                    } else {
353
                        None
354
                    };
355
                    // .push() is safe, since we just made space above.
356
                    info.seeds.push(seed.clone());
357
                    info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
358
                    update_times.push(info.next_expiration_time);
359

            
360
                    // Make a note to add the new verifier and remove the old one.
361
                    new_verifiers.push((seed, verifier));
362
                    if let Some(expired_seed) = expired_seed {
363
                        expired_verifiers.push(expired_seed.head());
364
                    }
365

            
366
                    // Tell the publisher to update this TP
367
                    updated_tps.push(*time_period);
368

            
369
                    tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
370
                }
371
            }
372

            
373
            for time_period in expired_tps {
374
                if let Some(seeds) = state.seeds.remove(&time_period) {
375
                    for seed in seeds.seeds {
376
                        state.verifiers.remove(&seed.head());
377
                    }
378
                }
379
            }
380

            
381
            for (seed, verifier) in new_verifiers {
382
                let replay_log = Mutex::new(
383
                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
384
                        .expect("Couldn't make ReplayLog."),
385
                );
386
                state.verifiers.insert(seed.head(), (verifier, replay_log));
387
            }
388

            
389
            for seed_head in expired_verifiers {
390
                state.verifiers.remove(&seed_head);
391
            }
392

            
393
            let record = state.to_record();
394
            if let Err(err) = state.storage_handle.store(&record) {
395
                tracing::warn!(?err, "Error saving PoW state");
396
            }
397

            
398
            state.publisher_update_tx.clone()
399
        };
400

            
401
        for time_period in updated_tps {
402
            if let Err(err) = publisher_update_tx.send(time_period).await {
403
                tracing::warn!(?err, "Couldn't send update message to publisher");
404
            }
405
        }
406

            
407
        update_times.iter().min().cloned()
408
    }
409

            
410
    /// Get [`PowParams`] for a given [`TimePeriod`].
411
    ///
412
    /// If we don't have any [`Seed`]s for the requested period, generate them. This is the only
413
    /// way that [`PowManager`] learns about new [`TimePeriod`]s.
414
    pub(crate) fn get_pow_params(
415
        self: &Arc<Self>,
416
        time_period: TimePeriod,
417
    ) -> Result<PowParams, InternalPowError> {
418
        let (seed_and_expiration, suggested_effort) = {
419
            let state = self.0.read().expect("Lock poisoned");
420
            let seed = state
421
                .seeds
422
                .get(&time_period)
423
                .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
424
            (seed, state.suggested_effort)
425
        };
426

            
427
        let (seed, expiration) = match seed_and_expiration {
428
            Some(seed) => seed,
429
            None => {
430
                // We don't have a seed for this time period, so we need to generate one.
431

            
432
                // TODO POW: get rng from the right place...
433
                let mut rng = rand::rng();
434

            
435
                let seed = Seed::new(&mut rng, None);
436
                let next_expiration_time = Self::make_next_expiration_time(&mut rng);
437

            
438
                let mut seeds = ArrayVec::new();
439
                seeds.push(seed.clone());
440

            
441
                let mut state = self.0.write().expect("Lock poisoned");
442

            
443
                state.seeds.insert(
444
                    time_period,
445
                    SeedsForTimePeriod {
446
                        seeds,
447
                        next_expiration_time,
448
                    },
449
                );
450

            
451
                let verifier = Self::make_verifier(
452
                    &state.keymgr,
453
                    state.nickname.clone(),
454
                    time_period,
455
                    seed.clone(),
456
                )
457
                .ok_or(InternalPowError::MissingKey)?;
458

            
459
                let replay_log = Mutex::new(
460
                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
461
                        .map_err(InternalPowError::CreateIptError)?,
462
                );
463
                state.verifiers.insert(seed.head(), (verifier, replay_log));
464

            
465
                let record = state.to_record();
466
                state
467
                    .storage_handle
468
                    .store(&record)
469
                    .map_err(|_| InternalPowError::StorageError)?;
470

            
471
                (seed, next_expiration_time)
472
            }
473
        };
474

            
475
        Ok(PowParams::V1(PowParamsV1::new(
476
            TimerangeBound::new(seed, ..expiration),
477
            suggested_effort,
478
        )))
479
    }
480

            
481
    /// Verify a PoW solve.
482
    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
483
        // TODO POW: This puts the nonce in the replay structure before we check if the solve is
484
        // valid, which could be a problem — a potential attack would be to send a large number of
485
        // invalid solves with the hope of causing collisions with valid requests. This is probably
486
        // highly impractical, but we should think through it before stabilizing PoW.
487
        {
488
            let state = self.0.write().expect("Lock poisoned");
489
            let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
490
                Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
491
                None => return Err(PowSolveError::InvalidSeedHead),
492
            };
493
            replay_log
494
                .check_for_replay(solve.nonce())
495
                .map_err(PowSolveError::NonceReplay)?;
496
        }
497

            
498
        // TODO: Once RwLock::downgrade is stabilized, it would make sense to use it here...
499

            
500
        let state = self.0.read().expect("Lock poisoned");
501
        let verifier = match state.verifiers.get(&solve.seed_head()) {
502
            Some((verifier, _)) => verifier,
503
            None => return Err(PowSolveError::InvalidSeedHead),
504
        };
505

            
506
        let solution = match Solution::try_from_bytes(
507
            solve.nonce().clone(),
508
            solve.effort(),
509
            solve.seed_head(),
510
            solve.solution(),
511
        ) {
512
            Ok(solution) => solution,
513
            Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
514
        };
515

            
516
        match verifier.check(&solution) {
517
            Ok(()) => Ok(()),
518
            Err(err) => Err(PowSolveError::InvalidSolve(err)),
519
        }
520
    }
521
}
522

            
523
/// Wrapper around [`RendRequest`] that implements [`std::cmp::Ord`] to sort by [`Effort`].
524
#[derive(Debug)]
525
struct RendRequestOrdByEffort {
526
    /// The underlying request.
527
    request: RendRequest,
528
    /// The proof-of-work options, if given.
529
    pow: Option<ProofOfWorkV1>,
530
}
531

            
532
impl RendRequestOrdByEffort {
533
    /// Create a new [`RendRequestOrdByEffort`].
534
    fn new(request: RendRequest) -> Result<Self, rend_handshake::IntroRequestError> {
535
        let pow = match request
536
            .intro_request()?
537
            .intro_payload()
538
            .proof_of_work_extension()
539
            .cloned()
540
        {
541
            Some(ProofOfWork::V1(pow)) => Some(pow),
542
            None | Some(_) => None,
543
        };
544

            
545
        Ok(Self { request, pow })
546
    }
547
}
548

            
549
impl Ord for RendRequestOrdByEffort {
550
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
551
        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
552
        let other_effort = other
553
            .pow
554
            .as_ref()
555
            .map_or(Effort::zero(), |pow| pow.effort());
556
        self_effort.cmp(&other_effort)
557
    }
558
}
559

            
560
impl PartialOrd for RendRequestOrdByEffort {
561
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
562
        Some(self.cmp(other))
563
    }
564
}
565

            
566
impl PartialEq for RendRequestOrdByEffort {
567
    fn eq(&self, other: &Self) -> bool {
568
        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
569
        let other_effort = other
570
            .pow
571
            .as_ref()
572
            .map_or(Effort::zero(), |pow| pow.effort());
573
        self_effort == other_effort
574
    }
575
}
576

            
577
impl Eq for RendRequestOrdByEffort {}
578

            
579
/// Implements [`Stream`] for incoming [`RendRequest`]s, using a priority queue system to dequeue
580
/// high-[`Effort`] requests first.
581
///
582
/// This is implemented on top of a [`mpsc::Receiver`]. There is a thread that dequeues from the
583
/// [`mpsc::Receiver`], checks the PoW solve, and if it is correct, adds it to a [`BinaryHeap`],
584
/// which the [`Stream`] implementation reads from.
585
///
586
/// This is not particularly optimized — queueing and dequeuing use a [`Mutex`], so there may be
587
/// some contention there. It's possible there may be some fancy lockless (or more optimized)
588
/// priority queue that we could use, but we should properly benchmark things before trying to make
589
/// a optimization like that.
590
#[derive(Clone)]
591
pub(crate) struct RendRequestReceiver(Arc<Mutex<RendRequestReceiverInner>>);
592

            
593
/// Inner implementation for [`RendRequestReceiver`].
594
struct RendRequestReceiverInner {
595
    /// Internal priority queue of requests.
596
    queue: BinaryHeap<RendRequestOrdByEffort>,
597

            
598
    /// Waker to inform async readers when there is a new message on the queue.
599
    waker: Option<Waker>,
600
}
601

            
602
impl RendRequestReceiver {
603
    /// Create a new [`RendRequestReceiver`].
604
    fn new<R: Runtime>(
605
        runtime: R,
606
        pow_manager: Arc<PowManager<R>>,
607
        inner_receiver: mpsc::Receiver<RendRequest>,
608
    ) -> Self {
609
        let receiver = RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
610
            queue: BinaryHeap::new(),
611
            waker: None,
612
        })));
613
        let receiver_clone = receiver.clone();
614
        let accept_thread = runtime.clone().spawn_blocking(move || {
615
            receiver_clone.accept_loop(&runtime, &pow_manager, inner_receiver);
616
        });
617
        drop(accept_thread);
618
        receiver
619
    }
620

            
621
    /// Loop to accept message from the wrapped [`mpsc::Receiver`], validate PoW sovles, and
622
    /// enqueue onto the priority queue.
623
    fn accept_loop<R: Runtime>(
624
        self,
625
        runtime: &R,
626
        pow_manager: &Arc<PowManager<R>>,
627
        mut receiver: mpsc::Receiver<RendRequest>,
628
    ) {
629
        loop {
630
            let rend_request = runtime
631
                .reenter_block_on(receiver.next())
632
                .expect("Other side of RendRequest queue hung up");
633
            let rend_request = match RendRequestOrdByEffort::new(rend_request) {
634
                Ok(rend_request) => rend_request,
635
                Err(err) => {
636
                    tracing::trace!(?err, "Error processing RendRequest");
637
                    continue;
638
                }
639
            };
640

            
641
            if let Some(ref pow) = rend_request.pow {
642
                if let Err(err) = pow_manager.check_solve(pow) {
643
                    tracing::debug!(?err, "PoW verification failed");
644
                    continue;
645
                }
646
            }
647

            
648
            let mut inner = self.0.lock().expect("Lock poisened");
649
            inner.queue.push(rend_request);
650
            if let Some(waker) = &inner.waker {
651
                waker.wake_by_ref();
652
            }
653
        }
654
    }
655
}
656

            
657
impl Stream for RendRequestReceiver {
658
    type Item = RendRequest;
659

            
660
    fn poll_next(
661
        self: std::pin::Pin<&mut Self>,
662
        cx: &mut std::task::Context<'_>,
663
    ) -> std::task::Poll<Option<Self::Item>> {
664
        let mut inner = self.get_mut().0.lock().expect("Lock poisened");
665
        match inner.queue.pop() {
666
            Some(item) => std::task::Poll::Ready(Some(item.request)),
667
            None => {
668
                inner.waker = Some(cx.waker().clone());
669
                std::task::Poll::Pending
670
            }
671
        }
672
    }
673
}