1
//! Implement a simple SOCKS proxy that relays connections over Tor.
2
//!
3
//! A proxy is launched with [`run_socks_proxy()`], which listens for new
4
//! connections and then runs
5

            
6
use futures::future::FutureExt;
7
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error as IoError};
8
use futures::stream::StreamExt;
9
use futures::task::SpawnExt;
10
use safelog::sensitive;
11
use std::io::Result as IoResult;
12
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
13
#[cfg(feature = "rpc")]
14
use std::sync::Arc;
15
use tracing::{debug, error, info, warn};
16

            
17
#[allow(unused)]
18
use arti_client::HasKind;
19
use arti_client::{ErrorKind, IntoTorAddr as _, StreamPrefs, TorClient};
20
use tor_config::Listen;
21
use tor_error::warn_report;
22
#[cfg(feature = "rpc")]
23
use tor_rpcbase::{self as rpc};
24
use tor_rtcompat::{NetStreamListener, Runtime};
25
use tor_socksproto::{Handshake as _, SocksAddr, SocksAuth, SocksCmd, SocksRequest, SOCKS_BUF_LEN};
26

            
27
use anyhow::{anyhow, Context, Result};
28

            
29
use crate::rpc::RpcProxySupport;
30

            
31
/// Payload to return when an HTTP connection arrive on a Socks port
32
const WRONG_PROTOCOL_PAYLOAD: &[u8] = br#"HTTP/1.0 501 Tor is not an HTTP Proxy
33
Content-Type: text/html; charset=utf-8
34

            
35
<!DOCTYPE html>
36
<html>
37
<head>
38
<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
39
</head>
40
<body>
41
<h1>This is a SOCKS proxy, not an HTTP proxy.</h1>
42
<p>
43
It appears you have configured your web browser to use this Tor port as
44
an HTTP proxy.
45
</p>
46
<p>
47
This is not correct: This port is configured as a SOCKS proxy, not
48
an HTTP proxy. If you need an HTTP proxy tunnel, wait for Arti to
49
add support for it in place of, or in addition to, socks_port.
50
Please configure your client accordingly.
51
</p>
52
<p>
53
See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
54
</p>
55
</body>
56
</html>"#;
57

            
58
/// Find out which kind of address family we can/should use for a
59
/// given `SocksRequest`.
60
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
61
fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
62
    let mut prefs = StreamPrefs::new();
63
    if addr.parse::<Ipv4Addr>().is_ok() {
64
        // If they asked for an IPv4 address correctly, nothing else will do.
65
        prefs.ipv4_only();
66
    } else if addr.parse::<Ipv6Addr>().is_ok() {
67
        // If they asked for an IPv6 address correctly, nothing else will do.
68
        prefs.ipv6_only();
69
    } else if req.version() == tor_socksproto::SocksVersion::V4 {
70
        // SOCKS4 and SOCKS4a only support IPv4
71
        prefs.ipv4_only();
72
    } else {
73
        // Otherwise, default to saying IPv4 is preferred.
74
        prefs.ipv4_preferred();
75
    }
76
    prefs
