obfs4_checker/
main.rs
1#![warn(clippy::missing_docs_in_private_items)]
2#![doc = include_str!("../README.md")]
3use crate::checking::RECEIVE_TIMEOUT;
4use axum::{
5 http::StatusCode,
6 routing::{get, post},
7 Json, Router,
8};
9use clap::Parser;
10use serde::{Deserialize, Serialize};
11use std::{collections::HashMap, net::SocketAddr};
12use time::OffsetDateTime;
13use tokio::sync::broadcast::{self, Receiver, Sender};
14use tokio::time::timeout;
15use tor_error::ErrorReport;
16mod checking;
17
18#[derive(Parser)]
20#[command(author, version, about, long_about = None)]
21struct Args {
22 #[arg(short, long, required = true)]
23 obfs4_bin: String,
25}
26
27#[derive(Deserialize)]
31struct BridgeLines {
32 pub bridge_lines: Vec<String>,
34}
35
36#[derive(Serialize, Clone, Debug)]
38pub struct BridgeResult {
39 functional: bool,
41 last_tested: OffsetDateTime,
43 #[serde(skip_serializing_if = "Option::is_none")]
47 error: Option<String>,
48}
49
50#[derive(Serialize)]
56struct BridgesResult {
57 bridge_results: HashMap<String, BridgeResult>,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 error: Option<String>,
62 time: f64,
64}
65
66async fn check_bridges(
68 bridge_lines: Vec<String>,
69 updates_tx: Sender<HashMap<String, BridgeResult>>,
70 obfs4_path: String,
71 new_bridges_rx: broadcast::Receiver<Vec<String>>,
72) -> (StatusCode, Json<BridgesResult>) {
73 let commencement_time = OffsetDateTime::now_utc();
74 let mainop = crate::checking::main_test(bridge_lines.clone(), &obfs4_path).await;
75 let end_time = OffsetDateTime::now_utc();
76 let diff = (end_time - commencement_time).as_seconds_f64();
77 let (bridge_results, error) = match mainop {
78 Ok((bridge_results, channels)) => {
79 let failed_bridges = crate::checking::get_failed_bridges(&bridge_lines, &channels);
80 let common_tor_client = crate::checking::build_common_tor_client(&obfs4_path)
81 .await
82 .unwrap();
83 tokio::spawn(async move {
84 crate::checking::continuous_check(
85 channels,
86 failed_bridges,
87 common_tor_client,
88 updates_tx,
89 new_bridges_rx,
90 )
91 .await
92 });
93 (bridge_results, None)
94 }
95 Err(e) => {
96 let error_report = e.report().to_string().replace("error: ", "");
97 (HashMap::new(), Some(error_report))
98 }
99 };
100 let finalresult = BridgesResult {
101 bridge_results,
102 error,
103 time: diff,
104 };
105 (StatusCode::OK, Json(finalresult))
106}
107
108async fn updates(
110 mut updates_rx: Receiver<HashMap<String, BridgeResult>>,
111) -> (StatusCode, Json<BridgesResult>) {
112 let mut bridge_results = HashMap::new();
113 while let Ok(Ok(update)) = timeout(RECEIVE_TIMEOUT, updates_rx.recv()).await {
114 if update.is_empty() {
115 break;
116 }
117 bridge_results.extend(update);
118 }
119 let finalresult = BridgesResult {
120 bridge_results,
121 error: None,
122 time: 0.0,
123 };
124 (StatusCode::OK, Json(finalresult))
125}
126
127async fn add_new_bridges(
129 new_bridge_lines: Vec<String>,
130 new_bridges_tx: Sender<Vec<String>>,
131) -> StatusCode {
132 match new_bridges_tx.send(new_bridge_lines) {
133 Ok(_) => StatusCode::OK,
134 Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
135 }
136}
137
138#[tokio::main]
140async fn main() {
141 tracing_subscriber::fmt::init();
142 let args = Args::parse();
143 let obfs4_bin_path = args.obfs4_bin;
144 let (updates_tx, _updates_rx_unused) =
146 broadcast::channel::<HashMap<String, BridgeResult>>(crate::checking::CHANNEL_SIZE);
147 let (new_bridges_tx, _new_bridges_rx) =
148 broadcast::channel::<Vec<String>>(crate::checking::CHANNEL_SIZE);
149 let updates_sender_clone = updates_tx.clone();
150 let new_bridges_tx_clone = new_bridges_tx.clone();
151 let bridges_check_callback = move |Json(payload): Json<BridgeLines>| {
152 let new_bridges_recv_clone = new_bridges_tx_clone.subscribe();
153 async {
154 check_bridges(
155 payload.bridge_lines,
156 updates_sender_clone,
157 obfs4_bin_path,
158 new_bridges_recv_clone,
159 )
160 .await
161 }
162 };
163 let updates_callback = move || {
164 let updates_rx = updates_tx.subscribe();
165 async move { updates(updates_rx).await }
166 };
167 let add_new_bridges_callback = move |Json(payload): Json<BridgeLines>| async move {
168 add_new_bridges(payload.bridge_lines, new_bridges_tx).await
169 };
170 let app = Router::new()
171 .route("/bridge-state", post(bridges_check_callback))
172 .route("/add-bridges", post(add_new_bridges_callback))
173 .route("/updates", get(updates_callback));
174
175 let addr = SocketAddr::from(([127, 0, 0, 1], 5000));
176
177 let listener = tokio::net::TcpListener::bind(addr)
178 .await
179 .expect("failed to bind to TCP address");
180
181 axum::serve(listener, app).await.expect("failed to serve");
182}