arti_rpc_client_core/conn/
stream.rs

1//! Support for RPC-visible connections through Arti.
2
3use std::{
4    io::{Error as IoError, Read as _, Write as _},
5    net::{SocketAddr, TcpStream},
6    sync::Arc,
7};
8
9use serde::Deserialize;
10
11use super::{ErrorResponse, NoParams, RpcConn};
12use crate::{
13    msgs::{request::Request, response::RpcErrorCode},
14    ObjectId,
15};
16
17use tor_error::ErrorReport as _;
18
19/// An error encountered while trying to open a data stream.
20#[derive(Clone, Debug, thiserror::Error)]
21#[non_exhaustive]
22pub enum StreamError {
23    /// One of the RPC methods that we invoked to create the stream failed.
24    #[error("An error occurred while invoking RPC methods")]
25    RpcMethods(#[from] super::ProtoError),
26
27    /// We weren't able to find a working proxy address.
28    #[error("Request for proxy info rejected: {0}")]
29    ProxyInfoRejected(ErrorResponse),
30
31    /// We weren't able to register a new stream ID.
32    #[error("Request for new stream ID rejected: {0}")]
33    NewStreamRejected(ErrorResponse),
34
35    /// We weren't able to release a new stream ID.
36    #[error("Request for new stream ID rejected: {0}")]
37    StreamReleaseRejected(ErrorResponse),
38
39    /// Tried to open a stream on an unauthenticated RPC connection.
40    ///
41    /// (At present (Sep 2024) there is no way to get an unauthenticated connection from
42    /// `arti-rpc-client-core`, but that may change in the future.)
43    #[error("RPC connection not authenticated")]
44    NotAuthenticated,
45
46    /// Tried to open a stream after having dropped the RPC session
47    ///
48    /// (At present (Jan 2025) dropping the RPC session is possible, but is not supported
49    /// when opening RPC streams.)
50    #[error("Unable to access Session object")]
51    NoSession,
52
53    /// We encountered an internal error.
54    /// (This should be impossible.)
55    #[error("Internal error: {0}")]
56    Internal(String),
57
58    /// No SOCKS proxies were listed in the server's reply.
59    #[error("No SOCKS proxy available")]
60    NoProxy,
61
62    /// We encountered an IO error while trying to connect to the
63    /// proxy or negotiate SOCKS.
64    #[error("IO error")]
65    Io(#[source] Arc<IoError>),
66
67    /// The generated SOCKS request was invalid.
68    ///
69    /// (Most likely, a provided isolation string or hostname was too long for the
70    /// authentication system to support.)
71    #[error("Invalid SOCKS request")]
72    SocksRequest(#[source] tor_socksproto::Error),
73
74    /// The other side did not speak socks, or did not speak socks in the format
75    /// we expected.
76    #[error("SOCKS protocol violation")]
77    SocksProtocol(#[source] tor_socksproto::Error),
78
79    /// The other side gave us a SOCKS error.
80    #[error("SOCKS error code {0}")]
81    SocksError(tor_socksproto::SocksStatus),
82}
83
84impl From<IoError> for StreamError {
85    fn from(e: IoError) -> Self {
86        Self::Io(Arc::new(e))
87    }
88}
89
90/// A response with a single ID.
91#[derive(Deserialize, Debug)]
92struct SingleIdReply {
93    /// The object ID of the response.
94    id: ObjectId,
95}
96
97/// Representation of a single proxy, as delivered by the RPC API.
98// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
99#[derive(Deserialize, Clone, Debug)]
100pub(super) struct Proxy {
101    /// Where the proxy is listening, and what protocol-specific options it expects.
102    pub(super) listener: ProxyListener,
103}
104
105/// Representation of a single proxy's listener location, as delivered by the RPC API.
106#[derive(Deserialize, Clone, Debug)]
107// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
108pub(super) enum ProxyListener {
109    /// A SOCKS5 proxy.
110    #[serde(rename = "socks5")]
111    Socks5 {
112        /// The address at which we're listening for SOCKS connections.
113        tcp_address: Option<SocketAddr>,
114    },
115    /// Some other (unrecognized) listener type.
116    #[serde(untagged)]
117    Unrecognized {},
118}
119
120impl Proxy {
121    /// If this is a SOCKS proxy, return its address.
122    fn socks_addr(&self) -> Option<SocketAddr> {
123        match self.listener {
124            ProxyListener::Socks5 { tcp_address } => tcp_address,
125            ProxyListener::Unrecognized {} => None,
126        }
127    }
128}
129
130impl ProxyInfoReply {
131    /// Choose a SOCKS5 address to use from this list of proxies.
132    fn find_socks_addr(&self) -> Option<SocketAddr> {
133        // We choose the first usable Proxy.
134        self.proxies.iter().find_map(Proxy::socks_addr)
135    }
136}
137
138/// A representation of the set of proxy addresses available from the RPC API.
139// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
140#[derive(Deserialize, Clone, Debug)]
141pub(super) struct ProxyInfoReply {
142    /// A list of the supported proxies.
143    ///
144    /// (So far, only SOCKS proxies are listed, but other kinds may be listed in the future.)
145    pub(super) proxies: Vec<Proxy>,
146}
147
148impl RpcConn {
149    /// Open a new data stream, registering the stream with the RPC system.
150    ///
151    /// Behaves the same as [`open_stream()`](RpcConn::open_stream),
152    /// with the following exceptions:
153    ///
154    /// - Returns a `ObjectId` that can be used to identify the `DataStream`
155    ///   for later RPC requests.
156    /// - Tells Arti not to wait for the stream to succeed or fail
157    ///   over the Tor network.
158    ///   (To wait for the stream to succeed or fail, use the appropriate method.)
159    ///
160    ///  (TODO RPC: Implement such a method!)
161    pub fn open_stream_as_object(
162        &self,
163        on_object: Option<&ObjectId>,
164        target: (&str, u16),
165        isolation: &str,
166    ) -> Result<(ObjectId, TcpStream), StreamError> {
167        let on_object = self.resolve_on_object(on_object)?;
168        let new_stream_request =
169            Request::new(on_object.clone(), "arti:new_oneshot_client", NoParams {});
170        let stream_id = self
171            .execute_internal::<SingleIdReply>(&new_stream_request.encode()?)?
172            .map_err(StreamError::NewStreamRejected)?
173            .id;
174
175        match self.open_stream(Some(&stream_id), target, isolation) {
176            Ok(tcp_stream) => Ok((stream_id, tcp_stream)),
177            Err(e) => {
178                if let Err(_inner) = self.release_obj(stream_id) {
179                    // TODO RPC: We should log this error or something
180                }
181                Err(e)
182            }
183        }
184    }
185
186    /// Open a new data stream, using Arti to connect anonymously to a given
187    /// address and port.
188    ///
189    /// If `on_object` is provided, it must be a an ID for a  client-like RPC
190    /// object that supports opening data streams.  If it is not provided,
191    /// the data stream is opened relative to the current session.
192    ///
193    /// We tell Arti that the stream must not share
194    /// a circuit with any other stream with a different value for `isolation`.
195    /// (If your application doesn't care about isolating its streams from one another,
196    /// it is acceptable to leave `isolation` as an empty string.)
197    pub fn open_stream(
198        &self,
199        on_object: Option<&ObjectId>,
200        (hostname, port): (&str, u16),
201        isolation: &str,
202    ) -> Result<TcpStream, StreamError> {
203        let on_object = self.resolve_on_object(on_object)?;
204        let socks_proxy_addr = self.lookup_socks_proxy_addr()?;
205        let mut stream = TcpStream::connect(socks_proxy_addr)?;
206
207        // For information about this encoding,
208        // see https://spec.torproject.org/socks-extensions.html#extended-auth
209        let username = format!("<torS0X>1{}", on_object.as_ref());
210        let password = isolation;
211        negotiate_socks(&mut stream, hostname, port, &username, password)?;
212
213        Ok(stream)
214    }
215
216    /// Ask Arti for its supported SOCKS addresses; return the first one.
217    //
218    // TODO: Currently we call this every time we want to open a stream.
219    // We could instead cache the value.
220    fn lookup_socks_proxy_addr(&self) -> Result<SocketAddr, StreamError> {
221        let session_id = self.session_id_required()?.clone();
222
223        let proxy_info_request: Request<NoParams> =
224            Request::new(session_id, "arti:get_rpc_proxy_info", NoParams {});
225        let cmd = proxy_info_request.encode()?;
226        let proxy_info = match self.execute_internal::<ProxyInfoReply>(&cmd)? {
227            Ok(info) => info,
228            Err(response) => {
229                if response.decode().code() == RpcErrorCode::OBJECT_ERROR {
230                    // TODO: This is an unfortunate error; it would be better
231                    // to tolerate this situation.  See #1819.
232                    return Err(StreamError::NoSession);
233                } else {
234                    return Err(response.internal_error(&cmd).into());
235                }
236            }
237        };
238        let socks_proxy_addr = proxy_info.find_socks_addr().ok_or(StreamError::NoProxy)?;
239
240        Ok(socks_proxy_addr)
241    }
242
243    /// Helper: Return the session ID, or an error.
244    fn session_id_required(&self) -> Result<&ObjectId, StreamError> {
245        self.session().ok_or(StreamError::NotAuthenticated)
246    }
247
248    /// Helper: Return on_object if it's present, or the session ID otherwise.
249    fn resolve_on_object(&self, on_object: Option<&ObjectId>) -> Result<ObjectId, StreamError> {
250        Ok(match on_object {
251            Some(obj) => obj.clone(),
252            None => self.session_id_required()?.clone(),
253        })
254    }
255}
256
257/// Helper: Negotiate SOCKS5 on the provided stream, using the given parameters.
258//
259// NOTE: We could user `tor-socksproto` instead, but that pulls in a little more
260// code unnecessarily, has features we don't need, and has to handle variations
261// of SOCKS responses that we'll never see.
262fn negotiate_socks(
263    stream: &mut TcpStream,
264    hostname: &str,
265    port: u16,
266    username: &str,
267    password: &str,
268) -> Result<(), StreamError> {
269    use tor_socksproto::{
270        Handshake as _, SocksAddr, SocksAuth, SocksClientHandshake, SocksCmd, SocksHostname,
271        SocksRequest, SocksStatus, SocksVersion,
272    };
273    use StreamError as E;
274
275    let request = SocksRequest::new(
276        SocksVersion::V5,
277        SocksCmd::CONNECT,
278        SocksAddr::Hostname(SocksHostname::try_from(hostname.to_owned()).map_err(E::SocksRequest)?),
279        port,
280        SocksAuth::Username(
281            username.to_owned().into_bytes(),
282            password.to_owned().into_bytes(),
283        ),
284    )
285    .map_err(E::SocksRequest)?;
286
287    let mut buf = tor_socksproto::Buffer::new_precise();
288    let mut state = SocksClientHandshake::new(request);
289    let reply = loop {
290        use tor_socksproto::NextStep as NS;
291        match state.step(&mut buf).map_err(E::SocksProtocol)? {
292            NS::Recv(mut recv) => {
293                let n = stream.read(recv.buf())?;
294                recv.note_received(n).map_err(E::SocksProtocol)?;
295            }
296            NS::Send(send) => stream.write_all(&send)?,
297            NS::Finished(fin) => {
298                break fin
299                    .into_output()
300                    .map_err(|bug| E::Internal(bug.report().to_string()))?
301            }
302        }
303    };
304
305    let status = reply.status();
306
307    if status == SocksStatus::SUCCEEDED {
308        Ok(())
309    } else {
310        Err(StreamError::SocksError(status))
311    }
312}
313
314#[cfg(test)]
315mod test {
316    // @@ begin test lint list maintained by maint/add_warning @@
317    #![allow(clippy::bool_assert_comparison)]
318    #![allow(clippy::clone_on_copy)]
319    #![allow(clippy::dbg_macro)]
320    #![allow(clippy::mixed_attributes_style)]
321    #![allow(clippy::print_stderr)]
322    #![allow(clippy::print_stdout)]
323    #![allow(clippy::single_char_pattern)]
324    #![allow(clippy::unwrap_used)]
325    #![allow(clippy::unchecked_duration_subtraction)]
326    #![allow(clippy::useless_vec)]
327    #![allow(clippy::needless_pass_by_value)]
328    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
329
330    use super::*;
331
332    #[test]
333    fn unexpected_proxies() {
334        let p: ProxyInfoReply = serde_json::from_str(
335            r#"
336               { "proxies" : [ {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}} ] }
337            "#,
338        )
339        .unwrap();
340        assert_eq!(p.proxies.len(), 1);
341        match p.proxies[0].listener {
342            ProxyListener::Socks5 {
343                tcp_address: address,
344            } => {
345                assert_eq!(address.unwrap(), "127.0.0.1:9090".parse().unwrap());
346            }
347            ref other => panic!("{:?}", other),
348        };
349
350        let p: ProxyInfoReply = serde_json::from_str(
351            r#"
352               { "proxies" : [
353                {"listener" : {"hypothetical" : {"tzitzel" : "buttered" }}},
354                {"listener" : {"socks5" : {"unix_path" : "/home/username/.local/PROXY"}}},
355                {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}},
356                {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9999" }}}
357               ] }
358            "#,
359        )
360        .unwrap();
361        assert_eq!(
362            p.find_socks_addr().unwrap(),
363            "127.0.0.1:9090".parse().unwrap()
364        );
365    }
366}