77
}
78

            
79
/// A Key used to isolate connections.
80
///
81
/// Composed of an usize (representing which listener socket accepted
82
/// the connection, the source IpAddr of the client, and the
83
/// authentication string provided by the client).
84
#[derive(Debug, Clone, PartialEq, Eq)]
85
struct SocksIsolationKey(ConnIsolation, ProvidedIsolation);
86
/// Isolation information provided through the socks connection
87
#[derive(Debug, Clone, PartialEq, Eq)]
88
enum ProvidedIsolation {
89
    /// The socks isolation itself.
90
    Legacy(SocksAuth),
91
    /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
92
    Extended {
93
        /// Which format was negotiated?
94
        ///
95
        /// (At present, different format codes can't share a circuit.)
96
        format_code: u8,
97
        /// What's the isolation string?
98
        isolation: Box<[u8]>,
99
    },
100
}
101

            
102
impl arti_client::isolation::IsolationHelper for SocksIsolationKey {
103
    fn compatible_same_type(&self, other: &Self) -> bool {
104
        self == other
105
    }
106

            
107
    fn join_same_type(&self, other: &Self) -> Option<Self> {
108
        if self == other {
109
            Some(self.clone())
110
        } else {
111
            None
112
        }
113
    }
114
}
115

            
116
/// The meaning of a SOCKS authentication field, according to our conventions.
117
struct AuthInterpretation {
118
    /// Associate this stream with a DataStream created by using a particular RPC object
119
    /// as a Tor client.
120
    #[cfg(feature = "rpc")]
121
    rpc_object: Option<rpc::ObjectId>,
122

            
123
    /// Isolate this stream from other streams that do not have the same
124
    /// value.
125
    isolation: ProvidedIsolation,
126
}
127

            
128
/// NOTE: The following documentation belongs in a spec.
129
/// But for now, it's our best attempt to document the design and protocol
130
/// implemented here
131
/// for integrating SOCKS with our RPC system. --nickm
132
///
133
/// Roughly speaking:
134
///
135
/// ## Key concepts
136
///
137
/// A data stream is "RPC-visible" if, when it is created via SOCKS,
138
/// the RPC system is told about it.
139
///
140
/// Every RPC-visible stream is associated with a given RPC object when it is created.
141
/// (Since the RPC object is being specified in the SOCKS protocol,
142
/// it must be one with an externally visible Object ID.
143
/// Such Object IDs are cryptographically unguessable and unforgeable,
144
/// and are qualified with a unique identifier for their associated RPC session.)
145
/// Call this RPC Object the "target" object for now.
146
/// This target RPC object must implement
147
/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
148
///
149
/// Right now, there are two general kinds of objects that implement this method:
150
/// client-like objects, and one-shot clients.
151
///
152
/// A client-like object is either a `TorClient` or an RPC `Session`.
153
/// It knows about and it is capable of opening multiple data streams.
154
/// Using it as the target object for a SOCKS connection tells Arti
155
/// that the resulting data stream (if any)
156
/// should be built by it, and associated with its RPC session.
157
///
158
/// An application gets a TorClient by asking the session for one,
159
/// or for asking a TorClient to give you a new variant clone of itself.
160
///
161
/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
162
/// It is created from a client-like object, but can only be used for a single data stream.
163
/// When created, it it not yet connected or trying to connect to anywhere:
164
/// the act of using it as the target Object for a SOCKS connection causes
165
/// it to begin connecting.
166
///
167
/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
168
/// on any client-like object.
169
///
170
/// ## The SOCKS protocol
171
///
172
/// See the specification for
173
/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
174
/// for full details.
175
///
176
/// ### Further restrictions on Object IDs and isolation
177
///
178
/// In some cases,
179
/// the RPC Object ID may denote an object
180
/// that already includes information about its intended stream isolation.
181
/// In such cases, the stream isolation MUST be blank.
182
/// Implementations MUST reject non-blank stream isolation in such cases.
183
///
184
/// In some cases, the RPC object ID may denote an object
185
/// that already includes information
186
/// about its intended destination address and port.
187
/// In such cases, the destination address MUST be `0.0.0.0` or `::`
188
/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
189
/// and the destination port MUST be 0.
190
/// Implementations MUST reject other addresses in such cases.
191
///
192
/// ### Another proposed change
193
///
194
/// We could add a new method to clients, with a name like
195
/// "open_stream" or "connect_stream".
196
/// This method would include all target and isolation information in its parameters.
197
/// It would actually create a DataStream immediately, tell it to begin connecting,
198
/// and return an externally visible object ID.
199
/// The RPC protocol could be used to watch the DataStream object,
200
/// to see when it was connected.
201
///
202
/// The resulting DataStream object could also be used as the target of a SOCKS connection.
203
/// We would require in such a case that no isolation be provided in the SOCKS handshake,
204
/// and that the target address was (e.g.) INADDR_ANY.
205
///
206
/// ## Intended use cases (examples)
207
///
208
/// (These examples assume that the application
209
/// already knows the SOCKS port it should use.
210
/// I'm leaving out the isolation strings as orthogonal.)
211
///
212
/// These are **NOT** the only possible use cases;
213
/// they're just the two that help understand this system best (I hope).
214
///
215
/// ### Case 1: Using a client-like object directly.
216
///
217
/// Here the application has authenticated to RPC
218
/// and gotten the session ID `SESSION-1`.
219
/// (In reality, this would be a longer ID, and full of crypto).
220
///
221
/// The application wants to open a new stream to www.example.com.
222
/// They don't particularly care about isolation,
223
/// but they do want their stream to use their RPC session.
224
/// They don't want an Object ID for the stream.
225
///
226
/// To do this, they make a SOCKS connection to arti,
227
/// with target address www.example.com.
228
/// They set the username to `<torS0X>0SESSION-1`,
229
/// and the password to the empty string.
230
///
231
/// Arti looks up the Session object via the `SESSION-1` object ID
232
/// and tells it (via the ConnectWithPrefs special method)
233
/// to connect to www.example.com.
234
/// The session creates a new DataStream using its internal TorClient,
235
/// but does not register the stream with an RPC Object ID.
236
/// Arti proxies the application's SOCKS connection through this DataStream.
237
///
238
///
239
/// ### Case 2: Creating an identifiable stream.
240
///
241
/// Here the application wants to be able to refer to its DataStream
242
/// after the stream is created.
243
/// As before, we assume that it's on an RPC session
244
/// where the Session ID is `SESSION-1`.
245
///
246
/// The application sends an RPC request of the form:
247
/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
248
///
249
/// It receives a reply like:
250
/// `{"id": 123, "result": {"id": "STREAM-1"} }`
251
///
252
/// (In reality, `STREAM-1` would also be longer and full of crypto.)
253
///
254
/// Now the application has an object called `STREAM-1` that is not yet a connected
255
/// stream, but which may become one.
256
///
257
/// This time, it wants to set its isolation string to "xyzzy".
258
///
259
/// The application opens a socks connection as before.
260
/// For the username it sends `<torS0X>0STREAM-1`,
261
/// and for the password it sends `xyzzy`.
262
///
263
/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
264
/// and tells it (via the ConnectWithPrefs special method)
265
/// to connect to www.example.com.
266
/// This causes the `RpcDataStream` internally to create a new `DataStream`,
267
/// and to store that `DataStream` in itself.
268
/// The `RpcDataStream` with Object ID `STREAM-1`
269
/// is now an alias for the newly created `DataStream`.
270
/// Arti proxies the application's SOCKS connection through that `DataStream`.
271
///
272
#[cfg(feature = "rpc")]
273
#[allow(dead_code)]
274
mod socks_and_rpc {}
275

            
276
/// Given the authentication object from a socks connection, determine what it's telling
277
/// us to do.
278
///
279
/// (In no case is it actually SOCKS authentication: it can either be a message
280
/// to the stream isolation system or the RPC system.)
281
fn interpret_socks_auth(auth: &SocksAuth) -> Result<AuthInterpretation> {
282
    /// Interpretation of a SOCKS5 username according to
283
    /// the [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
284
    /// specification.
285
    enum Uname<'a> {
286
        /// This is a legacy username; it's just part of the
287
        /// isolation information.
288
        //
289
        // Note: We're not actually throwing away the username here;
290
        // instead we're going to use the whole SocksAuth
291
        // in a `ProvidedAuthentication::Legacy``.
292
        // TODO RPC: Find a more idiomatic way to express this data flow.
293
        Legacy,
294
        /// This is using the socks extension: contains the extension
295
        /// format code and the remaining information from the username.
296
        Extended(u8, &'a [u8]),
297
    }
298
    /// Helper: Try to interpret a SOCKS5 username field as indicating the start of a set of
299
    /// extended socks authentication information.
300
    ///
301
    /// Implements [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
302
    ///
303
    /// If it does indicate that extensions are in use,
304
    /// return a `Uname::Extended` containing
305
    /// the extension format type and the remaining information from the username.
306
    ///
307
    /// If it indicates that no extensions are in use,
308
    /// return `Uname::Legacy`.
309
    ///
310
    /// If it is badly formatted, return an error.
311
    fn interpret_socks5_username(username: &[u8]) -> Result<Uname<'_>> {
312
        /// 8-byte "magic" sequence from
313
        /// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
314
        /// When it appears at the start of a username,
315
        /// indicates that the username/password are to be interpreted as
316
        /// as encoding SOCKS5 extended parameters,
317
        /// but the format might not be one we recognize.
318
        const SOCKS_EXT_CONST_ANY: &[u8] = b"<torS0X>";
319
        let Some(remainder) = username.strip_prefix(SOCKS_EXT_CONST_ANY) else {
320
            return Ok(Uname::Legacy);
321
        };
322
        if remainder.is_empty() {
323
            return Err(anyhow!("Extended SOCKS information without format code."));
324
        }
325
        // TODO MSRV 1.80: use split_at_checked instead.
326
        // This won't panic since we checked for an empty string above.
327
        let (format_code, remainder) = remainder.split_at(1);
328
        Ok(Uname::Extended(format_code[0], remainder))
329
    }
