arti/subcommands/
proxy.rs

1//! The `proxy` subcommand.
2
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use cfg_if::cfg_if;
7use clap::ArgMatches;
8#[allow(unused)]
9use tor_config_path::CfgPathResolver;
10use tracing::{info, warn};
11
12use arti_client::TorClientConfig;
13use tor_config::{ConfigurationSources, Listen};
14use tor_rtcompat::ToplevelRuntime;
15
16#[cfg(feature = "dns-proxy")]
17use crate::dns;
18use crate::{exit, process, reload_cfg, socks, ArtiConfig, TorClient};
19
20#[cfg(feature = "rpc")]
21use crate::rpc;
22
23#[cfg(feature = "onion-service-service")]
24use crate::onion_proxy;
25
26/// Shorthand for a boxed and pinned Future.
27type PinnedFuture<T> = std::pin::Pin<Box<dyn futures::Future<Output = T>>>;
28
29/// Run the `proxy` subcommand.
30pub(crate) fn run<R: ToplevelRuntime>(
31    runtime: R,
32    proxy_matches: &ArgMatches,
33    cfg_sources: ConfigurationSources,
34    config: ArtiConfig,
35    client_config: TorClientConfig,
36) -> Result<()> {
37    // Override configured SOCKS and DNS listen addresses from the command line.
38    // This implies listening on localhost ports.
39    let socks_listen = match proxy_matches.get_one::<String>("socks-port") {
40        Some(p) => Listen::new_localhost(p.parse().expect("Invalid port specified")),
41        None => config.proxy().socks_listen.clone(),
42    };
43
44    let dns_listen = match proxy_matches.get_one::<String>("dns-port") {
45        Some(p) => Listen::new_localhost(p.parse().expect("Invalid port specified")),
46        None => config.proxy().dns_listen.clone(),
47    };
48
49    if !socks_listen.is_empty() {
50        info!(
51            "Starting Arti {} in SOCKS proxy mode on {} ...",
52            env!("CARGO_PKG_VERSION"),
53            socks_listen
54        );
55    }
56
57    if let Some(listen) = {
58        // https://github.com/metrics-rs/metrics/issues/567
59        config
60            .metrics
61            .prometheus
62            .listen
63            .single_address_legacy()
64            .context("can only listen on a single address for Prometheus metrics")?
65    } {
66        cfg_if! {
67            if #[cfg(feature = "metrics")] {
68                metrics_exporter_prometheus::PrometheusBuilder::new()
69                    .with_http_listener(listen)
70                    .install()
71                    .with_context(|| format!(
72                        "set up Prometheus metrics exporter on {listen}"
73                    ))?;
74                info!("Arti Prometheus metrics export scraper endpoint http://{listen}");
75            } else {
76                return Err(anyhow::anyhow!(
77        "`metrics.prometheus.listen` config set but `metrics` cargo feature compiled out in `arti` crate"
78                ));
79            }
80        }
81    }
82
83    process::use_max_file_limit(&config);
84
85    let rt_copy = runtime.clone();
86    rt_copy.block_on(run_proxy(
87        runtime,
88        socks_listen,
89        dns_listen,
90        cfg_sources,
91        config,
92        client_config,
93    ))?;
94
95    Ok(())
96}
97
98/// Run the main loop of the proxy.
99///
100/// # Panics
101///
102/// Currently, might panic if things go badly enough wrong
103#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
104#[cfg_attr(docsrs, doc(cfg(feature = "experimental-api")))]
105async fn run_proxy<R: ToplevelRuntime>(
106    runtime: R,
107    socks_listen: Listen,
108    dns_listen: Listen,
109    config_sources: ConfigurationSources,
110    arti_config: ArtiConfig,
111    client_config: TorClientConfig,
112) -> Result<()> {
113    // Using OnDemand arranges that, while we are bootstrapping, incoming connections wait
114    // for bootstrap to complete, rather than getting errors.
115    use arti_client::BootstrapBehavior::OnDemand;
116    use futures::FutureExt;
117
118    // TODO RPC: We may instead want to provide a way to get these items out of TorClient.
119    #[allow(unused)]
120    let fs_mistrust = client_config.fs_mistrust().clone();
121    #[allow(unused)]
122    let path_resolver: CfgPathResolver = AsRef::<CfgPathResolver>::as_ref(&client_config).clone();
123
124    let client_builder = TorClient::with_runtime(runtime.clone())
125        .config(client_config)
126        .bootstrap_behavior(OnDemand);
127    let client = client_builder.create_unbootstrapped_async().await?;
128
129    #[allow(unused_mut)]
130    let mut reconfigurable_modules: Vec<Arc<dyn reload_cfg::ReconfigurableModule>> = vec![
131        Arc::new(client.clone()),
132        Arc::new(reload_cfg::Application::new(arti_config.clone())),
133    ];
134
135    cfg_if::cfg_if! {
136        if #[cfg(feature = "onion-service-service")] {
137            let onion_services =
138                onion_proxy::ProxySet::launch_new(&client, arti_config.onion_services.clone())?;
139            let launched_onion_svc = !onion_services.is_empty();
140            reconfigurable_modules.push(Arc::new(onion_services));
141        } else {
142            let launched_onion_svc = false;
143        }
144    };
145
146    // We weak references here to prevent the thread spawned by watch_for_config_changes from
147    // keeping these modules alive after this function exits.
148    //
149    // NOTE: reconfigurable_modules stores the only strong references to these modules,
150    // so we must keep the variable alive until the end of the function
151    let weak_modules = reconfigurable_modules.iter().map(Arc::downgrade).collect();
152    reload_cfg::watch_for_config_changes(
153        client.runtime(),
154        config_sources,
155        &arti_config,
156        weak_modules,
157    )?;
158
159    cfg_if::cfg_if! {
160        if #[cfg(feature = "rpc")] {
161            let rpc_data = rpc::launch_rpc_mgr(
162                &runtime,
163                &arti_config.rpc,
164                &path_resolver,
165                &fs_mistrust,
166                client.clone(),
167            )
168            .await?;
169        } else {
170            let rpc_data = None;
171        }
172    }
173
174    let mut proxy: Vec<PinnedFuture<(Result<()>, &str)>> = Vec::new();
175    if !socks_listen.is_empty() {
176        let runtime = runtime.clone();
177        let client = client.isolated_client();
178        let socks_listen = socks_listen.clone();
179        proxy.push(Box::pin(async move {
180            let res = socks::run_socks_proxy(runtime, client, socks_listen, rpc_data).await;
181            (res, "SOCKS")
182        }));
183    }
184
185    #[cfg(feature = "dns-proxy")]
186    if !dns_listen.is_empty() {
187        let runtime = runtime.clone();
188        let client = client.isolated_client();
189        proxy.push(Box::pin(async move {
190            let res = dns::run_dns_resolver(runtime, client, dns_listen).await;
191            (res, "DNS")
192        }));
193    }
194
195    #[cfg(not(feature = "dns-proxy"))]
196    if !dns_listen.is_empty() {
197        warn!(
198            "Tried to specify a DNS proxy address, but Arti was built without dns-proxy support."
199        );
200        return Ok(());
201    }
202
203    if proxy.is_empty() {
204        if !launched_onion_svc {
205            warn!("No proxy port set; specify -p PORT (for `socks_port`) or -d PORT (for `dns_port`). Alternatively, use the `socks_port` or `dns_port` configuration option.");
206            return Ok(());
207        } else {
208            // Push a dummy future to appease future::select_all,
209            // which expects a non-empty list
210            proxy.push(Box::pin(futures::future::pending()));
211        }
212    }
213
214    let proxy = futures::future::select_all(proxy).map(|(finished, _index, _others)| finished);
215    futures::select!(
216        r = exit::wait_for_ctrl_c().fuse()
217            => r.context("waiting for termination signal"),
218        r = proxy.fuse()
219            => r.0.context(format!("{} proxy failure", r.1)),
220        r = async {
221            client.bootstrap().await?;
222            if !socks_listen.is_empty() {
223                info!("Sufficiently bootstrapped; system SOCKS now functional.");
224            } else {
225                info!("Sufficiently bootstrapped.");
226            }
227            futures::future::pending::<Result<()>>().await
228        }.fuse()
229            => r.context("bootstrap"),
230    )?;
231
232    // The modules can be dropped now, because we are exiting.
233    drop(reconfigurable_modules);
234
235    Ok(())
236}