1
//! Support for RPC-visible connections through Arti.
2

            
3
use std::{
4
    io::{Error as IoError, Read as _, Write as _},
5
    net::{SocketAddr, TcpStream},
6
    sync::Arc,
7
};
8

            
9
use serde::Deserialize;
10

            
11
use super::{ErrorResponse, NoParams, RpcConn};
12
use crate::{
13
    msgs::{request::Request, response::RpcErrorCode},
14
    ObjectId,
15
};
16

            
17
use tor_error::ErrorReport as _;
18

            
19
/// An error encountered while trying to open a data stream.
20
#[derive(Clone, Debug, thiserror::Error)]
21
#[non_exhaustive]
22
pub enum StreamError {
23
    /// One of the RPC methods that we invoked to create the stream failed.
24
    #[error("An error occurred while invoking RPC methods")]
25
    RpcMethods(#[from] super::ProtoError),
26

            
27
    /// We weren't able to find a working proxy address.
28
    #[error("Request for proxy info rejected: {0}")]
29
    ProxyInfoRejected(ErrorResponse),
30

            
31
    /// We weren't able to register a new stream ID.
32
    #[error("Request for new stream ID rejected: {0}")]
33
    NewStreamRejected(ErrorResponse),
34

            
35
    /// We weren't able to release a new stream ID.
36
    #[error("Request for new stream ID rejected: {0}")]
37
    StreamReleaseRejected(ErrorResponse),
38

            
39
    /// Tried to open a stream on an unauthenticated RPC connection.
40
    ///
41
    /// (At present (Sep 2024) there is no way to get an unauthenticated connection from
42
    /// `arti-rpc-client-core`, but that may change in the future.)
43
    #[error("RPC connection not authenticated")]
44
    NotAuthenticated,
45

            
46
    /// Tried to open a stream after having dropped the RPC session
47
    ///
48
    /// (At present (Jan 2025) dropping the RPC session is possible, but is not supported
49
    /// when opening RPC streams.)
50
    #[error("Unable to access Session object")]
51
    NoSession,
52

            
53
    /// We encountered an internal error.
54
    /// (This should be impossible.)
55
    #[error("Internal error: {0}")]
56
    Internal(String),
57

            
58
    /// No SOCKS proxies were listed in the server's reply.
59
    #[error("No SOCKS proxy available")]
60
    NoProxy,
61

            
62
    /// We encountered an IO error while trying to connect to the
63
    /// proxy or negotiate SOCKS.
64
    #[error("IO error")]
65
    Io(#[source] Arc<IoError>),
66

            
67
    /// The generated SOCKS request was invalid.
68
    ///
69
    /// (Most likely, a provided isolation string or hostname was too long for the
70
    /// authentication system to support.)
71
    #[error("Invalid SOCKS request")]
72
    SocksRequest(#[source] tor_socksproto::Error),
73

            
74
    /// The other side did not speak socks, or did not speak socks in the format
75
    /// we expected.
76
    #[error("SOCKS protocol violation")]
77
    SocksProtocol(#[source] tor_socksproto::Error),
78

            
79
    /// The other side gave us a SOCKS error.
80
    #[error("SOCKS error code {0}")]
81
    SocksError(tor_socksproto::SocksStatus),
82
}
83

            
84
impl From<IoError> for StreamError {
85
    fn from(e: IoError) -> Self {
86
        Self::Io(Arc::new(e))
87
    }
88
}
89

            
90
/// A response with a single ID.
91
#[derive(Deserialize, Debug)]
92
struct SingleIdReply {
93
    /// The object ID of the response.
94
    id: ObjectId,
95
}
96

            
97
/// Representation of a single proxy, as delivered by the RPC API.
98
// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
99
#[derive(Deserialize, Clone, Debug)]
100
pub(super) struct Proxy {
101
    /// Where the proxy is listening, and what protocol-specific options it expects.
102
    pub(super) listener: ProxyListener,
103
}
104

            
105
/// Representation of a single proxy's listener location, as delivered by the RPC API.
106
#[derive(Deserialize, Clone, Debug)]
107
// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
108
pub(super) enum ProxyListener {
109
    /// A SOCKS5 proxy.
110
    #[serde(rename = "socks5")]
111
    Socks5 {
112
        /// The address at which we're listening for SOCKS connections.
113
        tcp_address: Option<SocketAddr>,
114
    },
115
    /// Some other (unrecognized) listener type.
116
    #[serde(untagged)]
117
    Unrecognized {},
118
}
119

            
120
impl Proxy {
121
    /// If this is a SOCKS proxy, return its address.
122
6
    fn socks_addr(&self) -> Option<SocketAddr> {
123
6
        match self.listener {
124
4
            ProxyListener::Socks5 { tcp_address } => tcp_address,
125
2
            ProxyListener::Unrecognized {} => None,
126
        }
127
6
    }
128
}
129

            
130
impl ProxyInfoReply {
131
    /// Choose a SOCKS5 address to use from this list of proxies.
132
2
    fn find_socks_addr(&self) -> Option<SocketAddr> {
133
2
        // We choose the first usable Proxy.
134
2
        self.proxies.iter().find_map(Proxy::socks_addr)
135
2
    }
136
}
137

            
138
/// A representation of the set of proxy addresses available from the RPC API.
139
// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
140
#[derive(Deserialize, Clone, Debug)]
141
pub(super) struct ProxyInfoReply {
142
    /// A list of the supported proxies.
143
    ///
144
    /// (So far, only SOCKS proxies are listed, but other kinds may be listed in the future.)
145
    pub(super) proxies: Vec<Proxy>,
146
}
147

            
148
impl RpcConn {
149
    /// Open a new data stream, registering the stream with the RPC system.
150
    ///
151
    /// Behaves the same as [`open_stream()`](RpcConn::open_stream),
152
    /// with the following exceptions:
153
    ///
154
    /// - Returns a `ObjectId` that can be used to identify the `DataStream`
155
    ///   for later RPC requests.
156
    /// - Tells Arti not to wait for the stream to succeed or fail
157
    ///   over the Tor network.
158
    ///   (To wait for the stream to succeed or fail, use the appropriate method.)
159
    ///
160
    ///  (TODO RPC: Implement such a method!)
161
    pub fn open_stream_as_object(
162
        &self,
163
        on_object: Option<&ObjectId>,
164
        target: (&str, u16),
165
        isolation: &str,
166
    ) -> Result<(ObjectId, TcpStream), StreamError> {
167
        let on_object = self.resolve_on_object(on_object)?;
168
        let new_stream_request =
169
            Request::new(on_object.clone(), "arti:new_oneshot_client", NoParams {});
170
        let stream_id = self
171
            .execute_internal::<SingleIdReply>(&new_stream_request.encode()?)?
172
            .map_err(StreamError::NewStreamRejected)?
173
            .id;
174

            
175
        match self.open_stream(Some(&stream_id), target, isolation) {
176
            Ok(tcp_stream) => Ok((stream_id, tcp_stream)),
177
            Err(e) => {
178
                if let Err(_inner) = self.release_obj(stream_id) {
179
                    // TODO RPC: We should log this error or something
180
                }
181
                Err(e)
182
            }
183
        }
184
    }
185

            
186
    /// Open a new data stream, using Arti to connect anonymously to a given
187
    /// address and port.
188
    ///
189
    /// If `on_object` is provided, it must be a an ID for a  client-like RPC
190
    /// object that supports opening data streams.  If it is not provided,
191
    /// the data stream is opened relative to the current session.
192
    ///
193
    /// We tell Arti that the stream must not share
194
    /// a circuit with any other stream with a different value for `isolation`.
195
    /// (If your application doesn't care about isolating its streams from one another,
196
    /// it is acceptable to leave `isolation` as an empty string.)
197
    pub fn open_stream(
198
        &self,
199
        on_object: Option<&ObjectId>,
200
        (hostname, port): (&str, u16),
201
        isolation: &str,
202
    ) -> Result<TcpStream, StreamError> {
203
        let on_object = self.resolve_on_object(on_object)?;
204
        let socks_proxy_addr = self.lookup_socks_proxy_addr()?;
205
        let mut stream = TcpStream::connect(socks_proxy_addr)?;
206

            
207
        // For information about this encoding,
208
        // see https://spec.torproject.org/socks-extensions.html#extended-auth
209
        let username = format!("<torS0X>1{}", on_object.as_ref());
210
        let password = isolation;
211
        negotiate_socks(&mut stream, hostname, port, &username, password)?;
212

            
213
        Ok(stream)
214
    }
215

            
216
    /// Ask Arti for its supported SOCKS addresses; return the first one.
217
    //
218
    // TODO: Currently we call this every time we want to open a stream.
219
    // We could instead cache the value.
220
    fn lookup_socks_proxy_addr(&self) -> Result<SocketAddr, StreamError> {
221
        let session_id = self.session_id_required()?.clone();
222

            
223
        let proxy_info_request: Request<NoParams> =
224
            Request::new(session_id, "arti:get_rpc_proxy_info", NoParams {});
225
        let cmd = proxy_info_request.encode()?;
226
        let proxy_info = match self.execute_internal::<ProxyInfoReply>(&cmd)? {
227
            Ok(info) => info,
228
            Err(response) => {
229
                if response.decode().code() == RpcErrorCode::OBJECT_ERROR {
230
                    // TODO: This is an unfortunate error; it would be better
231
                    // to tolerate this situation.  See #1819.
232
                    return Err(StreamError::NoSession);
233
                } else {
234
                    return Err(response.internal_error(&cmd).into());
235
                }
236
            }
237
        };
238
        let socks_proxy_addr = proxy_info.find_socks_addr().ok_or(StreamError::NoProxy)?;
239

            
240
        Ok(socks_proxy_addr)
241
    }
242

            
243
    /// Helper: Return the session ID, or an error.
244
    fn session_id_required(&self) -> Result<&ObjectId, StreamError> {
245
        self.session().ok_or(StreamError::NotAuthenticated)
246
    }
247

            
248
    /// Helper: Return on_object if it's present, or the session ID otherwise.
249
    fn resolve_on_object(&self, on_object: Option<&ObjectId>) -> Result<ObjectId, StreamError> {
250
        Ok(match on_object {
251
            Some(obj) => obj.clone(),
252
            None => self.session_id_required()?.clone(),
253
        })
254
    }
255
}
256

            
257
/// Helper: Negotiate SOCKS5 on the provided stream, using the given parameters.
258
//
259
// NOTE: We could user `tor-socksproto` instead, but that pulls in a little more
260
// code unnecessarily, has features we don't need, and has to handle variations
261
// of SOCKS responses that we'll never see.
262
fn negotiate_socks(
263
    stream: &mut TcpStream,
264
    hostname: &str,
265
    port: u16,
266
    username: &str,
267
    password: &str,
268
) -> Result<(), StreamError> {
269
    use tor_socksproto::{
270
        Handshake as _, SocksAddr, SocksAuth, SocksClientHandshake, SocksCmd, SocksHostname,
271
        SocksRequest, SocksStatus, SocksVersion,
272
    };
273
    use StreamError as E;
274

            
275
    let request = SocksRequest::new(
276
        SocksVersion::V5,
277
        SocksCmd::CONNECT,
278
        SocksAddr::Hostname(SocksHostname::try_from(hostname.to_owned()).map_err(E::SocksRequest)?),
279
        port,
280
        SocksAuth::Username(
281
            username.to_owned().into_bytes(),
282
            password.to_owned().into_bytes(),
283
        ),
284
    )
285
    .map_err(E::SocksRequest)?;
286

            
287
    let mut buf = tor_socksproto::Buffer::new_precise();
288
    let mut state = SocksClientHandshake::new(request);
289
    let reply = loop {
290
        use tor_socksproto::NextStep as NS;
291
        match state.step(&mut buf).map_err(E::SocksProtocol)? {
292
            NS::Recv(mut recv) => {
293
                let n = stream.read(recv.buf())?;
294
                recv.note_received(n).map_err(E::SocksProtocol)?;
295
            }
296
            NS::Send(send) => stream.write_all(&send)?,
297
            NS::Finished(fin) => {
298
                break fin
299
                    .into_output()
300
                    .map_err(|bug| E::Internal(bug.report().to_string()))?
301
            }
302
        }
303
    };
304

            
305
    let status = reply.status();
306

            
307
    if status == SocksStatus::SUCCEEDED {
308
        Ok(())
309
    } else {
310
        Err(StreamError::SocksError(status))
311
    }
312
}
313

            
314
#[cfg(test)]
315
mod test {
316
    // @@ begin test lint list maintained by maint/add_warning @@
317
    #![allow(clippy::bool_assert_comparison)]
318
    #![allow(clippy::clone_on_copy)]
319
    #![allow(clippy::dbg_macro)]
320
    #![allow(clippy::mixed_attributes_style)]
321
    #![allow(clippy::print_stderr)]
322
    #![allow(clippy::print_stdout)]
323
    #![allow(clippy::single_char_pattern)]
324
    #![allow(clippy::unwrap_used)]
325
    #![allow(clippy::unchecked_duration_subtraction)]
326
    #![allow(clippy::useless_vec)]
327
    #![allow(clippy::needless_pass_by_value)]
328
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
329

            
330
    use super::*;
331

            
332
    #[test]
333
    fn unexpected_proxies() {
334
        let p: ProxyInfoReply = serde_json::from_str(
335
            r#"
336
               { "proxies" : [ {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}} ] }
337
            "#,
338
        )
339
        .unwrap();
340
        assert_eq!(p.proxies.len(), 1);
341
        match p.proxies[0].listener {
342
            ProxyListener::Socks5 {
343
                tcp_address: address,
344
            } => {
345
                assert_eq!(address.unwrap(), "127.0.0.1:9090".parse().unwrap());
346
            }
347
            ref other => panic!("{:?}", other),
348
        };
349

            
350
        let p: ProxyInfoReply = serde_json::from_str(
351
            r#"
352
               { "proxies" : [
353
                {"listener" : {"hypothetical" : {"tzitzel" : "buttered" }}},
354
                {"listener" : {"socks5" : {"unix_path" : "/home/username/.local/PROXY"}}},
355
                {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}},
356
                {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9999" }}}
357
               ] }
358
            "#,
359
        )
360
        .unwrap();
361
        assert_eq!(
362
            p.find_socks_addr().unwrap(),
363
            "127.0.0.1:9090".parse().unwrap()
364
        );
365
    }
366
}