1//! Declare the RPC session object as exposed from the RPC server run by the `arti` crate.
23use arti_client::TorClient;
4use arti_rpcserver::RpcAuthentication;
5use derive_deftly::Deftly;
6use futures::stream::StreamExt as _;
7use std::{net::SocketAddr, sync::Arc};
8use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
9use tor_rpcbase::{self as rpc};
10use tor_rtcompat::Runtime;
1112use super::proxyinfo::{self, ProxyInfo};
1314/// A top-level RPC session object.
15///
16/// This is the first object that an RPC user receives upon authenticating;
17/// It is returned by `auth:authenticate`.
18///
19/// Other objects (`TorClient`,`RpcDataStream`, etc)
20/// are available using methods on this object.
21/// (See the list of available methods.)
22///
23/// This type wraps and delegates to [`arti_rpcserver::RpcSession`],
24/// but exposes additional functionality not available at the
25/// level of [`arti_rpcserver`], including information about configured proxies.
26///
27/// This ObjectID for this object can be used as the target of a SOCKS stream.
28#[derive(Deftly)]
29#[derive_deftly(rpc::Object)]
30#[deftly(rpc(
31 delegate_with = "|this: &Self| Some(this.session.clone())",
32 delegate_type = "arti_rpcserver::RpcSession"
33))]
34#[deftly(rpc(expose_outside_of_session))]
35pub(super) struct ArtiRpcSession {
36/// State about the `arti` server, as seen by the Rpc system.
37pub(super) arti_state: Arc<RpcVisibleArtiState>,
38/// The underlying RpcSession object that we delegate to.
39session: Arc<arti_rpcserver::RpcSession>,
40}
4142/// Information about the current global top-level Arti state,
43/// as exposed to an Rpc Session.
44//
45// TODO: This type is dangerously close to being a collection of globals.
46// We should refactor it aggressively when we refactor the `arti` crate.
47//
48// TODO: Right now this is constructed in the same form that it's used in
49// ArtiRpcSession. Later on, we could split it into one type that
50// the rest of this crate constructs, and another type that the
51// ArtiRpcSession actually uses. We should do that if the needs seem to diverge.
52pub(crate) struct RpcVisibleArtiState {
53/// A `ProxyInfo` that we hand out when asked to list our proxy ports.
54 ///
55 /// Right now it only lists Socks; in the future it may list more.
56proxy_info: postage::watch::Receiver<ProxyInfoState>,
57}
5859/// Handle to set RPC state across RPC sessions. (See `RpcVisibleArtiState`.)
60#[derive(Debug)]
61pub(crate) struct RpcStateSender {
62/// Sender for setting our list of proxy ports.
63proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
64}
6566impl ArtiRpcSession {
67/// Construct a new `ArtiRpcSession`.
68 ///
69 /// Privileges on the session (if any) are derived from `auth`, which describes
70 /// how the user authenticated.
71 ///
72 /// The session receives a new isolated TorClient, based on `client_root`.
73pub(super) fn new<R: Runtime>(
74 auth: &RpcAuthentication,
75 client_root: &TorClient<R>,
76 arti_state: &Arc<RpcVisibleArtiState>,
77 ) -> Arc<Self> {
78let _ = auth; // This is currently unused; any authentication gives the same result.
79let client = client_root.isolated_client();
80let session = arti_rpcserver::RpcSession::new_with_client(Arc::new(client));
81let arti_state = Arc::clone(arti_state);
82 Arc::new(ArtiRpcSession {
83 session,
84 arti_state,
85 })
86 }
87}
8889/// Possible state for a watched proxy_info.
90#[derive(Debug, Clone)]
91enum ProxyInfoState {
92/// We haven't set it yet.
93Unset,
94/// We've set it to a given value.
95Set(Arc<ProxyInfo>),
96/// The sender has been dropped.
97Eof,
98}
99100impl DropNotifyEofSignallable for ProxyInfoState {
101fn eof() -> Self {
102Self::Eof
103 }
104}
105106impl RpcVisibleArtiState {
107/// Construct a new `RpcVisibleArtiState`.
108pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
109let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
110let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
111 (
112 Arc::new(Self { proxy_info }),
113 RpcStateSender { proxy_info_sender },
114 )
115 }
116117/// Return the latest proxy info, waiting until it is set.
118 ///
119 /// Return an error if the sender has been closed.
120pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
121let mut proxy_info = self.proxy_info.clone();
122while let Some(v) = proxy_info.next().await {
123match v {
124 ProxyInfoState::Unset => {
125// Not yet set, try again.
126}
127 ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
128 ProxyInfoState::Eof => return Err(()),
129 }
130 }
131Err(())
132 }
133}
134135impl RpcStateSender {
136/// Set the list of socks listener addresses on this state.
137 ///
138 /// This method may only be called once per state.
139pub(crate) fn set_socks_listeners(&mut self, addrs: &[SocketAddr]) {
140let info = ProxyInfo {
141 proxies: addrs
142 .iter()
143 .map(|a| proxyinfo::Proxy {
144 listener: proxyinfo::ProxyListener::Socks5 {
145 tcp_address: Some(*a),
146 },
147 })
148 .collect(),
149 };
150*self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
151 }
152}
153154#[cfg(test)]
155mod test {
156// @@ begin test lint list maintained by maint/add_warning @@
157#![allow(clippy::bool_assert_comparison)]
158 #![allow(clippy::clone_on_copy)]
159 #![allow(clippy::dbg_macro)]
160 #![allow(clippy::mixed_attributes_style)]
161 #![allow(clippy::print_stderr)]
162 #![allow(clippy::print_stdout)]
163 #![allow(clippy::single_char_pattern)]
164 #![allow(clippy::unwrap_used)]
165 #![allow(clippy::unchecked_duration_subtraction)]
166 #![allow(clippy::useless_vec)]
167 #![allow(clippy::needless_pass_by_value)]
168//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
169170use futures::task::SpawnExt as _;
171use tor_rtmock::MockRuntime;
172173use super::*;
174175#[test]
176fn set_proxy_info() {
177 MockRuntime::test_with_various(|rt| async move {
178let (state, mut sender) = RpcVisibleArtiState::new();
179let _task = rt.clone().spawn_with_handle(async move {
180 sender.set_socks_listeners(&["8.8.4.4:99".parse().unwrap()]);
181 sender // keep sender alive
182});
183184let value = state.get_proxy_info().await;
185186// At this point, we've returned once, so this will test that we get a fresh answer even
187 // if we already set the inner value.
188let value_again = state.get_proxy_info().await;
189assert_eq!(value.unwrap(), value_again.unwrap());
190 });
191 }
192}