arti/
socks.rs

1//! Implement a simple SOCKS proxy that relays connections over Tor.
2//!
3//! A proxy is launched with [`run_socks_proxy()`], which listens for new
4//! connections and then runs
5
6use futures::future::FutureExt;
7use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error as IoError};
8use futures::stream::StreamExt;
9use futures::task::SpawnExt;
10use safelog::sensitive;
11use std::io::Result as IoResult;
12use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
13use std::sync::Arc;
14use tracing::{debug, error, info, warn};
15
16#[allow(unused)]
17use arti_client::HasKind;
18use arti_client::{ErrorKind, IntoTorAddr as _, StreamPrefs, TorClient};
19#[cfg(feature = "rpc")]
20use arti_rpcserver::RpcMgr;
21use tor_config::Listen;
22use tor_error::warn_report;
23#[cfg(feature = "rpc")]
24use tor_rpcbase::{self as rpc};
25use tor_rtcompat::{NetStreamListener, Runtime};
26use tor_socksproto::{Handshake as _, SocksAddr, SocksAuth, SocksCmd, SocksRequest, SOCKS_BUF_LEN};
27
28use anyhow::{anyhow, Context, Result};
29
30use crate::rpc::RpcProxySupport;
31
32/// Placeholder type when RPC is disabled at compile time.
33#[cfg(not(feature = "rpc"))]
34#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
35pub(crate) enum RpcMgr {}
36
37/// Payload to return when an HTTP connection arrive on a Socks port
38const WRONG_PROTOCOL_PAYLOAD: &[u8] = br#"HTTP/1.0 501 Tor is not an HTTP Proxy
39Content-Type: text/html; charset=utf-8
40
41<!DOCTYPE html>
42<html>
43<head>
44<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
45</head>
46<body>
47<h1>This is a SOCKS proxy, not an HTTP proxy.</h1>
48<p>
49It appears you have configured your web browser to use this Tor port as
50an HTTP proxy.
51</p>
52<p>
53This is not correct: This port is configured as a SOCKS proxy, not
54an HTTP proxy. If you need an HTTP proxy tunnel, wait for Arti to
55add support for it in place of, or in addition to, socks_port.
56Please configure your client accordingly.
57</p>
58<p>
59See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
60</p>
61</body>
62</html>"#;
63
64/// Find out which kind of address family we can/should use for a
65/// given `SocksRequest`.
66#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
67fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
68    let mut prefs = StreamPrefs::new();
69    if addr.parse::<Ipv4Addr>().is_ok() {
70        // If they asked for an IPv4 address correctly, nothing else will do.
71        prefs.ipv4_only();
72    } else if addr.parse::<Ipv6Addr>().is_ok() {
73        // If they asked for an IPv6 address correctly, nothing else will do.
74        prefs.ipv6_only();
75    } else if req.version() == tor_socksproto::SocksVersion::V4 {
76        // SOCKS4 and SOCKS4a only support IPv4
77        prefs.ipv4_only();
78    } else {
79        // Otherwise, default to saying IPv4 is preferred.
80        prefs.ipv4_preferred();
81    }
82    prefs
83}
84
85/// A Key used to isolate connections.
86///
87/// Composed of an usize (representing which listener socket accepted
88/// the connection, the source IpAddr of the client, and the
89/// authentication string provided by the client).
90#[derive(Debug, Clone, PartialEq, Eq)]
91struct SocksIsolationKey(ConnIsolation, ProvidedIsolation);
92/// Isolation information provided through the socks connection
93#[derive(Debug, Clone, PartialEq, Eq)]
94enum ProvidedIsolation {
95    /// The socks isolation itself.
96    Legacy(SocksAuth),
97    /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
98    Extended {
99        /// Which format was negotiated?
100        ///
101        /// (At present, different format codes can't share a circuit.)
102        format_code: u8,
103        /// What's the isolation string?
104        isolation: Box<[u8]>,
105    },
106}
107
108impl arti_client::isolation::IsolationHelper for SocksIsolationKey {
109    fn compatible_same_type(&self, other: &Self) -> bool {
110        self == other
111    }
112
113    fn join_same_type(&self, other: &Self) -> Option<Self> {
114        if self == other {
115            Some(self.clone())
116        } else {
117            None
118        }
119    }
120}
121
122/// The meaning of a SOCKS authentication field, according to our conventions.
123struct AuthInterpretation {
124    /// Associate this stream with a DataStream created by using a particular RPC object
125    /// as a Tor client.
126    #[cfg(feature = "rpc")]
127    rpc_object: Option<rpc::ObjectId>,
128
129    /// Isolate this stream from other streams that do not have the same
130    /// value.
131    isolation: ProvidedIsolation,
132}
133
134/// NOTE: The following documentation belongs in a spec.
135/// But for now, it's our best attempt to document the design and protocol
136/// implemented here
137/// for integrating SOCKS with our RPC system. --nickm
138///
139/// Roughly speaking:
140///
141/// ## Key concepts
142///
143/// A data stream is "RPC-visible" if, when it is created via SOCKS,
144/// the RPC system is told about it.
145///
146/// Every RPC-visible stream is associated with a given RPC object when it is created.
147/// (Since the RPC object is being specified in the SOCKS protocol,
148/// it must be one with an externally visible Object ID.
149/// Such Object IDs are cryptographically unguessable and unforgeable,
150/// and are qualified with a unique identifier for their associated RPC session.)
151/// Call this RPC Object the "target" object for now.
152/// This target RPC object must implement
153/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
154///
155/// Right now, there are two general kinds of objects that implement this method:
156/// client-like objects, and one-shot clients.
157///
158/// A client-like object is either a `TorClient` or an RPC `Session`.
159/// It knows about and it is capable of opening multiple data streams.
160/// Using it as the target object for a SOCKS connection tells Arti
161/// that the resulting data stream (if any)
162/// should be built by it, and associated with its RPC session.
163///
164/// An application gets a TorClient by asking the session for one,
165/// or for asking a TorClient to give you a new variant clone of itself.
166///
167/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
168/// It is created from a client-like object, but can only be used for a single data stream.
169/// When created, it it not yet connected or trying to connect to anywhere:
170/// the act of using it as the target Object for a SOCKS connection causes
171/// it to begin connecting.
172///
173/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
174/// on any client-like object.
175///
176/// ## The SOCKS protocol
177///
178/// See the specification for
179/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
180/// for full details.
181///
182/// ### Further restrictions on Object IDs and isolation
183///
184/// In some cases,
185/// the RPC Object ID may denote an object
186/// that already includes information about its intended stream isolation.
187/// In such cases, the stream isolation MUST be blank.
188/// Implementations MUST reject non-blank stream isolation in such cases.
189///
190/// In some cases, the RPC object ID may denote an object
191/// that already includes information
192/// about its intended destination address and port.
193/// In such cases, the destination address MUST be `0.0.0.0` or `::`
194/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
195/// and the destination port MUST be 0.
196/// Implementations MUST reject other addresses in such cases.
197///
198/// ### Another proposed change
199///
200/// We could add a new method to clients, with a name like
201/// "open_stream" or "connect_stream".
202/// This method would include all target and isolation information in its parameters.
203/// It would actually create a DataStream immediately, tell it to begin connecting,
204/// and return an externally visible object ID.
205/// The RPC protocol could be used to watch the DataStream object,
206/// to see when it was connected.
207///
208/// The resulting DataStream object could also be used as the target of a SOCKS connection.
209/// We would require in such a case that no isolation be provided in the SOCKS handshake,
210/// and that the target address was (e.g.) INADDR_ANY.
211///
212/// ## Intended use cases (examples)
213///
214/// (These examples assume that the application
215/// already knows the SOCKS port it should use.
216/// I'm leaving out the isolation strings as orthogonal.)
217///
218/// These are **NOT** the only possible use cases;
219/// they're just the two that help understand this system best (I hope).
220///
221/// ### Case 1: Using a client-like object directly.
222///
223/// Here the application has authenticated to RPC
224/// and gotten the session ID `SESSION-1`.
225/// (In reality, this would be a longer ID, and full of crypto).
226///
227/// The application wants to open a new stream to www.example.com.
228/// They don't particularly care about isolation,
229/// but they do want their stream to use their RPC session.
230/// They don't want an Object ID for the stream.
231///
232/// To do this, they make a SOCKS connection to arti,
233/// with target address www.example.com.
234/// They set the username to `<torS0X>0SESSION-1`,
235/// and the password to the empty string.
236///
237/// Arti looks up the Session object via the `SESSION-1` object ID
238/// and tells it (via the ConnectWithPrefs special method)
239/// to connect to www.example.com.
240/// The session creates a new DataStream using its internal TorClient,
241/// but does not register the stream with an RPC Object ID.
242/// Arti proxies the application's SOCKS connection through this DataStream.
243///
244///
245/// ### Case 2: Creating an identifiable stream.
246///
247/// Here the application wants to be able to refer to its DataStream
248/// after the stream is created.
249/// As before, we assume that it's on an RPC session
250/// where the Session ID is `SESSION-1`.
251///
252/// The application sends an RPC request of the form:
253/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
254///
255/// It receives a reply like:
256/// `{"id": 123, "result": {"id": "STREAM-1"} }`
257///
258/// (In reality, `STREAM-1` would also be longer and full of crypto.)
259///
260/// Now the application has an object called `STREAM-1` that is not yet a connected
261/// stream, but which may become one.
262///
263/// This time, it wants to set its isolation string to "xyzzy".
264///
265/// The application opens a socks connection as before.
266/// For the username it sends `<torS0X>0STREAM-1`,
267/// and for the password it sends `xyzzy`.
268///
269/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
270/// and tells it (via the ConnectWithPrefs special method)
271/// to connect to www.example.com.
272/// This causes the `RpcDataStream` internally to create a new `DataStream`,
273/// and to store that `DataStream` in itself.
274/// The `RpcDataStream` with Object ID `STREAM-1`
275/// is now an alias for the newly created `DataStream`.
276/// Arti proxies the application's SOCKS connection through that `DataStream`.
277///
278#[cfg(feature = "rpc")]
279#[allow(dead_code)]
280mod socks_and_rpc {}
281
282/// Given the authentication object from a socks connection, determine what it's telling
283/// us to do.
284///
285/// (In no case is it actually SOCKS authentication: it can either be a message
286/// to the stream isolation system or the RPC system.)
287fn interpret_socks_auth(auth: &SocksAuth) -> Result<AuthInterpretation> {
288    /// Interpretation of a SOCKS5 username according to
289    /// the [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
290    /// specification.
291    enum Uname<'a> {
292        /// This is a legacy username; it's just part of the
293        /// isolation information.
294        //
295        // Note: We're not actually throwing away the username here;
296        // instead we're going to use the whole SocksAuth
297        // in a `ProvidedAuthentication::Legacy``.
298        // TODO RPC: Find a more idiomatic way to express this data flow.
299        Legacy,
300        /// This is using the socks extension: contains the extension
301        /// format code and the remaining information from the username.
302        Extended(u8, &'a [u8]),
303    }
304    /// Helper: Try to interpret a SOCKS5 username field as indicating the start of a set of
305    /// extended socks authentication information.
306    ///
307    /// Implements [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
308    ///
309    /// If it does indicate that extensions are in use,
310    /// return a `Uname::Extended` containing
311    /// the extension format type and the remaining information from the username.
312    ///
313    /// If it indicates that no extensions are in use,
314    /// return `Uname::Legacy`.
315    ///
316    /// If it is badly formatted, return an error.
317    fn interpret_socks5_username(username: &[u8]) -> Result<Uname<'_>> {
318        /// 8-byte "magic" sequence from
319        /// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
320        /// When it appears at the start of a username,
321        /// indicates that the username/password are to be interpreted as
322        /// as encoding SOCKS5 extended parameters,
323        /// but the format might not be one we recognize.
324        const SOCKS_EXT_CONST_ANY: &[u8] = b"<torS0X>";
325        let Some(remainder) = username.strip_prefix(SOCKS_EXT_CONST_ANY) else {
326            return Ok(Uname::Legacy);
327        };
328        if remainder.is_empty() {
329            return Err(anyhow!("Extended SOCKS information without format code."));
330        }
331        // TODO MSRV 1.80: use split_at_checked instead.
332        // This won't panic since we checked for an empty string above.
333        let (format_code, remainder) = remainder.split_at(1);
334        Ok(Uname::Extended(format_code[0], remainder))
335    }
336
337    let isolation = match auth {
338        SocksAuth::Username(user, pass) => match interpret_socks5_username(user)? {
339            Uname::Legacy => ProvidedIsolation::Legacy(auth.clone()),
340            Uname::Extended(b'1', b"") => {
341                return Err(anyhow!("Received empty RPC object ID"));
342            }
343            Uname::Extended(format_code @ b'1', remainder) => {
344                #[cfg(not(feature = "rpc"))]
345                return Err(anyhow!(
346                    "Received RPC object ID, but not built with support for RPC"
347                ));
348                #[cfg(feature = "rpc")]
349                return Ok(AuthInterpretation {
350                    rpc_object: Some(rpc::ObjectId::from(
351                        std::str::from_utf8(remainder).context("Rpc object ID was not utf-8")?,
352                    )),
353                    isolation: ProvidedIsolation::Extended {
354                        format_code,
355                        isolation: pass.clone().into(),
356                    },
357                });
358            }
359            Uname::Extended(format_code @ b'0', b"") => ProvidedIsolation::Extended {
360                format_code,
361                isolation: pass.clone().into(),
362            },
363            Uname::Extended(b'0', _) => {
364                return Err(anyhow!("Extraneous information in SOCKS username field."))
365            }
366            _ => return Err(anyhow!("Unrecognized SOCKS format code")),
367        },
368        _ => ProvidedIsolation::Legacy(auth.clone()),
369    };
370
371    Ok(AuthInterpretation {
372        #[cfg(feature = "rpc")]
373        rpc_object: None,
374        isolation,
375    })
376}
377
378/// Information used to implement a SOCKS connection.
379struct SocksConnContext<R: Runtime> {
380    /// A TorClient to use (by default) to anonymize requests.
381    tor_client: TorClient<R>,
382    /// If present, an RpcMgr to use when for attaching requests to RPC
383    /// sessions.
384    #[cfg(feature = "rpc")]
385    rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
386}
387
388/// Type alias for the isolation information associated with a given SOCKS
389/// connection _before_ SOCKS is negotiated.
390///
391/// Currently this is an index for which listener accepted the connection, plus
392/// the address of the client that connected to the Socks port.
393type ConnIsolation = (usize, IpAddr);
394
395cfg_if::cfg_if! {
396    if #[cfg(feature="rpc")] {
397        use crate::rpc::conntarget::ConnTarget;
398    } else {
399        /// A type returned by get_prefs_and_session,
400        /// and used to launch data streams or resolve attempts.
401        ///
402        /// TODO RPC: This is quite ugly; we should do something better.
403        /// At least, we should never expose this outside the socks module.
404        type ConnTarget<R> = TorClient<R>;
405    }
406}
407
408impl<R: Runtime> SocksConnContext<R> {
409    /// Interpret a SOCKS request and our input information to determine which
410    /// TorClient / ClientConnectionTarget object and StreamPrefs we should use.
411    ///
412    /// TODO RPC: The return type here is a bit ugly.
413    fn get_prefs_and_session(
414        &self,
415        request: &SocksRequest,
416        target_addr: &str,
417        conn_isolation: ConnIsolation,
418    ) -> Result<(StreamPrefs, ConnTarget<R>)> {
419        // Determine whether we want to ask for IPv4/IPv6 addresses.
420        let mut prefs = stream_preference(request, target_addr);
421
422        // Interpret socks authentication to see whether we want to connect to an RPC connector.
423        let interp = interpret_socks_auth(request.auth())?;
424        prefs.set_isolation(SocksIsolationKey(conn_isolation, interp.isolation));
425
426        #[cfg(feature = "rpc")]
427        if let Some(session) = interp.rpc_object {
428            if let Some(mgr) = &self.rpc_mgr {
429                let (context, object) = mgr
430                    .lookup_object(&session)
431                    .context("no such session found")?;
432                let target = ConnTarget::Rpc { context, object };
433                return Ok((prefs, target));
434            } else {
435                return Err(anyhow!("no rpc manager found!?"));
436            }
437        }
438
439        let client = self.tor_client.clone();
440        #[cfg(feature = "rpc")]
441        let client = ConnTarget::Client(Box::new(client));
442
443        Ok((prefs, client))
444    }
445}
446
447/// Given a just-received TCP connection `S` on a SOCKS port, handle the
448/// SOCKS handshake and relay the connection over the Tor network.
449///
450/// Uses `isolation_info` to decide which circuits this connection
451/// may use.  Requires that `isolation_info` is a pair listing the listener
452/// id and the source address for the socks request.
453#[allow(clippy::cognitive_complexity)] // TODO: Refactor
454async fn handle_socks_conn<R, S>(
455    runtime: R,
456    context: SocksConnContext<R>,
457    mut socks_stream: S,
458    isolation_info: ConnIsolation,
459) -> Result<()>
460where
461    R: Runtime,
462    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
463{
464    // Part 1: Perform the SOCKS handshake, to learn where we are
465    // being asked to connect, and what we're being asked to do once
466    // we connect there.
467    //
468    // The SOCKS handshake can require multiple round trips (SOCKS5
469    // always does) so we we need to run this part of the process in a
470    // loop.
471    let mut handshake = tor_socksproto::SocksProxyHandshake::new();
472
473    let mut inbuf = tor_socksproto::Buffer::new();
474    let request = loop {
475        use tor_socksproto::NextStep as NS;
476
477        let rv = handshake.step(&mut inbuf);
478
479        let step = match rv {
480            Err(e) => {
481                if let tor_socksproto::Error::BadProtocol(version) = e {
482                    // check for HTTP methods: CONNECT, DELETE, GET, HEAD, OPTION, PUT, POST, PATCH and
483                    // TRACE.
484                    // To do so, check the first byte of the connection, which happen to be placed
485                    // where SOCKs version field is.
486                    if [b'C', b'D', b'G', b'H', b'O', b'P', b'T'].contains(&version) {
487                        write_all_and_close(&mut socks_stream, WRONG_PROTOCOL_PAYLOAD).await?;
488                    }
489                }
490                // if there is an handshake error, don't reply with a Socks error, remote does not
491                // seems to speak Socks.
492                return Err(e.into());
493            }
494            Ok(y) => y,
495        };
496
497        match step {
498            NS::Recv(mut recv) => {
499                let n = socks_stream
500                    .read(recv.buf())
501                    .await
502                    .context("Error while reading SOCKS handshake")?;
503                recv.note_received(n)?;
504            }
505            NS::Send(data) => write_all_and_flush(&mut socks_stream, &data).await?,
506            NS::Finished(fin) => break fin.into_output_forbid_pipelining()?,
507        }
508    };
509
510    // Unpack the socks request and find out where we're connecting to.
511    let addr = request.addr().to_string();
512    let port = request.port();
513    debug!(
514        "Got a socks request: {} {}:{}",
515        request.command(),
516        sensitive(&addr),
517        port
518    );
519
520    let (prefs, tor_client) = context.get_prefs_and_session(&request, &addr, isolation_info)?;
521
522    match request.command() {
523        SocksCmd::CONNECT => {
524            // The SOCKS request wants us to connect to a given address.
525            // So, launch a connection over Tor.
526            let tor_addr = (addr.clone(), port).into_tor_addr()?;
527            let tor_stream = tor_client.connect_with_prefs(&tor_addr, &prefs).await;
528            let tor_stream = match tor_stream {
529                Ok(s) => s,
530                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
531            };
532            // Okay, great! We have a connection over the Tor network.
533            debug!("Got a stream for {}:{}", sensitive(&addr), port);
534
535            // Send back a SOCKS response, telling the client that it
536            // successfully connected.
537            let reply = request
538                .reply(tor_socksproto::SocksStatus::SUCCEEDED, None)
539                .context("Encoding socks reply")?;
540            write_all_and_flush(&mut socks_stream, &reply[..]).await?;
541
542            let (socks_r, socks_w) = socks_stream.split();
543            let (tor_r, tor_w) = tor_stream.split();
544
545            // Finally, spawn two background tasks to relay traffic between
546            // the socks stream and the tor stream.
547            runtime.spawn(copy_interactive(socks_r, tor_w).map(|_| ()))?;
548            runtime.spawn(copy_interactive(tor_r, socks_w).map(|_| ()))?;
549        }
550        SocksCmd::RESOLVE => {
551            // We've been asked to perform a regular hostname lookup.
552            // (This is a tor-specific SOCKS extension.)
553
554            let addr = if let Ok(addr) = addr.parse() {
555                // if this is a valid ip address, just parse it and reply.
556                Ok(addr)
557            } else {
558                tor_client
559                    .resolve_with_prefs(&addr, &prefs)
560                    .await
561                    .map_err(|e| e.kind())
562                    .and_then(|addrs| addrs.first().copied().ok_or(ErrorKind::Other))
563            };
564            match addr {
565                Ok(addr) => {
566                    let reply = request
567                        .reply(
568                            tor_socksproto::SocksStatus::SUCCEEDED,
569                            Some(&SocksAddr::Ip(addr)),
570                        )
571                        .context("Encoding socks reply")?;
572                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
573                }
574                Err(e) => return reply_error(&mut socks_stream, &request, e).await,
575            }
576        }
577        SocksCmd::RESOLVE_PTR => {
578            // We've been asked to perform a reverse hostname lookup.
579            // (This is a tor-specific SOCKS extension.)
580            let addr: IpAddr = match addr.parse() {
581                Ok(ip) => ip,
582                Err(e) => {
583                    let reply = request
584                        .reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None)
585                        .context("Encoding socks reply")?;
586                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
587                    return Err(anyhow!(e));
588                }
589            };
590            let hosts = match tor_client.resolve_ptr_with_prefs(addr, &prefs).await {
591                Ok(hosts) => hosts,
592                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
593            };
594            if let Some(host) = hosts.into_iter().next() {
595                // this conversion should never fail, legal DNS names len must be <= 253 but Socks
596                // names can be up to 255 chars.
597                let hostname = SocksAddr::Hostname(host.try_into()?);
598                let reply = request
599                    .reply(tor_socksproto::SocksStatus::SUCCEEDED, Some(&hostname))
600                    .context("Encoding socks reply")?;
601                write_all_and_close(&mut socks_stream, &reply[..]).await?;
602            }
603        }
604        _ => {
605            // We don't support this SOCKS command.
606            warn!("Dropping request; {:?} is unsupported", request.command());
607            let reply = request
608                .reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None)
609                .context("Encoding socks reply")?;
610            write_all_and_close(&mut socks_stream, &reply[..]).await?;
611        }
612    };
613
614    // TODO: we should close the TCP stream if either task fails. Do we?
615    // See #211 and #190.
616
617    Ok(())
618}
619
620/// write_all the data to the writer & flush the writer if write_all is successful.
621async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
622where
623    W: AsyncWrite + Unpin,
624{
625    writer
626        .write_all(buf)
627        .await
628        .context("Error while writing SOCKS reply")?;
629    writer
630        .flush()
631        .await
632        .context("Error while flushing SOCKS stream")
633}
634
635/// write_all the data to the writer & close the writer if write_all is successful.
636async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
637where
638    W: AsyncWrite + Unpin,
639{
640    writer
641        .write_all(buf)
642        .await
643        .context("Error while writing SOCKS reply")?;
644    writer
645        .close()
646        .await
647        .context("Error while closing SOCKS stream")
648}
649
650/// Reply a Socks error based on an arti-client Error and close the stream.
651/// Returns the error provided in parameter
652async fn reply_error<W>(
653    writer: &mut W,
654    request: &SocksRequest,
655    error: arti_client::ErrorKind,
656) -> Result<()>
657where
658    W: AsyncWrite + Unpin,
659{
660    use {tor_socksproto::SocksStatus as S, ErrorKind as EK};
661
662    // TODO: Currently we _always_ try to return extended SOCKS return values
663    // for onion service failures from proposal 304 when they are appropriate.
664    // But according to prop 304, this is something we should only do when it's
665    // requested, for compatibility with SOCKS implementations that can't handle
666    // unexpected REP codes.
667    //
668    // I suggest we make these extended error codes "always-on" for now, and
669    // later add a feature to disable them if it's needed. -nickm
670
671    // TODO: Perhaps we should map the extended SOCKS return values for onion
672    // service failures unconditionally, even if we haven't compiled in onion
673    // service client support.  We can make that change after the relevant
674    // ErrorKinds are no longer `experimental-api` in `tor-error`.
675
676    // We need to send an error. See what kind it is.
677    let status = match error {
678        EK::RemoteNetworkFailed => S::TTL_EXPIRED,
679
680        #[cfg(feature = "onion-service-client")]
681        EK::OnionServiceNotFound => S::HS_DESC_NOT_FOUND,
682        #[cfg(feature = "onion-service-client")]
683        EK::OnionServiceAddressInvalid => S::HS_BAD_ADDRESS,
684        #[cfg(feature = "onion-service-client")]
685        EK::OnionServiceMissingClientAuth => S::HS_MISSING_CLIENT_AUTH,
686        #[cfg(feature = "onion-service-client")]
687        EK::OnionServiceWrongClientAuth => S::HS_WRONG_CLIENT_AUTH,
688
689        // NOTE: This is not a perfect correspondence from these ErrorKinds to
690        // the errors we're returning here. In the longer run, we'll want to
691        // encourage other ways to indicate failure to clients.  Those ways might
692        // include encouraging HTTP CONNECT, or the RPC system, both of which
693        // would give us more robust ways to report different kinds of failure.
694        #[cfg(feature = "onion-service-client")]
695        EK::OnionServiceNotRunning
696        | EK::OnionServiceConnectionFailed
697        | EK::OnionServiceProtocolViolation => S::HS_INTRO_FAILED,
698
699        _ => S::GENERAL_FAILURE,
700    };
701    let reply = request
702        .reply(status, None)
703        .context("Encoding socks reply")?;
704    // if writing back the error fail, still return the original error
705    let _ = write_all_and_close(writer, &reply[..]).await;
706
707    Err(anyhow!(error))
708}
709
710/// Copy all the data from `reader` into `writer` until we encounter an EOF or
711/// an error.
712///
713/// Unlike as futures::io::copy(), this function is meant for use with
714/// interactive readers and writers, where the reader might pause for
715/// a while, but where we want to send data on the writer as soon as
716/// it is available.
717///
718/// This function assumes that the writer might need to be flushed for
719/// any buffered data to be sent.  It tries to minimize the number of
720/// flushes, however, by only flushing the writer when the reader has no data.
721async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
722where
723    R: AsyncRead + Unpin,
724    W: AsyncWrite + Unpin,
725{
726    use futures::{poll, task::Poll};
727
728    let mut buf = [0_u8; SOCKS_BUF_LEN];
729
730    // At this point we could just loop, calling read().await,
731    // write_all().await, and flush().await.  But we want to be more
732    // clever than that: we only want to flush when the reader is
733    // stalled.  That way we can pack our data into as few cells as
734    // possible, but flush it immediately whenever there's no more
735    // data coming.
736    let loop_result: IoResult<()> = loop {
737        let mut read_future = reader.read(&mut buf[..]);
738        match poll!(&mut read_future) {
739            Poll::Ready(Err(e)) => break Err(e),
740            Poll::Ready(Ok(0)) => break Ok(()), // EOF
741            Poll::Ready(Ok(n)) => {
742                writer.write_all(&buf[..n]).await?;
743                continue;
744            }
745            Poll::Pending => writer.flush().await?,
746        }
747
748        // The read future is pending, so we should wait on it.
749        match read_future.await {
750            Err(e) => break Err(e),
751            Ok(0) => break Ok(()),
752            Ok(n) => writer.write_all(&buf[..n]).await?,
753        }
754    };
755
756    // Make sure that we flush any lingering data if we can.
757    //
758    // If there is a difference between closing and dropping, then we
759    // only want to do a "proper" close if the reader closed cleanly.
760    let flush_result = if loop_result.is_ok() {
761        writer.close().await
762    } else {
763        writer.flush().await
764    };
765
766    loop_result.or(flush_result)
767}
768
769/// Return true if a given IoError, when received from accept, is a fatal
770/// error.
771fn accept_err_is_fatal(err: &IoError) -> bool {
772    #![allow(clippy::match_like_matches_macro)]
773
774    /// Re-declaration of WSAEMFILE with the right type to match
775    /// `raw_os_error()`.
776    #[cfg(windows)]
777    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
778
779    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
780    // we need to use OS-specific errors. :P
781    match err.raw_os_error() {
782        #[cfg(unix)]
783        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
784        #[cfg(windows)]
785        Some(WSAEMFILE) => false,
786        _ => true,
787    }
788}
789
790/// Launch a SOCKS proxy to listen on a given localhost port, and run
791/// indefinitely.
792///
793/// Requires a `runtime` to use for launching tasks and handling
794/// timeouts, and a `tor_client` to use in connecting over the Tor
795/// network.
796#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
797#[allow(clippy::cognitive_complexity)] // TODO: Refactor
798pub(crate) async fn run_socks_proxy<R: Runtime>(
799    runtime: R,
800    tor_client: TorClient<R>,
801    listen: Listen,
802    rpc_data: Option<RpcProxySupport>,
803) -> Result<()> {
804    #[cfg(feature = "rpc")]
805    let (rpc_mgr, mut rpc_state_sender) = match rpc_data {
806        Some(RpcProxySupport {
807            rpc_mgr,
808            rpc_state_sender,
809        }) => (Some(rpc_mgr), Some(rpc_state_sender)),
810        None => (None, None),
811    };
812    #[cfg(not(feature = "rpc"))]
813    let rpc_mgr = None;
814
815    if !listen.is_localhost_only() {
816        warn!("Configured to listen for SOCKS on non-local addresses. This is usually insecure! We recommend listening on localhost only.");
817    }
818
819    let mut listeners = Vec::new();
820    let mut listening_on_addrs = Vec::new();
821
822    // Try to bind to the SOCKS ports.
823    match listen.ip_addrs() {
824        Ok(addrgroups) => {
825            for addrgroup in addrgroups {
826                for addr in addrgroup {
827                    match runtime.listen(&addr).await {
828                        Ok(listener) => {
829                            info!("Listening on {:?}.", addr);
830                            listeners.push(listener);
831                            listening_on_addrs.push(addr);
832                        }
833                        #[cfg(unix)]
834                        Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
835                            warn_report!(e, "Address family not supported {}", addr);
836                        }
837                        Err(ref e) => {
838                            return Err(anyhow!("Can't listen on {}: {e}", addr));
839                        }
840                    }
841                }
842            }
843        }
844        Err(e) => warn_report!(e, "Invalid listen spec"),
845    }
846
847    // We weren't able to bind any ports: There's nothing to do.
848    if listeners.is_empty() {
849        error!("Couldn't open any SOCKS listeners.");
850        return Err(anyhow!("Couldn't open SOCKS listeners"));
851    }
852
853    cfg_if::cfg_if! {
854        if #[cfg(feature="rpc")] {
855            if let Some(rpc_state_sender) = &mut rpc_state_sender {
856                rpc_state_sender.set_socks_listeners(&listening_on_addrs[..]);
857            }
858        } else {
859            let _ = listening_on_addrs;
860        }
861    }
862
863    run_socks_proxy_with_listeners(tor_client, listeners, rpc_mgr).await
864}
865
866/// Launch a SOCKS proxy from a given set of already bound listeners.
867#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
868pub(crate) async fn run_socks_proxy_with_listeners<R: Runtime>(
869    tor_client: TorClient<R>,
870    listeners: Vec<<R as tor_rtcompat::NetStreamProvider>::Listener>,
871    rpc_mgr: Option<Arc<RpcMgr>>,
872) -> Result<()> {
873    // Create a stream of (incoming socket, listener_id) pairs, selected
874    // across all the listeners.
875    let mut incoming = futures::stream::select_all(
876        listeners
877            .into_iter()
878            .map(NetStreamListener::incoming)
879            .enumerate()
880            .map(|(listener_id, incoming_conns)| {
881                incoming_conns.map(move |socket| (socket, listener_id))
882            }),
883    );
884
885    // Loop over all incoming connections.  For each one, call
886    // handle_socks_conn() in a new task.
887    while let Some((stream, sock_id)) = incoming.next().await {
888        let (stream, addr) = match stream {
889            Ok((s, a)) => (s, a),
890            Err(err) => {
891                if accept_err_is_fatal(&err) {
892                    return Err(err).context("Failed to receive incoming stream on SOCKS port");
893                } else {
894                    warn_report!(err, "Incoming stream failed");
895                    continue;
896                }
897            }
898        };
899        let socks_context = SocksConnContext {
900            tor_client: tor_client.clone(),
901            #[cfg(feature = "rpc")]
902            rpc_mgr: rpc_mgr.clone(),
903        };
904        let runtime_copy = tor_client.runtime().clone();
905        tor_client.runtime().spawn(async move {
906            let res =
907                handle_socks_conn(runtime_copy, socks_context, stream, (sock_id, addr.ip())).await;
908            if let Err(e) = res {
909                // TODO: warn_report doesn't work on anyhow::Error.
910                warn!("connection exited with error: {}", tor_error::Report(e));
911            }
912        })?;
913    }
914
915    Ok(())
916}