330

            
331
    let isolation = match auth {
332
        SocksAuth::Username(user, pass) => match interpret_socks5_username(user)? {
333
            Uname::Legacy => ProvidedIsolation::Legacy(auth.clone()),
334
            Uname::Extended(b'1', b"") => {
335
                return Err(anyhow!("Received empty RPC object ID"));
336
            }
337
            Uname::Extended(format_code @ b'1', remainder) => {
338
                #[cfg(not(feature = "rpc"))]
339
                return Err(anyhow!(
340
                    "Received RPC object ID, but not built with support for RPC"
341
                ));
342
                #[cfg(feature = "rpc")]
343
                return Ok(AuthInterpretation {
344
                    rpc_object: Some(rpc::ObjectId::from(
345
                        std::str::from_utf8(remainder).context("Rpc object ID was not utf-8")?,
346
                    )),
347
                    isolation: ProvidedIsolation::Extended {
348
                        format_code,
349
                        isolation: pass.clone().into(),
350
                    },
351
                });
352
            }
353
            Uname::Extended(format_code @ b'0', b"") => ProvidedIsolation::Extended {
354
                format_code,
355
                isolation: pass.clone().into(),
356
            },
357
            Uname::Extended(b'0', _) => {
358
                return Err(anyhow!("Extraneous information in SOCKS username field."))
359
            }
360
            _ => return Err(anyhow!("Unrecognized SOCKS format code")),
361
        },
362
        _ => ProvidedIsolation::Legacy(auth.clone()),
363
    };
