arti_testing/
main.rs
1#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
55#![warn(noop_method_call)]
56#![warn(unreachable_pub)]
57#![warn(clippy::all)]
58#![deny(clippy::await_holding_lock)]
59#![deny(clippy::cargo_common_metadata)]
60#![deny(clippy::cast_lossless)]
61#![deny(clippy::checked_conversions)]
62#![warn(clippy::cognitive_complexity)]
63#![deny(clippy::debug_assert_with_mut_call)]
64#![deny(clippy::exhaustive_enums)]
65#![deny(clippy::exhaustive_structs)]
66#![deny(clippy::expl_impl_clone_on_copy)]
67#![deny(clippy::fallible_impl_from)]
68#![deny(clippy::implicit_clone)]
69#![deny(clippy::large_stack_arrays)]
70#![warn(clippy::manual_ok_or)]
71#![deny(clippy::missing_docs_in_private_items)]
72#![warn(clippy::needless_borrow)]
73#![warn(clippy::needless_pass_by_value)]
74#![warn(clippy::option_option)]
75#![deny(clippy::print_stderr)]
76#![deny(clippy::print_stdout)]
77#![warn(clippy::rc_buffer)]
78#![deny(clippy::ref_option_ref)]
79#![warn(clippy::semicolon_if_nothing_returned)]
80#![warn(clippy::trait_duplication_in_bounds)]
81#![deny(clippy::unchecked_duration_subtraction)]
82#![deny(clippy::unnecessary_wraps)]
83#![warn(clippy::unseparated_literal_suffix)]
84#![deny(clippy::unwrap_used)]
85#![deny(clippy::mod_module_files)]
86#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
88#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] #![allow(clippy::print_stderr)] #![allow(clippy::print_stdout)] mod config;
99mod dirfilter;
100mod rt;
101mod traces;
102
103use arti::ArtiCombinedConfig;
104use arti_client::TorClient;
105use futures::task::SpawnExt;
106use rt::badtcp::BrokenTcpProvider;
107use tor_config::ConfigurationSources;
108use tor_dirmgr::filter::DirFilter;
109use tor_rtcompat::{PreferredRuntime, Runtime, RuntimeSubstExt as _, SleepProviderExt};
110
111use anyhow::{anyhow, Result};
112use tracing_subscriber::prelude::*;
113use std::str::FromStr;
115use std::sync::Arc;
116use std::time::Duration;
117
118#[derive(Debug, Clone)]
120enum Action {
121 Bootstrap,
123 Connect {
127 target: String,
129 retry_delay: Option<Duration>,
132 },
133}
134
135#[derive(Debug, Clone)]
137enum Expectation {
138 Success,
140 Failure,
142 Timeout,
144}
145
146impl FromStr for Expectation {
147 type Err = anyhow::Error;
148
149 fn from_str(s: &str) -> Result<Self, Self::Err> {
150 Ok(match s {
151 "success" => Expectation::Success,
152 "failure" => Expectation::Failure,
153 "timeout" => Expectation::Timeout,
154 _ => return Err(anyhow!("Unrecognized expectation {:?}", s)),
155 })
156 }
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
161enum BreakageStage {
162 Bootstrap,
164 Connect,
166}
167
168impl FromStr for BreakageStage {
169 type Err = anyhow::Error;
170
171 fn from_str(s: &str) -> Result<Self, Self::Err> {
172 Ok(match s {
173 "bootstrap" => BreakageStage::Bootstrap,
174 "connect" => BreakageStage::Connect,
175 _ => return Err(anyhow!("unrecognized breakage stage {:?}", s)),
176 })
177 }
178}
179
180#[derive(Debug, Clone)]
182struct TcpBreakage {
183 action: rt::badtcp::ConditionalAction,
185 stage: BreakageStage,
187 delay: Option<Duration>,
189}
190
191impl TcpBreakage {
192 fn apply<R: Runtime, R2: Send + Sync + 'static>(
194 &self,
195 main_runtime: &R,
196 breakage_provider: BrokenTcpProvider<R2>,
197 ) {
198 if let Some(delay) = self.delay {
199 let rt_clone = main_runtime.clone();
200 let action = self.action.clone();
201 main_runtime
202 .spawn(async move {
203 rt_clone.sleep(delay).await;
204 breakage_provider.set_action(action);
205 })
206 .expect("can't spawn.");
207 } else {
208 breakage_provider.set_action(self.action.clone());
209 }
210 }
211}
212
213#[derive(Debug, Clone)]
215struct Job {
216 action: Action,
218
219 tcp_breakage: TcpBreakage,
221
222 dir_filter: Arc<dyn DirFilter + 'static>,
224
225 console_log: String,
227
228 config: ConfigurationSources,
230
231 expectation: Option<Expectation>,
233
234 timeout: Duration,
236}
237
238impl Job {
239 fn make_client<R: Runtime>(&self, runtime: R) -> Result<TorClient<R>> {
241 let (_arti, tcc) = tor_config::resolve::<ArtiCombinedConfig>(self.config.load()?)?;
242 let client = TorClient::with_runtime(runtime)
243 .config(tcc)
244 .dirfilter(self.dir_filter.clone())
245 .create_unbootstrapped()?;
246 Ok(client)
247 }
248
249 async fn run_job_inner<R: Runtime, R2: Send + Sync + Clone + 'static>(
251 &self,
252 broken_tcp: rt::badtcp::BrokenTcpProvider<R2>,
253 client: TorClient<R>,
254 ) -> Result<()> {
255 if self.tcp_breakage.stage == BreakageStage::Bootstrap {
256 self.tcp_breakage
257 .apply(client.runtime(), broken_tcp.clone());
258 }
259
260 client.bootstrap().await?; match &self.action {
263 Action::Bootstrap => {}
264 Action::Connect {
265 target,
266 retry_delay,
267 } => {
268 if self.tcp_breakage.stage == BreakageStage::Connect {
269 self.tcp_breakage
270 .apply(client.runtime(), broken_tcp.clone());
271 }
272
273 loop {
274 let outcome = client.connect(target).await;
275 match (outcome, retry_delay) {
276 (Ok(_stream), _) => break,
277 (Err(e), None) => return Err(e.into()),
278 (Err(_e), Some(delay)) => client.runtime().sleep(*delay).await, }
280 }
281 }
282 }
283
284 Ok(())
285 }
286
287 async fn run_job(&self) -> Result<()> {
291 let runtime = PreferredRuntime::current()?;
292 let broken_tcp = rt::badtcp::BrokenTcpProvider::new(
293 runtime.clone(),
294 rt::badtcp::ConditionalAction::default(),
295 );
296 let counting_tcp = rt::count::Counting::new_zeroed(broken_tcp.clone());
300 let runtime = runtime.with_tcp_provider(counting_tcp.clone());
301 let client = self.make_client(runtime)?;
302
303 let outcome = client
304 .clone()
305 .runtime()
306 .timeout(self.timeout, self.run_job_inner(broken_tcp.clone(), client))
307 .await;
308
309 let result = match (&self.expectation, outcome) {
310 (Some(Expectation::Timeout), Err(tor_rtcompat::TimeoutError)) => {
311 println!("Timeout occurred [as expected]");
312 Ok(())
313 }
314 (Some(Expectation::Failure), Ok(Err(e))) => {
315 println!("Got an error as [as expected]");
316 println!("Error was: {}", e);
317 Ok(())
318 }
319 (Some(Expectation::Success), Ok(Ok(()))) => {
320 println!("Operation succeeded [as expected]");
321 Ok(())
322 }
323 (Some(expectation), outcome) => Err(anyhow!(
324 "Test failed. Expected {:?} but got: {:?}",
325 expectation,
326 outcome
327 )),
328 (None, outcome) => {
329 println!("Outcome: {:?}", outcome);
331 Ok(())
332 }
333 };
334
335 println!("TCP stats: {:?}", counting_tcp.counts());
336
337 result
338 }
339}
340
341#[tokio::main]
342async fn main() -> Result<()> {
343 let job = config::parse_cmdline()?;
344
345 let targets: tracing_subscriber::filter::Targets = job.console_log.parse()?;
346 let console_layer = tracing_subscriber::fmt::Layer::default().with_filter(targets);
347 let trace_count = Arc::new(traces::TraceCount::default());
348 tracing_subscriber::registry()
349 .with(console_layer)
350 .with(traces::TraceCounter(trace_count.clone()))
351 .init();
352
353 let outcome = job.run_job().await;
354
355 println!("Total events: {}", trace_count);
356
357 outcome
358}