1
//! Declare the RPC session object as exposed from the RPC server run by the `arti` crate.
2

            
3
use arti_client::TorClient;
4
use arti_rpcserver::RpcAuthentication;
5
use derive_deftly::Deftly;
6
use futures::stream::StreamExt as _;
7
use std::{net::SocketAddr, sync::Arc};
8
use tor_async_utils::{DropNotifyEofSignallable, DropNotifyWatchSender};
9
use tor_rpcbase::{self as rpc};
10
use tor_rtcompat::Runtime;
11

            
12
use super::proxyinfo::{self, ProxyInfo};
13

            
14
/// A top-level RPC session object.
15
///
16
/// This is the first object that an RPC user receives upon authenticating;
17
/// It is returned by `auth:authenticate`.
18
///
19
/// Other objects (`TorClient`,`RpcDataStream`, etc)
20
/// are available using methods on this object.
21
/// (See the list of available methods.)
22
///
23
/// This type wraps and delegates to [`arti_rpcserver::RpcSession`],
24
/// but exposes additional functionality not available at the
25
/// level of [`arti_rpcserver`], including information about configured proxies.
26
///
27
/// This ObjectID for this object can be used as the target of a SOCKS stream.
28
#[derive(Deftly)]
29
#[derive_deftly(rpc::Object)]
30
#[deftly(rpc(
31
    delegate_with = "|this: &Self| Some(this.session.clone())",
32
    delegate_type = "arti_rpcserver::RpcSession"