364

            
365
    Ok(AuthInterpretation {
366
        #[cfg(feature = "rpc")]
367
        rpc_object: None,
368
        isolation,
369
    })
370
}
371

            
372
/// Information used to implement a SOCKS connection.
373
struct SocksConnContext<R: Runtime> {
374
    /// A TorClient to use (by default) to anonymize requests.
375
    tor_client: TorClient<R>,
376
    /// If present, an RpcMgr to use when for attaching requests to RPC
377
    /// sessions.
378
    #[cfg(feature = "rpc")]
379
    rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
380
}
381

            
382
/// Type alias for the isolation information associated with a given SOCKS
383
/// connection _before_ SOCKS is negotiated.
384
///
385
/// Currently this is an index for which listener accepted the connection, plus
386
/// the address of the client that connected to the Socks port.
387
type ConnIsolation = (usize, IpAddr);
388

            
389
cfg_if::cfg_if! {
390
    if #[cfg(feature="rpc")] {
391
        use crate::rpc::conntarget::ConnTarget;
392
    } else {
393
        /// A type returned by get_prefs_and_session,
394
        /// and used to launch data streams or resolve attempts.
395
        ///
396
        /// TODO RPC: This is quite ugly; we should do something better.
397
        /// At least, we should never expose this outside the socks module.
398
        type ConnTarget<R> = TorClient<R>;
399
    }
400
}
401

            
402
impl<R: Runtime> SocksConnContext<R> {
403
    /// Interpret a SOCKS request and our input information to determine which
404
    /// TorClient / ClientConnectionTarget object and StreamPrefs we should use.
405
    ///
406
    /// TODO RPC: The return type here is a bit ugly.
407
    fn get_prefs_and_session(
408
        &self,
409
        request: &SocksRequest,
410
        target_addr: &str,
411
        conn_isolation: ConnIsolation,
412
    ) -> Result<(StreamPrefs, ConnTarget<R>)> {
413
        // Determine whether we want to ask for IPv4/IPv6 addresses.
414
        let mut prefs = stream_preference(request, target_addr);
415

            
416
        // Interpret socks authentication to see whether we want to connect to an RPC connector.
417
        let interp = interpret_socks_auth(request.auth())?;
418
        prefs.set_isolation(SocksIsolationKey(conn_isolation, interp.isolation));
419

            
420
        #[cfg(feature = "rpc")]
421
        if let Some(session) = interp.rpc_object {
422
            if let Some(mgr) = &self.rpc_mgr {
423
                let (context, object) = mgr
424
                    .lookup_object(&session)
425
                    .context("no such session found")?;
426
                let target = ConnTarget::Rpc { context, object };
427
                return Ok((prefs, target));
428
            } else {
429
                return Err(anyhow!("no rpc manager found!?"));
430
            }
431
        }
432

            
433
        let client = self.tor_client.clone();
434
        #[cfg(feature = "rpc")]
435
        let client = ConnTarget::Client(Box::new(client));
436

            
437
        Ok((prefs, client))
438
    }
439
}
440

            
441
/// Given a just-received TCP connection `S` on a SOCKS port, handle the
442
/// SOCKS handshake and relay the connection over the Tor network.
443
///
444
/// Uses `isolation_info` to decide which circuits this connection
445
/// may use.  Requires that `isolation_info` is a pair listing the listener
446
/// id and the source address for the socks request.
447
#[allow(clippy::cognitive_complexity)] // TODO: Refactor
448
async fn handle_socks_conn<R, S>(
449
    runtime: R,
450
    context: SocksConnContext<R>,
451
    mut socks_stream: S,
452
    isolation_info: ConnIsolation,
453
) -> Result<()>
454
where
455
    R: Runtime,
