Skip to main content

arti/
rpc.rs

1//! Experimental RPC support.
2
3use anyhow::Result;
4use arti_rpcserver::RpcMgr;
5use derive_deftly::Deftly;
6use fs_mistrust::Mistrust;
7use futures::{AsyncReadExt, stream::StreamExt};
8use session::ArtiRpcSession;
9use std::collections::BTreeMap;
10use std::{io::Result as IoResult, sync::Arc};
11use tor_config::derive::prelude::*;
12use tor_config_path::CfgPathResolver;
13use tracing::{debug, info};
14
15use arti_client::TorClient;
16use tor_rtcompat::{NetStreamListener as _, Runtime, SpawnExt, general};
17
18pub(crate) mod conntarget;
19pub(crate) mod listener;
20mod proxyinfo;
21mod session;
22mod superuser;
23
24use listener::RpcListenerSetConfig;
25pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
26
27use crate::rpc::superuser::RpcSuperuser;
28
29/// Configuration for Arti's RPC subsystem.
30///
31/// You cannot change this section on a running Arti client.
32#[derive(Debug, Clone, Deftly, Eq, PartialEq)]
33#[derive_deftly(TorConfig)]
34#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
35#[cfg_attr(feature = "experimental-api", deftly(tor_config(vis = "pub")))]
36pub(crate) struct RpcConfig {
37    /// If true, then the RPC subsystem is enabled and will listen for connections.
38    #[deftly(tor_config(default = "false"))] // TODO RPC make this true once we are stable.
39    enable: bool,
40
41    /// A set of named locations in which to find connect files.
42    #[deftly(tor_config(map, default = "listener::listener_map_defaults()"))]
43    listen: BTreeMap<String, RpcListenerSetConfig>,
44
45    /// A list of default connect points to bind
46    /// if no enabled connect points are found under `listen`.
47    #[deftly(tor_config(list(element(clone)), default = "listen_defaults_defaults()"))]
48    listen_default: Vec<String>,
49}
50
51/// Return default values for `RpcConfig.listen_default`
52fn listen_defaults_defaults() -> Vec<String> {
53    vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
54}
55
56/// Information about an incoming connection.
57///
58/// Yielded in a stream from our RPC listeners.
59type IncomingConn = (
60    general::Stream,
61    general::SocketAddr,
62    Arc<listener::RpcConnInfo>,
63);
64
65/// Bind to all configured RPC listeners in `cfg`.
66///
67/// On success, return a stream of `IncomingConn`.
68#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
69async fn launch_all_listeners<R: Runtime>(
70    runtime: &R,
71    cfg: &RpcConfig,
72    resolver: &CfgPathResolver,
73    mistrust: &Mistrust,
74) -> anyhow::Result<(
75    impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin + use<R>,
76    Vec<tor_rpc_connect::server::ListenerGuard>,
77)> {
78    let mut listeners = Vec::new();
79    let mut guards = Vec::new();
80    for (name, listener_cfg) in cfg.listen.iter() {
81        for (lis, info, guard) in listener_cfg
82            .bind(runtime, name.as_str(), resolver, mistrust)
83            .await?
84        {
85            // (Note that `bind` only returns enabled listeners, so we don't need to check here.
86            debug!(
87                "Listening at {} for {}",
88                lis.local_addr()
89                    .expect("general::listener without address?")
90                    .display_lossy(),
91                info.name,
92            );
93            listeners.push((lis, info));
94            guards.push(guard);
95        }
96    }
97    if listeners.is_empty() {
98        for (idx, connpt) in cfg.listen_default.iter().enumerate() {
99            let display_index = idx + 1; // One-indexed values are more human-readable.
100            let (lis, info, guard) =
101                listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
102            debug!(
103                "Listening at {} for {}",
104                lis.local_addr()
105                    .expect("general::listener without address?")
106                    .display_lossy(),
107                info.name,
108            );
109            listeners.push((lis, info));
110            guards.push(guard);
111        }
112    }
113    if listeners.is_empty() {
114        info!("No RPC listeners configured.");
115    }
116
117    let streams = listeners.into_iter().map(|(listener, info)| {
118        listener
119            .incoming()
120            .map(move |accept_result| match accept_result {
121                Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
122                Err(e) => Err(e),
123            })
124    });
125
126    Ok((futures::stream::select_all(streams), guards))
127}
128
129/// Create an RPC manager, bind to connect points, and open a listener task to accept incoming
130/// RPC connections.
131pub(crate) async fn launch_rpc_mgr<R: Runtime>(
132    runtime: &R,
133    cfg: &RpcConfig,
134    resolver: &CfgPathResolver,
135    mistrust: &Mistrust,
136    client: TorClient<R>,
137) -> Result<Option<RpcProxySupport>> {
138    if !cfg.enable {
139        return Ok(None);
140    }
141    let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
142
143    let rpc_mgr = RpcMgr::new()?;
144    // Register methods. Needed since TorClient is generic.
145    //
146    // TODO: If we accumulate a large number of generics like this, we should do this elsewhere.
147    rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
148    rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
149    rpc_mgr.register_rpc_methods(RpcSuperuser::<R>::rpc_methods());
150
151    let rt_clone = runtime.clone();
152    let rpc_mgr_clone = rpc_mgr.clone();
153
154    let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
155
156    // TODO: Using spawn in this way makes it hard to report whether we
157    // succeeded or not. This is something we should fix when we refactor
158    // our service-launching code.
159    runtime.spawn(async move {
160        let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone, client, rpc_state).await;
161        if let Err(e) = result {
162            tracing::warn!("RPC manager quit with an error: {}", e);
163        }
164        drop(guards);
165    })?;
166    Ok(Some(RpcProxySupport {
167        rpc_mgr,
168        rpc_state_sender,
169    }))
170}
171
172/// Backend function to implement an RPC listener: runs in a loop.
173async fn run_rpc_listener<R: Runtime>(
174    runtime: R,
175    mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
176    rpc_mgr: Arc<RpcMgr>,
177    client: TorClient<R>,
178    rpc_state: Arc<RpcVisibleArtiState>,
179) -> Result<()> {
180    while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
181        debug!("Received incoming RPC connection from {}", &info.name);
182
183        let client_clone = client.clone();
184        let rpc_state_clone = rpc_state.clone();
185        let connection = rpc_mgr.new_connection(info.auth.clone(), move |auth| {
186            ArtiRpcSession::new(auth, &client_clone, &rpc_state_clone, &info) as _
187        });
188        let (input, output) = stream.split();
189
190        runtime.spawn(async {
191            let result = connection.run(input, output).await;
192            if let Err(e) = result {
193                tracing::warn!("RPC session ended with an error: {}", e);
194            }
195        })?;
196    }
197    Ok(())
198}
199
200/// Information passed to a proxy or similar stream provider when running with RPC support.
201pub(crate) struct RpcProxySupport {
202    /// An RPC manager to use for looking up objects as possible stream targets.
203    pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
204    /// An RPCStateSender to use for registering the list of known proxy ports.
205    pub(crate) rpc_state_sender: RpcStateSender,
206}
207
208#[cfg(test)]
209mod test {
210    // @@ begin test lint list maintained by maint/add_warning @@
211    #![allow(clippy::bool_assert_comparison)]
212    #![allow(clippy::clone_on_copy)]
213    #![allow(clippy::dbg_macro)]
214    #![allow(clippy::mixed_attributes_style)]
215    #![allow(clippy::print_stderr)]
216    #![allow(clippy::print_stdout)]
217    #![allow(clippy::single_char_pattern)]
218    #![allow(clippy::unwrap_used)]
219    #![allow(clippy::unchecked_time_subtraction)]
220    #![allow(clippy::useless_vec)]
221    #![allow(clippy::needless_pass_by_value)]
222    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
223
224    use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
225    use tor_config_path::CfgPath;
226    use tor_rpc_connect::ParsedConnectPoint;
227
228    use super::*;
229
230    #[test]
231    fn rpc_method_names() {
232        // We run this from a nice high level module, to ensure that as many method names as
233        // possible will be in-scope.
234        let problems = tor_rpcbase::check_method_names([]);
235
236        for (m, err) in &problems {
237            eprintln!("Bad method name {m:?}: {err}");
238        }
239        assert!(problems.is_empty());
240    }
241
242    #[test]
243    fn parse_listener_defaults() {
244        for string in listen_defaults_defaults() {
245            let _parsed: ParsedConnectPoint = string.parse().unwrap();
246        }
247    }
248
249    #[test]
250    fn parsing_and_building() {
251        fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
252            let b: RpcConfigBuilder = toml::from_str(s)?;
253            Ok(b.build()?)
254        }
255
256        let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
257        user_defaults_builder.listener_options().enable(true);
258        user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
259        let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
260        system_defaults_builder.listener_options().enable(false);
261        system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
262
263        // Make sure that an empty configuration gets us the defaults.
264        let defaults = build("").unwrap();
265        assert_eq!(
266            defaults,
267            RpcConfig {
268                enable: false,
269                listen: vec![
270                    (
271                        "user-default".to_string(),
272                        user_defaults_builder.build().unwrap()
273                    ),
274                    (
275                        "system-default".to_string(),
276                        system_defaults_builder.build().unwrap()
277                    ),
278                ]
279                .into_iter()
280                .collect(),
281                listen_default: listen_defaults_defaults()
282            }
283        );
284
285        // Make sure that overriding specific options works as expected.
286        let altered = build(
287            r#"
288[listen."user-default"]
289enable = false
290[listen."system-default"]
291dir = "/usr/local/etc/arti-rpc/connect.d"
292file_options = { "tmp.toml" = { "enable" = false } }
293[listen."my-connpt"]
294file = "/home/dante/.paradiso/connpt.toml"
295"#,
296        )
297        .unwrap();
298        let mut altered_user_defaults = user_defaults_builder.clone();
299        altered_user_defaults.listener_options().enable(false);
300        let mut altered_system_defaults = system_defaults_builder.clone();
301        altered_system_defaults.dir(CfgPath::new(
302            "/usr/local/etc/arti-rpc/connect.d".to_string(),
303        ));
304        let mut opt = ConnectPointOptionsBuilder::default();
305        opt.enable(false);
306        altered_system_defaults
307            .file_options()
308            .insert("tmp.toml".to_string(), opt);
309        let mut my_connpt = RpcListenerSetConfigBuilder::default();
310        my_connpt.file(CfgPath::new(
311            "/home/dante/.paradiso/connpt.toml".to_string(),
312        ));
313
314        assert_eq!(
315            altered,
316            RpcConfig {
317                enable: false,
318                listen: vec![
319                    (
320                        "user-default".to_string(),
321                        altered_user_defaults.build().unwrap()
322                    ),
323                    (
324                        "system-default".to_string(),
325                        altered_system_defaults.build().unwrap()
326                    ),
327                    ("my-connpt".to_string(), my_connpt.build().unwrap()),
328                ]
329                .into_iter()
330                .collect(),
331                listen_default: listen_defaults_defaults()
332            }
333        );
334    }
335}