arti/
rpc.rs

1//! Experimental RPC support.
2
3use anyhow::Result;
4use arti_rpcserver::RpcMgr;
5use derive_builder::Builder;
6use fs_mistrust::Mistrust;
7use futures::{stream::StreamExt, task::SpawnExt, AsyncReadExt};
8use listener::{RpcListenerMap, RpcListenerMapBuilder};
9use serde::{Deserialize, Serialize};
10use session::ArtiRpcSession;
11use std::{io::Result as IoResult, sync::Arc};
12use tor_config::{define_list_builder_helper, impl_standard_builder, ConfigBuildError};
13use tor_config_path::CfgPathResolver;
14use tracing::{debug, info};
15
16use arti_client::TorClient;
17use tor_rtcompat::{general, NetStreamListener as _, Runtime};
18
19pub(crate) mod conntarget;
20pub(crate) mod listener;
21mod proxyinfo;
22mod session;
23
24pub(crate) use session::{RpcStateSender, RpcVisibleArtiState};
25
26/// Configuration for Arti's RPC subsystem.
27///
28/// You cannot change this section on a running Arti client.
29#[derive(Debug, Clone, Builder, Eq, PartialEq)]
30#[builder(build_fn(error = "ConfigBuildError"))]
31#[builder(derive(Debug, Serialize, Deserialize))]
32#[builder_struct_attr(non_exhaustive)]
33#[non_exhaustive]
34pub struct RpcConfig {
35    /// If true, then the RPC subsystem is enabled and will listen for connections.
36    #[builder(default = "false")] // TODO RPC make this true once we are stable.
37    enable: bool,
38
39    /// A set of named locations in which to find connect files.
40    #[builder(sub_builder)]
41    #[builder_field_attr(serde(default))]
42    listen: RpcListenerMap,
43
44    /// A list of default connect points to bind
45    /// if no enabled connect points are found under `listen`.
46    #[builder(sub_builder)]
47    #[builder_field_attr(serde(default))]
48    listen_default: ListenDefaults,
49}
50impl_standard_builder! { RpcConfig }
51
52/// Type alias to enable sub_builder to work.
53type ListenDefaults = Vec<String>;
54
55define_list_builder_helper! {
56    pub struct ListenDefaultsBuilder {
57        values: [String],
58    }
59    built: Vec<String> = values;
60    default = listen_defaults_defaults();
61    item_build: |item| Ok(item.clone());
62}
63
64/// Return default values for `RpcConfig.listen_default`
65fn listen_defaults_defaults() -> Vec<String> {
66    vec![tor_rpc_connect::USER_DEFAULT_CONNECT_POINT.to_string()]
67}
68
69/// Information about an incoming connection.
70///
71/// Yielded in a stream from our RPC listeners.
72type IncomingConn = (
73    general::Stream,
74    general::SocketAddr,
75    Arc<listener::RpcConnInfo>,
76);
77
78/// Bind to all configured RPC listeners in `cfg`.
79///
80/// On success, return a stream of `IncomingConn`.
81#[allow(clippy::cognitive_complexity)] // TODO: Refactor?
82async fn launch_all_listeners<R: Runtime>(
83    runtime: &R,
84    cfg: &RpcConfig,
85    resolver: &CfgPathResolver,
86    mistrust: &Mistrust,
87) -> anyhow::Result<(
88    impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
89    Vec<tor_rpc_connect::server::ListenerGuard>,
90)> {
91    let mut listeners = Vec::new();
92    let mut guards = Vec::new();
93    for (name, listener_cfg) in cfg.listen.iter() {
94        for (lis, info, guard) in listener_cfg
95            .bind(runtime, name.as_str(), resolver, mistrust)
96            .await?
97        {
98            // (Note that `bind` only returns enabled listeners, so we don't need to check here.
99            debug!(
100                "Listening at {} for {}",
101                lis.local_addr()
102                    .expect("general::listener without address?")
103                    .display_lossy(),
104                info.name,
105            );
106            listeners.push((lis, info));
107            guards.push(guard);
108        }
109    }
110    if listeners.is_empty() {
111        for (idx, connpt) in cfg.listen_default.iter().enumerate() {
112            let display_index = idx + 1; // One-indexed values are more human-readable.
113            let (lis, info, guard) =
114                listener::bind_string(connpt, display_index, runtime, resolver, mistrust).await?;
115            debug!(
116                "Listening at {} for {}",
117                lis.local_addr()
118                    .expect("general::listener without address?")
119                    .display_lossy(),
120                info.name,
121            );
122            listeners.push((lis, info));
123            guards.push(guard);
124        }
125    }
126    if listeners.is_empty() {
127        info!("No RPC listeners configured.");
128    }
129
130    let streams = listeners.into_iter().map(|(listener, info)| {
131        listener
132            .incoming()
133            .map(move |accept_result| match accept_result {
134                Ok((netstream, addr)) => Ok((netstream, addr, Arc::clone(&info))),
135                Err(e) => Err(e),
136            })
137    });
138
139    Ok((futures::stream::select_all(streams), guards))
140}
141
142/// Create an RPC manager, bind to connect points, and open a listener task to accept incoming
143/// RPC connections.
144pub(crate) async fn launch_rpc_mgr<R: Runtime>(
145    runtime: &R,
146    cfg: &RpcConfig,
147    resolver: &CfgPathResolver,
148    mistrust: &Mistrust,
149    client: TorClient<R>,
150) -> Result<Option<RpcProxySupport>> {
151    if !cfg.enable {
152        return Ok(None);
153    }
154    let (rpc_state, rpc_state_sender) = RpcVisibleArtiState::new();
155
156    let rpc_mgr = RpcMgr::new(move |auth| ArtiRpcSession::new(auth, &client, &rpc_state))?;
157    // Register methods. Needed since TorClient is generic.
158    //
159    // TODO: If we accumulate a large number of generics like this, we should do this elsewhere.
160    rpc_mgr.register_rpc_methods(TorClient::<R>::rpc_methods());
161    rpc_mgr.register_rpc_methods(arti_rpcserver::rpc_methods::<R>());
162
163    let rt_clone = runtime.clone();
164    let rpc_mgr_clone = rpc_mgr.clone();
165
166    let (incoming, guards) = launch_all_listeners(runtime, cfg, resolver, mistrust).await?;
167
168    // TODO: Using spawn in this way makes it hard to report whether we
169    // succeeded or not. This is something we should fix when we refactor
170    // our service-launching code.
171    runtime.spawn(async move {
172        let result = run_rpc_listener(rt_clone, incoming, rpc_mgr_clone).await;
173        if let Err(e) = result {
174            tracing::warn!("RPC manager quit with an error: {}", e);
175        }
176        drop(guards);
177    })?;
178    Ok(Some(RpcProxySupport {
179        rpc_mgr,
180        rpc_state_sender,
181    }))
182}
183
184/// Backend function to implement an RPC listener: runs in a loop.
185async fn run_rpc_listener<R: Runtime>(
186    runtime: R,
187    mut incoming: impl futures::Stream<Item = IoResult<IncomingConn>> + Unpin,
188    rpc_mgr: Arc<RpcMgr>,
189) -> Result<()> {
190    while let Some((stream, _addr, info)) = incoming.next().await.transpose()? {
191        debug!("Received incoming RPC connection from {}", &info.name);
192
193        let connection = rpc_mgr.new_connection(info.auth.clone());
194        let (input, output) = stream.split();
195
196        runtime.spawn(async {
197            let result = connection.run(input, output).await;
198            if let Err(e) = result {
199                tracing::warn!("RPC session ended with an error: {}", e);
200            }
201        })?;
202    }
203    Ok(())
204}
205
206/// Information passed to a SOCKS proxy or similar stream provider when running with RPC support.
207#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
208pub(crate) struct RpcProxySupport {
209    /// An RPC manager to use for looking up objects as possible stream targets.
210    pub(crate) rpc_mgr: Arc<arti_rpcserver::RpcMgr>,
211    /// An RPCStateSender to use for registering the list of known proxy ports.
212    pub(crate) rpc_state_sender: RpcStateSender,
213}
214
215#[cfg(test)]
216mod test {
217    // @@ begin test lint list maintained by maint/add_warning @@
218    #![allow(clippy::bool_assert_comparison)]
219    #![allow(clippy::clone_on_copy)]
220    #![allow(clippy::dbg_macro)]
221    #![allow(clippy::mixed_attributes_style)]
222    #![allow(clippy::print_stderr)]
223    #![allow(clippy::print_stdout)]
224    #![allow(clippy::single_char_pattern)]
225    #![allow(clippy::unwrap_used)]
226    #![allow(clippy::unchecked_duration_subtraction)]
227    #![allow(clippy::useless_vec)]
228    #![allow(clippy::needless_pass_by_value)]
229    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
230
231    use listener::{ConnectPointOptionsBuilder, RpcListenerSetConfigBuilder};
232    use tor_config_path::CfgPath;
233    use tor_rpc_connect::ParsedConnectPoint;
234
235    use super::*;
236
237    #[test]
238    fn rpc_method_names() {
239        // We run this from a nice high level module, to ensure that as many method names as
240        // possible will be in-scope.
241        let problems = tor_rpcbase::check_method_names([]);
242
243        for (m, err) in &problems {
244            eprintln!("Bad method name {m:?}: {err}");
245        }
246        assert!(problems.is_empty());
247    }
248
249    #[test]
250    fn parse_listener_defaults() {
251        for string in listen_defaults_defaults() {
252            let _parsed: ParsedConnectPoint = string.parse().unwrap();
253        }
254    }
255
256    #[test]
257    fn parsing_and_building() {
258        fn build(s: &str) -> Result<RpcConfig, anyhow::Error> {
259            let b: RpcConfigBuilder = toml::from_str(s)?;
260            Ok(b.build()?)
261        }
262
263        let mut user_defaults_builder = RpcListenerSetConfigBuilder::default();
264        user_defaults_builder.listener_options().enable(true);
265        user_defaults_builder.dir(CfgPath::new("${ARTI_LOCAL_DATA}/rpc/connect.d".to_string()));
266        let mut system_defaults_builder = RpcListenerSetConfigBuilder::default();
267        system_defaults_builder.listener_options().enable(false);
268        system_defaults_builder.dir(CfgPath::new("/etc/arti-rpc/connect.d".to_string()));
269
270        // Make sure that an empty configuration gets us the defaults.
271        let defaults = build("").unwrap();
272        assert_eq!(
273            defaults,
274            RpcConfig {
275                enable: false,
276                listen: vec![
277                    (
278                        "user-default".to_string(),
279                        user_defaults_builder.build().unwrap()
280                    ),
281                    (
282                        "system-default".to_string(),
283                        system_defaults_builder.build().unwrap()
284                    ),
285                ]
286                .into_iter()
287                .collect(),
288                listen_default: listen_defaults_defaults()
289            }
290        );
291
292        // Make sure that overriding specific options works as expected.
293        let altered = build(
294            r#"
295[listen."user-default"]
296enable = false
297[listen."system-default"]
298dir = "/usr/local/etc/arti-rpc/connect.d"
299file_options = { "tmp.toml" = { "enable" = false } }
300[listen."my-connpt"]
301file = "/home/dante/.paradiso/connpt.toml"
302"#,
303        )
304        .unwrap();
305        let mut altered_user_defaults = user_defaults_builder.clone();
306        altered_user_defaults.listener_options().enable(false);
307        let mut altered_system_defaults = system_defaults_builder.clone();
308        altered_system_defaults.dir(CfgPath::new(
309            "/usr/local/etc/arti-rpc/connect.d".to_string(),
310        ));
311        let mut opt = ConnectPointOptionsBuilder::default();
312        opt.enable(false);
313        altered_system_defaults
314            .file_options()
315            .insert("tmp.toml".to_string(), opt);
316        let mut my_connpt = RpcListenerSetConfigBuilder::default();
317        my_connpt.file(CfgPath::new(
318            "/home/dante/.paradiso/connpt.toml".to_string(),
319        ));
320
321        assert_eq!(
322            altered,
323            RpcConfig {
324                enable: false,
325                listen: vec![
326                    (
327                        "user-default".to_string(),
328                        altered_user_defaults.build().unwrap()
329                    ),
330                    (
331                        "system-default".to_string(),
332                        altered_system_defaults.build().unwrap()
333                    ),
334                    ("my-connpt".to_string(), my_connpt.build().unwrap()),
335                ]
336                .into_iter()
337                .collect(),
338                listen_default: listen_defaults_defaults()
339            }
340        );
341    }
342}