456
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
457
{
458
    // Part 1: Perform the SOCKS handshake, to learn where we are
459
    // being asked to connect, and what we're being asked to do once
460
    // we connect there.
461
    //
462
    // The SOCKS handshake can require multiple round trips (SOCKS5
463
    // always does) so we we need to run this part of the process in a
464
    // loop.
465
    let mut handshake = tor_socksproto::SocksProxyHandshake::new();
466

            
467
    let mut inbuf = tor_socksproto::Buffer::new();
468
    let request = loop {
469
        use tor_socksproto::NextStep as NS;
470

            
471
        let rv = handshake.step(&mut inbuf);
472

            
473
        let step = match rv {
474
            Err(e) => {
475
                if let tor_socksproto::Error::BadProtocol(version) = e {
476
                    // check for HTTP methods: CONNECT, DELETE, GET, HEAD, OPTION, PUT, POST, PATCH and
477
                    // TRACE.
478
                    // To do so, check the first byte of the connection, which happen to be placed
479
                    // where SOCKs version field is.
480
                    if [b'C', b'D', b'G', b'H', b'O', b'P', b'T'].contains(&version) {
481
                        write_all_and_close(&mut socks_stream, WRONG_PROTOCOL_PAYLOAD).await?;
482
                    }
483
                }
484
                // if there is an handshake error, don't reply with a Socks error, remote does not
485
                // seems to speak Socks.
486
                return Err(e.into());
487
            }
488
            Ok(y) => y,
489
        };
490

            
491
        match step {
492
            NS::Recv(mut recv) => {
493
                let n = socks_stream
494
                    .read(recv.buf())
495
                    .await
496
                    .context("Error while reading SOCKS handshake")?;
497
                recv.note_received(n)?;
498
            }
499
            NS::Send(data) => write_all_and_flush(&mut socks_stream, &data).await?,
500
            NS::Finished(fin) => break fin.into_output_forbid_pipelining()?,
501
        }
502
    };
503

            
504
    // Unpack the socks request and find out where we're connecting to.
505
    let addr = request.addr().to_string();
506
    let port = request.port();
507
    debug!(
508
        "Got a socks request: {} {}:{}",
509
        request.command(),
510
        sensitive(&addr),
511
        port
512
    );
513

            
514
    let (prefs, tor_client) = context.get_prefs_and_session(&request, &addr, isolation_info)?;
515

            
516
    match request.command() {
517
        SocksCmd::CONNECT => {
518
            // The SOCKS request wants us to connect to a given address.
519
            // So, launch a connection over Tor.
520
            let tor_addr = (addr.clone(), port).into_tor_addr()?;
521
            let tor_stream = tor_client.connect_with_prefs(&tor_addr, &prefs).await;
522
            let tor_stream = match tor_stream {
523
                Ok(s) => s,
524
                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
525
            };
526
            // Okay, great! We have a connection over the Tor network.
527
            debug!("Got a stream for {}:{}", sensitive(&addr), port);
528

            
529
            // Send back a SOCKS response, telling the client that it
530
            // successfully connected.
531
            let reply = request
532
                .reply(tor_socksproto::SocksStatus::SUCCEEDED, None)
533
                .context("Encoding socks reply")?;
534
            write_all_and_flush(&mut socks_stream, &reply[..]).await?;
535

            
536
            let (socks_r, socks_w) = socks_stream.split();
537
            let (tor_r, tor_w) = tor_stream.split();
538

            
539
            // Finally, spawn two background tasks to relay traffic between
540
            // the socks stream and the tor stream.
541
            runtime.spawn(copy_interactive(socks_r, tor_w).map(|_| ()))?;
542
            runtime.spawn(copy_interactive(tor_r, socks_w).map(|_| ()))?;
543
        }
544
        SocksCmd::RESOLVE => {
545
            // We've been asked to perform a regular hostname lookup.
546
            // (This is a tor-specific SOCKS extension.)
547

            
548
            let addr = if let Ok(addr) = addr.parse() {
549
                // if this is a valid ip address, just parse it and reply.
550
                Ok(addr)
551
            } else {
552
                tor_client
553
                    .resolve_with_prefs(&addr, &prefs)
554
                    .await
555
                    .map_err(|e| e.kind())
556
                    .and_then(|addrs| addrs.first().copied().ok_or(ErrorKind::Other))
557
            };
558
            match addr {
559
                Ok(addr) => {
560
                    let reply = request
561
                        .reply(
562
                            tor_socksproto::SocksStatus::SUCCEEDED,
563
                            Some(&SocksAddr::Ip(addr)),
564
                        )
565
                        .context("Encoding socks reply")?;
566
                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
567
                }
568
                Err(e) => return reply_error(&mut socks_stream, &request, e).await,
569
            }
570
        }
571
        SocksCmd::RESOLVE_PTR => {
572
            // We've been asked to perform a reverse hostname lookup.
573
            // (This is a tor-specific SOCKS extension.)
574
            let addr: IpAddr = match addr.parse() {
575
                Ok(ip) => ip,
576
                Err(e) => {
577
                    let reply = request
578
                        .reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None)
579
                        .context("Encoding socks reply")?;
580
                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
581
                    return Err(anyhow!(e));
582
                }
583
            };
584
            let hosts = match tor_client.resolve_ptr_with_prefs(addr, &prefs).await {
585
                Ok(hosts) => hosts,
586
                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
587
            };
588
            if let Some(host) = hosts.into_iter().next() {
589
                // this conversion should never fail, legal DNS names len must be <= 253 but Socks
590
                // names can be up to 255 chars.
591
                let hostname = SocksAddr::Hostname(host.try_into()?);
592
                let reply = request
593
                    .reply(tor_socksproto::SocksStatus::SUCCEEDED, Some(&hostname))
594
                    .context("Encoding socks reply")?;
595
                write_all_and_close(&mut socks_stream, &reply[..]).await?;
596
            }
597
        }
598
        _ => {
599
            // We don't support this SOCKS command.
600
            warn!("Dropping request; {:?} is unsupported", request.command());
601
            let reply = request
602
                .reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None)
603
                .context("Encoding socks reply")?;
604
            write_all_and_close(&mut socks_stream, &reply[..]).await?;
605
        }
