arti/
socks.rs

1//! Implement a simple SOCKS proxy that relays connections over Tor.
2//!
3//! A proxy is launched with [`run_socks_proxy()`], which listens for new
4//! connections and then runs
5
6use futures::future::FutureExt;
7use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error as IoError};
8use futures::stream::StreamExt;
9use futures::task::SpawnExt;
10use safelog::sensitive;
11use std::io::Result as IoResult;
12use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
13#[cfg(feature = "rpc")]
14use std::sync::Arc;
15use tracing::{debug, error, info, warn};
16
17#[allow(unused)]
18use arti_client::HasKind;
19use arti_client::{ErrorKind, IntoTorAddr as _, StreamPrefs, TorClient};
20use tor_config::Listen;
21use tor_error::warn_report;
22#[cfg(feature = "rpc")]
23use tor_rpcbase::{self as rpc};
24use tor_rtcompat::{NetStreamListener, Runtime};
25use tor_socksproto::{Handshake as _, SocksAddr, SocksAuth, SocksCmd, SocksRequest, SOCKS_BUF_LEN};
26
27use anyhow::{anyhow, Context, Result};
28
29use crate::rpc::RpcProxySupport;
30
31/// Payload to return when an HTTP connection arrive on a Socks port
32const WRONG_PROTOCOL_PAYLOAD: &[u8] = br#"HTTP/1.0 501 Tor is not an HTTP Proxy
33Content-Type: text/html; charset=utf-8
34
35<!DOCTYPE html>
36<html>
37<head>
38<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
39</head>
40<body>
41<h1>This is a SOCKS proxy, not an HTTP proxy.</h1>
42<p>
43It appears you have configured your web browser to use this Tor port as
44an HTTP proxy.
45</p>
46<p>
47This is not correct: This port is configured as a SOCKS proxy, not
48an HTTP proxy. If you need an HTTP proxy tunnel, wait for Arti to
49add support for it in place of, or in addition to, socks_port.
50Please configure your client accordingly.
51</p>
52<p>
53See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
54</p>
55</body>
56</html>"#;
57
58/// Find out which kind of address family we can/should use for a
59/// given `SocksRequest`.
60#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
61fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
62    let mut prefs = StreamPrefs::new();
63    if addr.parse::<Ipv4Addr>().is_ok() {
64        // If they asked for an IPv4 address correctly, nothing else will do.
65        prefs.ipv4_only();
66    } else if addr.parse::<Ipv6Addr>().is_ok() {
67        // If they asked for an IPv6 address correctly, nothing else will do.
68        prefs.ipv6_only();
69    } else if req.version() == tor_socksproto::SocksVersion::V4 {
70        // SOCKS4 and SOCKS4a only support IPv4
71        prefs.ipv4_only();
72    } else {
73        // Otherwise, default to saying IPv4 is preferred.
74        prefs.ipv4_preferred();
75    }
76    prefs
77}
78
79/// A Key used to isolate connections.
80///
81/// Composed of an usize (representing which listener socket accepted
82/// the connection, the source IpAddr of the client, and the
83/// authentication string provided by the client).
84#[derive(Debug, Clone, PartialEq, Eq)]
85struct SocksIsolationKey(ConnIsolation, ProvidedIsolation);
86/// Isolation information provided through the socks connection
87#[derive(Debug, Clone, PartialEq, Eq)]
88enum ProvidedIsolation {
89    /// The socks isolation itself.
90    Legacy(SocksAuth),
91    /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
92    Extended {
93        /// Which format was negotiated?
94        ///
95        /// (At present, different format codes can't share a circuit.)
96        format_code: u8,
97        /// What's the isolation string?
98        isolation: Box<[u8]>,
99    },
100}
101
102impl arti_client::isolation::IsolationHelper for SocksIsolationKey {
103    fn compatible_same_type(&self, other: &Self) -> bool {
104        self == other
105    }
106
107    fn join_same_type(&self, other: &Self) -> Option<Self> {
108        if self == other {
109            Some(self.clone())
110        } else {
111            None
112        }
113    }
114}
115
116/// The meaning of a SOCKS authentication field, according to our conventions.
117struct AuthInterpretation {
118    /// Associate this stream with a DataStream created by using a particular RPC object
119    /// as a Tor client.
120    #[cfg(feature = "rpc")]
121    rpc_object: Option<rpc::ObjectId>,
122
123    /// Isolate this stream from other streams that do not have the same
124    /// value.
125    isolation: ProvidedIsolation,
126}
127
128/// NOTE: The following documentation belongs in a spec.
129/// But for now, it's our best attempt to document the design and protocol
130/// implemented here
131/// for integrating SOCKS with our RPC system. --nickm
132///
133/// Roughly speaking:
134///
135/// ## Key concepts
136///
137/// A data stream is "RPC-visible" if, when it is created via SOCKS,
138/// the RPC system is told about it.
139///
140/// Every RPC-visible stream is associated with a given RPC object when it is created.
141/// (Since the RPC object is being specified in the SOCKS protocol,
142/// it must be one with an externally visible Object ID.
143/// Such Object IDs are cryptographically unguessable and unforgeable,
144/// and are qualified with a unique identifier for their associated RPC session.)
145/// Call this RPC Object the "target" object for now.
146/// This target RPC object must implement
147/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
148///
149/// Right now, there are two general kinds of objects that implement this method:
150/// client-like objects, and one-shot clients.
151///
152/// A client-like object is either a `TorClient` or an RPC `Session`.
153/// It knows about and it is capable of opening multiple data streams.
154/// Using it as the target object for a SOCKS connection tells Arti
155/// that the resulting data stream (if any)
156/// should be built by it, and associated with its RPC session.
157///
158/// An application gets a TorClient by asking the session for one,
159/// or for asking a TorClient to give you a new variant clone of itself.
160///
161/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
162/// It is created from a client-like object, but can only be used for a single data stream.
163/// When created, it it not yet connected or trying to connect to anywhere:
164/// the act of using it as the target Object for a SOCKS connection causes
165/// it to begin connecting.
166///
167/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
168/// on any client-like object.
169///
170/// ## The SOCKS protocol
171///
172/// See the specification for
173/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
174/// for full details.
175///
176/// ### Further restrictions on Object IDs and isolation
177///
178/// In some cases,
179/// the RPC Object ID may denote an object
180/// that already includes information about its intended stream isolation.
181/// In such cases, the stream isolation MUST be blank.
182/// Implementations MUST reject non-blank stream isolation in such cases.
183///
184/// In some cases, the RPC object ID may denote an object
185/// that already includes information
186/// about its intended destination address and port.
187/// In such cases, the destination address MUST be `0.0.0.0` or `::`
188/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
189/// and the destination port MUST be 0.
190/// Implementations MUST reject other addresses in such cases.
191///
192/// ### Another proposed change
193///
194/// We could add a new method to clients, with a name like
195/// "open_stream" or "connect_stream".
196/// This method would include all target and isolation information in its parameters.
197/// It would actually create a DataStream immediately, tell it to begin connecting,
198/// and return an externally visible object ID.
199/// The RPC protocol could be used to watch the DataStream object,
200/// to see when it was connected.
201///
202/// The resulting DataStream object could also be used as the target of a SOCKS connection.
203/// We would require in such a case that no isolation be provided in the SOCKS handshake,
204/// and that the target address was (e.g.) INADDR_ANY.
205///
206/// ## Intended use cases (examples)
207///
208/// (These examples assume that the application
209/// already knows the SOCKS port it should use.
210/// I'm leaving out the isolation strings as orthogonal.)
211///
212/// These are **NOT** the only possible use cases;
213/// they're just the two that help understand this system best (I hope).
214///
215/// ### Case 1: Using a client-like object directly.
216///
217/// Here the application has authenticated to RPC
218/// and gotten the session ID `SESSION-1`.
219/// (In reality, this would be a longer ID, and full of crypto).
220///
221/// The application wants to open a new stream to www.example.com.
222/// They don't particularly care about isolation,
223/// but they do want their stream to use their RPC session.
224/// They don't want an Object ID for the stream.
225///
226/// To do this, they make a SOCKS connection to arti,
227/// with target address www.example.com.
228/// They set the username to `<torS0X>0SESSION-1`,
229/// and the password to the empty string.
230///
231/// Arti looks up the Session object via the `SESSION-1` object ID
232/// and tells it (via the ConnectWithPrefs special method)
233/// to connect to www.example.com.
234/// The session creates a new DataStream using its internal TorClient,
235/// but does not register the stream with an RPC Object ID.
236/// Arti proxies the application's SOCKS connection through this DataStream.
237///
238///
239/// ### Case 2: Creating an identifiable stream.
240///
241/// Here the application wants to be able to refer to its DataStream
242/// after the stream is created.
243/// As before, we assume that it's on an RPC session
244/// where the Session ID is `SESSION-1`.
245///
246/// The application sends an RPC request of the form:
247/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
248///
249/// It receives a reply like:
250/// `{"id": 123, "result": {"id": "STREAM-1"} }`
251///
252/// (In reality, `STREAM-1` would also be longer and full of crypto.)
253///
254/// Now the application has an object called `STREAM-1` that is not yet a connected
255/// stream, but which may become one.
256///
257/// This time, it wants to set its isolation string to "xyzzy".
258///
259/// The application opens a socks connection as before.
260/// For the username it sends `<torS0X>0STREAM-1`,
261/// and for the password it sends `xyzzy`.
262///
263/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
264/// and tells it (via the ConnectWithPrefs special method)
265/// to connect to www.example.com.
266/// This causes the `RpcDataStream` internally to create a new `DataStream`,
267/// and to store that `DataStream` in itself.
268/// The `RpcDataStream` with Object ID `STREAM-1`
269/// is now an alias for the newly created `DataStream`.
270/// Arti proxies the application's SOCKS connection through that `DataStream`.
271///
272#[cfg(feature = "rpc")]
273#[allow(dead_code)]
274mod socks_and_rpc {}
275
276/// Given the authentication object from a socks connection, determine what it's telling
277/// us to do.
278///
279/// (In no case is it actually SOCKS authentication: it can either be a message
280/// to the stream isolation system or the RPC system.)
281fn interpret_socks_auth(auth: &SocksAuth) -> Result<AuthInterpretation> {
282    /// Interpretation of a SOCKS5 username according to
283    /// the [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
284    /// specification.
285    enum Uname<'a> {
286        /// This is a legacy username; it's just part of the
287        /// isolation information.
288        //
289        // Note: We're not actually throwing away the username here;
290        // instead we're going to use the whole SocksAuth
291        // in a `ProvidedAuthentication::Legacy``.
292        // TODO RPC: Find a more idiomatic way to express this data flow.
293        Legacy,
294        /// This is using the socks extension: contains the extension
295        /// format code and the remaining information from the username.
296        Extended(u8, &'a [u8]),
297    }
298    /// Helper: Try to interpret a SOCKS5 username field as indicating the start of a set of
299    /// extended socks authentication information.
300    ///
301    /// Implements [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
302    ///
303    /// If it does indicate that extensions are in use,
304    /// return a `Uname::Extended` containing
305    /// the extension format type and the remaining information from the username.
306    ///
307    /// If it indicates that no extensions are in use,
308    /// return `Uname::Legacy`.
309    ///
310    /// If it is badly formatted, return an error.
311    fn interpret_socks5_username(username: &[u8]) -> Result<Uname<'_>> {
312        /// 8-byte "magic" sequence from
313        /// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
314        /// When it appears at the start of a username,
315        /// indicates that the username/password are to be interpreted as
316        /// as encoding SOCKS5 extended parameters,
317        /// but the format might not be one we recognize.
318        const SOCKS_EXT_CONST_ANY: &[u8] = b"<torS0X>";
319        let Some(remainder) = username.strip_prefix(SOCKS_EXT_CONST_ANY) else {
320            return Ok(Uname::Legacy);
321        };
322        if remainder.is_empty() {
323            return Err(anyhow!("Extended SOCKS information without format code."));
324        }
325        // TODO MSRV 1.80: use split_at_checked instead.
326        // This won't panic since we checked for an empty string above.
327        let (format_code, remainder) = remainder.split_at(1);
328        Ok(Uname::Extended(format_code[0], remainder))
329    }
330
331    let isolation = match auth {
332        SocksAuth::Username(user, pass) => match interpret_socks5_username(user)? {
333            Uname::Legacy => ProvidedIsolation::Legacy(auth.clone()),
334            Uname::Extended(b'1', b"") => {
335                return Err(anyhow!("Received empty RPC object ID"));
336            }
337            Uname::Extended(format_code @ b'1', remainder) => {
338                #[cfg(not(feature = "rpc"))]
339                return Err(anyhow!(
340                    "Received RPC object ID, but not built with support for RPC"
341                ));
342                #[cfg(feature = "rpc")]
343                return Ok(AuthInterpretation {
344                    rpc_object: Some(rpc::ObjectId::from(
345                        std::str::from_utf8(remainder).context("Rpc object ID was not utf-8")?,
346                    )),
347                    isolation: ProvidedIsolation::Extended {
348                        format_code,
349                        isolation: pass.clone().into(),
350                    },
351                });
352            }
353            Uname::Extended(format_code @ b'0', b"") => ProvidedIsolation::Extended {
354                format_code,
355                isolation: pass.clone().into(),
356            },
357            Uname::Extended(b'0', _) => {
358                return Err(anyhow!("Extraneous information in SOCKS username field."))
359            }
360            _ => return Err(anyhow!("Unrecognized SOCKS format code")),
361        },
362        _ => ProvidedIsolation::Legacy(auth.clone()),
363    };
364
365    Ok(AuthInterpretation {
366        #[cfg(feature = "rpc")]
367        rpc_object: None,
368        isolation,
369    })
370}
371
372/// Information used to implement a SOCKS connection.
373struct SocksConnContext<R: Runtime> {
374    /// A TorClient to use (by default) to anonymize requests.
375    tor_client: TorClient<R>,
376    /// If present, an RpcMgr to use when for attaching requests to RPC
377    /// sessions.
378    #[cfg(feature = "rpc")]
379    rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
380}
381
382/// Type alias for the isolation information associated with a given SOCKS
383/// connection _before_ SOCKS is negotiated.
384///
385/// Currently this is an index for which listener accepted the connection, plus
386/// the address of the client that connected to the Socks port.
387type ConnIsolation = (usize, IpAddr);
388
389cfg_if::cfg_if! {
390    if #[cfg(feature="rpc")] {
391        use crate::rpc::conntarget::ConnTarget;
392    } else {
393        /// A type returned by get_prefs_and_session,
394        /// and used to launch data streams or resolve attempts.
395        ///
396        /// TODO RPC: This is quite ugly; we should do something better.
397        /// At least, we should never expose this outside the socks module.
398        type ConnTarget<R> = TorClient<R>;
399    }
400}
401
402impl<R: Runtime> SocksConnContext<R> {
403    /// Interpret a SOCKS request and our input information to determine which
404    /// TorClient / ClientConnectionTarget object and StreamPrefs we should use.
405    ///
406    /// TODO RPC: The return type here is a bit ugly.
407    fn get_prefs_and_session(
408        &self,
409        request: &SocksRequest,
410        target_addr: &str,
411        conn_isolation: ConnIsolation,
412    ) -> Result<(StreamPrefs, ConnTarget<R>)> {
413        // Determine whether we want to ask for IPv4/IPv6 addresses.
414        let mut prefs = stream_preference(request, target_addr);
415
416        // Interpret socks authentication to see whether we want to connect to an RPC connector.
417        let interp = interpret_socks_auth(request.auth())?;
418        prefs.set_isolation(SocksIsolationKey(conn_isolation, interp.isolation));
419
420        #[cfg(feature = "rpc")]
421        if let Some(session) = interp.rpc_object {
422            if let Some(mgr) = &self.rpc_mgr {
423                let (context, object) = mgr
424                    .lookup_object(&session)
425                    .context("no such session found")?;
426                let target = ConnTarget::Rpc { context, object };
427                return Ok((prefs, target));
428            } else {
429                return Err(anyhow!("no rpc manager found!?"));
430            }
431        }
432
433        let client = self.tor_client.clone();
434        #[cfg(feature = "rpc")]
435        let client = ConnTarget::Client(client);
436
437        Ok((prefs, client))
438    }
439}
440
441/// Given a just-received TCP connection `S` on a SOCKS port, handle the
442/// SOCKS handshake and relay the connection over the Tor network.
443///
444/// Uses `isolation_info` to decide which circuits this connection
445/// may use.  Requires that `isolation_info` is a pair listing the listener
446/// id and the source address for the socks request.
447async fn handle_socks_conn<R, S>(
448    runtime: R,
449    context: SocksConnContext<R>,
450    mut socks_stream: S,
451    isolation_info: ConnIsolation,
452) -> Result<()>
453where
454    R: Runtime,
455    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
456{
457    // Part 1: Perform the SOCKS handshake, to learn where we are
458    // being asked to connect, and what we're being asked to do once
459    // we connect there.
460    //
461    // The SOCKS handshake can require multiple round trips (SOCKS5
462    // always does) so we we need to run this part of the process in a
463    // loop.
464    let mut handshake = tor_socksproto::SocksProxyHandshake::new();
465
466    let mut inbuf = tor_socksproto::Buffer::new();
467    let request = loop {
468        use tor_socksproto::NextStep as NS;
469
470        let rv = handshake.step(&mut inbuf);
471
472        let step = match rv {
473            Err(e) => {
474                if let tor_socksproto::Error::BadProtocol(version) = e {
475                    // check for HTTP methods: CONNECT, DELETE, GET, HEAD, OPTION, PUT, POST, PATCH and
476                    // TRACE.
477                    // To do so, check the first byte of the connection, which happen to be placed
478                    // where SOCKs version field is.
479                    if [b'C', b'D', b'G', b'H', b'O', b'P', b'T'].contains(&version) {
480                        write_all_and_close(&mut socks_stream, WRONG_PROTOCOL_PAYLOAD).await?;
481                    }
482                }
483                // if there is an handshake error, don't reply with a Socks error, remote does not
484                // seems to speak Socks.
485                return Err(e.into());
486            }
487            Ok(y) => y,
488        };
489
490        match step {
491            NS::Recv(mut recv) => {
492                let n = socks_stream
493                    .read(recv.buf())
494                    .await
495                    .context("Error while reading SOCKS handshake")?;
496                recv.note_received(n)?;
497            }
498            NS::Send(data) => write_all_and_flush(&mut socks_stream, &data).await?,
499            NS::Finished(fin) => break fin.into_output_forbid_pipelining()?,
500        }
501    };
502
503    // Unpack the socks request and find out where we're connecting to.
504    let addr = request.addr().to_string();
505    let port = request.port();
506    debug!(
507        "Got a socks request: {} {}:{}",
508        request.command(),
509        sensitive(&addr),
510        port
511    );
512
513    let (prefs, tor_client) = context.get_prefs_and_session(&request, &addr, isolation_info)?;
514
515    match request.command() {
516        SocksCmd::CONNECT => {
517            // The SOCKS request wants us to connect to a given address.
518            // So, launch a connection over Tor.
519            let tor_addr = (addr.clone(), port).into_tor_addr()?;
520            let tor_stream = tor_client.connect_with_prefs(&tor_addr, &prefs).await;
521            let tor_stream = match tor_stream {
522                Ok(s) => s,
523                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
524            };
525            // Okay, great! We have a connection over the Tor network.
526            debug!("Got a stream for {}:{}", sensitive(&addr), port);
527
528            // Send back a SOCKS response, telling the client that it
529            // successfully connected.
530            let reply = request
531                .reply(tor_socksproto::SocksStatus::SUCCEEDED, None)
532                .context("Encoding socks reply")?;
533            write_all_and_flush(&mut socks_stream, &reply[..]).await?;
534
535            let (socks_r, socks_w) = socks_stream.split();
536            let (tor_r, tor_w) = tor_stream.split();
537
538            // Finally, spawn two background tasks to relay traffic between
539            // the socks stream and the tor stream.
540            runtime.spawn(copy_interactive(socks_r, tor_w).map(|_| ()))?;
541            runtime.spawn(copy_interactive(tor_r, socks_w).map(|_| ()))?;
542        }
543        SocksCmd::RESOLVE => {
544            // We've been asked to perform a regular hostname lookup.
545            // (This is a tor-specific SOCKS extension.)
546
547            let addr = if let Ok(addr) = addr.parse() {
548                // if this is a valid ip address, just parse it and reply.
549                Ok(addr)
550            } else {
551                tor_client
552                    .resolve_with_prefs(&addr, &prefs)
553                    .await
554                    .map_err(|e| e.kind())
555                    .and_then(|addrs| addrs.first().copied().ok_or(ErrorKind::Other))
556            };
557            match addr {
558                Ok(addr) => {
559                    let reply = request
560                        .reply(
561                            tor_socksproto::SocksStatus::SUCCEEDED,
562                            Some(&SocksAddr::Ip(addr)),
563                        )
564                        .context("Encoding socks reply")?;
565                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
566                }
567                Err(e) => return reply_error(&mut socks_stream, &request, e).await,
568            }
569        }
570        SocksCmd::RESOLVE_PTR => {
571            // We've been asked to perform a reverse hostname lookup.
572            // (This is a tor-specific SOCKS extension.)
573            let addr: IpAddr = match addr.parse() {
574                Ok(ip) => ip,
575                Err(e) => {
576                    let reply = request
577                        .reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None)
578                        .context("Encoding socks reply")?;
579                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
580                    return Err(anyhow!(e));
581                }
582            };
583            let hosts = match tor_client.resolve_ptr_with_prefs(addr, &prefs).await {
584                Ok(hosts) => hosts,
585                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
586            };
587            if let Some(host) = hosts.into_iter().next() {
588                // this conversion should never fail, legal DNS names len must be <= 253 but Socks
589                // names can be up to 255 chars.
590                let hostname = SocksAddr::Hostname(host.try_into()?);
591                let reply = request
592                    .reply(tor_socksproto::SocksStatus::SUCCEEDED, Some(&hostname))
593                    .context("Encoding socks reply")?;
594                write_all_and_close(&mut socks_stream, &reply[..]).await?;
595            }
596        }
597        _ => {
598            // We don't support this SOCKS command.
599            warn!("Dropping request; {:?} is unsupported", request.command());
600            let reply = request
601                .reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None)
602                .context("Encoding socks reply")?;
603            write_all_and_close(&mut socks_stream, &reply[..]).await?;
604        }
605    };
606
607    // TODO: we should close the TCP stream if either task fails. Do we?
608    // See #211 and #190.
609
610    Ok(())
611}
612
613/// write_all the data to the writer & flush the writer if write_all is successful.
614async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
615where
616    W: AsyncWrite + Unpin,
617{
618    writer
619        .write_all(buf)
620        .await
621        .context("Error while writing SOCKS reply")?;
622    writer
623        .flush()
624        .await
625        .context("Error while flushing SOCKS stream")
626}
627
628/// write_all the data to the writer & close the writer if write_all is successful.
629async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
630where
631    W: AsyncWrite + Unpin,
632{
633    writer
634        .write_all(buf)
635        .await
636        .context("Error while writing SOCKS reply")?;
637    writer
638        .close()
639        .await
640        .context("Error while closing SOCKS stream")
641}
642
643/// Reply a Socks error based on an arti-client Error and close the stream.
644/// Returns the error provided in parameter
645async fn reply_error<W>(
646    writer: &mut W,
647    request: &SocksRequest,
648    error: arti_client::ErrorKind,
649) -> Result<()>
650where
651    W: AsyncWrite + Unpin,
652{
653    use {tor_socksproto::SocksStatus as S, ErrorKind as EK};
654
655    // TODO: Currently we _always_ try to return extended SOCKS return values
656    // for onion service failures from proposal 304 when they are appropriate.
657    // But according to prop 304, this is something we should only do when it's
658    // requested, for compatibility with SOCKS implementations that can't handle
659    // unexpected REP codes.
660    //
661    // I suggest we make these extended error codes "always-on" for now, and
662    // later add a feature to disable them if it's needed. -nickm
663
664    // TODO: Perhaps we should map the extended SOCKS return values for onion
665    // service failures unconditionally, even if we haven't compiled in onion
666    // service client support.  We can make that change after the relevant
667    // ErrorKinds are no longer `experimental-api` in `tor-error`.
668
669    // We need to send an error. See what kind it is.
670    let status = match error {
671        EK::RemoteNetworkFailed => S::TTL_EXPIRED,
672
673        #[cfg(feature = "onion-service-client")]
674        EK::OnionServiceNotFound => S::HS_DESC_NOT_FOUND,
675        #[cfg(feature = "onion-service-client")]
676        EK::OnionServiceAddressInvalid => S::HS_BAD_ADDRESS,
677        #[cfg(feature = "onion-service-client")]
678        EK::OnionServiceMissingClientAuth => S::HS_MISSING_CLIENT_AUTH,
679        #[cfg(feature = "onion-service-client")]
680        EK::OnionServiceWrongClientAuth => S::HS_WRONG_CLIENT_AUTH,
681
682        // NOTE: This is not a perfect correspondence from these ErrorKinds to
683        // the errors we're returning here. In the longer run, we'll want to
684        // encourage other ways to indicate failure to clients.  Those ways might
685        // include encouraging HTTP CONNECT, or the RPC system, both of which
686        // would give us more robust ways to report different kinds of failure.
687        #[cfg(feature = "onion-service-client")]
688        EK::OnionServiceNotRunning
689        | EK::OnionServiceConnectionFailed
690        | EK::OnionServiceProtocolViolation => S::HS_INTRO_FAILED,
691
692        _ => S::GENERAL_FAILURE,
693    };
694    let reply = request
695        .reply(status, None)
696        .context("Encoding socks reply")?;
697    // if writing back the error fail, still return the original error
698    let _ = write_all_and_close(writer, &reply[..]).await;
699
700    Err(anyhow!(error))
701}
702
703/// Copy all the data from `reader` into `writer` until we encounter an EOF or
704/// an error.
705///
706/// Unlike as futures::io::copy(), this function is meant for use with
707/// interactive readers and writers, where the reader might pause for
708/// a while, but where we want to send data on the writer as soon as
709/// it is available.
710///
711/// This function assumes that the writer might need to be flushed for
712/// any buffered data to be sent.  It tries to minimize the number of
713/// flushes, however, by only flushing the writer when the reader has no data.
714async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
715where
716    R: AsyncRead + Unpin,
717    W: AsyncWrite + Unpin,
718{
719    use futures::{poll, task::Poll};
720
721    let mut buf = [0_u8; SOCKS_BUF_LEN];
722
723    // At this point we could just loop, calling read().await,
724    // write_all().await, and flush().await.  But we want to be more
725    // clever than that: we only want to flush when the reader is
726    // stalled.  That way we can pack our data into as few cells as
727    // possible, but flush it immediately whenever there's no more
728    // data coming.
729    let loop_result: IoResult<()> = loop {
730        let mut read_future = reader.read(&mut buf[..]);
731        match poll!(&mut read_future) {
732            Poll::Ready(Err(e)) => break Err(e),
733            Poll::Ready(Ok(0)) => break Ok(()), // EOF
734            Poll::Ready(Ok(n)) => {
735                writer.write_all(&buf[..n]).await?;
736                continue;
737            }
738            Poll::Pending => writer.flush().await?,
739        }
740
741        // The read future is pending, so we should wait on it.
742        match read_future.await {
743            Err(e) => break Err(e),
744            Ok(0) => break Ok(()),
745            Ok(n) => writer.write_all(&buf[..n]).await?,
746        }
747    };
748
749    // Make sure that we flush any lingering data if we can.
750    //
751    // If there is a difference between closing and dropping, then we
752    // only want to do a "proper" close if the reader closed cleanly.
753    let flush_result = if loop_result.is_ok() {
754        writer.close().await
755    } else {
756        writer.flush().await
757    };
758
759    loop_result.or(flush_result)
760}
761
762/// Return true if a given IoError, when received from accept, is a fatal
763/// error.
764fn accept_err_is_fatal(err: &IoError) -> bool {
765    #![allow(clippy::match_like_matches_macro)]
766
767    /// Re-declaration of WSAEMFILE with the right type to match
768    /// `raw_os_error()`.
769    #[cfg(windows)]
770    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
771
772    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
773    // we need to use OS-specific errors. :P
774    match err.raw_os_error() {
775        #[cfg(unix)]
776        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
777        #[cfg(windows)]
778        Some(WSAEMFILE) => false,
779        _ => true,
780    }
781}
782
783/// Launch a SOCKS proxy to listen on a given localhost port, and run
784/// indefinitely.
785///
786/// Requires a `runtime` to use for launching tasks and handling
787/// timeouts, and a `tor_client` to use in connecting over the Tor
788/// network.
789#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
790pub(crate) async fn run_socks_proxy<R: Runtime>(
791    runtime: R,
792    tor_client: TorClient<R>,
793    listen: Listen,
794    rpc_data: Option<RpcProxySupport>,
795) -> Result<()> {
796    #[cfg(feature = "rpc")]
797    let (rpc_mgr, mut rpc_state_sender) = match rpc_data {
798        Some(RpcProxySupport {
799            rpc_mgr,
800            rpc_state_sender,
801        }) => (Some(rpc_mgr), Some(rpc_state_sender)),
802        None => (None, None),
803    };
804
805    if !listen.is_localhost_only() {
806        warn!("Configured to listen for SOCKS on non-local addresses. This is usually insecure! We recommend listening on localhost only.");
807    }
808
809    let mut listeners = Vec::new();
810    let mut listening_on_addrs = Vec::new();
811
812    // Try to bind to the SOCKS ports.
813    match listen.ip_addrs() {
814        Ok(addrgroups) => {
815            for addrgroup in addrgroups {
816                for addr in addrgroup {
817                    match runtime.listen(&addr).await {
818                        Ok(listener) => {
819                            info!("Listening on {:?}.", addr);
820                            listeners.push(listener);
821                            listening_on_addrs.push(addr);
822                        }
823                        #[cfg(unix)]
824                        Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
825                            warn_report!(e, "Address family not supported {}", addr);
826                        }
827                        Err(ref e) => {
828                            return Err(anyhow!("Can't listen on {}: {e}", addr));
829                        }
830                    }
831                }
832            }
833        }
834        Err(e) => warn_report!(e, "Invalid listen spec"),
835    }
836
837    // We weren't able to bind any ports: There's nothing to do.
838    if listeners.is_empty() {
839        error!("Couldn't open any SOCKS listeners.");
840        return Err(anyhow!("Couldn't open SOCKS listeners"));
841    }
842
843    cfg_if::cfg_if! {
844        if #[cfg(feature="rpc")] {
845            if let Some(rpc_state_sender) = &mut rpc_state_sender {
846                rpc_state_sender.set_socks_listeners(&listening_on_addrs[..]);
847            }
848        } else {
849            let _ = listening_on_addrs;
850        }
851    }
852
853    // Create a stream of (incoming socket, listener_id) pairs, selected
854    // across all the listeners.
855    let mut incoming = futures::stream::select_all(
856        listeners
857            .into_iter()
858            .map(NetStreamListener::incoming)
859            .enumerate()
860            .map(|(listener_id, incoming_conns)| {
861                incoming_conns.map(move |socket| (socket, listener_id))
862            }),
863    );
864
865    // Loop over all incoming connections.  For each one, call
866    // handle_socks_conn() in a new task.
867    while let Some((stream, sock_id)) = incoming.next().await {
868        let (stream, addr) = match stream {
869            Ok((s, a)) => (s, a),
870            Err(err) => {
871                if accept_err_is_fatal(&err) {
872                    return Err(err).context("Failed to receive incoming stream on SOCKS port");
873                } else {
874                    warn_report!(err, "Incoming stream failed");
875                    continue;
876                }
877            }
878        };
879        let socks_context = SocksConnContext {
880            tor_client: tor_client.clone(),
881            #[cfg(feature = "rpc")]
882            rpc_mgr: rpc_mgr.clone(),
883        };
884        let runtime_copy = runtime.clone();
885        runtime.spawn(async move {
886            let res =
887                handle_socks_conn(runtime_copy, socks_context, stream, (sock_id, addr.ip())).await;
888            if let Err(e) = res {
889                // TODO: warn_report doesn't work on anyhow::Error.
890                warn!("connection exited with error: {}", tor_error::Report(e));
891            }
892        })?;
893    }
894
895    Ok(())
896}