tor_hsservice/pow/
v1.rs

1//! Code implementing version 1 proof-of-work for onion service hosts.
2//!
3//! Spec links:
4//! * <https://spec.torproject.org/hspow-spec/common-protocol.html>
5//! * <https://spec.torproject.org/hspow-spec/v1-equix.html>
6
7use std::{
8    collections::{BinaryHeap, HashMap},
9    sync::{Arc, Mutex, RwLock},
10    task::Waker,
11    time::{Duration, SystemTime},
12};
13
14use arrayvec::ArrayVec;
15use futures::task::SpawnExt;
16use futures::{channel::mpsc, Stream};
17use futures::{SinkExt, StreamExt};
18use rand::{CryptoRng, RngCore};
19use serde::{Deserialize, Serialize};
20use tor_basic_utils::RngExt as _;
21use tor_cell::relaycell::hs::pow::{v1::ProofOfWorkV1, ProofOfWork};
22use tor_checkable::timed::TimerangeBound;
23use tor_hscrypto::{
24    pk::HsBlindIdKey,
25    pow::v1::{Effort, Instance, Seed, SeedHead, Solution, SolutionErrorV1, Verifier},
26    time::TimePeriod,
27};
28use tor_keymgr::KeyMgr;
29use tor_netdoc::doc::hsdesc::pow::{v1::PowParamsV1, PowParams};
30use tor_persist::{
31    hsnickname::HsNickname,
32    state_dir::{InstanceRawSubdir, StorageHandle},
33};
34use tor_rtcompat::Runtime;
35
36use crate::{
37    rend_handshake, replay::PowNonceReplayLog, BlindIdPublicKeySpecifier, CreateIptError,
38    RendRequest, ReplayError, StartupError,
39};
40
41use super::NewPowManager;
42
43/// This is responsible for rotating Proof-of-Work seeds and doing verification of PoW solves.
44pub(crate) struct PowManager<R>(RwLock<State<R>>);
45
46/// Internal state for [`PowManager`].
47struct State<R> {
48    /// The [`Seed`]s for a given [`TimePeriod`]
49    ///
50    /// The [`ArrayVec`] contains the current and previous seed, and the [`SystemTime`] is when the
51    /// current seed will expire.
52    seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
53
54    /// Verifiers for all the seeds that exist in `seeds`.
55    verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
56
57    /// The nickname for this hidden service.
58    ///
59    /// We need this so we can get the blinded keys from the [`KeyMgr`].
60    nickname: HsNickname,
61
62    /// Directory used to store nonce replay log.
63    instance_dir: InstanceRawSubdir,
64
65    /// Key manager.
66    keymgr: Arc<KeyMgr>,
67
68    /// Current suggested effort that we publish in the pow-params line.
69    suggested_effort: Effort,
70
71    /// Runtime
72    runtime: R,
73
74    /// Handle for storing state we need to persist to disk.
75    storage_handle: StorageHandle<PowManagerStateRecord>,
76
77    /// Queue to tell the publisher to re-upload a descriptor for a given TP, since we've rotated
78    /// that seed.
79    publisher_update_tx: mpsc::Sender<TimePeriod>,
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone)]
83/// Information about the current and previous [`Seed`] for a given [`TimePeriod`].
84struct SeedsForTimePeriod {
85    /// The previous and current [`Seed`].
86    ///
87    /// The last element in this array is the current seed.
88    seeds: ArrayVec<Seed, 2>,
89
90    /// When the current seed will expire.
91    next_expiration_time: SystemTime,
92}
93
94#[derive(Debug)]
95#[allow(unused)]
96/// A PoW solve was invalid.
97///
98/// While this contains the reason for the failure, we probably just want to use that for
99/// debugging, we shouldn't make any logical decisions based on what the particular error was.
100pub(crate) enum PowSolveError {
101    /// Seed head was not recognized, it may be expired.
102    InvalidSeedHead,
103    /// We have already seen a solve with this nonce
104    NonceReplay(ReplayError),
105    /// The bytes given as a solution do not form a valid Equi-X puzzle
106    InvalidEquixSolution(SolutionErrorV1),
107    /// The solution given was invalid.
108    InvalidSolve(tor_hscrypto::pow::Error),
109}
110
111/// On-disk record of [`PowManager`] state.
112#[derive(Serialize, Deserialize, Debug)]
113pub(crate) struct PowManagerStateRecord {
114    /// Seeds for each time period.
115    ///
116    /// Conceptually, this is a map between TimePeriod and SeedsForTimePeriod, but since TimePeriod
117    /// can't be serialized to a string, it's not very simple to use serde to serialize it like
118    /// that, so we instead store it as a list of tuples, and convert it to/from the map when
119    /// saving/loading.
120    seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
121    // TODO POW: suggested_effort / etc should be serialized
122}
123
124impl<R: Runtime> State<R> {
125    /// Make a [`PowManagerStateRecord`] for this state.
126    pub(crate) fn to_record(&self) -> PowManagerStateRecord {
127        PowManagerStateRecord {
128            seeds: self.seeds.clone().into_iter().collect(),
129        }
130    }
131}
132
133/// How soon before a seed's expiration time we should rotate it and publish a new seed.
134const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
135
136/// Minimum seed expiration time in minutes. See:
137/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
138const EXPIRATION_TIME_MINS_MIN: u64 = 105;
139
140/// Maximum seed expiration time in minutes. See:
141/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
142const EXPIRATION_TIME_MINS_MAX: u64 = 120;
143
144/// Enforce that early rotation time is less than or equal to min expiration time.
145const _: () = assert!(
146    SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
147    "Early rotation time must be less than minimum expiration time"
148);
149
150/// Enforce that min expiration time is less than or equal to max.
151const _: () = assert!(
152    EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
153    "Minimum expiration time must be less than or equal to max"
154);
155
156/// Depth of the queue used to signal the publisher that it needs to update a given time period.
157///
158/// 32 is likely way larger than we need but the messages are tiny so we might as well.
159const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
160
161#[derive(Debug)]
162#[allow(dead_code)] // We want to show fields in Debug even if we don't use them.
163/// Internal error within the PoW subsystem.
164pub(crate) enum InternalPowError {
165    /// We don't have a key that is needed.
166    MissingKey,
167    /// Error in the underlying storage layer.
168    StorageError,
169    /// Error from the ReplayLog.
170    CreateIptError(CreateIptError),
171}
172
173impl<R: Runtime> PowManager<R> {
174    /// Create a new [`PowManager`].
175    #[allow(clippy::new_ret_no_self)]
176    pub(crate) fn new(
177        runtime: R,
178        nickname: HsNickname,
179        instance_dir: InstanceRawSubdir,
180        keymgr: Arc<KeyMgr>,
181        storage_handle: StorageHandle<PowManagerStateRecord>,
182    ) -> Result<NewPowManager<R>, StartupError> {
183        let on_disk_state = storage_handle.load().map_err(StartupError::LoadState)?;
184        let seeds = on_disk_state.map_or(vec![], |on_disk_state| on_disk_state.seeds);
185        let seeds = seeds.into_iter().collect();
186
187        // This queue is extremely small, and we only make one of it per onion service, so it's
188        // fine to not use memquota tracking.
189        let (publisher_update_tx, publisher_update_rx) =
190            crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
191
192        let state = State {
193            seeds,
194            nickname,
195            instance_dir,
196            keymgr,
197            publisher_update_tx,
198            verifiers: HashMap::new(),
199            suggested_effort: Effort::zero(),
200            runtime: runtime.clone(),
201            storage_handle,
202        };
203        let pow_manager = Arc::new(PowManager(RwLock::new(state)));
204
205        let (rend_req_tx, rend_req_rx) = super::make_rend_queue();
206        let rend_req_rx = RendRequestReceiver::new(runtime, pow_manager.clone(), rend_req_rx);
207
208        Ok(NewPowManager {
209            pow_manager,
210            rend_req_tx,
211            rend_req_rx: Box::pin(rend_req_rx),
212            publisher_update_rx,
213        })
214    }
215
216    /// Launch background task to rotate seeds.
217    pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
218        let pow_manager = self.clone();
219        let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
220
221        runtime
222            .spawn(pow_manager.main_loop_task())
223            .map_err(|cause| StartupError::Spawn {
224                spawning: "pow manager",
225                cause: cause.into(),
226            })?;
227        Ok(())
228    }
229
230    /// Main loop for rotating seeds.
231    async fn main_loop_task(self: Arc<Self>) {
232        let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
233
234        loop {
235            let next_update_time = self.rotate_seeds_if_expiring().await;
236
237            // A new TimePeriod that we don't know about (and thus that isn't in next_update_time)
238            // might get added at any point. Making sure that our maximum delay is the minimum
239            // amount of time that it might take for a seed to expire means that we can be sure
240            // that we will rotate newly-added seeds properly.
241            let max_delay =
242                Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60) - SEED_EARLY_ROTATION_TIME;
243            let delay = next_update_time
244                .map(|x| x.duration_since(SystemTime::now()).unwrap_or(max_delay))
245                .unwrap_or(max_delay)
246                .min(max_delay);
247
248            tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
249
250            runtime.sleep(delay).await;
251        }
252    }
253
254    /// Make a randomized seed expiration time.
255    fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
256        SystemTime::now()
257            + Duration::from_secs(
258                60 * rng
259                    .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
260                    .expect("Can't generate expiration_time"),
261            )
262    }
263
264    /// Make a ner [`Verifier`] for a given [`TimePeriod`] and [`Seed`].
265    ///
266    /// If a key is not available for this TP, returns None.
267    ///
268    /// This takes individual agruments instead of `&self` to avoid getting into any trouble with
269    /// locking.
270    fn make_verifier(
271        keymgr: &Arc<KeyMgr>,
272        nickname: HsNickname,
273        time_period: TimePeriod,
274        seed: Seed,
275    ) -> Option<Verifier> {
276        let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
277        let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
278            Ok(blind_id_key) => blind_id_key,
279            Err(err) => {
280                tracing::warn!(?err, "KeyMgr error when getting blinded ID key for PoW");
281                None
282            }
283        };
284        let instance = Instance::new(blind_id_key?.id(), seed);
285        Some(Verifier::new(instance))
286    }
287
288    /// Calculate a time when we want to rotate a seed, slightly before it expires, in order to
289    /// ensure that clients don't ever download a seed that is already out of date.
290    fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
291        // Underflow cannot happen because:
292        //
293        // * We set the expiration time to the current time plus at least the minimum
294        //   expiration time
295        // * We know (backed up by a compile-time assertion) that SEED_EARLY_ROTATION_TIME is
296        //   less than the minimum expiration time.
297        //
298        // Thus, the only way this subtraction can underflow is if the system time at the
299        // moment we set the expiration time was before the epoch, which is not possible on
300        // reasonable platforms.
301        expiration_time
302            .checked_sub(SEED_EARLY_ROTATION_TIME)
303            .expect("PoW seed expiration underflow")
304    }
305
306    /// Rotate any seeds that will expire soon.
307    ///
308    /// This also pokes the publisher when needed to cause rotated seeds to be published.
309    ///
310    /// Returns the next time this function should be called again.
311    async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
312        let mut expired_verifiers = vec![];
313        let mut new_verifiers = vec![];
314
315        let mut update_times = vec![];
316        let mut updated_tps = vec![];
317        let mut expired_tps = vec![];
318
319        let mut publisher_update_tx = {
320            // TODO POW: get rng from the right place...
321            let mut rng = rand::rng();
322
323            let mut state = self.0.write().expect("Lock poisoned");
324
325            let keymgr = state.keymgr.clone();
326            let nickname = state.nickname.clone();
327
328            for (time_period, info) in state.seeds.iter_mut() {
329                let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
330                update_times.push(rotation_time);
331
332                if rotation_time <= SystemTime::now() {
333                    let seed = Seed::new(&mut rng, None);
334                    let verifier = match Self::make_verifier(
335                        &keymgr,
336                        nickname.clone(),
337                        *time_period,
338                        seed.clone(),
339                    ) {
340                        Some(verifier) => verifier,
341                        None => {
342                            // We use not having a key for a given TP as the signal that we should
343                            // stop keeping track of seeds for that TP.
344                            expired_tps.push(*time_period);
345                            continue;
346                        }
347                    };
348
349                    let expired_seed = if info.seeds.is_full() {
350                        info.seeds.pop_at(0)
351                    } else {
352                        None
353                    };
354                    // .push() is safe, since we just made space above.
355                    info.seeds.push(seed.clone());
356                    info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
357                    update_times.push(info.next_expiration_time);
358
359                    // Make a note to add the new verifier and remove the old one.
360                    new_verifiers.push((seed, verifier));
361                    if let Some(expired_seed) = expired_seed {
362                        expired_verifiers.push(expired_seed.head());
363                    }
364
365                    // Tell the publisher to update this TP
366                    updated_tps.push(*time_period);
367
368                    tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
369                }
370            }
371
372            for time_period in expired_tps {
373                if let Some(seeds) = state.seeds.remove(&time_period) {
374                    for seed in seeds.seeds {
375                        state.verifiers.remove(&seed.head());
376                    }
377                }
378            }
379
380            for (seed, verifier) in new_verifiers {
381                let replay_log = Mutex::new(
382                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
383                        .expect("Couldn't make ReplayLog."),
384                );
385                state.verifiers.insert(seed.head(), (verifier, replay_log));
386            }
387
388            for seed_head in expired_verifiers {
389                state.verifiers.remove(&seed_head);
390            }
391
392            let record = state.to_record();
393            if let Err(err) = state.storage_handle.store(&record) {
394                tracing::warn!(?err, "Error saving PoW state");
395            }
396
397            state.publisher_update_tx.clone()
398        };
399
400        for time_period in updated_tps {
401            if let Err(err) = publisher_update_tx.send(time_period).await {
402                tracing::warn!(?err, "Couldn't send update message to publisher");
403            }
404        }
405
406        update_times.iter().min().cloned()
407    }
408
409    /// Get [`PowParams`] for a given [`TimePeriod`].
410    ///
411    /// If we don't have any [`Seed`]s for the requested period, generate them. This is the only
412    /// way that [`PowManager`] learns about new [`TimePeriod`]s.
413    pub(crate) fn get_pow_params(
414        self: &Arc<Self>,
415        time_period: TimePeriod,
416    ) -> Result<PowParams, InternalPowError> {
417        let (seed_and_expiration, suggested_effort) = {
418            let state = self.0.read().expect("Lock poisoned");
419            let seed = state
420                .seeds
421                .get(&time_period)
422                .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
423            (seed, state.suggested_effort)
424        };
425
426        let (seed, expiration) = match seed_and_expiration {
427            Some(seed) => seed,
428            None => {
429                // We don't have a seed for this time period, so we need to generate one.
430
431                // TODO POW: get rng from the right place...
432                let mut rng = rand::rng();
433
434                let seed = Seed::new(&mut rng, None);
435                let next_expiration_time = Self::make_next_expiration_time(&mut rng);
436
437                let mut seeds = ArrayVec::new();
438                seeds.push(seed.clone());
439
440                let mut state = self.0.write().expect("Lock poisoned");
441
442                state.seeds.insert(
443                    time_period,
444                    SeedsForTimePeriod {
445                        seeds,
446                        next_expiration_time,
447                    },
448                );
449
450                let verifier = Self::make_verifier(
451                    &state.keymgr,
452                    state.nickname.clone(),
453                    time_period,
454                    seed.clone(),
455                )
456                .ok_or(InternalPowError::MissingKey)?;
457
458                let replay_log = Mutex::new(
459                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
460                        .map_err(InternalPowError::CreateIptError)?,
461                );
462                state.verifiers.insert(seed.head(), (verifier, replay_log));
463
464                let record = state.to_record();
465                state
466                    .storage_handle
467                    .store(&record)
468                    .map_err(|_| InternalPowError::StorageError)?;
469
470                (seed, next_expiration_time)
471            }
472        };
473
474        Ok(PowParams::V1(PowParamsV1::new(
475            TimerangeBound::new(seed, ..expiration),
476            suggested_effort,
477        )))
478    }
479
480    /// Verify a PoW solve.
481    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
482        // TODO POW: This puts the nonce in the replay structure before we check if the solve is
483        // valid, which could be a problem — a potential attack would be to send a large number of
484        // invalid solves with the hope of causing collisions with valid requests. This is probably
485        // highly impractical, but we should think through it before stabilizing PoW.
486        {
487            let state = self.0.write().expect("Lock poisoned");
488            let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
489                Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
490                None => return Err(PowSolveError::InvalidSeedHead),
491            };
492            replay_log
493                .check_for_replay(solve.nonce())
494                .map_err(PowSolveError::NonceReplay)?;
495        }
496
497        // TODO: Once RwLock::downgrade is stabilized, it would make sense to use it here...
498
499        let state = self.0.read().expect("Lock poisoned");
500        let verifier = match state.verifiers.get(&solve.seed_head()) {
501            Some((verifier, _)) => verifier,
502            None => return Err(PowSolveError::InvalidSeedHead),
503        };
504
505        let solution = match Solution::try_from_bytes(
506            solve.nonce().clone(),
507            solve.effort(),
508            solve.seed_head(),
509            solve.solution(),
510        ) {
511            Ok(solution) => solution,
512            Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
513        };
514
515        match verifier.check(&solution) {
516            Ok(()) => Ok(()),
517            Err(err) => Err(PowSolveError::InvalidSolve(err)),
518        }
519    }
520}
521
522/// Wrapper around [`RendRequest`] that implements [`std::cmp::Ord`] to sort by [`Effort`].
523#[derive(Debug)]
524struct RendRequestOrdByEffort {
525    /// The underlying request.
526    request: RendRequest,
527    /// The proof-of-work options, if given.
528    pow: Option<ProofOfWorkV1>,
529}
530
531impl RendRequestOrdByEffort {
532    /// Create a new [`RendRequestOrdByEffort`].
533    fn new(request: RendRequest) -> Result<Self, rend_handshake::IntroRequestError> {
534        let pow = match request
535            .intro_request()?
536            .intro_payload()
537            .proof_of_work_extension()
538            .cloned()
539        {
540            Some(ProofOfWork::V1(pow)) => Some(pow),
541            None | Some(_) => None,
542        };
543
544        Ok(Self { request, pow })
545    }
546}
547
548impl Ord for RendRequestOrdByEffort {
549    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
550        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
551        let other_effort = other
552            .pow
553            .as_ref()
554            .map_or(Effort::zero(), |pow| pow.effort());
555        self_effort.cmp(&other_effort)
556    }
557}
558
559impl PartialOrd for RendRequestOrdByEffort {
560    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
561        Some(self.cmp(other))
562    }
563}
564
565impl PartialEq for RendRequestOrdByEffort {
566    fn eq(&self, other: &Self) -> bool {
567        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
568        let other_effort = other
569            .pow
570            .as_ref()
571            .map_or(Effort::zero(), |pow| pow.effort());
572        self_effort == other_effort
573    }
574}
575
576impl Eq for RendRequestOrdByEffort {}
577
578/// Implements [`Stream`] for incoming [`RendRequest`]s, using a priority queue system to dequeue
579/// high-[`Effort`] requests first.
580///
581/// This is implemented on top of a [`mpsc::Receiver`]. There is a thread that dequeues from the
582/// [`mpsc::Receiver`], checks the PoW solve, and if it is correct, adds it to a [`BinaryHeap`],
583/// which the [`Stream`] implementation reads from.
584///
585/// This is not particularly optimized — queueing and dequeuing use a [`Mutex`], so there may be
586/// some contention there. It's possible there may be some fancy lockless (or more optimized)
587/// priorty queue that we could use, but we should properly benchmark things before trying to make
588/// a optimization like that.
589#[derive(Clone)]
590pub(crate) struct RendRequestReceiver(Arc<Mutex<RendRequestReceiverInner>>);
591
592/// Inner implementation for [`RendRequestReceiver`].
593struct RendRequestReceiverInner {
594    /// Internal priority queue of requests.
595    queue: BinaryHeap<RendRequestOrdByEffort>,
596
597    /// Waker to inform async readers when there is a new message on the queue.
598    waker: Option<Waker>,
599}
600
601impl RendRequestReceiver {
602    /// Create a new [`RendRequestReceiver`].
603    fn new<R: Runtime>(
604        runtime: R,
605        pow_manager: Arc<PowManager<R>>,
606        inner_receiver: mpsc::Receiver<RendRequest>,
607    ) -> Self {
608        let receiver = RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
609            queue: BinaryHeap::new(),
610            waker: None,
611        })));
612        let receiver_clone = receiver.clone();
613        let accept_thread = runtime.clone().spawn_blocking(move || {
614            receiver_clone.accept_loop(&runtime, &pow_manager, inner_receiver);
615        });
616        drop(accept_thread);
617        receiver
618    }
619
620    /// Loop to accept message from the wrapped [`mpsc::Receiver`], validate PoW sovles, and
621    /// enqueue onto the priority queue.
622    fn accept_loop<R: Runtime>(
623        self,
624        runtime: &R,
625        pow_manager: &Arc<PowManager<R>>,
626        mut receiver: mpsc::Receiver<RendRequest>,
627    ) {
628        loop {
629            let rend_request = runtime
630                .reenter_block_on(receiver.next())
631                .expect("Other side of RendRequest queue hung up");
632            let rend_request = match RendRequestOrdByEffort::new(rend_request) {
633                Ok(rend_request) => rend_request,
634                Err(err) => {
635                    tracing::trace!(?err, "Error processing RendRequest");
636                    continue;
637                }
638            };
639
640            if let Some(ref pow) = rend_request.pow {
641                if let Err(err) = pow_manager.check_solve(pow) {
642                    tracing::debug!(?err, "PoW verification failed");
643                    continue;
644                }
645            }
646
647            let mut inner = self.0.lock().expect("Lock poisened");
648            inner.queue.push(rend_request);
649            if let Some(waker) = &inner.waker {
650                waker.wake_by_ref();
651            }
652        }
653    }
654}
655
656impl Stream for RendRequestReceiver {
657    type Item = RendRequest;
658
659    fn poll_next(
660        self: std::pin::Pin<&mut Self>,
661        cx: &mut std::task::Context<'_>,
662    ) -> std::task::Poll<Option<Self::Item>> {
663        let mut inner = self.get_mut().0.lock().expect("Lock poisened");
664        match inner.queue.pop() {
665            Some(item) => std::task::Poll::Ready(Some(item.request)),
666            None => {
667                inner.waker = Some(cx.waker().clone());
668                std::task::Poll::Pending
669            }
670        }
671    }
672}