606
    };
607

            
608
    // TODO: we should close the TCP stream if either task fails. Do we?
609
    // See #211 and #190.
610

            
611
    Ok(())
612
}
613

            
614
/// write_all the data to the writer & flush the writer if write_all is successful.
615
async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
616
where
617
    W: AsyncWrite + Unpin,
618
{
619
    writer
620
        .write_all(buf)
621
        .await
622
        .context("Error while writing SOCKS reply")?;
623
    writer
624
        .flush()
625
        .await
626
        .context("Error while flushing SOCKS stream")
627
}
628

            
629
/// write_all the data to the writer & close the writer if write_all is successful.
630
async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
631
where
632
    W: AsyncWrite + Unpin,
633
{
634
    writer
635
        .write_all(buf)
636
        .await
637
        .context("Error while writing SOCKS reply")?;
638
    writer
639
        .close()
640
        .await
641
        .context("Error while closing SOCKS stream")
642
}
643

            
644
/// Reply a Socks error based on an arti-client Error and close the stream.
645
/// Returns the error provided in parameter
646
async fn reply_error<W>(
647
    writer: &mut W,
648
    request: &SocksRequest,
649
    error: arti_client::ErrorKind,
650
) -> Result<()>
651
where
652
    W: AsyncWrite + Unpin,
