arti/
rpc.rs

1//! Experimental RPC support.
2
3use anyhow::Result;
4use arti_rpcserver::RpcMgr;
5use derive_builder::Builder;
6use fs_mistrust::Mistrust;
7use futures::{stream::StreamExt, task::SpawnExt, AsyncReadExt};
8use listener::{RpcListenerMap, RpcListenerMapBuilder};
9use serde::{Deserialize, Serialize};
10use session::ArtiRpcSession;
11use std::{io::Result as IoResult, sync::Arc};
12use tor_config::{define_list_builder_helper, impl_standard_builder, ConfigBuildError};
13use tor_config_path::CfgPathResolver;
14use tracing::{debug, info};
15
16use arti_client::TorClient;
17use tor_rtcompat::{general, NetStreamListener as _, Runtime};
18
19pub(crate) mod conntarget;
20pub(crate) mod listener;
21mod proxyinfo;
22mod session;
23
24pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
25
26/// Configuration for Arti's RPC subsystem.
27///
28/// You cannot change this section on a running Arti client.
29#[derive(Debug, Clone, Builder, Eq, PartialEq)]
30#[builder(build_fn(error = "ConfigBuildError"))]
31#[builder(derive(Debug, Serialize, Deserialize))]
32#[builder_struct_attr(non_exhaustive)]
33#[non_exhaustive]
34pub struct RpcConfig {
35    /// If true, then the RPC subsystem is enabled and will listen for connections.
36    #[builder(default = "false")] // TODO RPC make this true once we are stable.
37    enable: bool,
38
39    /// A set of named locations in which to find connect files.
40    #[builder(sub_builder)]
41    #[builder_field_attr(serde(default))]
42    listen: RpcListenerMap,
43
44    /// A list of default connect points to bind
45    /// if no enabled connect points are found under `listen`.
46    #[builder(sub_builder)]
47    #[builder_field_attr(serde(default))]
48    listen_default: ListenDefaults,
49}
50impl_standard_builder! { RpcConfig }
51
52/// Type alias to enable sub_builder to work.
53type ListenDefaults = Vec<String>;
54
55define_list_builder_helper! {
56    pub struct ListenDefaultsBuilder {
57        values: [String],
58    }
59    built: Vec<String> = values;
60    default = listen_defaults_defaults();
61    item_build: |item| Ok(item.clone());
62}
63
64/// Return default values for `RpcConfig.listen_default`
65fn listen_defaults_defaults() -> Vec<String> {
66    vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
67}
68
69/// Information about an incoming connection.
70///
71/// Yielded in a stream from our RPC listeners.
72type IncomingConn = (
73    general::Stream,
74    general::SocketAddr,
75    Arc<listener::RpcConnInfo>,
76);
77
78/// Bind to all configured RPC listeners in `cfg`.
79///
80/// On success, return a stream of `IncomingConn`.
81async fn launch_all_listeners<R: Runtime>(
82    runtime: &R,
83    cfg: &RpcConfig,
84    resolver: &CfgPathResolver,
85    mistrust: &Mistrust,
86) -> anyhow::Result<(
87    impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
88    Vec<tor_rpc_connect::server::ListenerGuard>,
89)> {
90    let mut listeners = Vec::new();
91    let mut guards = Vec::new();
92    for (name, listener_cfg) in cfg.listen.iter() {
93        for (lis, info, guard) in listener_cfg
94            .bind(runtime, name.as_str(), resolver, mistrust)
95            .await?
96        {
97            // (Note that `bind` only returns enabled listeners, so we don't need to check here.
98            debug!(
99                "Listening at {} for {}",
100                lis.local_addr()
101                    .expect("general::listener without address?")
102                    .display_lossy(),
103                info.name,
104            );
105            listeners.push((lis, info));
106            guards.push(guard);
107        }
108    }
109    if listeners.is_empty() {
110        for (idx, connpt) in cfg.listen_default.iter().enumerate() {
111            let display_index = idx + 1; // One-indexed values are more human-readable.
112            let (lis, info, guard) =
113                listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
114            debug!(
115                "Listening at {} for {}",
116                lis.local_addr()
117                    .expect("general::listener without address?")
118                    .display_lossy(),
119                info.name,
120            );
121            listeners.push((lis, info));
122            guards.push(guard);
123        }
124    }
125    if listeners.is_empty() {
126        info!("No RPC listeners configured.");
127    }
128
129    let streams = listeners.into_iter().map(|(listener, info)| {
130        listener
131            .incoming()
132            .map(move |accept_result| match accept_result {
133                Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
134                Err(e) => Err(e),
135            })
136    });
137
138    Ok((futures::stream::select_all(streams), guards))
139}
140
141/// Create an RPC manager, bind to connect points, and open a listener task to accept incoming
142/// RPC connections.
143pub(crate) async fn launch_rpc_mgr<R: Runtime>(
144    runtime: &R,
145    cfg: &RpcConfig,
146    resolver: &CfgPathResolver,
147    mistrust: &Mistrust,
148    client: TorClient<R>,
149) -> Result<Option<RpcProxySupport>> {
150    if !cfg.enable {
151        return Ok(None);
152    }
153    let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
154
155    let rpc_mgr = RpcMgr::new(move |auth| ArtiRpcSession::new(auth, &client, &rpc_state))?;
156    // Register methods. Needed since TorClient is generic.
157    //
158    // TODO: If we accumulate a large number of generics like this, we should do this elsewhere.
159    rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
160    rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
161
162    let rt_clone = runtime.clone();
163    let rpc_mgr_clone = rpc_mgr.clone();
164
165    let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
166
167    // TODO: Using spawn in this way makes it hard to report whether we
168    // succeeded or not. This is something we should fix when we refactor
169    // our service-launching code.
170    runtime.spawn(async move {
171        let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone).await;
172        if let Err(e) = result {
173            tracing::warn!("RPC manager quit with an error: {}", e);
174        }
175        drop(guards);
176    })?;
177    Ok(Some(RpcProxySupport {
178        rpc_mgr,
179        rpc_state_sender,
180    }))
181}
182
183/// Backend function to implement an RPC listener: runs in a loop.
184async fn run_rpc_listener<R: Runtime>(
185    runtime: R,
186    mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
187    rpc_mgr: Arc<RpcMgr>,
188) -> Result<()> {
189    while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
190        debug!("Received incoming RPC connection from {}", &info.name);
191
192        let connection = rpc_mgr.new_connection(info.auth.clone());
193        let (input, output) = stream.split();
194
195        runtime.spawn(async {
196            let result = connection.run(input, output).await;
197            if let Err(e) = result {
198                tracing::warn!("RPC session ended with an error: {}", e);
199            }
200        })?;
201    }
202    Ok(())
203}
204
205/// Information passed to a SOCKS proxy or similar stream provider when running with RPC support.
206#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
207pub(crate) struct RpcProxySupport {
208    /// An RPC manager to use for looking up objects as possible stream targets.
209    pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
210    /// An RPCStateSender to use for registering the list of known proxy ports.
211    pub(crate) rpc_state_sender: RpcStateSender,
212}
213
214#[cfg(test)]
215mod test {
216    // @@ begin test lint list maintained by maint/add_warning @@
217    #![allow(clippy::bool_assert_comparison)]
218    #![allow(clippy::clone_on_copy)]
219    #![allow(clippy::dbg_macro)]
220    #![allow(clippy::mixed_attributes_style)]
221    #![allow(clippy::print_stderr)]
222    #![allow(clippy::print_stdout)]
223    #![allow(clippy::single_char_pattern)]
224    #![allow(clippy::unwrap_used)]
225    #![allow(clippy::unchecked_duration_subtraction)]
226    #![allow(clippy::useless_vec)]
227    #![allow(clippy::needless_pass_by_value)]
228    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
229
230    use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
231    use tor_config_path::CfgPath;
232    use tor_rpc_connect::ParsedConnectPoint;
233
234    use super::*;
235
236    #[test]
237    fn rpc_method_names() {
238        // We run this from a nice high level module, to ensure that as many method names as
239        // possible will be in-scope.
240        let problems = tor_rpcbase::check_method_names([]);
241
242        for (m, err) in &problems {
243            eprintln!("Bad method name {m:?}: {err}");
244        }
245        assert!(problems.is_empty());
246    }
247
248    #[test]
249    fn parse_listener_defaults() {
250        for string in listen_defaults_defaults() {
251            let _parsed: ParsedConnectPoint = string.parse().unwrap();
252        }
253    }
254
255    #[test]
256    fn parsing_and_building() {
257        fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
258            let b: RpcConfigBuilder = toml::from_str(s)?;
259            Ok(b.build()?)
260        }
261
262        let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
263        user_defaults_builder.listener_options().enable(true);
264        user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
265        let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
266        system_defaults_builder.listener_options().enable(false);
267        system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
268
269        // Make sure that an empty configuration gets us the defaults.
270        let defaults = build("").unwrap();
271        assert_eq!(
272            defaults,
273            RpcConfig {
274                enable: false,
275                listen: vec![
276                    (
277                        "user-default".to_string(),
278                        user_defaults_builder.build().unwrap()
279                    ),
280                    (
281                        "system-default".to_string(),
282                        system_defaults_builder.build().unwrap()
283                    ),
284                ]
285                .into_iter()
286                .collect(),
287                listen_default: listen_defaults_defaults()
288            }
289        );
290
291        // Make sure that overriding specific options works as expected.
292        let altered = build(
293            r#"
294[listen."user-default"]
295enable = false
296[listen."system-default"]
297dir = "/usr/local/etc/arti-rpc/connect.d"
298file_options = { "tmp.toml" = { "enable" = false } }
299[listen."my-connpt"]
300file = "/home/dante/.paradiso/connpt.toml"
301"#,
302        )
303        .unwrap();
304        let mut altered_user_defaults = user_defaults_builder.clone();
305        altered_user_defaults.listener_options().enable(false);
306        let mut altered_system_defaults = system_defaults_builder.clone();
307        altered_system_defaults.dir(CfgPath::new(
308            "/usr/local/etc/arti-rpc/connect.d".to_string(),
309        ));
310        let mut opt = ConnectPointOptionsBuilder::default();
311        opt.enable(false);
312        altered_system_defaults
313            .file_options()
314            .insert("tmp.toml".to_string(), opt);
315        let mut my_connpt = RpcListenerSetConfigBuilder::default();
316        my_connpt.file(CfgPath::new(
317            "/home/dante/.paradiso/connpt.toml".to_string(),
318        ));
319
320        assert_eq!(
321            altered,
322            RpcConfig {
323                enable: false,
324                listen: vec![
325                    (
326                        "user-default".to_string(),
327                        altered_user_defaults.build().unwrap()
328                    ),
329                    (
330                        "system-default".to_string(),
331                        altered_system_defaults.build().unwrap()
332                    ),
333                    ("my-connpt".to_string(), my_connpt.build().unwrap()),
334                ]
335                .into_iter()
336                .collect(),
337                listen_default: listen_defaults_defaults()
338            }
339        );
340    }
341}