1
//! Implement background tasks used by guard managers.
2
//!
3
//! These background tasks keep a weak reference to the [`GuardMgrInner`]
4
//! and use that to notice when they should shut down.
5

            
6
use crate::pending::{GuardStatus, RequestId};
7
use crate::GuardMgrInner;
8

            
9
use futures::{channel::mpsc, stream::StreamExt};
10
#[cfg(test)]
11
use oneshot_fused_workaround as oneshot;
12
use tor_proto::ClockSkew;
13

            
14
use std::sync::{Mutex, Weak};
15

            
16
/// A message sent by to the [`report_status_events()`] task.
17
#[derive(Debug)]
18
pub(crate) enum Msg {
19
    /// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to
20
    /// report the status of an attempt to use a guard.
21
    Status(RequestId, GuardStatus, Option<ClockSkew>),
22
    /// Tells the task to reply on the provided oneshot::Sender once
23
    /// it has seen this message.  Used to indicate that the message
24
    /// queue is flushed.
25
    #[cfg(test)]
26
    Ping(oneshot::Sender<()>),
27
}
28

            
29
/// Background task: wait for messages about guard statuses, and
30
/// tell a guard manager about them.  Runs indefinitely.
31
///
32
/// Takes the [`GuardMgrInner`] by weak reference; if the guard
33
/// manager goes away, then this task exits.
34
///
35
/// Requires a `mpsc::Receiver` that is used to tell the task about
36
/// new status events to wait for.
37
640
pub(crate) async fn report_status_events(
38
640
    runtime: impl tor_rtcompat::SleepProvider,
39
640
    inner: Weak<Mutex<GuardMgrInner>>,
40
640
    mut events: mpsc::UnboundedReceiver<Msg>,
41
640
) {
42
    loop {
43
16852
        match events.next().await {
44
16678
            Some(Msg::Status(id, status, skew)) => {
45
                // We've got a report about a guard status.
46
16678
                if let Some(inner) = inner.upgrade() {
47
16568
                    let mut inner = inner.lock().expect("Poisoned lock");
48
16568
                    inner.handle_msg(id, status, skew, &runtime);
49
16568
                } else {
50
                    // The guard manager has gone away.
51
110
                    return;
52
                }
53
            }
54
            #[cfg(test)]
55
48
            Some(Msg::Ping(sender)) => {
56
48
                let _ignore = sender.send(());
57
48
            }
58
            // The streams have all closed.  (I think this is impossible?)
59
82
            None => return,
60
        }
61
        // TODO: Is this task guaranteed to exit?
62
    }
63
192
}
64

            
65
/// Background task to run periodic events on the guard manager.
66
///
67
/// The only role of this task is to invoke
68
/// [`GuardMgrInner::run_periodic_events`] from time to time, so that
69
/// it can perform housekeeping tasks.
70
///
71
/// Takes the [`GuardMgrInner`] by weak reference; if the guard
72
/// manager goes away, then this task exits.
73
640
pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
74
640
    runtime: R,
75
640
    inner: Weak<Mutex<GuardMgrInner>>,
76
640
) {
77
    loop {
78
488
        let delay = if let Some(inner) = inner.upgrade() {
79
350
            let mut inner = inner.lock().expect("Poisoned lock");
80
350
            let wallclock = runtime.wallclock();
81
350
            let now = runtime.now();
82
350
            inner.run_periodic_events(wallclock, now)
83
        } else {
84
            // The guard manager has gone away.
85
138
            return;
86
        };
87
350
        runtime.sleep(delay).await;
88
    }
89
138
}
90

            
91
/// Background task to keep a guard manager up-to-date with a given network
92
/// directory provider.
93
568
pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
94
568
    runtime: RT,
95
568
    inner: Weak<Mutex<GuardMgrInner>>,
96
568
    netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
97
568
) {
98
    use tor_netdir::DirEvent;
99

            
100
164
    let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
101
104
        Some(s) => s,
102
60
        None => return,
103
    };
104

            
105
104
    while let Some(event) = event_stream.next().await {
106
        match event {
107
            DirEvent::NewConsensus | DirEvent::NewDescriptors => {
108
                if let Some(inner) = inner.upgrade() {
109
                    let mut inner = inner.lock().expect("Poisoned lock");
110
                    inner.update(runtime.wallclock(), runtime.now());
111
                } else {
112
                    return;
113
                }
114
            }
115
            _ => {}
116
        }
117
    }
118
156
}
119

            
120
/// Background task to keep a guard manager up-to-date with a given bridge
121
/// descriptor provider.
122
#[cfg(feature = "bridge-client")]
123
pub(crate) async fn keep_bridge_descs_updated<RT: tor_rtcompat::Runtime>(
124
    runtime: RT,
125
    inner: Weak<Mutex<GuardMgrInner>>,
126
    bridge_desc_provider: Weak<dyn crate::bridge::BridgeDescProvider>,
127
) {
128
    use crate::bridge::BridgeDescEvent as E;
129
    let mut event_stream = match bridge_desc_provider.upgrade().map(|p| p.events()) {
130
        Some(s) => s,
131
        None => return,
132
    };
133

            
134
    while let Some(event) = event_stream.next().await {
135
        match event {
136
            E::SomethingChanged => {
137
                if let Some(inner) = inner.upgrade() {
138
                    let mut inner = inner.lock().expect("Poisoned lock");
139
                    inner.update(runtime.wallclock(), runtime.now());
140
                } else {
141
                    return;
142
                }
143
            }
144
        }
145
    }
146
}