653
{
654
    use {tor_socksproto::SocksStatus as S, ErrorKind as EK};
655

            
656
    // TODO: Currently we _always_ try to return extended SOCKS return values
657
    // for onion service failures from proposal 304 when they are appropriate.
658
    // But according to prop 304, this is something we should only do when it's
659
    // requested, for compatibility with SOCKS implementations that can't handle
660
    // unexpected REP codes.
661
    //
662
    // I suggest we make these extended error codes "always-on" for now, and
663
    // later add a feature to disable them if it's needed. -nickm
664

            
665
    // TODO: Perhaps we should map the extended SOCKS return values for onion
666
    // service failures unconditionally, even if we haven't compiled in onion
667
    // service client support.  We can make that change after the relevant
668
    // ErrorKinds are no longer `experimental-api` in `tor-error`.
669

            
670
    // We need to send an error. See what kind it is.
671
    let status = match error {
672
        EK::RemoteNetworkFailed => S::TTL_EXPIRED,
673

            
674
        #[cfg(feature = "onion-service-client")]
675
        EK::OnionServiceNotFound => S::HS_DESC_NOT_FOUND,
676
        #[cfg(feature = "onion-service-client")]
677
        EK::OnionServiceAddressInvalid => S::HS_BAD_ADDRESS,
678
        #[cfg(feature = "onion-service-client")]
679
        EK::OnionServiceMissingClientAuth => S::HS_MISSING_CLIENT_AUTH,
680
        #[cfg(feature = "onion-service-client")]
681
        EK::OnionServiceWrongClientAuth => S::HS_WRONG_CLIENT_AUTH,
682

            
683
        // NOTE: This is not a perfect correspondence from these ErrorKinds to
684
        // the errors we're returning here. In the longer run, we'll want to
685
        // encourage other ways to indicate failure to clients.  Those ways might
686
        // include encouraging HTTP CONNECT, or the RPC system, both of which
687
        // would give us more robust ways to report different kinds of failure.
688
        #[cfg(feature = "onion-service-client")]
689
        EK::OnionServiceNotRunning
690
        | EK::OnionServiceConnectionFailed
691
        | EK::OnionServiceProtocolViolation => S::HS_INTRO_FAILED,
692

            
693
        _ => S::GENERAL_FAILURE,
694
    };
695
    let reply = request
696
        .reply(status, None)
697
        .context("Encoding socks reply")?;
698
    // if writing back the error fail, still return the original error
699
    let _ = write_all_and_close(writer, &reply[..]).await;
700

            
701
    Err(anyhow!(error))
702
}
703

            
704
/// Copy all the data from `reader` into `writer` until we encounter an EOF or
705
/// an error.
706
///
707
/// Unlike as futures::io::copy(), this function is meant for use with
708
/// interactive readers and writers, where the reader might pause for
709
/// a while, but where we want to send data on the writer as soon as
710
/// it is available.
711
///
712
/// This function assumes that the writer might need to be flushed for
713
/// any buffered data to be sent.  It tries to minimize the number of
714
/// flushes, however, by only flushing the writer when the reader has no data.
715
async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
716
where
717
    R: AsyncRead + Unpin,
718
    W: AsyncWrite + Unpin,
719
{
720
    use futures::{poll, task::Poll};
721

            
722
    let mut buf = [0_u8; SOCKS_BUF_LEN];
723

            
724
    // At this point we could just loop, calling read().await,
725
    // write_all().await, and flush().await.  But we want to be more
726
    // clever than that: we only want to flush when the reader is
727
    // stalled.  That way we can pack our data into as few cells as
728
    // possible, but flush it immediately whenever there's no more
729
    // data coming.
730
    let loop_result: IoResult<()> = loop {
731
        let mut read_future = reader.read(&mut buf[..]);
732
        match poll!(&mut read_future) {
733
            Poll::Ready(Err(e)) => break Err(e),
734
            Poll::Ready(Ok(0)) => break Ok(()), // EOF
735
            Poll::Ready(Ok(n)) => {
736
                writer.write_all(&buf[..n]).await?;
737
                continue;
738
            }
739
            Poll::Pending => writer.flush().await?,
740
        }
741

            
742
        // The read future is pending, so we should wait on it.
743
        match read_future.await {
744
            Err(e) => break Err(e),
745
            Ok(0) => break Ok(()),
746
            Ok(n) => writer.write_all(&buf[..n]).await?,
747
        }
748
    };
749

            
750
    // Make sure that we flush any lingering data if we can.
751
    //
752
    // If there is a difference between closing and dropping, then we
753
    // only want to do a "proper" close if the reader closed cleanly.
754
    let flush_result = if loop_result.is_ok() {
755
        writer.close().await
756
    } else {
757
        writer.flush().await
758
    };
759

            
760
    loop_result.or(flush_result)
761
}
762

            
763
/// Return true if a given IoError, when received from accept, is a fatal
764
/// error.
765
fn accept_err_is_fatal(err: &IoError) -> bool {
766
    #![allow(clippy::match_like_matches_macro)]
767

            
768
    /// Re-declaration of WSAEMFILE with the right type to match
769
    /// `raw_os_error()`.
770
    #[cfg(windows)]
771
    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
772

            
773
    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
774
    // we need to use OS-specific errors. :P
775
    match err.raw_os_error() {
776
        #[cfg(unix)]
777
        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
778
        #[cfg(windows)]
779
        Some(WSAEMFILE) => false,
780
        _ => true,
781
    }
782
}
783

            
784
/// Launch a SOCKS proxy to listen on a given localhost port, and run
785
/// indefinitely.
786
///
787
/// Requires a `runtime` to use for launching tasks and handling
788
/// timeouts, and a `tor_client` to use in connecting over the Tor
789
/// network.
790
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
791
#[allow(clippy::cognitive_complexity)] // TODO: Refactor
792
pub(crate) async fn run_socks_proxy<R: Runtime>(
793
    runtime: R,
794
    tor_client: TorClient<R>,
795
    listen: Listen,
796
    rpc_data: Option<RpcProxySupport>,
797
) -> Result<()> {
798
    #[cfg(feature = "rpc")]
799
    let (rpc_mgr, mut rpc_state_sender) = match rpc_data {
800
        Some(RpcProxySupport {
801
            rpc_mgr,
802
            rpc_state_sender,
803
        }) => (Some(rpc_mgr), Some(rpc_state_sender)),
804
        None => (None, None),
805
    };
806

            
807
    if !listen.is_localhost_only() {
808
        warn!("Configured to listen for SOCKS on non-local addresses. This is usually insecure! We recommend listening on localhost only.");
809
    }
810

            
811
    let mut listeners = Vec::new();
812
    let mut listening_on_addrs = Vec::new();
813

            
814
    // Try to bind to the SOCKS ports.
815
    match listen.ip_addrs() {
816
        Ok(addrgroups) => {
817
            for addrgroup in addrgroups {
818
                for addr in addrgroup {
819
                    match runtime.listen(&addr).await {
820
                        Ok(listener) => {
821
                            info!("Listening on {:?}.", addr);
822
                            listeners.push(listener);
823
                            listening_on_addrs.push(addr);
824
                        }
825
                        #[cfg(unix)]
826
                        Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
827
                            warn_report!(e, "Address family not supported {}", addr);
828
                        }
829
                        Err(ref e) => {
830
                            return Err(anyhow!("Can't listen on {}: {e}", addr));
831
                        }
832
                    }
833
                }
834
            }
835
        }
836
        Err(e) => warn_report!(e, "Invalid listen spec"),
837
    }
