tor_guardmgr/
daemon.rs

1//! Implement background tasks used by guard managers.
2//!
3//! These background tasks keep a weak reference to the [`GuardMgrInner`]
4//! and use that to notice when they should shut down.
5
6use crate::pending::{GuardStatus, RequestId};
7use crate::GuardMgrInner;
8
9use futures::{channel::mpsc, stream::StreamExt};
10#[cfg(test)]
11use oneshot_fused_workaround as oneshot;
12use tor_proto::ClockSkew;
13
14use std::sync::{Mutex, Weak};
15
16/// A message sent by to the [`report_status_events()`] task.
17#[derive(Debug)]
18pub(crate) enum Msg {
19    /// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to
20    /// report the status of an attempt to use a guard.
21    Status(RequestId, GuardStatus, Option<ClockSkew>),
22    /// Tells the task to reply on the provided oneshot::Sender once
23    /// it has seen this message.  Used to indicate that the message
24    /// queue is flushed.
25    #[cfg(test)]
26    Ping(oneshot::Sender<()>),
27}
28
29/// Background task: wait for messages about guard statuses, and
30/// tell a guard manager about them.  Runs indefinitely.
31///
32/// Takes the [`GuardMgrInner`] by weak reference; if the guard
33/// manager goes away, then this task exits.
34///
35/// Requires a `mpsc::Receiver` that is used to tell the task about
36/// new status events to wait for.
37pub(crate) async fn report_status_events(
38    runtime: impl tor_rtcompat::SleepProvider,
39    inner: Weak<Mutex<GuardMgrInner>>,
40    mut events: mpsc::UnboundedReceiver<Msg>,
41) {
42    loop {
43        match events.next().await {
44            Some(Msg::Status(id, status, skew)) => {
45                // We've got a report about a guard status.
46                if let Some(inner) = inner.upgrade() {
47                    let mut inner = inner.lock().expect("Poisoned lock");
48                    inner.handle_msg(id, status, skew, &runtime);
49                } else {
50                    // The guard manager has gone away.
51                    return;
52                }
53            }
54            #[cfg(test)]
55            Some(Msg::Ping(sender)) => {
56                let _ignore = sender.send(());
57            }
58            // The streams have all closed.  (I think this is impossible?)
59            None => return,
60        }
61        // TODO: Is this task guaranteed to exit?
62    }
63}
64
65/// Background task to run periodic events on the guard manager.
66///
67/// The only role of this task is to invoke
68/// [`GuardMgrInner::run_periodic_events`] from time to time, so that
69/// it can perform housekeeping tasks.
70///
71/// Takes the [`GuardMgrInner`] by weak reference; if the guard
72/// manager goes away, then this task exits.
73pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
74    runtime: R,
75    inner: Weak<Mutex<GuardMgrInner>>,
76) {
77    loop {
78        let delay = if let Some(inner) = inner.upgrade() {
79            let mut inner = inner.lock().expect("Poisoned lock");
80            let wallclock = runtime.wallclock();
81            let now = runtime.now();
82            inner.run_periodic_events(wallclock, now)
83        } else {
84            // The guard manager has gone away.
85            return;
86        };
87        runtime.sleep(delay).await;
88    }
89}
90
91/// Background task to keep a guard manager up-to-date with a given network
92/// directory provider.
93pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
94    runtime: RT,
95    inner: Weak<Mutex<GuardMgrInner>>,
96    netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
97) {
98    use tor_netdir::DirEvent;
99
100    let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
101        Some(s) => s,
102        None => return,
103    };
104
105    while let Some(event) = event_stream.next().await {
106        match event {
107            DirEvent::NewConsensus | DirEvent::NewDescriptors => {
108                if let Some(inner) = inner.upgrade() {
109                    let mut inner = inner.lock().expect("Poisoned lock");
110                    inner.update(runtime.wallclock(), runtime.now());
111                } else {
112                    return;
113                }
114            }
115            _ => {}
116        }
117    }
118}
119
120/// Background task to keep a guard manager up-to-date with a given bridge
121/// descriptor provider.
122#[cfg(feature = "bridge-client")]
123pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
124    runtime: RT,
125    inner: Weak<Mutex<GuardMgrInner>>,
126    bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
127) {
128    use crate::bridge::BridgeDescEvent as E;
129    let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
130        Some(s) => s,
131        None => return,
132    };
133
134    while let Some(event) = event_stream.next().await {
135        match event {
136            E::SomethingChanged => {
137                if let Some(inner) = inner.upgrade() {
138                    let mut inner = inner.lock().expect("Poisoned lock");
139                    inner.update(runtime.wallclock(), runtime.now());
140                } else {
141                    return;
142                }
143            }
144        }
145    }
146}