1use arti_client::config::pt::TransportConfigBuilder;
3use arti_client::config::{BridgeConfigBuilder, CfgPath, TorClientConfigBuilder};
4use arti_client::{TorClient, TorClientConfig};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use time::OffsetDateTime;
8use tokio::sync::broadcast;
9use tokio::sync::mpsc::{self, Receiver, Sender};
10use tokio::time::{timeout, Duration};
11use tor_error::ErrorReport;
12use tor_guardmgr::bridge::{BridgeConfig, BridgeParseError};
13use tor_proto::channel::Channel;
14use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
15use tor_rtcompat::PreferredRuntime;
16
17use crate::BridgeResult;
18
19const MAX_CONNECTIONS: usize = 10;
21
22pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(1);
25
26pub(crate) const CHANNEL_SIZE: usize = 100;
29
30async fn is_bridge_online(
37 bridge_config: &BridgeConfig,
38 tor_client: &TorClient<PreferredRuntime>,
39) -> Result<Arc<Channel>, tor_chanmgr::Error> {
40 let chanmgr = tor_client.chanmgr();
41 chanmgr
42 .build_unmanaged_channel(bridge_config, ChannelAccount::new_noop())
43 .await
44}
45
46async fn is_bridge_still_online(
49 channel: &Channel,
50 bridge_line: String,
51 expiry_tx: Sender<String>,
52) -> anyhow::Result<()> {
53 let _ = channel.wait_for_close().await;
54 expiry_tx.send(bridge_line).await?;
56 Ok(())
57}
58
59fn build_pt_bridge_config(
62 protocol: &str,
63 bin_path: &str,
64) -> anyhow::Result<TorClientConfigBuilder> {
65 let mut builder = TorClientConfig::builder();
66 let mut transport = TransportConfigBuilder::default();
67 let protocol_parsed = protocol.parse()?;
68 transport
69 .protocols(vec![protocol_parsed])
70 .path(CfgPath::new(bin_path.into()))
71 .run_on_startup(true);
72 builder.bridges().transports().push(transport);
73 Ok(builder)
74}
75
76async fn test_bridges(
85 bridge_lines: &[String],
86 common_tor_client: TorClient<PreferredRuntime>,
87) -> (HashMap<String, BridgeResult>, HashMap<String, Arc<Channel>>) {
88 let mut results = HashMap::new();
89 let mut channels = HashMap::new();
90 let mut counter = 0;
91 while counter < bridge_lines.len() {
92 let tasks: Vec<_> = bridge_lines
93 [counter..(counter + MAX_CONNECTIONS).min(bridge_lines.len())]
94 .iter()
95 .map(|rawbridgeline_ref| {
96 let rawbridgeline = rawbridgeline_ref.to_string();
97 let maybe_bridge: Result<BridgeConfigBuilder, BridgeParseError> =
98 rawbridgeline.parse();
99 match maybe_bridge {
100 Ok(bridge) => {
101 let bridge_config = bridge.build().unwrap();
102 let tor_client = common_tor_client.isolated_client();
103 tokio::spawn(async move {
104 let current_time = OffsetDateTime::now_utc();
105 match is_bridge_online(&bridge_config, &tor_client).await {
106 Ok(functional) => {
107 (rawbridgeline, Some(functional), current_time, None)
108 }
109 Err(er) => {
110 let error_report =
116 er.report().to_string().replace("error: ", "");
117 (rawbridgeline, None, current_time, Some(error_report))
118 }
119 }
120 })
121 }
122 Err(e) => tokio::spawn(async move {
123 let current_time = OffsetDateTime::now_utc();
124 (
130 rawbridgeline,
131 None,
132 current_time,
133 Some(e.report().to_string()),
134 )
135 }),
136 }
137 })
138 .collect();
139 counter += MAX_CONNECTIONS;
140 let task_results = futures::future::join_all(tasks).await;
141 for (bridgeline, chan, time, error) in task_results.into_iter().flatten() {
142 let res = BridgeResult {
143 functional: chan.is_some(),
144 last_tested: time,
145 error,
146 };
147 results.insert(bridgeline.clone(), res);
148 if let Some(channel) = chan {
149 channels.insert(bridgeline, channel);
150 }
151 }
152 }
153 (results, channels)
154}
155
156pub fn get_failed_bridges(
158 bridge_lines: &[String],
159 channels: &HashMap<String, Arc<Channel>>,
160) -> Vec<String> {
161 bridge_lines
162 .iter()
163 .filter_map(|bridge_line| {
164 if !channels.contains_key(bridge_line) {
165 Some(bridge_line.to_owned())
166 } else {
167 None
168 }
169 })
170 .collect::<Vec<_>>()
171}
172
173pub async fn check_failed_bridges_task(
175 initial_failed_bridges: Vec<String>,
176 common_tor_client: TorClient<PreferredRuntime>,
177 now_online_bridges_tx: Sender<HashMap<String, Arc<Channel>>>,
178 mut once_online_bridges_rx: Receiver<Vec<String>>,
179 updates_tx: broadcast::Sender<HashMap<String, BridgeResult>>,
180 mut new_bridges_rx: broadcast::Receiver<Vec<String>>,
181) {
182 let mut failed_bridges = initial_failed_bridges;
183 loop {
184 let (newresults, good_bridges) =
185 test_bridges(&failed_bridges, common_tor_client.isolated_client()).await;
186 failed_bridges = get_failed_bridges(&failed_bridges, &good_bridges);
188 now_online_bridges_tx.send(good_bridges).await.unwrap();
190 while let Ok(Some(new_failures)) =
192 timeout(RECEIVE_TIMEOUT, once_online_bridges_rx.recv()).await
193 {
194 if new_failures.is_empty() {
195 break;
196 }
197 failed_bridges.splice(..0, new_failures.iter().cloned());
198 }
199 while let Ok(Ok(new_failures)) = timeout(RECEIVE_TIMEOUT, new_bridges_rx.recv()).await {
202 if new_failures.is_empty() {
203 break;
204 }
205 let set1: HashSet<_> = new_failures.iter().cloned().collect();
206 let set2: HashSet<_> = failed_bridges.iter().cloned().collect();
207 failed_bridges = set1.union(&set2).cloned().collect();
208 }
209 if !newresults.is_empty() {
211 updates_tx.send(newresults).unwrap();
212 }
213 }
214}
215
216pub async fn detect_bridges_going_down(
220 initial_channels: HashMap<String, Arc<Channel>>,
221 once_online_bridges_tx: Sender<Vec<String>>,
222 mut now_online_bridges_rx: Receiver<HashMap<String, Arc<Channel>>>,
223) {
224 let mut channels = initial_channels;
225 let (expiry_tx, mut expiry_rx) = mpsc::channel::<String>(CHANNEL_SIZE);
226 loop {
227 let mut failed_bridges = Vec::new();
228 let mut new_channels = HashMap::new();
229 for (bridgeline, channel) in channels.into_iter() {
230 let new_expiry_tx = expiry_tx.clone();
231 tokio::spawn(async move {
232 if let Err(e) =
233 is_bridge_still_online(channel.as_ref(), bridgeline.clone(), new_expiry_tx)
234 .await
235 {
236 eprintln!("Error while waiting on close: {:#?}", e);
237 }
238 });
239 }
240 while let Ok(Some(bridgeline)) = timeout(RECEIVE_TIMEOUT, expiry_rx.recv()).await {
242 new_channels.remove(&bridgeline);
243 failed_bridges.push(bridgeline);
244 }
245 once_online_bridges_tx.send(failed_bridges).await.unwrap();
247 while let Ok(Some(just_online_bridges)) =
249 timeout(RECEIVE_TIMEOUT, now_online_bridges_rx.recv()).await
250 {
251 new_channels.extend(just_online_bridges);
252 }
253 channels = new_channels;
254 }
255}
256
257pub async fn continuous_check(
259 channels: HashMap<String, Arc<Channel>>,
260 failed_bridges: Vec<String>,
261 common_tor_client: TorClient<PreferredRuntime>,
262 updates_tx: broadcast::Sender<HashMap<String, BridgeResult>>,
263 new_bridges_rx: broadcast::Receiver<Vec<String>>,
264) {
265 let (once_online_sender, once_online_recv) = mpsc::channel(CHANNEL_SIZE);
266 let (now_online_sender, now_online_recv) = mpsc::channel(CHANNEL_SIZE);
267 let task1 = detect_bridges_going_down(channels, once_online_sender, now_online_recv);
268 let task2 = check_failed_bridges_task(
269 failed_bridges,
270 common_tor_client,
271 now_online_sender,
272 once_online_recv,
273 updates_tx,
274 new_bridges_rx,
275 );
276 tokio::join!(task1, task2);
277}
278
279pub async fn build_common_tor_client(
283 obfs4_path: &str,
284) -> anyhow::Result<TorClient<PreferredRuntime>> {
285 let builder = build_pt_bridge_config("obfs4", obfs4_path)?.build()?;
286 Ok(TorClient::create_bootstrapped(builder).await?)
287}
288
289pub async fn main_test(
299 bridge_lines: Vec<String>,
300 obfs4_path: &str,
301) -> Result<(HashMap<String, BridgeResult>, HashMap<String, Arc<Channel>>), arti_client::Error> {
302 let common_tor_client = build_common_tor_client(obfs4_path).await.unwrap();
303 Ok(test_bridges(&bridge_lines, common_tor_client).await)
304}