838

            
839
    // We weren't able to bind any ports: There's nothing to do.
840
    if listeners.is_empty() {
841
        error!("Couldn't open any SOCKS listeners.");
842
        return Err(anyhow!("Couldn't open SOCKS listeners"));
843
    }
844

            
845
    cfg_if::cfg_if! {
846
        if #[cfg(feature="rpc")] {
847
            if let Some(rpc_state_sender) = &mut rpc_state_sender {
848
                rpc_state_sender.set_socks_listeners(&listening_on_addrs[..]);
849
            }
850
        } else {
851
            let _ = listening_on_addrs;
852
        }
853
    }
854

            
855
    // Create a stream of (incoming socket, listener_id) pairs, selected
856
    // across all the listeners.
857
    let mut incoming = futures::stream::select_all(
858
        listeners
859
            .into_iter()
860
            .map(NetStreamListener::incoming)
861
            .enumerate()
862
            .map(|(listener_id, incoming_conns)| {
863
                incoming_conns.map(move |socket| (socket, listener_id))
864
            }),
865
    );
866

            
867
    // Loop over all incoming connections.  For each one, call
868
    // handle_socks_conn() in a new task.
869
    while let Some((stream, sock_id)) = incoming.next().await {
870
        let (stream, addr) = match stream {
871
            Ok((s, a)) => (s, a),
872
            Err(err) => {
873
                if accept_err_is_fatal(&err) {
874
                    return Err(err).context("Failed to receive incoming stream on SOCKS port");
875
                } else {
876
                    warn_report!(err, "Incoming stream failed");
877
                    continue;
878
                }
879
            }
880
        };
881
        let socks_context = SocksConnContext {
882
            tor_client: tor_client.clone(),
883
            #[cfg(feature = "rpc")]
884
            rpc_mgr: rpc_mgr.clone(),
885
        };
886
        let runtime_copy = runtime.clone();
887
        runtime.spawn(async move {
888
            let res =
889
                handle_socks_conn(runtime_copy, socks_context, stream, (sock_id, addr.ip())).await;
890
            if let Err(e) = res {
891
                // TODO: warn_report doesn't work on anyhow::Error.
892
                warn!("connection exited with error: {}", tor_error::Report(e));
893
            }
894
        })?;
895
    }
896

            
897
    Ok(())
898
}