33
))]
34
#[deftly(rpc(expose_outside_of_session))]
35
pub(super) struct ArtiRpcSession {
36
    /// State about the `arti` server, as seen by the Rpc system.
37
    pub(super) arti_state: Arc<RpcVisibleArtiState>,
38
    /// The underlying RpcSession object that we delegate to.
39
    session: Arc<arti_rpcserver::RpcSession>,
40
}
41

            
42
/// Information about the current global top-level Arti state,
43
/// as exposed to an Rpc Session.
44
//
45
// TODO: This type is dangerously close to being a collection of globals.
46
// We should refactor it aggressively when we refactor the `arti` crate.
47
//
48
// TODO: Right now this is constructed in the same form that it's used in
49
// ArtiRpcSession.  Later on, we could split it into one type that
50
// the rest of this crate constructs, and another type that the
51
// ArtiRpcSession actually uses. We should do that if the needs seem to diverge.
52
pub(crate) struct RpcVisibleArtiState {
53
    /// A `ProxyInfo` that we hand out when asked to list our proxy ports.
54
    ///
55
    /// Right now it only lists Socks; in the future it may list more.
56
    proxy_info: postage::watch::Receiver<ProxyInfoState>,
57
}
58

            
59
/// Handle to set RPC state across RPC sessions.  (See `RpcVisibleArtiState`.)
60
#[derive(Debug)]
61
pub(crate) struct RpcStateSender {
62
    /// Sender for setting our list of proxy ports.
63
    proxy_info_sender: DropNotifyWatchSender<ProxyInfoState>,
64
}
65

            
66
impl ArtiRpcSession {
67
    /// Construct a new `ArtiRpcSession`.
68
    ///
69
    /// Privileges on the session (if any) are derived from `auth`, which describes
70
    /// how the user authenticated.
71
    ///
72
    /// The session receives a new isolated TorClient, based on `client_root`.
73
    pub(super) fn new<R: Runtime>(
74
        auth: &RpcAuthentication,
75
        client_root: &TorClient<R>,
76
        arti_state: &Arc<RpcVisibleArtiState>,
77
    ) -> Arc<Self> {
78
        let _ = auth; // This is currently unused; any authentication gives the same result.
79
        let client = client_root.isolated_client();
80
        let session = arti_rpcserver::RpcSession::new_with_client(Arc::new(client));
81
        let arti_state = Arc::clone(arti_state);
82
        Arc::new(ArtiRpcSession {
83
            session,
84
            arti_state,
85
        })
86
    }
87
}
88

            
89
/// Possible state for a watched proxy_info.
90
#[derive(Debug, Clone)]
91
enum ProxyInfoState {
92
    /// We haven't set it yet.
93
    Unset,
94
    /// We've set it to a given value.
95
    Set(Arc<ProxyInfo>),
96
    /// The sender has been dropped.
97
    Eof,
98
}
99

            
100
impl DropNotifyEofSignallable for ProxyInfoState {
101
4
    fn eof() -> Self {
102
4
        Self::Eof
103
4
    }
104
}
105

            
106
impl RpcVisibleArtiState {
107
    /// Construct a new `RpcVisibleArtiState`.
108
4
    pub(crate) fn new() -> (Arc<Self>, RpcStateSender) {
109
4
        let (proxy_info_sender, proxy_info) = postage::watch::channel_with(ProxyInfoState::Unset);
110
4
        let proxy_info_sender = DropNotifyWatchSender::new(proxy_info_sender);
111
4
        (
112
4
            Arc::new(Self { proxy_info }),
113
4
            RpcStateSender { proxy_info_sender },
114
4
        )
115
4
    }
116

            
117
    /// Return the latest proxy info, waiting until it is set.
118
    ///
119
    /// Return an error if the sender has been closed.
120
12
    pub(super) async fn get_proxy_info(&self) -> Result<Arc<ProxyInfo>, ()> {
121
8
        let mut proxy_info = self.proxy_info.clone();
122
12
        while let Some(v) = proxy_info.next().await {
123
12
            match v {
124
4
                ProxyInfoState::Unset => {
125
4
                    // Not yet set, try again.
126
4
                }
127
8
                ProxyInfoState::Set(proxyinfo) => return Ok(Arc::clone(&proxyinfo)),
128
                ProxyInfoState::Eof => return Err(()),
129
            }
130
        }
131
        Err(())
132
8
    }
133
}
134

            
135
impl RpcStateSender {
136
    /// Set the list of socks listener addresses on this state.
137
    ///
138
    /// This method may only be called once per state.
139
4
    pub(crate) fn set_socks_listeners(&mut self, addrs: &[SocketAddr]) {
140
4
        let info = ProxyInfo {
141
4
            proxies: addrs
142
4
                .iter()
143
6
                .flat_map(|a| {
144
4
                    [
145
4
                        proxyinfo::Proxy {
146
4
                            listener: proxyinfo::ProxyListener::Socks5 {
147
4
                                tcp_address: Some(*a),
148
4
                            },
149
4
                        },
150
4
                        // When http-connect is enabled, every SOCKS proxy is also an HTTP CONNECT
151
4
                        // proxy.
152
4
                        #[cfg(feature = "http-connect")]
153
4
                        proxyinfo::Proxy {
154
4
                            listener: proxyinfo::ProxyListener::HttpConnect {
155
4
                                tcp_address: Some(*a),
156
4
                            },
157
4
                        },
158
4
                    ]
159
4
                })
160
4
                .collect(),
161
        };
162
4
        *self.proxy_info_sender.borrow_mut() = ProxyInfoState::Set(Arc::new(info));
163
4
    }
164
}
165

            
166
#[cfg(test)]
167
mod test {
168
    // @@ begin test lint list maintained by maint/add_warning @@
169
    #![allow(clippy::bool_assert_comparison)]
170
    #![allow(clippy::clone_on_copy)]
171
    #![allow(clippy::dbg_macro)]
172
    #![allow(clippy::mixed_attributes_style)]
173
    #![allow(clippy::print_stderr)]
174
    #![allow(clippy::print_stdout)]
175
    #![allow(clippy::single_char_pattern)]
176
    #![allow(clippy::unwrap_used)]
177
    #![allow(clippy::unchecked_time_subtraction)]
178
    #![allow(clippy::useless_vec)]
179
    #![allow(clippy::needless_pass_by_value)]
180
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
181

            
182
    use tor_rtcompat::SpawnExt as _;
183
    use tor_rtmock::MockRuntime;
184

            
185
    use super::*;
186

            
187
    #[test]
188
    fn set_proxy_info() {
189
        MockRuntime::test_with_various(|rt| async move {
190
            let (state, mut sender) = RpcVisibleArtiState::new();
191
            let _task = rt.clone().spawn_with_handle(async move {
192
                sender.set_socks_listeners(&["8.8.4.4:99".parse().unwrap()]);
193
                sender // keep sender alive
194
            });
195

            
196
            let value = state.get_proxy_info().await;
197

            
198
            // At this point, we've returned once, so this will test that we get a fresh answer even
199
            // if we already set the inner value.
200
            let value_again = state.get_proxy_info().await;
201
            assert_eq!(value.unwrap(), value_again.unwrap());
202
        });
203
    }
204
}