1use anyhow::Result;
4use arti_rpcserver::RpcMgr;
5use derive_builder::Builder;
6use fs_mistrust::Mistrust;
7use futures::{stream::StreamExt, task::SpawnExt, AsyncReadExt};
8use listener::{RpcListenerMap, RpcListenerMapBuilder};
9use serde::{Deserialize, Serialize};
10use session::ArtiRpcSession;
11use std::{io::Result as IoResult, sync::Arc};
12use tor_config::{define_list_builder_helper, impl_standard_builder, ConfigBuildError};
13use tor_config_path::CfgPathResolver;
14use tracing::{debug, info};
15
16use arti_client::TorClient;
17use tor_rtcompat::{general, NetStreamListener as _, Runtime};
18
19pub(crate) mod conntarget;
20pub(crate) mod listener;
21mod proxyinfo;
22mod session;
23
24pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
25
26#[derive(Debug, Clone, Builder, Eq, PartialEq)]
30#[builder(build_fn(error = "ConfigBuildError"))]
31#[builder(derive(Debug, Serialize, Deserialize))]
32#[builder_struct_attr(non_exhaustive)]
33#[non_exhaustive]
34pub struct RpcConfig {
35 #[builder(default = "false")] enable: bool,
38
39 #[builder(sub_builder)]
41 #[builder_field_attr(serde(default))]
42 listen: RpcListenerMap,
43
44 #[builder(sub_builder)]
47 #[builder_field_attr(serde(default))]
48 listen_default: ListenDefaults,
49}
50impl_standard_builder! { RpcConfig }
51
52type ListenDefaults = Vec<String>;
54
55define_list_builder_helper! {
56 pub struct ListenDefaultsBuilder {
57 values: [String],
58 }
59 built: Vec<String> = values;
60 default = listen_defaults_defaults();
61 item_build: |item| Ok(item.clone());
62}
63
64fn listen_defaults_defaults() -> Vec<String> {
66 vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
67}
68
69type IncomingConn = (
73 general::Stream,
74 general::SocketAddr,
75 Arc<listener::RpcConnInfo>,
76);
77
78async fn launch_all_listeners<R: Runtime>(
82 runtime: &R,
83 cfg: &RpcConfig,
84 resolver: &CfgPathResolver,
85 mistrust: &Mistrust,
86) -> anyhow::Result<(
87 impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
88 Vec<tor_rpc_connect::server::ListenerGuard>,
89)> {
90 let mut listeners = Vec::new();
91 let mut guards = Vec::new();
92 for (name, listener_cfg) in cfg.listen.iter() {
93 for (lis, info, guard) in listener_cfg
94 .bind(runtime, name.as_str(), resolver, mistrust)
95 .await?
96 {
97 debug!(
99 "Listening at {} for {}",
100 lis.local_addr()
101 .expect("general::listener without address?")
102 .display_lossy(),
103 info.name,
104 );
105 listeners.push((lis, info));
106 guards.push(guard);
107 }
108 }
109 if listeners.is_empty() {
110 for (idx, connpt) in cfg.listen_default.iter().enumerate() {
111 let display_index = idx + 1; let (lis, info, guard) =
113 listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
114 debug!(
115 "Listening at {} for {}",
116 lis.local_addr()
117 .expect("general::listener without address?")
118 .display_lossy(),
119 info.name,
120 );
121 listeners.push((lis, info));
122 guards.push(guard);
123 }
124 }
125 if listeners.is_empty() {
126 info!("No RPC listeners configured.");
127 }
128
129 let streams = listeners.into_iter().map(|(listener, info)| {
130 listener
131 .incoming()
132 .map(move |accept_result| match accept_result {
133 Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
134 Err(e) => Err(e),
135 })
136 });
137
138 Ok((futures::stream::select_all(streams), guards))
139}
140
141pub(crate) async fn launch_rpc_mgr<R: Runtime>(
144 runtime: &R,
145 cfg: &RpcConfig,
146 resolver: &CfgPathResolver,
147 mistrust: &Mistrust,
148 client: TorClient<R>,
149) -> Result<Option<RpcProxySupport>> {
150 if !cfg.enable {
151 return Ok(None);
152 }
153 let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
154
155 let rpc_mgr = RpcMgr::new(move |auth| ArtiRpcSession::new(auth, &client, &rpc_state))?;
156 rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
160 rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
161
162 let rt_clone = runtime.clone();
163 let rpc_mgr_clone = rpc_mgr.clone();
164
165 let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
166
167 runtime.spawn(async move {
171 let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone).await;
172 if let Err(e) = result {
173 tracing::warn!("RPC manager quit with an error: {}", e);
174 }
175 drop(guards);
176 })?;
177 Ok(Some(RpcProxySupport {
178 rpc_mgr,
179 rpc_state_sender,
180 }))
181}
182
183async fn run_rpc_listener<R: Runtime>(
185 runtime: R,
186 mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
187 rpc_mgr: Arc<RpcMgr>,
188) -> Result<()> {
189 while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
190 debug!("Received incoming RPC connection from {}", &info.name);
191
192 let connection = rpc_mgr.new_connection(info.auth.clone());
193 let (input, output) = stream.split();
194
195 runtime.spawn(async {
196 let result = connection.run(input, output).await;
197 if let Err(e) = result {
198 tracing::warn!("RPC session ended with an error: {}", e);
199 }
200 })?;
201 }
202 Ok(())
203}
204
205#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
207pub(crate) struct RpcProxySupport {
208 pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
210 pub(crate) rpc_state_sender: RpcStateSender,
212}
213
214#[cfg(test)]
215mod test {
216 #![allow(clippy::bool_assert_comparison)]
218 #![allow(clippy::clone_on_copy)]
219 #![allow(clippy::dbg_macro)]
220 #![allow(clippy::mixed_attributes_style)]
221 #![allow(clippy::print_stderr)]
222 #![allow(clippy::print_stdout)]
223 #![allow(clippy::single_char_pattern)]
224 #![allow(clippy::unwrap_used)]
225 #![allow(clippy::unchecked_duration_subtraction)]
226 #![allow(clippy::useless_vec)]
227 #![allow(clippy::needless_pass_by_value)]
228 use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
231 use tor_config_path::CfgPath;
232 use tor_rpc_connect::ParsedConnectPoint;
233
234 use super::*;
235
236 #[test]
237 fn rpc_method_names() {
238 let problems = tor_rpcbase::check_method_names([]);
241
242 for (m, err) in &problems {
243 eprintln!("Bad method name {m:?}: {err}");
244 }
245 assert!(problems.is_empty());
246 }
247
248 #[test]
249 fn parse_listener_defaults() {
250 for string in listen_defaults_defaults() {
251 let _parsed: ParsedConnectPoint = string.parse().unwrap();
252 }
253 }
254
255 #[test]
256 fn parsing_and_building() {
257 fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
258 let b: RpcConfigBuilder = toml::from_str(s)?;
259 Ok(b.build()?)
260 }
261
262 let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
263 user_defaults_builder.listener_options().enable(true);
264 user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
265 let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
266 system_defaults_builder.listener_options().enable(false);
267 system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
268
269 let defaults = build("").unwrap();
271 assert_eq!(
272 defaults,
273 RpcConfig {
274 enable: false,
275 listen: vec![
276 (
277 "user-default".to_string(),
278 user_defaults_builder.build().unwrap()
279 ),
280 (
281 "system-default".to_string(),
282 system_defaults_builder.build().unwrap()
283 ),
284 ]
285 .into_iter()
286 .collect(),
287 listen_default: listen_defaults_defaults()
288 }
289 );
290
291 let altered = build(
293 r#"
294[listen."user-default"]
295enable = false
296[listen."system-default"]
297dir = "/usr/local/etc/arti-rpc/connect.d"
298file_options = { "tmp.toml" = { "enable" = false } }
299[listen."my-connpt"]
300file = "/home/dante/.paradiso/connpt.toml"
301"#,
302 )
303 .unwrap();
304 let mut altered_user_defaults = user_defaults_builder.clone();
305 altered_user_defaults.listener_options().enable(false);
306 let mut altered_system_defaults = system_defaults_builder.clone();
307 altered_system_defaults.dir(CfgPath::new(
308 "/usr/local/etc/arti-rpc/connect.d".to_string(),
309 ));
310 let mut opt = ConnectPointOptionsBuilder::default();
311 opt.enable(false);
312 altered_system_defaults
313 .file_options()
314 .insert("tmp.toml".to_string(), opt);
315 let mut my_connpt = RpcListenerSetConfigBuilder::default();
316 my_connpt.file(CfgPath::new(
317 "/home/dante/.paradiso/connpt.toml".to_string(),
318 ));
319
320 assert_eq!(
321 altered,
322 RpcConfig {
323 enable: false,
324 listen: vec![
325 (
326 "user-default".to_string(),
327 altered_user_defaults.build().unwrap()
328 ),
329 (
330 "system-default".to_string(),
331 altered_system_defaults.build().unwrap()
332 ),
333 ("my-connpt".to_string(), my_connpt.build().unwrap()),
334 ]
335 .into_iter()
336 .collect(),
337 listen_default: listen_defaults_defaults()
338 }
339 );
340 }
341}