arti/subcommands/
proxy.rs
1use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use cfg_if::cfg_if;
7use clap::ArgMatches;
8#[allow(unused)]
9use tor_config_path::CfgPathResolver;
10use tracing::{info, warn};
11
12use arti_client::TorClientConfig;
13use tor_config::{ConfigurationSources, Listen};
14use tor_rtcompat::ToplevelRuntime;
15
16#[cfg(feature = "dns-proxy")]
17use crate::dns;
18use crate::{exit, process, reload_cfg, socks, ArtiConfig, TorClient};
19
20#[cfg(feature = "rpc")]
21use crate::rpc;
22
23#[cfg(feature = "onion-service-service")]
24use crate::onion_proxy;
25
26type PinnedFuture<T> = std::pin::Pin<Box<dyn futures::Future<Output = T>>>;
28
29pub(crate) fn run<R: ToplevelRuntime>(
31 runtime: R,
32 proxy_matches: &ArgMatches,
33 cfg_sources: ConfigurationSources,
34 config: ArtiConfig,
35 client_config: TorClientConfig,
36) -> Result<()> {
37 let socks_listen = match proxy_matches.get_one::<String>("socks-port") {
40 Some(p) => Listen::new_localhost(p.parse().expect("Invalid port specified")),
41 None => config.proxy().socks_listen.clone(),
42 };
43
44 let dns_listen = match proxy_matches.get_one::<String>("dns-port") {
45 Some(p) => Listen::new_localhost(p.parse().expect("Invalid port specified")),
46 None => config.proxy().dns_listen.clone(),
47 };
48
49 if !socks_listen.is_empty() {
50 info!(
51 "Starting Arti {} in SOCKS proxy mode on {} ...",
52 env!("CARGO_PKG_VERSION"),
53 socks_listen
54 );
55 }
56
57 if let Some(listen) = {
58 config
60 .metrics
61 .prometheus
62 .listen
63 .single_address_legacy()
64 .context("can only listen on a single address for Prometheus metrics")?
65 } {
66 cfg_if! {
67 if #[cfg(feature = "metrics")] {
68 metrics_exporter_prometheus::PrometheusBuilder::new()
69 .with_http_listener(listen)
70 .install()
71 .with_context(|| format!(
72 "set up Prometheus metrics exporter on {listen}"
73 ))?;
74 info!("Arti Prometheus metrics export scraper endpoint http://{listen}");
75 } else {
76 return Err(anyhow::anyhow!(
77 "`metrics.prometheus.listen` config set but `metrics` cargo feature compiled out in `arti` crate"
78 ));
79 }
80 }
81 }
82
83 process::use_max_file_limit(&config);
84
85 let rt_copy = runtime.clone();
86 rt_copy.block_on(run_proxy(
87 runtime,
88 socks_listen,
89 dns_listen,
90 cfg_sources,
91 config,
92 client_config,
93 ))?;
94
95 Ok(())
96}
97
98#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
104#[cfg_attr(docsrs, doc(cfg(feature = "experimental-api")))]
105async fn run_proxy<R: ToplevelRuntime>(
106 runtime: R,
107 socks_listen: Listen,
108 dns_listen: Listen,
109 config_sources: ConfigurationSources,
110 arti_config: ArtiConfig,
111 client_config: TorClientConfig,
112) -> Result<()> {
113 use arti_client::BootstrapBehavior::OnDemand;
116 use futures::FutureExt;
117
118 #[allow(unused)]
120 let fs_mistrust = client_config.fs_mistrust().clone();
121 #[allow(unused)]
122 let path_resolver: CfgPathResolver = AsRef::<CfgPathResolver>::as_ref(&client_config).clone();
123
124 let client_builder = TorClient::with_runtime(runtime.clone())
125 .config(client_config)
126 .bootstrap_behavior(OnDemand);
127 let client = client_builder.create_unbootstrapped_async().await?;
128
129 #[allow(unused_mut)]
130 let mut reconfigurable_modules: Vec<Arc<dyn reload_cfg::ReconfigurableModule>> = vec![
131 Arc::new(client.clone()),
132 Arc::new(reload_cfg::Application::new(arti_config.clone())),
133 ];
134
135 cfg_if::cfg_if! {
136 if #[cfg(feature = "onion-service-service")] {
137 let onion_services =
138 onion_proxy::ProxySet::launch_new(&client, arti_config.onion_services.clone())?;
139 let launched_onion_svc = !onion_services.is_empty();
140 reconfigurable_modules.push(Arc::new(onion_services));
141 } else {
142 let launched_onion_svc = false;
143 }
144 };
145
146 let weak_modules = reconfigurable_modules.iter().map(Arc::downgrade).collect();
152 reload_cfg::watch_for_config_changes(
153 client.runtime(),
154 config_sources,
155 &arti_config,
156 weak_modules,
157 )?;
158
159 cfg_if::cfg_if! {
160 if #[cfg(feature = "rpc")] {
161 let rpc_data = rpc::launch_rpc_mgr(
162 &runtime,
163 &arti_config.rpc,
164 &path_resolver,
165 &fs_mistrust,
166 client.clone(),
167 )
168 .await?;
169 } else {
170 let rpc_data = None;
171 }
172 }
173
174 let mut proxy: Vec<PinnedFuture<(Result<()>, &str)>> = Vec::new();
175 if !socks_listen.is_empty() {
176 let runtime = runtime.clone();
177 let client = client.isolated_client();
178 let socks_listen = socks_listen.clone();
179 proxy.push(Box::pin(async move {
180 let res = socks::run_socks_proxy(runtime, client, socks_listen, rpc_data).await;
181 (res, "SOCKS")
182 }));
183 }
184
185 #[cfg(feature = "dns-proxy")]
186 if !dns_listen.is_empty() {
187 let runtime = runtime.clone();
188 let client = client.isolated_client();
189 proxy.push(Box::pin(async move {
190 let res = dns::run_dns_resolver(runtime, client, dns_listen).await;
191 (res, "DNS")
192 }));
193 }
194
195 #[cfg(not(feature = "dns-proxy"))]
196 if !dns_listen.is_empty() {
197 warn!(
198 "Tried to specify a DNS proxy address, but Arti was built without dns-proxy support."
199 );
200 return Ok(());
201 }
202
203 if proxy.is_empty() {
204 if !launched_onion_svc {
205 warn!("No proxy port set; specify -p PORT (for `socks_port`) or -d PORT (for `dns_port`). Alternatively, use the `socks_port` or `dns_port` configuration option.");
206 return Ok(());
207 } else {
208 proxy.push(Box::pin(futures::future::pending()));
211 }
212 }
213
214 let proxy = futures::future::select_all(proxy).map(|(finished, _index, _others)| finished);
215 futures::select!(
216 r = exit::wait_for_ctrl_c().fuse()
217 => r.context("waiting for termination signal"),
218 r = proxy.fuse()
219 => r.0.context(format!("{} proxy failure", r.1)),
220 r = async {
221 client.bootstrap().await?;
222 if !socks_listen.is_empty() {
223 info!("Sufficiently bootstrapped; system SOCKS now functional.");
224 } else {
225 info!("Sufficiently bootstrapped.");
226 }
227 futures::future::pending::<Result<()>>().await
228 }.fuse()
229 => r.context("bootstrap"),
230 )?;
231
232 drop(reconfigurable_modules);
234
235 Ok(())
236}