arti_testing/
main.rs

1//! Tool for running an Arti client with unusual behavior or limitations.
2//!
3//! Example use:
4//!
5//! ```sh
6//! $ cat ~/.arti_testing.toml
7//! [storage]
8//!
9//! cache_dir = "${USER_HOME}/.arti_testing/cache"
10//! state_dir = "${USER_HOME}/.arti_testing/state"
11//!
12//! $ ./target/debug/arti-testing bootstrap --config ~/.arti-testing.toml \
13//!           --timeout 120 --expect=success
14//! [...lots of logs]
15//! Operation succeeded [as expected]
16//! TCP stats: TcpCount { n_connect_attempt: 4, n_connect_ok: 2, n_accept: 0, n_bytes_send: 461102, n_bytes_recv: 3502811 }
17//! Total events: Trace: 6943, Debug: 17, Info: 13, Warn: 0, Error: 0
18//!
19//! $ faketime '1 year ago' ./target/debug/arti-testing connect \
20//!           --config ~/.arti-testing.toml
21//!           --target www.torproject.org:80
22//!           --timeout 60
23//!           --expect=timeout
24//! [...lots of logs...]
25//! Timeout occurred [as expected]
26//! TCP stats: TcpCount { n_connect_attempt: 3, n_connect_ok: 3, n_accept: 0, n_bytes_send: 10917, n_bytes_recv: 16704 }
27//! Total events: Trace: 77, Debug: 21, Info: 10, Warn: 2, Error: 0
28//! ```
29//!
30//! # TODO
31//!
32//! - More ways to break
33//!   - make TCP connections fail only sporadically
34//!   - make TLS fail
35//!      - With wrong cert
36//!      - Mysteriously
37//!      - With complete junk
38//!      - TLS succeeds, then sends nonsense
39//!      - Authenticating with wrong ID.
40//!   - Munge directory before using it
41//!      - May require some dirmgr plug-in. :p
42//!      - May require
43//!
44//! - More things to look at
45//!   - do something on the connection
46//!   - look at bootstrapping status and events
47//!   - Make streams repeatedly on different circuits with some delay.
48//! - Make sure we can replicate all/most test situations from arti#329
49//! - Actually implement those tests.
50
51// @@ begin lint list maintained by maint/add_warning @@
52#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
53#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
54#![warn(missing_docs)]
55#![warn(noop_method_call)]
56#![warn(unreachable_pub)]
57#![warn(clippy::all)]
58#![deny(clippy::await_holding_lock)]
59#![deny(clippy::cargo_common_metadata)]
60#![deny(clippy::cast_lossless)]
61#![deny(clippy::checked_conversions)]
62#![warn(clippy::cognitive_complexity)]
63#![deny(clippy::debug_assert_with_mut_call)]
64#![deny(clippy::exhaustive_enums)]
65#![deny(clippy::exhaustive_structs)]
66#![deny(clippy::expl_impl_clone_on_copy)]
67#![deny(clippy::fallible_impl_from)]
68#![deny(clippy::implicit_clone)]
69#![deny(clippy::large_stack_arrays)]
70#![warn(clippy::manual_ok_or)]
71#![deny(clippy::missing_docs_in_private_items)]
72#![warn(clippy::needless_borrow)]
73#![warn(clippy::needless_pass_by_value)]
74#![warn(clippy::option_option)]
75#![deny(clippy::print_stderr)]
76#![deny(clippy::print_stdout)]
77#![warn(clippy::rc_buffer)]
78#![deny(clippy::ref_option_ref)]
79#![warn(clippy::semicolon_if_nothing_returned)]
80#![warn(clippy::trait_duplication_in_bounds)]
81#![deny(clippy::unchecked_duration_subtraction)]
82#![deny(clippy::unnecessary_wraps)]
83#![warn(clippy::unseparated_literal_suffix)]
84#![deny(clippy::unwrap_used)]
85#![deny(clippy::mod_module_files)]
86#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
87#![allow(clippy::uninlined_format_args)]
88#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
89#![allow(clippy::result_large_err)] // temporary workaround for arti#587
90#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
91#![allow(clippy::needless_lifetimes)] // See arti#1765
92//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
93
94#![allow(clippy::print_stderr)] // Allowed in this crate only.
95#![allow(clippy::print_stdout)] // Allowed in this crate only.
96
97mod config;
98mod dirfilter;
99mod rt;
100mod traces;
101
102use arti::ArtiCombinedConfig;
103use arti_client::TorClient;
104use futures::task::SpawnExt;
105use rt::badtcp::BrokenTcpProvider;
106use tor_config::ConfigurationSources;
107use tor_dirmgr::filter::DirFilter;
108use tor_rtcompat::{PreferredRuntime, Runtime, RuntimeSubstExt as _, SleepProviderExt};
109
110use anyhow::{anyhow, Result};
111use tracing_subscriber::prelude::*;
112//use std::path::PathBuf;
113use std::str::FromStr;
114use std::sync::Arc;
115use std::time::Duration;
116
117/// A possible action for the tool to try to take
118#[derive(Debug, Clone)]
119enum Action {
120    /// Bootstrap the client and exit.
121    Bootstrap,
122    /// Bootstrap the client, then try to connect to a target
123    ///
124    /// Exit when successful.
125    Connect {
126        /// The target address.
127        target: String,
128        /// How long to wait between attempts?  If None, exit on the first
129        /// failure.
130        retry_delay: Option<Duration>,
131    },
132}
133
134/// What we expect to happen when we run a given job.
135#[derive(Debug, Clone)]
136enum Expectation {
137    /// The operation should complete successfully.
138    Success,
139    /// The operation should terminate with an error.
140    Failure,
141    /// The operation should time out
142    Timeout,
143}
144
145impl FromStr for Expectation {
146    type Err = anyhow::Error;
147
148    fn from_str(s: &str) -> Result<Self, Self::Err> {
149        Ok(match s {
150            "success" => Expectation::Success,
151            "failure" => Expectation::Failure,
152            "timeout" => Expectation::Timeout,
153            _ => return Err(anyhow!("Unrecognized expectation {:?}", s)),
154        })
155    }
156}
157
158/// At what stage to install a kind of breakage
159#[derive(Debug, Clone, PartialEq, Eq)]
160enum BreakageStage {
161    /// Create breakage while bootstrapping
162    Bootstrap,
163    /// Create breakage while connecting
164    Connect,
165}
166
167impl FromStr for BreakageStage {
168    type Err = anyhow::Error;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        Ok(match s {
172            "bootstrap" => BreakageStage::Bootstrap,
173            "connect" => BreakageStage::Connect,
174            _ => return Err(anyhow!("unrecognized breakage stage {:?}", s)),
175        })
176    }
177}
178
179/// Describes how (if at all) to break TCP connection attempts
180#[derive(Debug, Clone)]
181struct TcpBreakage {
182    /// What kind of breakage to install (if any)
183    action: rt::badtcp::ConditionalAction,
184    /// What stage to apply the breakage at.
185    stage: BreakageStage,
186    /// Delay (if any) after the start of the stage to apply breakage
187    delay: Option<Duration>,
188}
189
190impl TcpBreakage {
191    /// Apply the configured breakage to breakage_provider.  Use `main_runtime` to sleep if necessary.
192    fn apply<R: Runtime, R2: Send + Sync + 'static>(
193        &self,
194        main_runtime: &R,
195        breakage_provider: BrokenTcpProvider<R2>,
196    ) {
197        if let Some(delay) = self.delay {
198            let rt_clone = main_runtime.clone();
199            let action = self.action.clone();
200            main_runtime
201                .spawn(async move {
202                    rt_clone.sleep(delay).await;
203                    breakage_provider.set_action(action);
204                })
205                .expect("can't spawn.");
206        } else {
207            breakage_provider.set_action(self.action.clone());
208        }
209    }
210}
211
212/// Descriptions of an action to take, and what to expect as an outcome.
213#[derive(Debug, Clone)]
214struct Job {
215    /// The action that the client should try to take
216    action: Action,
217
218    /// Describes how (if at all) to break the TCP connections.
219    tcp_breakage: TcpBreakage,
220
221    /// Describes how (if at all) to mess with directories.
222    dir_filter: Arc<dyn DirFilter + 'static>,
223
224    /// The tracing configuration for our console log.
225    console_log: String,
226
227    /// Where we're getting our configuration from.
228    config: ConfigurationSources,
229
230    /// What we expect to happen.
231    expectation: Option<Expectation>,
232
233    /// How long to wait for the action to succeed or fail.
234    timeout: Duration,
235}
236
237impl Job {
238    /// Make a new unbootstrapped client for this job.
239    fn make_client<R: Runtime>(&self, runtime: R) -> Result<TorClient<R>> {
240        let (_arti, tcc) = tor_config::resolve::<ArtiCombinedConfig>(self.config.load()?)?;
241        let client = TorClient::with_runtime(runtime)
242            .config(tcc)
243            .dirfilter(self.dir_filter.clone())
244            .create_unbootstrapped()?;
245        Ok(client)
246    }
247
248    /// Run the body of a job.
249    async fn run_job_inner<R: Runtime, R2: Send + Sync + Clone + 'static>(
250        &self,
251        broken_tcp: rt::badtcp::BrokenTcpProvider<R2>,
252        client: TorClient<R>,
253    ) -> Result<()> {
254        if self.tcp_breakage.stage == BreakageStage::Bootstrap {
255            self.tcp_breakage
256                .apply(client.runtime(), broken_tcp.clone());
257        }
258
259        client.bootstrap().await?; // all jobs currently start with a bootstrap.
260
261        match &self.action {
262            Action::Bootstrap => {}
263            Action::Connect {
264                target,
265                retry_delay,
266            } => {
267                if self.tcp_breakage.stage == BreakageStage::Connect {
268                    self.tcp_breakage
269                        .apply(client.runtime(), broken_tcp.clone());
270                }
271
272                loop {
273                    let outcome = client.connect(target).await;
274                    match (outcome, retry_delay) {
275                        (Ok(_stream), _) => break,
276                        (Err(e), None) => return Err(e.into()),
277                        (Err(_e), Some(delay)) => client.runtime().sleep(*delay).await, // TODO log error
278                    }
279                }
280            }
281        }
282
283        Ok(())
284    }
285
286    /// Run a provided job.
287    ///
288    /// TUDO Eventually this should come up with some kind of result that's meaningful.
289    async fn run_job(&self) -> Result<()> {
290        let runtime = PreferredRuntime::current()?;
291        let broken_tcp = rt::badtcp::BrokenTcpProvider::new(
292            runtime.clone(),
293            rt::badtcp::ConditionalAction::default(),
294        );
295        // We put the counting TCP provider outside the one that breaks: we want
296        // to know how many attempts to connect there are, and BrokenTcpProvider
297        // eats the attempts that it fails without passing them down the stack.
298        let counting_tcp = rt::count::Counting::new_zeroed(broken_tcp.clone());
299        let runtime = runtime.with_tcp_provider(counting_tcp.clone());
300        let client = self.make_client(runtime)?;
301
302        let outcome = client
303            .clone()
304            .runtime()
305            .timeout(self.timeout, self.run_job_inner(broken_tcp.clone(), client))
306            .await;
307
308        let result = match (&self.expectation, outcome) {
309            (Some(Expectation::Timeout), Err(tor_rtcompat::TimeoutError)) => {
310                println!("Timeout occurred [as expected]");
311                Ok(())
312            }
313            (Some(Expectation::Failure), Ok(Err(e))) => {
314                println!("Got an error as [as expected]");
315                println!("Error was: {}", e);
316                Ok(())
317            }
318            (Some(Expectation::Success), Ok(Ok(()))) => {
319                println!("Operation succeeded [as expected]");
320                Ok(())
321            }
322            (Some(expectation), outcome) => Err(anyhow!(
323                "Test failed. Expected {:?} but got: {:?}",
324                expectation,
325                outcome
326            )),
327            (None, outcome) => {
328                // no expectation.
329                println!("Outcome: {:?}", outcome);
330                Ok(())
331            }
332        };
333
334        println!("TCP stats: {:?}", counting_tcp.counts());
335
336        result
337    }
338}
339
340#[tokio::main]
341async fn main() -> Result<()> {
342    let job = config::parse_cmdline()?;
343
344    let targets: tracing_subscriber::filter::Targets = job.console_log.parse()?;
345    let console_layer = tracing_subscriber::fmt::Layer::default().with_filter(targets);
346    let trace_count = Arc::new(traces::TraceCount::default());
347    tracing_subscriber::registry()
348        .with(console_layer)
349        .with(traces::TraceCounter(trace_count.clone()))
350        .init();
351
352    let outcome = job.run_job().await;
353
354    println!("Total events: {}", trace_count);
355
356    outcome
357}