obfs4_checker/
main.rs

1#![warn(clippy::missing_docs_in_private_items)]
2#![doc = include_str!("../README.md")]
3use crate::checking::RECEIVE_TIMEOUT;
4use axum::{
5    http::StatusCode,
6    routing::{get, post},
7    Json, Router,
8};
9use clap::Parser;
10use serde::{Deserialize, Serialize};
11use std::{collections::HashMap, net::SocketAddr};
12use time::OffsetDateTime;
13use tokio::sync::broadcast::{self, Receiver, Sender};
14use tokio::time::timeout;
15use tor_error::ErrorReport;
16mod checking;
17
18/// Utility to deliver real-time updates on bridge health
19#[derive(Parser)]
20#[command(author, version, about, long_about = None)]
21struct Args {
22    #[arg(short, long, required = true)]
23    /// Path to the `lyrebird` or `obfs4proxy`, required for making obfs4 connections
24    obfs4_bin: String,
25}
26
27/// The input to our `bridge-state` handler
28///
29/// Just contains a list of bridge lines to test
30#[derive(Deserialize)]
31struct BridgeLines {
32    /// List of bridge lines to test
33    pub bridge_lines: Vec<String>,
34}
35
36/// Struct which represents one bridge's result
37#[derive(Serialize, Clone, Debug)]
38pub struct BridgeResult {
39    /// Is bridge online or not?
40    functional: bool,
41    /// The time at which the bridge was last tested, written as a nice string
42    last_tested: OffsetDateTime,
43    /// Error encountered while trying to connect to the bridge, if any
44    ///
45    /// It is generated using [tor_error::ErrorReport]
46    #[serde(skip_serializing_if = "Option::is_none")]
47    error: Option<String>,
48}
49
50/// The output to our `bridge-state` handler
51///
52/// Contains the [BridgeResult] for each bridgeline,
53/// an error (if any), and the total time it took
54/// to run the entire test
55#[derive(Serialize)]
56struct BridgesResult {
57    /// All the bridge results, mapped by bridge line
58    bridge_results: HashMap<String, BridgeResult>,
59    /// General error encountered, if any
60    #[serde(skip_serializing_if = "Option::is_none")]
61    error: Option<String>,
62    /// The time it took to generate this result
63    time: f64,
64}
65
66/// Wrapper around the main testing function
67async fn check_bridges(
68    bridge_lines: Vec<String>,
69    updates_tx: Sender<HashMap<String, BridgeResult>>,
70    obfs4_path: String,
71    new_bridges_rx: broadcast::Receiver<Vec<String>>,
72) -> (StatusCode, Json<BridgesResult>) {
73    let commencement_time = OffsetDateTime::now_utc();
74    let mainop = crate::checking::main_test(bridge_lines.clone(), &obfs4_path).await;
75    let end_time = OffsetDateTime::now_utc();
76    let diff = (end_time - commencement_time).as_seconds_f64();
77    let (bridge_results, error) = match mainop {
78        Ok((bridge_results, channels)) => {
79            let failed_bridges = crate::checking::get_failed_bridges(&bridge_lines, &channels);
80            let common_tor_client = crate::checking::build_common_tor_client(&obfs4_path)
81                .await
82                .unwrap();
83            tokio::spawn(async move {
84                crate::checking::continuous_check(
85                    channels,
86                    failed_bridges,
87                    common_tor_client,
88                    updates_tx,
89                    new_bridges_rx,
90                )
91                .await
92            });
93            (bridge_results, None)
94        }
95        Err(e) => {
96            let error_report = e.report().to_string().replace("error: ", "");
97            (HashMap::new(), Some(error_report))
98        }
99    };
100    let finalresult = BridgesResult {
101        bridge_results,
102        error,
103        time: diff,
104    };
105    (StatusCode::OK, Json(finalresult))
106}
107
108/// Wrapper around the main testing function
109async fn updates(
110    mut updates_rx: Receiver<HashMap<String, BridgeResult>>,
111) -> (StatusCode, Json<BridgesResult>) {
112    let mut bridge_results = HashMap::new();
113    while let Ok(Ok(update)) = timeout(RECEIVE_TIMEOUT, updates_rx.recv()).await {
114        if update.is_empty() {
115            break;
116        }
117        bridge_results.extend(update);
118    }
119    let finalresult = BridgesResult {
120        bridge_results,
121        error: None,
122        time: 0.0,
123    };
124    (StatusCode::OK, Json(finalresult))
125}
126
127/// Add new bridges to the main testing tasks
128async fn add_new_bridges(
129    new_bridge_lines: Vec<String>,
130    new_bridges_tx: Sender<Vec<String>>,
131) -> StatusCode {
132    match new_bridges_tx.send(new_bridge_lines) {
133        Ok(_) => StatusCode::OK,
134        Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
135    }
136}
137
138/// Run the HTTP server and call the required methods to initialize the testing
139#[tokio::main]
140async fn main() {
141    tracing_subscriber::fmt::init();
142    let args = Args::parse();
143    let obfs4_bin_path = args.obfs4_bin;
144    // unused Receiver prevents SendErrors
145    let (updates_tx, _updates_rx_unused) =
146        broadcast::channel::<HashMap<String, BridgeResult>>(crate::checking::CHANNEL_SIZE);
147    let (new_bridges_tx, _new_bridges_rx) =
148        broadcast::channel::<Vec<String>>(crate::checking::CHANNEL_SIZE);
149    let updates_sender_clone = updates_tx.clone();
150    let new_bridges_tx_clone = new_bridges_tx.clone();
151    let bridges_check_callback = move |Json(payload): Json<BridgeLines>| {
152        let new_bridges_recv_clone = new_bridges_tx_clone.subscribe();
153        async {
154            check_bridges(
155                payload.bridge_lines,
156                updates_sender_clone,
157                obfs4_bin_path,
158                new_bridges_recv_clone,
159            )
160            .await
161        }
162    };
163    let updates_callback = move || {
164        let updates_rx = updates_tx.subscribe();
165        async move { updates(updates_rx).await }
166    };
167    let add_new_bridges_callback = move |Json(payload): Json<BridgeLines>| async move {
168        add_new_bridges(payload.bridge_lines, new_bridges_tx).await
169    };
170    let app = Router::new()
171        .route("/bridge-state", post(bridges_check_callback))
172        .route("/add-bridges", post(add_new_bridges_callback))
173        .route("/updates", get(updates_callback));
174
175    let addr = SocketAddr::from(([127, 0, 0, 1], 5000));
176
177    let listener = tokio::net::TcpListener::bind(addr)
178        .await
179        .expect("failed to bind to TCP address");
180
181    axum::serve(listener, app).await.expect("failed to serve");
182}