1use anyhow::Result;
4use arti_rpcserver::RpcMgr;
5use derive_builder::Builder;
6use fs_mistrust::Mistrust;
7use futures::{stream::StreamExt, task::SpawnExt, AsyncReadExt};
8use listener::{RpcListenerMap, RpcListenerMapBuilder};
9use serde::{Deserialize, Serialize};
10use session::ArtiRpcSession;
11use std::{io::Result as IoResult, sync::Arc};
12use tor_config::{define_list_builder_helper, impl_standard_builder, ConfigBuildError};
13use tor_config_path::CfgPathResolver;
14use tracing::{debug, info};
15
16use arti_client::TorClient;
17use tor_rtcompat::{general, NetStreamListener as _, Runtime};
18
19pub(crate) mod conntarget;
20pub(crate) mod listener;
21mod proxyinfo;
22mod session;
23
24pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
25
26#[derive(Debug, Clone, Builder, Eq, PartialEq)]
30#[builder(build_fn(error = "ConfigBuildError"))]
31#[builder(derive(Debug, Serialize, Deserialize))]
32#[builder_struct_attr(non_exhaustive)]
33#[non_exhaustive]
34pub struct RpcConfig {
35 #[builder(default = "false")] enable: bool,
38
39 #[builder(sub_builder)]
41 #[builder_field_attr(serde(default))]
42 listen: RpcListenerMap,
43
44 #[builder(sub_builder)]
47 #[builder_field_attr(serde(default))]
48 listen_default: ListenDefaults,
49}
50impl_standard_builder! { RpcConfig }
51
52type ListenDefaults = Vec<String>;
54
55define_list_builder_helper! {
56 pub struct ListenDefaultsBuilder {
57 values: [String],
58 }
59 built: Vec<String> = values;
60 default = listen_defaults_defaults();
61 item_build: |item| Ok(item.clone());
62}
63
64fn listen_defaults_defaults() -> Vec<String> {
66 vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
67}
68
69type IncomingConn = (
73 general::Stream,
74 general::SocketAddr,
75 Arc<listener::RpcConnInfo>,
76);
77
78#[allow(clippy::cognitive_complexity)] async fn launch_all_listeners<R: Runtime>(
83 runtime: &R,
84 cfg: &RpcConfig,
85 resolver: &CfgPathResolver,
86 mistrust: &Mistrust,
87) -> anyhow::Result<(
88 impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
89 Vec<tor_rpc_connect::server::ListenerGuard>,
90)> {
91 let mut listeners = Vec::new();
92 let mut guards = Vec::new();
93 for (name, listener_cfg) in cfg.listen.iter() {
94 for (lis, info, guard) in listener_cfg
95 .bind(runtime, name.as_str(), resolver, mistrust)
96 .await?
97 {
98 debug!(
100 "Listening at {} for {}",
101 lis.local_addr()
102 .expect("general::listener without address?")
103 .display_lossy(),
104 info.name,
105 );
106 listeners.push((lis, info));
107 guards.push(guard);
108 }
109 }
110 if listeners.is_empty() {
111 for (idx, connpt) in cfg.listen_default.iter().enumerate() {
112 let display_index = idx + 1; let (lis, info, guard) =
114 listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
115 debug!(
116 "Listening at {} for {}",
117 lis.local_addr()
118 .expect("general::listener without address?")
119 .display_lossy(),
120 info.name,
121 );
122 listeners.push((lis, info));
123 guards.push(guard);
124 }
125 }
126 if listeners.is_empty() {
127 info!("No RPC listeners configured.");
128 }
129
130 let streams = listeners.into_iter().map(|(listener, info)| {
131 listener
132 .incoming()
133 .map(move |accept_result| match accept_result {
134 Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
135 Err(e) => Err(e),
136 })
137 });
138
139 Ok((futures::stream::select_all(streams), guards))
140}
141
142pub(crate) async fn launch_rpc_mgr<R: Runtime>(
145 runtime: &R,
146 cfg: &RpcConfig,
147 resolver: &CfgPathResolver,
148 mistrust: &Mistrust,
149 client: TorClient<R>,
150) -> Result<Option<RpcProxySupport>> {
151 if !cfg.enable {
152 return Ok(None);
153 }
154 let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
155
156 let rpc_mgr = RpcMgr::new(move |auth| ArtiRpcSession::new(auth, &client, &rpc_state))?;
157 rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
161 rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
162
163 let rt_clone = runtime.clone();
164 let rpc_mgr_clone = rpc_mgr.clone();
165
166 let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
167
168 runtime.spawn(async move {
172 let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone).await;
173 if let Err(e) = result {
174 tracing::warn!("RPC manager quit with an error: {}", e);
175 }
176 drop(guards);
177 })?;
178 Ok(Some(RpcProxySupport {
179 rpc_mgr,
180 rpc_state_sender,
181 }))
182}
183
184async fn run_rpc_listener<R: Runtime>(
186 runtime: R,
187 mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
188 rpc_mgr: Arc<RpcMgr>,
189) -> Result<()> {
190 while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
191 debug!("Received incoming RPC connection from {}", &info.name);
192
193 let connection = rpc_mgr.new_connection(info.auth.clone());
194 let (input, output) = stream.split();
195
196 runtime.spawn(async {
197 let result = connection.run(input, output).await;
198 if let Err(e) = result {
199 tracing::warn!("RPC session ended with an error: {}", e);
200 }
201 })?;
202 }
203 Ok(())
204}
205
206#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
208pub(crate) struct RpcProxySupport {
209 pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
211 pub(crate) rpc_state_sender: RpcStateSender,
213}
214
215#[cfg(test)]
216mod test {
217 #![allow(clippy::bool_assert_comparison)]
219 #![allow(clippy::clone_on_copy)]
220 #![allow(clippy::dbg_macro)]
221 #![allow(clippy::mixed_attributes_style)]
222 #![allow(clippy::print_stderr)]
223 #![allow(clippy::print_stdout)]
224 #![allow(clippy::single_char_pattern)]
225 #![allow(clippy::unwrap_used)]
226 #![allow(clippy::unchecked_duration_subtraction)]
227 #![allow(clippy::useless_vec)]
228 #![allow(clippy::needless_pass_by_value)]
229 use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
232 use tor_config_path::CfgPath;
233 use tor_rpc_connect::ParsedConnectPoint;
234
235 use super::*;
236
237 #[test]
238 fn rpc_method_names() {
239 let problems = tor_rpcbase::check_method_names([]);
242
243 for (m, err) in &problems {
244 eprintln!("Bad method name {m:?}: {err}");
245 }
246 assert!(problems.is_empty());
247 }
248
249 #[test]
250 fn parse_listener_defaults() {
251 for string in listen_defaults_defaults() {
252 let _parsed: ParsedConnectPoint = string.parse().unwrap();
253 }
254 }
255
256 #[test]
257 fn parsing_and_building() {
258 fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
259 let b: RpcConfigBuilder = toml::from_str(s)?;
260 Ok(b.build()?)
261 }
262
263 let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
264 user_defaults_builder.listener_options().enable(true);
265 user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
266 let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
267 system_defaults_builder.listener_options().enable(false);
268 system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
269
270 let defaults = build("").unwrap();
272 assert_eq!(
273 defaults,
274 RpcConfig {
275 enable: false,
276 listen: vec![
277 (
278 "user-default".to_string(),
279 user_defaults_builder.build().unwrap()
280 ),
281 (
282 "system-default".to_string(),
283 system_defaults_builder.build().unwrap()
284 ),
285 ]
286 .into_iter()
287 .collect(),
288 listen_default: listen_defaults_defaults()
289 }
290 );
291
292 let altered = build(
294 r#"
295[listen."user-default"]
296enable = false
297[listen."system-default"]
298dir = "/usr/local/etc/arti-rpc/connect.d"
299file_options = { "tmp.toml" = { "enable" = false } }
300[listen."my-connpt"]
301file = "/home/dante/.paradiso/connpt.toml"
302"#,
303 )
304 .unwrap();
305 let mut altered_user_defaults = user_defaults_builder.clone();
306 altered_user_defaults.listener_options().enable(false);
307 let mut altered_system_defaults = system_defaults_builder.clone();
308 altered_system_defaults.dir(CfgPath::new(
309 "/usr/local/etc/arti-rpc/connect.d".to_string(),
310 ));
311 let mut opt = ConnectPointOptionsBuilder::default();
312 opt.enable(false);
313 altered_system_defaults
314 .file_options()
315 .insert("tmp.toml".to_string(), opt);
316 let mut my_connpt = RpcListenerSetConfigBuilder::default();
317 my_connpt.file(CfgPath::new(
318 "/home/dante/.paradiso/connpt.toml".to_string(),
319 ));
320
321 assert_eq!(
322 altered,
323 RpcConfig {
324 enable: false,
325 listen: vec![
326 (
327 "user-default".to_string(),
328 altered_user_defaults.build().unwrap()
329 ),
330 (
331 "system-default".to_string(),
332 altered_system_defaults.build().unwrap()
333 ),
334 ("my-connpt".to_string(), my_connpt.build().unwrap()),
335 ]
336 .into_iter()
337 .collect(),
338 listen_default: listen_defaults_defaults()
339 }
340 );
341 }
342}