arti_testing/
main.rs
1#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
55#![warn(noop_method_call)]
56#![warn(unreachable_pub)]
57#![warn(clippy::all)]
58#![deny(clippy::await_holding_lock)]
59#![deny(clippy::cargo_common_metadata)]
60#![deny(clippy::cast_lossless)]
61#![deny(clippy::checked_conversions)]
62#![warn(clippy::cognitive_complexity)]
63#![deny(clippy::debug_assert_with_mut_call)]
64#![deny(clippy::exhaustive_enums)]
65#![deny(clippy::exhaustive_structs)]
66#![deny(clippy::expl_impl_clone_on_copy)]
67#![deny(clippy::fallible_impl_from)]
68#![deny(clippy::implicit_clone)]
69#![deny(clippy::large_stack_arrays)]
70#![warn(clippy::manual_ok_or)]
71#![deny(clippy::missing_docs_in_private_items)]
72#![warn(clippy::needless_borrow)]
73#![warn(clippy::needless_pass_by_value)]
74#![warn(clippy::option_option)]
75#![deny(clippy::print_stderr)]
76#![deny(clippy::print_stdout)]
77#![warn(clippy::rc_buffer)]
78#![deny(clippy::ref_option_ref)]
79#![warn(clippy::semicolon_if_nothing_returned)]
80#![warn(clippy::trait_duplication_in_bounds)]
81#![deny(clippy::unchecked_duration_subtraction)]
82#![deny(clippy::unnecessary_wraps)]
83#![warn(clippy::unseparated_literal_suffix)]
84#![deny(clippy::unwrap_used)]
85#![deny(clippy::mod_module_files)]
86#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
88#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(clippy::print_stderr)] #![allow(clippy::print_stdout)] mod config;
98mod dirfilter;
99mod rt;
100mod traces;
101
102use arti::ArtiCombinedConfig;
103use arti_client::TorClient;
104use futures::task::SpawnExt;
105use rt::badtcp::BrokenTcpProvider;
106use tor_config::ConfigurationSources;
107use tor_dirmgr::filter::DirFilter;
108use tor_rtcompat::{PreferredRuntime, Runtime, RuntimeSubstExt as _, SleepProviderExt};
109
110use anyhow::{anyhow, Result};
111use tracing_subscriber::prelude::*;
112use std::str::FromStr;
114use std::sync::Arc;
115use std::time::Duration;
116
117#[derive(Debug, Clone)]
119enum Action {
120 Bootstrap,
122 Connect {
126 target: String,
128 retry_delay: Option<Duration>,
131 },
132}
133
134#[derive(Debug, Clone)]
136enum Expectation {
137 Success,
139 Failure,
141 Timeout,
143}
144
145impl FromStr for Expectation {
146 type Err = anyhow::Error;
147
148 fn from_str(s: &str) -> Result<Self, Self::Err> {
149 Ok(match s {
150 "success" => Expectation::Success,
151 "failure" => Expectation::Failure,
152 "timeout" => Expectation::Timeout,
153 _ => return Err(anyhow!("Unrecognized expectation {:?}", s)),
154 })
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160enum BreakageStage {
161 Bootstrap,
163 Connect,
165}
166
167impl FromStr for BreakageStage {
168 type Err = anyhow::Error;
169
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 Ok(match s {
172 "bootstrap" => BreakageStage::Bootstrap,
173 "connect" => BreakageStage::Connect,
174 _ => return Err(anyhow!("unrecognized breakage stage {:?}", s)),
175 })
176 }
177}
178
179#[derive(Debug, Clone)]
181struct TcpBreakage {
182 action: rt::badtcp::ConditionalAction,
184 stage: BreakageStage,
186 delay: Option<Duration>,
188}
189
190impl TcpBreakage {
191 fn apply<R: Runtime, R2: Send + Sync + 'static>(
193 &self,
194 main_runtime: &R,
195 breakage_provider: BrokenTcpProvider<R2>,
196 ) {
197 if let Some(delay) = self.delay {
198 let rt_clone = main_runtime.clone();
199 let action = self.action.clone();
200 main_runtime
201 .spawn(async move {
202 rt_clone.sleep(delay).await;
203 breakage_provider.set_action(action);
204 })
205 .expect("can't spawn.");
206 } else {
207 breakage_provider.set_action(self.action.clone());
208 }
209 }
210}
211
212#[derive(Debug, Clone)]
214struct Job {
215 action: Action,
217
218 tcp_breakage: TcpBreakage,
220
221 dir_filter: Arc<dyn DirFilter + 'static>,
223
224 console_log: String,
226
227 config: ConfigurationSources,
229
230 expectation: Option<Expectation>,
232
233 timeout: Duration,
235}
236
237impl Job {
238 fn make_client<R: Runtime>(&self, runtime: R) -> Result<TorClient<R>> {
240 let (_arti, tcc) = tor_config::resolve::<ArtiCombinedConfig>(self.config.load()?)?;
241 let client = TorClient::with_runtime(runtime)
242 .config(tcc)
243 .dirfilter(self.dir_filter.clone())
244 .create_unbootstrapped()?;
245 Ok(client)
246 }
247
248 async fn run_job_inner<R: Runtime, R2: Send + Sync + Clone + 'static>(
250 &self,
251 broken_tcp: rt::badtcp::BrokenTcpProvider<R2>,
252 client: TorClient<R>,
253 ) -> Result<()> {
254 if self.tcp_breakage.stage == BreakageStage::Bootstrap {
255 self.tcp_breakage
256 .apply(client.runtime(), broken_tcp.clone());
257 }
258
259 client.bootstrap().await?; match &self.action {
262 Action::Bootstrap => {}
263 Action::Connect {
264 target,
265 retry_delay,
266 } => {
267 if self.tcp_breakage.stage == BreakageStage::Connect {
268 self.tcp_breakage
269 .apply(client.runtime(), broken_tcp.clone());
270 }
271
272 loop {
273 let outcome = client.connect(target).await;
274 match (outcome, retry_delay) {
275 (Ok(_stream), _) => break,
276 (Err(e), None) => return Err(e.into()),
277 (Err(_e), Some(delay)) => client.runtime().sleep(*delay).await, }
279 }
280 }
281 }
282
283 Ok(())
284 }
285
286 async fn run_job(&self) -> Result<()> {
290 let runtime = PreferredRuntime::current()?;
291 let broken_tcp = rt::badtcp::BrokenTcpProvider::new(
292 runtime.clone(),
293 rt::badtcp::ConditionalAction::default(),
294 );
295 let counting_tcp = rt::count::Counting::new_zeroed(broken_tcp.clone());
299 let runtime = runtime.with_tcp_provider(counting_tcp.clone());
300 let client = self.make_client(runtime)?;
301
302 let outcome = client
303 .clone()
304 .runtime()
305 .timeout(self.timeout, self.run_job_inner(broken_tcp.clone(), client))
306 .await;
307
308 let result = match (&self.expectation, outcome) {
309 (Some(Expectation::Timeout), Err(tor_rtcompat::TimeoutError)) => {
310 println!("Timeout occurred [as expected]");
311 Ok(())
312 }
313 (Some(Expectation::Failure), Ok(Err(e))) => {
314 println!("Got an error as [as expected]");
315 println!("Error was: {}", e);
316 Ok(())
317 }
318 (Some(Expectation::Success), Ok(Ok(()))) => {
319 println!("Operation succeeded [as expected]");
320 Ok(())
321 }
322 (Some(expectation), outcome) => Err(anyhow!(
323 "Test failed. Expected {:?} but got: {:?}",
324 expectation,
325 outcome
326 )),
327 (None, outcome) => {
328 println!("Outcome: {:?}", outcome);
330 Ok(())
331 }
332 };
333
334 println!("TCP stats: {:?}", counting_tcp.counts());
335
336 result
337 }
338}
339
340#[tokio::main]
341async fn main() -> Result<()> {
342 let job = config::parse_cmdline()?;
343
344 let targets: tracing_subscriber::filter::Targets = job.console_log.parse()?;
345 let console_layer = tracing_subscriber::fmt::Layer::default().with_filter(targets);
346 let trace_count = Arc::new(traces::TraceCount::default());
347 tracing_subscriber::registry()
348 .with(console_layer)
349 .with(traces::TraceCounter(trace_count.clone()))
350 .init();
351
352 let outcome = job.run_job().await;
353
354 println!("Total events: {}", trace_count);
355
356 outcome
357}