arti/rpc/
session.rs

1//! Declare the RPC session object as exposed from the RPC server run by the `arti` crate.
2
3use arti_client::TorClient;
4use arti_rpcserver::RpcAuthentication;
5use derive_deftly::Deftly;
6use futures::stream::StreamExt as _;
7use std::{net::SocketAddr, sync::Arc};
8use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
9use tor_rpcbase::{self as rpc};
10use tor_rtcompat::Runtime;
11
12use super::proxyinfo::{self, ProxyInfo};
13
14/// A top-level RPC session object.
15///
16/// This is the first object that an RPC user receives upon authenticating;
17/// It is returned by `auth:authenticate`.
18///
19/// Other objects (`TorClient`,`RpcDataStream`, etc)
20/// are available using methods on this object.
21/// (See the list of available methods.)
22///
23/// This type wraps and delegates to [`arti_rpcserver::RpcSession`],
24/// but exposes additional functionality not available at the
25/// level of [`arti_rpcserver`], including information about configured proxies.
26///
27/// This ObjectID for this object can be used as the target of a SOCKS stream.
28#[derive(Deftly)]
29#[derive_deftly(rpc::Object)]
30#[deftly(rpc(
31    delegate_with = "|this: &Self| Some(this.session.clone())",
32    delegate_type = "arti_rpcserver::RpcSession"
33))]
34#[deftly(rpc(expose_outside_of_session))]
35pub(super) struct ArtiRpcSession {
36    /// State about the `arti` server, as seen by the Rpc system.
37    pub(super) arti_state: Arc<RpcVisibleArtiState>,
38    /// The underlying RpcSession object that we delegate to.
39    session: Arc<arti_rpcserver::RpcSession>,
40}
41
42/// Information about the current global top-level Arti state,
43/// as exposed to an Rpc Session.
44//
45// TODO: This type is dangerously close to being a collection of globals.
46// We should refactor it aggressively when we refactor the `arti` crate.
47//
48// TODO: Right now this is constructed in the same form that it's used in
49// ArtiRpcSession.  Later on, we could split it into one type that
50// the rest of this crate constructs, and another type that the
51// ArtiRpcSession actually uses. We should do that if the needs seem to diverge.
52pub(crate) struct RpcVisibleArtiState {
53    /// A `ProxyInfo` that we hand out when asked to list our proxy ports.
54    ///
55    /// Right now it only lists Socks; in the future it may list more.
56    proxy_info: postage::watch::Receiver<ProxyInfoState>,
57}
58
59/// Handle to set RPC state across RPC sessions.  (See `RpcVisibleArtiState`.)
60#[derive(Debug)]
61pub(crate) struct RpcStateSender {
62    /// Sender for setting our list of proxy ports.
63    proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
64}
65
66impl ArtiRpcSession {
67    /// Construct a new `ArtiRpcSession`.
68    ///
69    /// Privileges on the session (if any) are derived from `auth`, which describes
70    /// how the user authenticated.
71    ///
72    /// The session receives a new isolated TorClient, based on `client_root`.
73    pub(super) fn new<R: Runtime>(
74        auth: &RpcAuthentication,
75        client_root: &TorClient<R>,
76        arti_state: &Arc<RpcVisibleArtiState>,
77    ) -> Arc<Self> {
78        let _ = auth; // This is currently unused; any authentication gives the same result.
79        let client = client_root.isolated_client();
80        let session = arti_rpcserver::RpcSession::new_with_client(Arc::new(client));
81        let arti_state = Arc::clone(arti_state);
82        Arc::new(ArtiRpcSession {
83            session,
84            arti_state,
85        })
86    }
87}
88
89/// Possible state for a watched proxy_info.
90#[derive(Debug, Clone)]
91enum ProxyInfoState {
92    /// We haven't set it yet.
93    Unset,
94    /// We've set it to a given value.
95    Set(Arc<ProxyInfo>),
96    /// The sender has been dropped.
97    Eof,
98}
99
100impl DropNotifyEofSignallable for ProxyInfoState {
101    fn eof() -> Self {
102        Self::Eof
103    }
104}
105
106impl RpcVisibleArtiState {
107    /// Construct a new `RpcVisibleArtiState`.
108    pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
109        let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
110        let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
111        (
112            Arc::new(Self { proxy_info }),
113            RpcStateSender { proxy_info_sender },
114        )
115    }
116
117    /// Return the latest proxy info, waiting until it is set.
118    ///
119    /// Return an error if the sender has been closed.
120    pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
121        let mut proxy_info = self.proxy_info.clone();
122        while let Some(v) = proxy_info.next().await {
123            match v {
124                ProxyInfoState::Unset => {
125                    // Not yet set, try again.
126                }
127                ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
128                ProxyInfoState::Eof => return Err(()),
129            }
130        }
131        Err(())
132    }
133}
134
135impl RpcStateSender {
136    /// Set the list of socks listener addresses on this state.
137    ///
138    /// This method may only be called once per state.
139    pub(crate) fn set_socks_listeners(&mut self, addrs: &[SocketAddr]) {
140        let info = ProxyInfo {
141            proxies: addrs
142                .iter()
143                .map(|a| proxyinfo::Proxy {
144                    listener: proxyinfo::ProxyListener::Socks5 {
145                        tcp_address: Some(*a),
146                    },
147                })
148                .collect(),
149        };
150        *self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
151    }
152}
153
154#[cfg(test)]
155mod test {
156    // @@ begin test lint list maintained by maint/add_warning @@
157    #![allow(clippy::bool_assert_comparison)]
158    #![allow(clippy::clone_on_copy)]
159    #![allow(clippy::dbg_macro)]
160    #![allow(clippy::mixed_attributes_style)]
161    #![allow(clippy::print_stderr)]
162    #![allow(clippy::print_stdout)]
163    #![allow(clippy::single_char_pattern)]
164    #![allow(clippy::unwrap_used)]
165    #![allow(clippy::unchecked_duration_subtraction)]
166    #![allow(clippy::useless_vec)]
167    #![allow(clippy::needless_pass_by_value)]
168    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
169
170    use futures::task::SpawnExt as _;
171    use tor_rtmock::MockRuntime;
172
173    use super::*;
174
175    #[test]
176    fn set_proxy_info() {
177        MockRuntime::test_with_various(|rt| async move {
178            let (state, mut sender) = RpcVisibleArtiState::new();
179            let _task = rt.clone().spawn_with_handle(async move {
180                sender.set_socks_listeners(&["8.8.4.4:99".parse().unwrap()]);
181                sender // keep sender alive
182            });
183
184            let value = state.get_proxy_info().await;
185
186            // At this point, we've returned once, so this will test that we get a fresh answer even
187            // if we already set the inner value.
188            let value_again = state.get_proxy_info().await;
189            assert_eq!(value.unwrap(), value_again.unwrap());
190        });
191    }
192}