obfs4_checker/
checking.rs

1//! This module contains the code that actually runs checks on bridges
2use arti_client::config::pt::TransportConfigBuilder;
3use arti_client::config::{BridgeConfigBuilder, CfgPath, TorClientConfigBuilder};
4use arti_client::{TorClient, TorClientConfig};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use time::OffsetDateTime;
8use tokio::sync::broadcast;
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::time::{timeout, Duration};
11use tor_error::ErrorReport;
12use tor_guardmgr::bridge::{BridgeConfig, BridgeParseError};
13use tor_proto::channel::Channel;
14use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
15use tor_rtcompat::PreferredRuntime;
16
17use crate::BridgeResult;
18
19/// The maximum number of open connections to relays at any given time
20const MAX_CONNECTIONS: usize = 10;
21
22/// The maximum amount of time we wait for a response from a channel
23/// before giving up. This is important to avoid getting the program stuck
24pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(1);
25
26/// The amount of messages our channels will be able to store in them
27/// while in transit
28pub(crate) const CHANNEL_SIZE: usize = 100;
29
30/// Attempt to create a Channel to a provided bridge
31///
32/// If successful, we will obtain a Channel, if not we get an error.
33///
34/// The channel is created using [tor_chanmgr::ChanMgr], accessed using
35/// [TorClient::chanmgr()]
36async fn is_bridge_online(
37    bridge_config: &BridgeConfig,
38    tor_client: &TorClient<PreferredRuntime>,
39) -> Result<Arc<Channel>, tor_chanmgr::Error> {
40    let chanmgr = tor_client.chanmgr();
41    chanmgr
42        .build_unmanaged_channel(bridge_config, ChannelAccount::new_noop())
43        .await
44}
45
46/// Waits for given channel to expire and sends this info through specified
47/// channel
48async fn is_bridge_still_online(
49    channel: &Channel,
50    bridge_line: String,
51    expiry_tx: Sender<String>,
52) -> anyhow::Result<()> {
53    let _ = channel.wait_for_close().await;
54    // if we reached this statement, it means the channel has expired
55    expiry_tx.send(bridge_line).await?;
56    Ok(())
57}
58
59/// Return a [TorClientConfigBuilder] which is set to use a pluggable transport
60/// for all connections
61fn build_pt_bridge_config(
62    protocol: &str,
63    bin_path: &str,
64) -> anyhow::Result<TorClientConfigBuilder> {
65    let mut builder = TorClientConfig::builder();
66    let mut transport = TransportConfigBuilder::default();
67    let protocol_parsed = protocol.parse()?;
68    transport
69        .protocols(vec![protocol_parsed])
70        .path(CfgPath::new(bin_path.into()))
71        .run_on_startup(true);
72    builder.bridges().transports().push(transport);
73    Ok(builder)
74}
75
76/// Contains the main logic for testing each bridge.
77///
78/// It ends up taking in a slice of bridge lines, and creates [MAX_CONNECTIONS]
79/// number of connections as tasks, then waits for these requests to be resolved,
80/// either by successfully connecting or not (for a variety of reasons). The
81/// actual work to check each single bridge is done by [is_bridge_online()]
82///
83/// This is done up until all the bridges in the slice are covered
84async fn test_bridges(
85    bridge_lines: &[String],
86    common_tor_client: TorClient<PreferredRuntime>,
87) -> (HashMap<String, BridgeResult>, HashMap<String, Arc<Channel>>) {
88    let mut results = HashMap::new();
89    let mut channels = HashMap::new();
90    let mut counter = 0;
91    while counter < bridge_lines.len() {
92        let tasks: Vec<_> = bridge_lines
93            [counter..(counter + MAX_CONNECTIONS).min(bridge_lines.len())]
94            .iter()
95            .map(|rawbridgeline_ref| {
96                let rawbridgeline = rawbridgeline_ref.to_string();
97                let maybe_bridge: Result<BridgeConfigBuilder, BridgeParseError> =
98                    rawbridgeline.parse();
99                match maybe_bridge {
100                    Ok(bridge) => {
101                        let bridge_config = bridge.build().unwrap();
102                        let tor_client = common_tor_client.isolated_client();
103                        tokio::spawn(async move {
104                            let current_time = OffsetDateTime::now_utc();
105                            match is_bridge_online(&bridge_config, &tor_client).await {
106                                Ok(functional) => {
107                                    (rawbridgeline, Some(functional), current_time, None)
108                                }
109                                Err(er) => {
110                                    // Build error here since we can't
111                                    // represent the actual Arti-related errors
112                                    // by `dyn ErrorReport` and we need the
113                                    // `.report()` method's output to pretty print
114                                    // errors in the JSON we return to the user
115                                    let error_report =
116                                        er.report().to_string().replace("error: ", "");
117                                    (rawbridgeline, None, current_time, Some(error_report))
118                                }
119                            }
120                        })
121                    }
122                    Err(e) => tokio::spawn(async move {
123                        let current_time = OffsetDateTime::now_utc();
124                        // Build error here since we can't
125                        // represent the actual Arti-related errors
126                        // by `dyn ErrorReport` and we need the
127                        // `.report()` method's output to pretty print
128                        // errors in the JSON we return to the user
129                        (
130                            rawbridgeline,
131                            None,
132                            current_time,
133                            Some(e.report().to_string()),
134                        )
135                    }),
136                }
137            })
138            .collect();
139        counter += MAX_CONNECTIONS;
140        let task_results = futures::future::join_all(tasks).await;
141        for (bridgeline, chan, time, error) in task_results.into_iter().flatten() {
142            let res = BridgeResult {
143                functional: chan.is_some(),
144                last_tested: time,
145                error,
146            };
147            results.insert(bridgeline.clone(), res);
148            if let Some(channel) = chan {
149                channels.insert(bridgeline, channel);
150            }
151        }
152    }
153    (results, channels)
154}
155
156/// Calculates a list of bridge lines that have no channels
157pub fn get_failed_bridges(
158    bridge_lines: &[String],
159    channels: &HashMap<String, Arc<Channel>>,
160) -> Vec<String> {
161    bridge_lines
162        .iter()
163        .filter_map(|bridge_line| {
164            if !channels.contains_key(bridge_line) {
165                Some(bridge_line.to_owned())
166            } else {
167                None
168            }
169        })
170        .collect::<Vec<_>>()
171}
172
173/// Task which checks if failed bridges have come up online
174pub async fn check_failed_bridges_task(
175    initial_failed_bridges: Vec<String>,
176    common_tor_client: TorClient<PreferredRuntime>,
177    now_online_bridges_tx: Sender<HashMap<String, Arc<Channel>>>,
178    mut once_online_bridges_rx: Receiver<Vec<String>>,
179    updates_tx: broadcast::Sender<HashMap<String, BridgeResult>>,
180    mut new_bridges_rx: broadcast::Receiver<Vec<String>>,
181) {
182    let mut failed_bridges = initial_failed_bridges;
183    loop {
184        let (newresults, good_bridges) =
185            test_bridges(&failed_bridges, common_tor_client.isolated_client()).await;
186        // detect which bridges failed again
187        failed_bridges = get_failed_bridges(&failed_bridges, &good_bridges);
188        // report online bridges to the appropriate task
189        now_online_bridges_tx.send(good_bridges).await.unwrap();
190        // get new failures from the other task
191        while let Ok(Some(new_failures)) =
192            timeout(RECEIVE_TIMEOUT, once_online_bridges_rx.recv()).await
193        {
194            if new_failures.is_empty() {
195                break;
196            }
197            failed_bridges.splice(..0, new_failures.iter().cloned());
198        }
199        // get new bridges to test from API call and merge them with known bad
200        // bridges
201        while let Ok(Ok(new_failures)) = timeout(RECEIVE_TIMEOUT, new_bridges_rx.recv()).await {
202            if new_failures.is_empty() {
203                break;
204            }
205            let set1: HashSet<_> = new_failures.iter().cloned().collect();
206            let set2: HashSet<_> = failed_bridges.iter().cloned().collect();
207            failed_bridges = set1.union(&set2).cloned().collect();
208        }
209        // write newresults into the updates channel
210        if !newresults.is_empty() {
211            updates_tx.send(newresults).unwrap();
212        }
213    }
214}
215
216/// Task which checks if online bridges have gone down
217///
218/// TODO: use new Arti APIs for detecting bridges going down
219pub async fn detect_bridges_going_down(
220    initial_channels: HashMap<String, Arc<Channel>>,
221    once_online_bridges_tx: Sender<Vec<String>>,
222    mut now_online_bridges_rx: Receiver<HashMap<String, Arc<Channel>>>,
223) {
224    let mut channels = initial_channels;
225    let (expiry_tx, mut expiry_rx) = mpsc::channel::<String>(CHANNEL_SIZE);
226    loop {
227        let mut failed_bridges = Vec::new();
228        let mut new_channels = HashMap::new();
229        for (bridgeline, channel) in channels.into_iter() {
230            let new_expiry_tx = expiry_tx.clone();
231            tokio::spawn(async move {
232                if let Err(e) =
233                    is_bridge_still_online(channel.as_ref(), bridgeline.clone(), new_expiry_tx)
234                        .await
235                {
236                    eprintln!("Error while waiting on close: {:#?}", e);
237                }
238            });
239        }
240        // detect any bridges failing
241        while let Ok(Some(bridgeline)) = timeout(RECEIVE_TIMEOUT, expiry_rx.recv()).await {
242            new_channels.remove(&bridgeline);
243            failed_bridges.push(bridgeline);
244        }
245        // report failures to the appropriate task
246        once_online_bridges_tx.send(failed_bridges).await.unwrap();
247        // get new channels from the other task
248        while let Ok(Some(just_online_bridges)) =
249            timeout(RECEIVE_TIMEOUT, now_online_bridges_rx.recv()).await
250        {
251            new_channels.extend(just_online_bridges);
252        }
253        channels = new_channels;
254    }
255}
256
257/// Function which keeps track of the state of all the bridges given to it
258pub async fn continuous_check(
259    channels: HashMap<String, Arc<Channel>>,
260    failed_bridges: Vec<String>,
261    common_tor_client: TorClient<PreferredRuntime>,
262    updates_tx: broadcast::Sender<HashMap<String, BridgeResult>>,
263    new_bridges_rx: broadcast::Receiver<Vec<String>>,
264) {
265    let (once_online_sender, once_online_recv) = mpsc::channel(CHANNEL_SIZE);
266    let (now_online_sender, now_online_recv) = mpsc::channel(CHANNEL_SIZE);
267    let task1 = detect_bridges_going_down(channels, once_online_sender, now_online_recv);
268    let task2 = check_failed_bridges_task(
269        failed_bridges,
270        common_tor_client,
271        now_online_sender,
272        once_online_recv,
273        updates_tx,
274        new_bridges_rx,
275    );
276    tokio::join!(task1, task2);
277}
278
279/// Build a [TorClient] that is intended to be used purely for creating isolated clients off of.
280///
281/// Note that this is mainly a wrapper for convenience purposes
282pub async fn build_common_tor_client(
283    obfs4_path: &str,
284) -> anyhow::Result<TorClient<PreferredRuntime>> {
285    let builder = build_pt_bridge_config("obfs4", obfs4_path)?.build()?;
286    Ok(TorClient::create_bootstrapped(builder).await?)
287}
288
289/// Main function to unite everything together
290///
291/// In summary,
292///
293/// 1. Create the common [`TorClient`] which will be used for every connection
294///
295/// 2. Give [test_bridges()] the bridge lines
296///
297/// 3. Return the results
298pub async fn main_test(
299    bridge_lines: Vec<String>,
300    obfs4_path: &str,
301) -> Result<(HashMap<String, BridgeResult>, HashMap<String, Arc<Channel>>), arti_client::Error> {
302    let common_tor_client = build_common_tor_client(obfs4_path).await.unwrap();
303    Ok(test_bridges(&bridge_lines, common_tor_client).await)
304}