tor_hsservice/pow/
v1.rs

1//! Code implementing version 1 proof-of-work for onion service hosts.
2//!
3//! Spec links:
4//! * <https://spec.torproject.org/hspow-spec/common-protocol.html>
5//! * <https://spec.torproject.org/hspow-spec/v1-equix.html>
6
7use std::{
8    collections::{BinaryHeap, HashMap},
9    sync::{Arc, Mutex, RwLock},
10    task::Waker,
11    time::{Duration, SystemTime},
12};
13
14use arrayvec::ArrayVec;
15use futures::task::SpawnExt;
16use futures::{channel::mpsc, Stream};
17use futures::{SinkExt, StreamExt};
18use rand::{CryptoRng, RngCore};
19use serde::{Deserialize, Serialize};
20use tor_basic_utils::RngExt as _;
21use tor_cell::relaycell::hs::pow::{v1::ProofOfWorkV1, ProofOfWork};
22use tor_checkable::timed::TimerangeBound;
23use tor_hscrypto::{
24    pk::HsBlindIdKey,
25    pow::v1::{Effort, Instance, Seed, SeedHead, Solution, SolutionErrorV1, Verifier},
26    time::TimePeriod,
27};
28use tor_keymgr::KeyMgr;
29use tor_netdoc::doc::hsdesc::pow::{v1::PowParamsV1, PowParams};
30use tor_persist::{
31    hsnickname::HsNickname,
32    state_dir::{InstanceRawSubdir, StorageHandle},
33};
34use tor_rtcompat::Runtime;
35
36use crate::{
37    rend_handshake, replay::PowNonceReplayLog, BlindIdPublicKeySpecifier, CreateIptError,
38    RendRequest, ReplayError, StartupError,
39};
40
41use super::NewPowManager;
42
43/// This is responsible for rotating Proof-of-Work seeds and doing verification of PoW solves.
44pub(crate) struct PowManager<R>(RwLock<State<R>>);
45
46/// Internal state for [`PowManager`].
47struct State<R> {
48    /// The [`Seed`]s for a given [`TimePeriod`]
49    ///
50    /// The [`ArrayVec`] contains the current and previous seed, and the [`SystemTime`] is when the
51    /// current seed will expire.
52    seeds: HashMap<TimePeriod, SeedsForTimePeriod>,
53
54    /// Verifiers for all the seeds that exist in `seeds`.
55    verifiers: HashMap<SeedHead, (Verifier, Mutex<PowNonceReplayLog>)>,
56
57    /// The nickname for this hidden service.
58    ///
59    /// We need this so we can get the blinded keys from the [`KeyMgr`].
60    nickname: HsNickname,
61
62    /// Directory used to store nonce replay log.
63    instance_dir: InstanceRawSubdir,
64
65    /// Key manager.
66    keymgr: Arc<KeyMgr>,
67
68    /// Current suggested effort that we publish in the pow-params line.
69    suggested_effort: Effort,
70
71    /// Runtime
72    runtime: R,
73
74    /// Handle for storing state we need to persist to disk.
75    storage_handle: StorageHandle<PowManagerStateRecord>,
76
77    /// Queue to tell the publisher to re-upload a descriptor for a given TP, since we've rotated
78    /// that seed.
79    publisher_update_tx: mpsc::Sender<TimePeriod>,
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone)]
83/// Information about the current and previous [`Seed`] for a given [`TimePeriod`].
84struct SeedsForTimePeriod {
85    /// The previous and current [`Seed`].
86    ///
87    /// The last element in this array is the current seed.
88    seeds: ArrayVec<Seed, 2>,
89
90    /// When the current seed will expire.
91    next_expiration_time: SystemTime,
92}
93
94#[derive(Debug)]
95#[allow(unused)]
96/// A PoW solve was invalid.
97///
98/// While this contains the reason for the failure, we probably just want to use that for
99/// debugging, we shouldn't make any logical decisions based on what the particular error was.
100pub(crate) enum PowSolveError {
101    /// Seed head was not recognized, it may be expired.
102    InvalidSeedHead,
103    /// We have already seen a solve with this nonce
104    NonceReplay(ReplayError),
105    /// The bytes given as a solution do not form a valid Equi-X puzzle
106    InvalidEquixSolution(SolutionErrorV1),
107    /// The solution given was invalid.
108    InvalidSolve(tor_hscrypto::pow::Error),
109}
110
111/// On-disk record of [`PowManager`] state.
112#[derive(Serialize, Deserialize, Debug)]
113pub(crate) struct PowManagerStateRecord {
114    /// Seeds for each time period.
115    ///
116    /// Conceptually, this is a map between TimePeriod and SeedsForTimePeriod, but since TimePeriod
117    /// can't be serialized to a string, it's not very simple to use serde to serialize it like
118    /// that, so we instead store it as a list of tuples, and convert it to/from the map when
119    /// saving/loading.
120    seeds: Vec<(TimePeriod, SeedsForTimePeriod)>,
121    // TODO POW: suggested_effort / etc should be serialized
122}
123
124impl<R: Runtime> State<R> {
125    /// Make a [`PowManagerStateRecord`] for this state.
126    pub(crate) fn to_record(&self) -> PowManagerStateRecord {
127        PowManagerStateRecord {
128            seeds: self.seeds.clone().into_iter().collect(),
129        }
130    }
131}
132
133/// How soon before a seed's expiration time we should rotate it and publish a new seed.
134const SEED_EARLY_ROTATION_TIME: Duration = Duration::from_secs(60 * 5);
135
136/// Minimum seed expiration time in minutes. See:
137/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
138const EXPIRATION_TIME_MINS_MIN: u64 = 105;
139
140/// Maximum seed expiration time in minutes. See:
141/// <https://spec.torproject.org/hspow-spec/v1-equix.html#parameter-descriptor>
142const EXPIRATION_TIME_MINS_MAX: u64 = 120;
143
144/// Enforce that early rotation time is less than or equal to min expiration time.
145const _: () = assert!(
146    SEED_EARLY_ROTATION_TIME.as_secs() <= EXPIRATION_TIME_MINS_MIN * 60,
147    "Early rotation time must be less than minimum expiration time"
148);
149
150/// Enforce that min expiration time is less than or equal to max.
151const _: () = assert!(
152    EXPIRATION_TIME_MINS_MIN <= EXPIRATION_TIME_MINS_MAX,
153    "Minimum expiration time must be less than or equal to max"
154);
155
156/// Depth of the queue used to signal the publisher that it needs to update a given time period.
157///
158/// 32 is likely way larger than we need but the messages are tiny so we might as well.
159const PUBLISHER_UPDATE_QUEUE_DEPTH: usize = 32;
160
161#[derive(Debug)]
162#[allow(dead_code)] // We want to show fields in Debug even if we don't use them.
163/// Internal error within the PoW subsystem.
164pub(crate) enum InternalPowError {
165    /// We don't have a key that is needed.
166    MissingKey,
167    /// Error in the underlying storage layer.
168    StorageError,
169    /// Error from the ReplayLog.
170    CreateIptError(CreateIptError),
171}
172
173impl<R: Runtime> PowManager<R> {
174    /// Create a new [`PowManager`].
175    #[allow(clippy::new_ret_no_self)]
176    pub(crate) fn new(
177        runtime: R,
178        nickname: HsNickname,
179        instance_dir: InstanceRawSubdir,
180        keymgr: Arc<KeyMgr>,
181        storage_handle: StorageHandle<PowManagerStateRecord>,
182    ) -> Result<NewPowManager<R>, StartupError> {
183        let on_disk_state = storage_handle.load().map_err(StartupError::LoadState)?;
184        let seeds = on_disk_state.map_or(vec![], |on_disk_state| on_disk_state.seeds);
185        let seeds = seeds.into_iter().collect();
186
187        // This queue is extremely small, and we only make one of it per onion service, so it's
188        // fine to not use memquota tracking.
189        let (publisher_update_tx, publisher_update_rx) =
190            crate::mpsc_channel_no_memquota(PUBLISHER_UPDATE_QUEUE_DEPTH);
191
192        let state = State {
193            seeds,
194            nickname,
195            instance_dir,
196            keymgr,
197            publisher_update_tx,
198            verifiers: HashMap::new(),
199            suggested_effort: Effort::zero(),
200            runtime: runtime.clone(),
201            storage_handle,
202        };
203        let pow_manager = Arc::new(PowManager(RwLock::new(state)));
204
205        let (rend_req_tx, rend_req_rx) = super::make_rend_queue();
206        let rend_req_rx = RendRequestReceiver::new(runtime, pow_manager.clone(), rend_req_rx);
207
208        Ok(NewPowManager {
209            pow_manager,
210            rend_req_tx,
211            rend_req_rx: Box::pin(rend_req_rx),
212            publisher_update_rx,
213        })
214    }
215
216    /// Launch background task to rotate seeds.
217    pub(crate) fn launch(self: &Arc<Self>) -> Result<(), StartupError> {
218        let pow_manager = self.clone();
219        let runtime = pow_manager.0.read().expect("Lock poisoned").runtime.clone();
220
221        runtime
222            .spawn(pow_manager.main_loop_task())
223            .map_err(|cause| StartupError::Spawn {
224                spawning: "pow manager",
225                cause: cause.into(),
226            })?;
227        Ok(())
228    }
229
230    /// Main loop for rotating seeds.
231    async fn main_loop_task(self: Arc<Self>) {
232        let runtime = self.0.write().expect("Lock poisoned").runtime.clone();
233
234        loop {
235            let next_update_time = self.rotate_seeds_if_expiring().await;
236
237            // A new TimePeriod that we don't know about (and thus that isn't in next_update_time)
238            // might get added at any point. Making sure that our maximum delay is the minimum
239            // amount of time that it might take for a seed to expire means that we can be sure
240            // that we will rotate newly-added seeds properly.
241            let max_delay =
242                Duration::from_secs(EXPIRATION_TIME_MINS_MIN * 60) - SEED_EARLY_ROTATION_TIME;
243            let delay = next_update_time
244                .map(|x| x.duration_since(SystemTime::now()).unwrap_or(max_delay))
245                .unwrap_or(max_delay)
246                .min(max_delay);
247
248            tracing::debug!(next_wakeup = ?delay, "Recalculated PoW seeds.");
249
250            runtime.sleep(delay).await;
251        }
252    }
253
254    /// Make a randomized seed expiration time.
255    fn make_next_expiration_time<Rng: RngCore + CryptoRng>(rng: &mut Rng) -> SystemTime {
256        SystemTime::now()
257            + Duration::from_secs(
258                60 * rng
259                    .gen_range_checked(EXPIRATION_TIME_MINS_MIN..=EXPIRATION_TIME_MINS_MAX)
260                    .expect("Can't generate expiration_time"),
261            )
262    }
263
264    /// Make a ner [`Verifier`] for a given [`TimePeriod`] and [`Seed`].
265    ///
266    /// If a key is not available for this TP, returns None.
267    ///
268    /// This takes individual arguments instead of `&self` to avoid getting into any trouble with
269    /// locking.
270    fn make_verifier(
271        keymgr: &Arc<KeyMgr>,
272        nickname: HsNickname,
273        time_period: TimePeriod,
274        seed: Seed,
275    ) -> Option<Verifier> {
276        let blind_id_spec = BlindIdPublicKeySpecifier::new(nickname, time_period);
277        let blind_id_key = match keymgr.get::<HsBlindIdKey>(&blind_id_spec) {
278            Ok(blind_id_key) => blind_id_key,
279            Err(err) => {
280                tracing::warn!(?err, "KeyMgr error when getting blinded ID key for PoW");
281                None
282            }
283        };
284        let instance = Instance::new(blind_id_key?.id(), seed);
285        Some(Verifier::new(instance))
286    }
287
288    /// Calculate a time when we want to rotate a seed, slightly before it expires, in order to
289    /// ensure that clients don't ever download a seed that is already out of date.
290    fn calculate_early_rotation_time(expiration_time: SystemTime) -> SystemTime {
291        // Underflow cannot happen because:
292        //
293        // * We set the expiration time to the current time plus at least the minimum
294        //   expiration time
295        // * We know (backed up by a compile-time assertion) that SEED_EARLY_ROTATION_TIME is
296        //   less than the minimum expiration time.
297        //
298        // Thus, the only way this subtraction can underflow is if the system time at the
299        // moment we set the expiration time was before the epoch, which is not possible on
300        // reasonable platforms.
301        expiration_time
302            .checked_sub(SEED_EARLY_ROTATION_TIME)
303            .expect("PoW seed expiration underflow")
304    }
305
306    /// Rotate any seeds that will expire soon.
307    ///
308    /// This also pokes the publisher when needed to cause rotated seeds to be published.
309    ///
310    /// Returns the next time this function should be called again.
311    #[allow(clippy::cognitive_complexity)]
312    async fn rotate_seeds_if_expiring(&self) -> Option<SystemTime> {
313        let mut expired_verifiers = vec![];
314        let mut new_verifiers = vec![];
315
316        let mut update_times = vec![];
317        let mut updated_tps = vec![];
318        let mut expired_tps = vec![];
319
320        let mut publisher_update_tx = {
321            // TODO POW: get rng from the right place...
322            let mut rng = rand::rng();
323
324            let mut state = self.0.write().expect("Lock poisoned");
325
326            let keymgr = state.keymgr.clone();
327            let nickname = state.nickname.clone();
328
329            for (time_period, info) in state.seeds.iter_mut() {
330                let rotation_time = Self::calculate_early_rotation_time(info.next_expiration_time);
331                update_times.push(rotation_time);
332
333                if rotation_time <= SystemTime::now() {
334                    let seed = Seed::new(&mut rng, None);
335                    let verifier = match Self::make_verifier(
336                        &keymgr,
337                        nickname.clone(),
338                        *time_period,
339                        seed.clone(),
340                    ) {
341                        Some(verifier) => verifier,
342                        None => {
343                            // We use not having a key for a given TP as the signal that we should
344                            // stop keeping track of seeds for that TP.
345                            expired_tps.push(*time_period);
346                            continue;
347                        }
348                    };
349
350                    let expired_seed = if info.seeds.is_full() {
351                        info.seeds.pop_at(0)
352                    } else {
353                        None
354                    };
355                    // .push() is safe, since we just made space above.
356                    info.seeds.push(seed.clone());
357                    info.next_expiration_time = Self::make_next_expiration_time(&mut rng);
358                    update_times.push(info.next_expiration_time);
359
360                    // Make a note to add the new verifier and remove the old one.
361                    new_verifiers.push((seed, verifier));
362                    if let Some(expired_seed) = expired_seed {
363                        expired_verifiers.push(expired_seed.head());
364                    }
365
366                    // Tell the publisher to update this TP
367                    updated_tps.push(*time_period);
368
369                    tracing::debug!(time_period = ?time_period, "Rotated PoW seed");
370                }
371            }
372
373            for time_period in expired_tps {
374                if let Some(seeds) = state.seeds.remove(&time_period) {
375                    for seed in seeds.seeds {
376                        state.verifiers.remove(&seed.head());
377                    }
378                }
379            }
380
381            for (seed, verifier) in new_verifiers {
382                let replay_log = Mutex::new(
383                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
384                        .expect("Couldn't make ReplayLog."),
385                );
386                state.verifiers.insert(seed.head(), (verifier, replay_log));
387            }
388
389            for seed_head in expired_verifiers {
390                state.verifiers.remove(&seed_head);
391            }
392
393            let record = state.to_record();
394            if let Err(err) = state.storage_handle.store(&record) {
395                tracing::warn!(?err, "Error saving PoW state");
396            }
397
398            state.publisher_update_tx.clone()
399        };
400
401        for time_period in updated_tps {
402            if let Err(err) = publisher_update_tx.send(time_period).await {
403                tracing::warn!(?err, "Couldn't send update message to publisher");
404            }
405        }
406
407        update_times.iter().min().cloned()
408    }
409
410    /// Get [`PowParams`] for a given [`TimePeriod`].
411    ///
412    /// If we don't have any [`Seed`]s for the requested period, generate them. This is the only
413    /// way that [`PowManager`] learns about new [`TimePeriod`]s.
414    pub(crate) fn get_pow_params(
415        self: &Arc<Self>,
416        time_period: TimePeriod,
417    ) -> Result<PowParams, InternalPowError> {
418        let (seed_and_expiration, suggested_effort) = {
419            let state = self.0.read().expect("Lock poisoned");
420            let seed = state
421                .seeds
422                .get(&time_period)
423                .and_then(|x| Some((x.seeds.last()?.clone(), x.next_expiration_time)));
424            (seed, state.suggested_effort)
425        };
426
427        let (seed, expiration) = match seed_and_expiration {
428            Some(seed) => seed,
429            None => {
430                // We don't have a seed for this time period, so we need to generate one.
431
432                // TODO POW: get rng from the right place...
433                let mut rng = rand::rng();
434
435                let seed = Seed::new(&mut rng, None);
436                let next_expiration_time = Self::make_next_expiration_time(&mut rng);
437
438                let mut seeds = ArrayVec::new();
439                seeds.push(seed.clone());
440
441                let mut state = self.0.write().expect("Lock poisoned");
442
443                state.seeds.insert(
444                    time_period,
445                    SeedsForTimePeriod {
446                        seeds,
447                        next_expiration_time,
448                    },
449                );
450
451                let verifier = Self::make_verifier(
452                    &state.keymgr,
453                    state.nickname.clone(),
454                    time_period,
455                    seed.clone(),
456                )
457                .ok_or(InternalPowError::MissingKey)?;
458
459                let replay_log = Mutex::new(
460                    PowNonceReplayLog::new_logged(&state.instance_dir, &seed)
461                        .map_err(InternalPowError::CreateIptError)?,
462                );
463                state.verifiers.insert(seed.head(), (verifier, replay_log));
464
465                let record = state.to_record();
466                state
467                    .storage_handle
468                    .store(&record)
469                    .map_err(|_| InternalPowError::StorageError)?;
470
471                (seed, next_expiration_time)
472            }
473        };
474
475        Ok(PowParams::V1(PowParamsV1::new(
476            TimerangeBound::new(seed, ..expiration),
477            suggested_effort,
478        )))
479    }
480
481    /// Verify a PoW solve.
482    fn check_solve(self: &Arc<Self>, solve: &ProofOfWorkV1) -> Result<(), PowSolveError> {
483        // TODO POW: This puts the nonce in the replay structure before we check if the solve is
484        // valid, which could be a problem — a potential attack would be to send a large number of
485        // invalid solves with the hope of causing collisions with valid requests. This is probably
486        // highly impractical, but we should think through it before stabilizing PoW.
487        {
488            let state = self.0.write().expect("Lock poisoned");
489            let mut replay_log = match state.verifiers.get(&solve.seed_head()) {
490                Some((_, replay_log)) => replay_log.lock().expect("Lock poisoned"),
491                None => return Err(PowSolveError::InvalidSeedHead),
492            };
493            replay_log
494                .check_for_replay(solve.nonce())
495                .map_err(PowSolveError::NonceReplay)?;
496        }
497
498        // TODO: Once RwLock::downgrade is stabilized, it would make sense to use it here...
499
500        let state = self.0.read().expect("Lock poisoned");
501        let verifier = match state.verifiers.get(&solve.seed_head()) {
502            Some((verifier, _)) => verifier,
503            None => return Err(PowSolveError::InvalidSeedHead),
504        };
505
506        let solution = match Solution::try_from_bytes(
507            solve.nonce().clone(),
508            solve.effort(),
509            solve.seed_head(),
510            solve.solution(),
511        ) {
512            Ok(solution) => solution,
513            Err(err) => return Err(PowSolveError::InvalidEquixSolution(err)),
514        };
515
516        match verifier.check(&solution) {
517            Ok(()) => Ok(()),
518            Err(err) => Err(PowSolveError::InvalidSolve(err)),
519        }
520    }
521}
522
523/// Wrapper around [`RendRequest`] that implements [`std::cmp::Ord`] to sort by [`Effort`].
524#[derive(Debug)]
525struct RendRequestOrdByEffort {
526    /// The underlying request.
527    request: RendRequest,
528    /// The proof-of-work options, if given.
529    pow: Option<ProofOfWorkV1>,
530}
531
532impl RendRequestOrdByEffort {
533    /// Create a new [`RendRequestOrdByEffort`].
534    fn new(request: RendRequest) -> Result<Self, rend_handshake::IntroRequestError> {
535        let pow = match request
536            .intro_request()?
537            .intro_payload()
538            .proof_of_work_extension()
539            .cloned()
540        {
541            Some(ProofOfWork::V1(pow)) => Some(pow),
542            None | Some(_) => None,
543        };
544
545        Ok(Self { request, pow })
546    }
547}
548
549impl Ord for RendRequestOrdByEffort {
550    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
551        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
552        let other_effort = other
553            .pow
554            .as_ref()
555            .map_or(Effort::zero(), |pow| pow.effort());
556        self_effort.cmp(&other_effort)
557    }
558}
559
560impl PartialOrd for RendRequestOrdByEffort {
561    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
562        Some(self.cmp(other))
563    }
564}
565
566impl PartialEq for RendRequestOrdByEffort {
567    fn eq(&self, other: &Self) -> bool {
568        let self_effort = self.pow.as_ref().map_or(Effort::zero(), |pow| pow.effort());
569        let other_effort = other
570            .pow
571            .as_ref()
572            .map_or(Effort::zero(), |pow| pow.effort());
573        self_effort == other_effort
574    }
575}
576
577impl Eq for RendRequestOrdByEffort {}
578
579/// Implements [`Stream`] for incoming [`RendRequest`]s, using a priority queue system to dequeue
580/// high-[`Effort`] requests first.
581///
582/// This is implemented on top of a [`mpsc::Receiver`]. There is a thread that dequeues from the
583/// [`mpsc::Receiver`], checks the PoW solve, and if it is correct, adds it to a [`BinaryHeap`],
584/// which the [`Stream`] implementation reads from.
585///
586/// This is not particularly optimized — queueing and dequeuing use a [`Mutex`], so there may be
587/// some contention there. It's possible there may be some fancy lockless (or more optimized)
588/// priority queue that we could use, but we should properly benchmark things before trying to make
589/// a optimization like that.
590#[derive(Clone)]
591pub(crate) struct RendRequestReceiver(Arc<Mutex<RendRequestReceiverInner>>);
592
593/// Inner implementation for [`RendRequestReceiver`].
594struct RendRequestReceiverInner {
595    /// Internal priority queue of requests.
596    queue: BinaryHeap<RendRequestOrdByEffort>,
597
598    /// Waker to inform async readers when there is a new message on the queue.
599    waker: Option<Waker>,
600}
601
602impl RendRequestReceiver {
603    /// Create a new [`RendRequestReceiver`].
604    fn new<R: Runtime>(
605        runtime: R,
606        pow_manager: Arc<PowManager<R>>,
607        inner_receiver: mpsc::Receiver<RendRequest>,
608    ) -> Self {
609        let receiver = RendRequestReceiver(Arc::new(Mutex::new(RendRequestReceiverInner {
610            queue: BinaryHeap::new(),
611            waker: None,
612        })));
613        let receiver_clone = receiver.clone();
614        let accept_thread = runtime.clone().spawn_blocking(move || {
615            receiver_clone.accept_loop(&runtime, &pow_manager, inner_receiver);
616        });
617        drop(accept_thread);
618        receiver
619    }
620
621    /// Loop to accept message from the wrapped [`mpsc::Receiver`], validate PoW sovles, and
622    /// enqueue onto the priority queue.
623    fn accept_loop<R: Runtime>(
624        self,
625        runtime: &R,
626        pow_manager: &Arc<PowManager<R>>,
627        mut receiver: mpsc::Receiver<RendRequest>,
628    ) {
629        loop {
630            let rend_request = runtime
631                .reenter_block_on(receiver.next())
632                .expect("Other side of RendRequest queue hung up");
633            let rend_request = match RendRequestOrdByEffort::new(rend_request) {
634                Ok(rend_request) => rend_request,
635                Err(err) => {
636                    tracing::trace!(?err, "Error processing RendRequest");
637                    continue;
638                }
639            };
640
641            if let Some(ref pow) = rend_request.pow {
642                if let Err(err) = pow_manager.check_solve(pow) {
643                    tracing::debug!(?err, "PoW verification failed");
644                    continue;
645                }
646            }
647
648            let mut inner = self.0.lock().expect("Lock poisened");
649            inner.queue.push(rend_request);
650            if let Some(waker) = &inner.waker {
651                waker.wake_by_ref();
652            }
653        }
654    }
655}
656
657impl Stream for RendRequestReceiver {
658    type Item = RendRequest;
659
660    fn poll_next(
661        self: std::pin::Pin<&mut Self>,
662        cx: &mut std::task::Context<'_>,
663    ) -> std::task::Poll<Option<Self::Item>> {
664        let mut inner = self.get_mut().0.lock().expect("Lock poisened");
665        match inner.queue.pop() {
666            Some(item) => std::task::Poll::Ready(Some(item.request)),
667            None => {
668                inner.waker = Some(cx.waker().clone());
669                std::task::Poll::Pending
670            }
671        }
672    }
673}