1use anyhow::Result;
4use arti_rpcserver::RpcMgr;
5use derive_deftly::Deftly;
6use fs_mistrust::Mistrust;
7use futures::{AsyncReadExt, stream::StreamExt};
8use session::ArtiRpcSession;
9use std::collections::BTreeMap;
10use std::{io::Result as IoResult, sync::Arc};
11use tor_config::derive::prelude::*;
12use tor_config_path::CfgPathResolver;
13use tracing::{debug, info};
14
15use arti_client::TorClient;
16use tor_rtcompat::{NetStreamListener as _, Runtime, SpawnExt, general};
17
18pub(crate) mod conntarget;
19pub(crate) mod listener;
20mod proxyinfo;
21mod session;
22mod superuser;
23
24use listener::RpcListenerSetConfig;
25pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
26
27use crate::rpc::superuser::RpcSuperuser;
28
29#[derive(Debug, Clone, Deftly, Eq, PartialEq)]
33#[derive_deftly(TorConfig)]
34#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
35#[cfg_attr(feature = "experimental-api", deftly(tor_config(vis = "pub")))]
36pub(crate) struct RpcConfig {
37 #[deftly(tor_config(default = "false"))] enable: bool,
40
41 #[deftly(tor_config(map, default = "listener::listener_map_defaults()"))]
43 listen: BTreeMap<String, RpcListenerSetConfig>,
44
45 #[deftly(tor_config(list(element(clone)), default = "listen_defaults_defaults()"))]
48 listen_default: Vec<String>,
49}
50
51fn listen_defaults_defaults() -> Vec<String> {
53 vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
54}
55
56type IncomingConn = (
60 general::Stream,
61 general::SocketAddr,
62 Arc<listener::RpcConnInfo>,
63);
64
65#[allow(clippy::cognitive_complexity)] async fn launch_all_listeners<R: Runtime>(
70 runtime: &R,
71 cfg: &RpcConfig,
72 resolver: &CfgPathResolver,
73 mistrust: &Mistrust,
74) -> anyhow::Result<(
75 impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin + use<R>,
76 Vec<tor_rpc_connect::server::ListenerGuard>,
77)> {
78 let mut listeners = Vec::new();
79 let mut guards = Vec::new();
80 for (name, listener_cfg) in cfg.listen.iter() {
81 for (lis, info, guard) in listener_cfg
82 .bind(runtime, name.as_str(), resolver, mistrust)
83 .await?
84 {
85 debug!(
87 "Listening at {} for {}",
88 lis.local_addr()
89 .expect("general::listener without address?")
90 .display_lossy(),
91 info.name,
92 );
93 listeners.push((lis, info));
94 guards.push(guard);
95 }
96 }
97 if listeners.is_empty() {
98 for (idx, connpt) in cfg.listen_default.iter().enumerate() {
99 let display_index = idx + 1; let (lis, info, guard) =
101 listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
102 debug!(
103 "Listening at {} for {}",
104 lis.local_addr()
105 .expect("general::listener without address?")
106 .display_lossy(),
107 info.name,
108 );
109 listeners.push((lis, info));
110 guards.push(guard);
111 }
112 }
113 if listeners.is_empty() {
114 info!("No RPC listeners configured.");
115 }
116
117 let streams = listeners.into_iter().map(|(listener, info)| {
118 listener
119 .incoming()
120 .map(move |accept_result| match accept_result {
121 Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
122 Err(e) => Err(e),
123 })
124 });
125
126 Ok((futures::stream::select_all(streams), guards))
127}
128
129pub(crate) async fn launch_rpc_mgr<R: Runtime>(
132 runtime: &R,
133 cfg: &RpcConfig,
134 resolver: &CfgPathResolver,
135 mistrust: &Mistrust,
136 client: TorClient<R>,
137) -> Result<Option<RpcProxySupport>> {
138 if !cfg.enable {
139 return Ok(None);
140 }
141 let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
142
143 let rpc_mgr = RpcMgr::new()?;
144 rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
148 rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
149 rpc_mgr.register_rpc_methods(RpcSuperuser::<R>::rpc_methods());
150
151 let rt_clone = runtime.clone();
152 let rpc_mgr_clone = rpc_mgr.clone();
153
154 let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
155
156 runtime.spawn(async move {
160 let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone, client, rpc_state).await;
161 if let Err(e) = result {
162 tracing::warn!("RPC manager quit with an error: {}", e);
163 }
164 drop(guards);
165 })?;
166 Ok(Some(RpcProxySupport {
167 rpc_mgr,
168 rpc_state_sender,
169 }))
170}
171
172async fn run_rpc_listener<R: Runtime>(
174 runtime: R,
175 mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
176 rpc_mgr: Arc<RpcMgr>,
177 client: TorClient<R>,
178 rpc_state: Arc<RpcVisibleArtiState>,
179) -> Result<()> {
180 while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
181 debug!("Received incoming RPC connection from {}", &info.name);
182
183 let client_clone = client.clone();
184 let rpc_state_clone = rpc_state.clone();
185 let connection = rpc_mgr.new_connection(info.auth.clone(), move |auth| {
186 ArtiRpcSession::new(auth, &client_clone, &rpc_state_clone, &info) as _
187 });
188 let (input, output) = stream.split();
189
190 runtime.spawn(async {
191 let result = connection.run(input, output).await;
192 if let Err(e) = result {
193 tracing::warn!("RPC session ended with an error: {}", e);
194 }
195 })?;
196 }
197 Ok(())
198}
199
200pub(crate) struct RpcProxySupport {
202 pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
204 pub(crate) rpc_state_sender: RpcStateSender,
206}
207
208#[cfg(test)]
209mod test {
210 #![allow(clippy::bool_assert_comparison)]
212 #![allow(clippy::clone_on_copy)]
213 #![allow(clippy::dbg_macro)]
214 #![allow(clippy::mixed_attributes_style)]
215 #![allow(clippy::print_stderr)]
216 #![allow(clippy::print_stdout)]
217 #![allow(clippy::single_char_pattern)]
218 #![allow(clippy::unwrap_used)]
219 #![allow(clippy::unchecked_time_subtraction)]
220 #![allow(clippy::useless_vec)]
221 #![allow(clippy::needless_pass_by_value)]
222 use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
225 use tor_config_path::CfgPath;
226 use tor_rpc_connect::ParsedConnectPoint;
227
228 use super::*;
229
230 #[test]
231 fn rpc_method_names() {
232 let problems = tor_rpcbase::check_method_names([]);
235
236 for (m, err) in &problems {
237 eprintln!("Bad method name {m:?}: {err}");
238 }
239 assert!(problems.is_empty());
240 }
241
242 #[test]
243 fn parse_listener_defaults() {
244 for string in listen_defaults_defaults() {
245 let _parsed: ParsedConnectPoint = string.parse().unwrap();
246 }
247 }
248
249 #[test]
250 fn parsing_and_building() {
251 fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
252 let b: RpcConfigBuilder = toml::from_str(s)?;
253 Ok(b.build()?)
254 }
255
256 let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
257 user_defaults_builder.listener_options().enable(true);
258 user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
259 let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
260 system_defaults_builder.listener_options().enable(false);
261 system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
262
263 let defaults = build("").unwrap();
265 assert_eq!(
266 defaults,
267 RpcConfig {
268 enable: false,
269 listen: vec![
270 (
271 "user-default".to_string(),
272 user_defaults_builder.build().unwrap()
273 ),
274 (
275 "system-default".to_string(),
276 system_defaults_builder.build().unwrap()
277 ),
278 ]
279 .into_iter()
280 .collect(),
281 listen_default: listen_defaults_defaults()
282 }
283 );
284
285 let altered = build(
287 r#"
288[listen."user-default"]
289enable = false
290[listen."system-default"]
291dir = "/usr/local/etc/arti-rpc/connect.d"
292file_options = { "tmp.toml" = { "enable" = false } }
293[listen."my-connpt"]
294file = "/home/dante/.paradiso/connpt.toml"
295"#,
296 )
297 .unwrap();
298 let mut altered_user_defaults = user_defaults_builder.clone();
299 altered_user_defaults.listener_options().enable(false);
300 let mut altered_system_defaults = system_defaults_builder.clone();
301 altered_system_defaults.dir(CfgPath::new(
302 "/usr/local/etc/arti-rpc/connect.d".to_string(),
303 ));
304 let mut opt = ConnectPointOptionsBuilder::default();
305 opt.enable(false);
306 altered_system_defaults
307 .file_options()
308 .insert("tmp.toml".to_string(), opt);
309 let mut my_connpt = RpcListenerSetConfigBuilder::default();
310 my_connpt.file(CfgPath::new(
311 "/home/dante/.paradiso/connpt.toml".to_string(),
312 ));
313
314 assert_eq!(
315 altered,
316 RpcConfig {
317 enable: false,
318 listen: vec![
319 (
320 "user-default".to_string(),
321 altered_user_defaults.build().unwrap()
322 ),
323 (
324 "system-default".to_string(),
325 altered_system_defaults.build().unwrap()
326 ),
327 ("my-connpt".to_string(), my_connpt.build().unwrap()),
328 ]
329 .into_iter()
330 .collect(),
331 listen_default: listen_defaults_defaults()
332 }
333 );
334 }
335}