1//! Support for RPC-visible connections through Arti.
23use std::{
4 io::{Error as IoError, Read as _, Write as _},
5 net::{SocketAddr, TcpStream},
6 sync::Arc,
7};
89use serde::Deserialize;
1011use super::{ErrorResponse, NoParams, RpcConn};
12use crate::{
13 msgs::{request::Request, response::RpcErrorCode},
14 ObjectId,
15};
1617use tor_error::ErrorReport as _;
1819/// An error encountered while trying to open a data stream.
20#[derive(Clone, Debug, thiserror::Error)]
21#[non_exhaustive]
22pub enum StreamError {
23/// One of the RPC methods that we invoked to create the stream failed.
24#[error("An error occurred while invoking RPC methods")]
25RpcMethods(#[from] super::ProtoError),
2627/// We weren't able to find a working proxy address.
28#[error("Request for proxy info rejected: {0}")]
29ProxyInfoRejected(ErrorResponse),
3031/// We weren't able to register a new stream ID.
32#[error("Request for new stream ID rejected: {0}")]
33NewStreamRejected(ErrorResponse),
3435/// We weren't able to release a new stream ID.
36#[error("Request for new stream ID rejected: {0}")]
37StreamReleaseRejected(ErrorResponse),
3839/// Tried to open a stream on an unauthenticated RPC connection.
40 ///
41 /// (At present (Sep 2024) there is no way to get an unauthenticated connection from
42 /// `arti-rpc-client-core`, but that may change in the future.)
43#[error("RPC connection not authenticated")]
44NotAuthenticated,
4546/// Tried to open a stream after having dropped the RPC session
47 ///
48 /// (At present (Jan 2025) dropping the RPC session is possible, but is not supported
49 /// when opening RPC streams.)
50#[error("Unable to access Session object")]
51NoSession,
5253/// We encountered an internal error.
54 /// (This should be impossible.)
55#[error("Internal error: {0}")]
56Internal(String),
5758/// No SOCKS proxies were listed in the server's reply.
59#[error("No SOCKS proxy available")]
60NoProxy,
6162/// We encountered an IO error while trying to connect to the
63 /// proxy or negotiate SOCKS.
64#[error("IO error")]
65Io(#[source] Arc<IoError>),
6667/// The generated SOCKS request was invalid.
68 ///
69 /// (Most likely, a provided isolation string or hostname was too long for the
70 /// authentication system to support.)
71#[error("Invalid SOCKS request")]
72SocksRequest(#[source] tor_socksproto::Error),
7374/// The other side did not speak socks, or did not speak socks in the format
75 /// we expected.
76#[error("SOCKS protocol violation")]
77SocksProtocol(#[source] tor_socksproto::Error),
7879/// The other side gave us a SOCKS error.
80#[error("SOCKS error code {0}")]
81SocksError(tor_socksproto::SocksStatus),
82}
8384impl From<IoError> for StreamError {
85fn from(e: IoError) -> Self {
86Self::Io(Arc::new(e))
87 }
88}
8990/// A response with a single ID.
91#[derive(Deserialize, Debug)]
92struct SingleIdReply {
93/// The object ID of the response.
94id: ObjectId,
95}
9697/// Representation of a single proxy, as delivered by the RPC API.
98// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
99#[derive(Deserialize, Clone, Debug)]
100pub(super) struct Proxy {
101/// Where the proxy is listening, and what protocol-specific options it expects.
102pub(super) listener: ProxyListener,
103}
104105/// Representation of a single proxy's listener location, as delivered by the RPC API.
106#[derive(Deserialize, Clone, Debug)]
107// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
108pub(super) enum ProxyListener {
109/// A SOCKS5 proxy.
110#[serde(rename = "socks5")]
111Socks5 {
112/// The address at which we're listening for SOCKS connections.
113tcp_address: Option<SocketAddr>,
114 },
115/// Some other (unrecognized) listener type.
116#[serde(untagged)]
117Unrecognized {},
118}
119120impl Proxy {
121/// If this is a SOCKS proxy, return its address.
122fn socks_addr(&self) -> Option<SocketAddr> {
123match self.listener {
124 ProxyListener::Socks5 { tcp_address } => tcp_address,
125 ProxyListener::Unrecognized {} => None,
126 }
127 }
128}
129130impl ProxyInfoReply {
131/// Choose a SOCKS5 address to use from this list of proxies.
132fn find_socks_addr(&self) -> Option<SocketAddr> {
133// We choose the first usable Proxy.
134self.proxies.iter().find_map(Proxy::socks_addr)
135 }
136}
137138/// A representation of the set of proxy addresses available from the RPC API.
139// TODO RPC: This is duplicated from proxyinfo.rs; decide on our strategy for this stuff.
140#[derive(Deserialize, Clone, Debug)]
141pub(super) struct ProxyInfoReply {
142/// A list of the supported proxies.
143 ///
144 /// (So far, only SOCKS proxies are listed, but other kinds may be listed in the future.)
145pub(super) proxies: Vec<Proxy>,
146}
147148impl RpcConn {
149/// Open a new data stream, registering the stream with the RPC system.
150 ///
151 /// Behaves the same as [`open_stream()`](RpcConn::open_stream),
152 /// with the following exceptions:
153 ///
154 /// - Returns a `ObjectId` that can be used to identify the `DataStream`
155 /// for later RPC requests.
156 /// - Tells Arti not to wait for the stream to succeed or fail
157 /// over the Tor network.
158 /// (To wait for the stream to succeed or fail, use the appropriate method.)
159 ///
160 /// (TODO RPC: Implement such a method!)
161pub fn open_stream_as_object(
162&self,
163 on_object: Option<&ObjectId>,
164 target: (&str, u16),
165 isolation: &str,
166 ) -> Result<(ObjectId, TcpStream), StreamError> {
167let on_object = self.resolve_on_object(on_object)?;
168let new_stream_request =
169 Request::new(on_object.clone(), "arti:new_oneshot_client", NoParams {});
170let stream_id = self
171.execute_internal::<SingleIdReply>(&new_stream_request.encode()?)?
172.map_err(StreamError::NewStreamRejected)?
173.id;
174175match self.open_stream(Some(&stream_id), target, isolation) {
176Ok(tcp_stream) => Ok((stream_id, tcp_stream)),
177Err(e) => {
178if let Err(_inner) = self.release_obj(stream_id) {
179// TODO RPC: We should log this error or something
180}
181Err(e)
182 }
183 }
184 }
185186/// Open a new data stream, using Arti to connect anonymously to a given
187 /// address and port.
188 ///
189 /// If `on_object` is provided, it must be a an ID for a client-like RPC
190 /// object that supports opening data streams. If it is not provided,
191 /// the data stream is opened relative to the current session.
192 ///
193 /// We tell Arti that the stream must not share
194 /// a circuit with any other stream with a different value for `isolation`.
195 /// (If your application doesn't care about isolating its streams from one another,
196 /// it is acceptable to leave `isolation` as an empty string.)
197pub fn open_stream(
198&self,
199 on_object: Option<&ObjectId>,
200 (hostname, port): (&str, u16),
201 isolation: &str,
202 ) -> Result<TcpStream, StreamError> {
203let on_object = self.resolve_on_object(on_object)?;
204let socks_proxy_addr = self.lookup_socks_proxy_addr()?;
205let mut stream = TcpStream::connect(socks_proxy_addr)?;
206207// For information about this encoding,
208 // see https://spec.torproject.org/socks-extensions.html#extended-auth
209let username = format!("<torS0X>1{}", on_object.as_ref());
210let password = isolation;
211 negotiate_socks(&mut stream, hostname, port, &username, password)?;
212213Ok(stream)
214 }
215216/// Ask Arti for its supported SOCKS addresses; return the first one.
217//
218 // TODO: Currently we call this every time we want to open a stream.
219 // We could instead cache the value.
220fn lookup_socks_proxy_addr(&self) -> Result<SocketAddr, StreamError> {
221let session_id = self.session_id_required()?.clone();
222223let proxy_info_request: Request<NoParams> =
224 Request::new(session_id, "arti:get_rpc_proxy_info", NoParams {});
225let cmd = proxy_info_request.encode()?;
226let proxy_info = match self.execute_internal::<ProxyInfoReply>(&cmd)? {
227Ok(info) => info,
228Err(response) => {
229if response.decode().code() == RpcErrorCode::OBJECT_ERROR {
230// TODO: This is an unfortunate error; it would be better
231 // to tolerate this situation. See #1819.
232return Err(StreamError::NoSession);
233 } else {
234return Err(response.internal_error(&cmd).into());
235 }
236 }
237 };
238let socks_proxy_addr = proxy_info.find_socks_addr().ok_or(StreamError::NoProxy)?;
239240Ok(socks_proxy_addr)
241 }
242243/// Helper: Return the session ID, or an error.
244fn session_id_required(&self) -> Result<&ObjectId, StreamError> {
245self.session().ok_or(StreamError::NotAuthenticated)
246 }
247248/// Helper: Return on_object if it's present, or the session ID otherwise.
249fn resolve_on_object(&self, on_object: Option<&ObjectId>) -> Result<ObjectId, StreamError> {
250Ok(match on_object {
251Some(obj) => obj.clone(),
252None => self.session_id_required()?.clone(),
253 })
254 }
255}
256257/// Helper: Negotiate SOCKS5 on the provided stream, using the given parameters.
258//
259// NOTE: We could user `tor-socksproto` instead, but that pulls in a little more
260// code unnecessarily, has features we don't need, and has to handle variations
261// of SOCKS responses that we'll never see.
262fn negotiate_socks(
263 stream: &mut TcpStream,
264 hostname: &str,
265 port: u16,
266 username: &str,
267 password: &str,
268) -> Result<(), StreamError> {
269use tor_socksproto::{
270 Handshake as _, SocksAddr, SocksAuth, SocksClientHandshake, SocksCmd, SocksHostname,
271 SocksRequest, SocksStatus, SocksVersion,
272 };
273use StreamError as E;
274275let request = SocksRequest::new(
276 SocksVersion::V5,
277 SocksCmd::CONNECT,
278 SocksAddr::Hostname(SocksHostname::try_from(hostname.to_owned()).map_err(E::SocksRequest)?),
279 port,
280 SocksAuth::Username(
281 username.to_owned().into_bytes(),
282 password.to_owned().into_bytes(),
283 ),
284 )
285 .map_err(E::SocksRequest)?;
286287let mut buf = tor_socksproto::Buffer::new_precise();
288let mut state = SocksClientHandshake::new(request);
289let reply = loop {
290use tor_socksproto::NextStep as NS;
291match state.step(&mut buf).map_err(E::SocksProtocol)? {
292 NS::Recv(mut recv) => {
293let n = stream.read(recv.buf())?;
294 recv.note_received(n).map_err(E::SocksProtocol)?;
295 }
296 NS::Send(send) => stream.write_all(&send)?,
297 NS::Finished(fin) => {
298break fin
299 .into_output()
300 .map_err(|bug| E::Internal(bug.report().to_string()))?
301}
302 }
303 };
304305let status = reply.status();
306307if status == SocksStatus::SUCCEEDED {
308Ok(())
309 } else {
310Err(StreamError::SocksError(status))
311 }
312}
313314#[cfg(test)]
315mod test {
316// @@ begin test lint list maintained by maint/add_warning @@
317#![allow(clippy::bool_assert_comparison)]
318 #![allow(clippy::clone_on_copy)]
319 #![allow(clippy::dbg_macro)]
320 #![allow(clippy::mixed_attributes_style)]
321 #![allow(clippy::print_stderr)]
322 #![allow(clippy::print_stdout)]
323 #![allow(clippy::single_char_pattern)]
324 #![allow(clippy::unwrap_used)]
325 #![allow(clippy::unchecked_duration_subtraction)]
326 #![allow(clippy::useless_vec)]
327 #![allow(clippy::needless_pass_by_value)]
328//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
329330use super::*;
331332#[test]
333fn unexpected_proxies() {
334let p: ProxyInfoReply = serde_json::from_str(
335r#"
336 { "proxies" : [ {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}} ] }
337 "#,
338 )
339 .unwrap();
340assert_eq!(p.proxies.len(), 1);
341match p.proxies[0].listener {
342 ProxyListener::Socks5 {
343 tcp_address: address,
344 } => {
345assert_eq!(address.unwrap(), "127.0.0.1:9090".parse().unwrap());
346 }
347ref other => panic!("{:?}", other),
348 };
349350let p: ProxyInfoReply = serde_json::from_str(
351r#"
352 { "proxies" : [
353 {"listener" : {"hypothetical" : {"tzitzel" : "buttered" }}},
354 {"listener" : {"socks5" : {"unix_path" : "/home/username/.local/PROXY"}}},
355 {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9090" }}},
356 {"listener" : {"socks5" : {"tcp_address" : "127.0.0.1:9999" }}}
357 ] }
358 "#,
359 )
360 .unwrap();
361assert_eq!(
362 p.find_socks_addr().unwrap(),
363"127.0.0.1:9090".parse().unwrap()
364 );
365 }
366}