1
//! Implement a simple SOCKS proxy that relays connections over Tor.
2
//!
3
//! A proxy is launched with [`run_socks_proxy()`], which listens for new
4
//! connections and then runs
5

            
6
use futures::future::FutureExt;
7
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error as IoError};
8
use futures::stream::StreamExt;
9
use futures::task::SpawnExt;
10
use safelog::sensitive;
11
use std::io::Result as IoResult;
12
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
13
#[cfg(feature = "rpc")]
14
use std::sync::Arc;
15
use tracing::{debug, error, info, warn};
16

            
17
#[allow(unused)]
18
use arti_client::HasKind;
19
use arti_client::{ErrorKind, IntoTorAddr as _, StreamPrefs, TorClient};
20
use tor_config::Listen;
21
use tor_error::warn_report;
22
#[cfg(feature = "rpc")]
23
use tor_rpcbase::{self as rpc};
24
use tor_rtcompat::{NetStreamListener, Runtime};
25
use tor_socksproto::{Handshake as _, SocksAddr, SocksAuth, SocksCmd, SocksRequest, SOCKS_BUF_LEN};
26

            
27
use anyhow::{anyhow, Context, Result};
28

            
29
use crate::rpc::RpcProxySupport;
30

            
31
/// Payload to return when an HTTP connection arrive on a Socks port
32
const WRONG_PROTOCOL_PAYLOAD: &[u8] = br#"HTTP/1.0 501 Tor is not an HTTP Proxy
33
Content-Type: text/html; charset=utf-8
34

            
35
<!DOCTYPE html>
36
<html>
37
<head>
38
<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
39
</head>
40
<body>
41
<h1>This is a SOCKS proxy, not an HTTP proxy.</h1>
42
<p>
43
It appears you have configured your web browser to use this Tor port as
44
an HTTP proxy.
45
</p>
46
<p>
47
This is not correct: This port is configured as a SOCKS proxy, not
48
an HTTP proxy. If you need an HTTP proxy tunnel, wait for Arti to
49
add support for it in place of, or in addition to, socks_port.
50
Please configure your client accordingly.
51
</p>
52
<p>
53
See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
54
</p>
55
</body>
56
</html>"#;
57

            
58
/// Find out which kind of address family we can/should use for a
59
/// given `SocksRequest`.
60
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
61
fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
62
    let mut prefs = StreamPrefs::new();
63
    if addr.parse::<Ipv4Addr>().is_ok() {
64
        // If they asked for an IPv4 address correctly, nothing else will do.
65
        prefs.ipv4_only();
66
    } else if addr.parse::<Ipv6Addr>().is_ok() {
67
        // If they asked for an IPv6 address correctly, nothing else will do.
68
        prefs.ipv6_only();
69
    } else if req.version() == tor_socksproto::SocksVersion::V4 {
70
        // SOCKS4 and SOCKS4a only support IPv4
71
        prefs.ipv4_only();
72
    } else {
73
        // Otherwise, default to saying IPv4 is preferred.
74
        prefs.ipv4_preferred();
75
    }
76
    prefs
77
}
78

            
79
/// A Key used to isolate connections.
80
///
81
/// Composed of an usize (representing which listener socket accepted
82
/// the connection, the source IpAddr of the client, and the
83
/// authentication string provided by the client).
84
#[derive(Debug, Clone, PartialEq, Eq)]
85
struct SocksIsolationKey(ConnIsolation, ProvidedIsolation);
86
/// Isolation information provided through the socks connection
87
#[derive(Debug, Clone, PartialEq, Eq)]
88
enum ProvidedIsolation {
89
    /// The socks isolation itself.
90
    Legacy(SocksAuth),
91
    /// A bytestring provided as isolation with the extended Socks5 username/password protocol.
92
    Extended {
93
        /// Which format was negotiated?
94
        ///
95
        /// (At present, different format codes can't share a circuit.)
96
        format_code: u8,
97
        /// What's the isolation string?
98
        isolation: Box<[u8]>,
99
    },
100
}
101

            
102
impl arti_client::isolation::IsolationHelper for SocksIsolationKey {
103
    fn compatible_same_type(&self, other: &Self) -> bool {
104
        self == other
105
    }
106

            
107
    fn join_same_type(&self, other: &Self) -> Option<Self> {
108
        if self == other {
109
            Some(self.clone())
110
        } else {
111
            None
112
        }
113
    }
114
}
115

            
116
/// The meaning of a SOCKS authentication field, according to our conventions.
117
struct AuthInterpretation {
118
    /// Associate this stream with a DataStream created by using a particular RPC object
119
    /// as a Tor client.
120
    #[cfg(feature = "rpc")]
121
    rpc_object: Option<rpc::ObjectId>,
122

            
123
    /// Isolate this stream from other streams that do not have the same
124
    /// value.
125
    isolation: ProvidedIsolation,
126
}
127

            
128
/// NOTE: The following documentation belongs in a spec.
129
/// But for now, it's our best attempt to document the design and protocol
130
/// implemented here
131
/// for integrating SOCKS with our RPC system. --nickm
132
///
133
/// Roughly speaking:
134
///
135
/// ## Key concepts
136
///
137
/// A data stream is "RPC-visible" if, when it is created via SOCKS,
138
/// the RPC system is told about it.
139
///
140
/// Every RPC-visible stream is associated with a given RPC object when it is created.
141
/// (Since the RPC object is being specified in the SOCKS protocol,
142
/// it must be one with an externally visible Object ID.
143
/// Such Object IDs are cryptographically unguessable and unforgeable,
144
/// and are qualified with a unique identifier for their associated RPC session.)
145
/// Call this RPC Object the "target" object for now.
146
/// This target RPC object must implement
147
/// the [`ConnectWithPrefs`](arti_client::rpc::ConnectWithPrefs) special method.
148
///
149
/// Right now, there are two general kinds of objects that implement this method:
150
/// client-like objects, and one-shot clients.
151
///
152
/// A client-like object is either a `TorClient` or an RPC `Session`.
153
/// It knows about and it is capable of opening multiple data streams.
154
/// Using it as the target object for a SOCKS connection tells Arti
155
/// that the resulting data stream (if any)
156
/// should be built by it, and associated with its RPC session.
157
///
158
/// An application gets a TorClient by asking the session for one,
159
/// or for asking a TorClient to give you a new variant clone of itself.
160
///
161
/// A one-shot client is an `arti_rpcserver::stream::OneshotClient`.
162
/// It is created from a client-like object, but can only be used for a single data stream.
163
/// When created, it it not yet connected or trying to connect to anywhere:
164
/// the act of using it as the target Object for a SOCKS connection causes
165
/// it to begin connecting.
166
///
167
/// An application gets a `OneShotClient` by calling `arti:new_oneshot_client`
168
/// on any client-like object.
169
///
170
/// ## The SOCKS protocol
171
///
172
/// See the specification for
173
/// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
174
/// for full details.
175
///
176
/// ### Further restrictions on Object IDs and isolation
177
///
178
/// In some cases,
179
/// the RPC Object ID may denote an object
180
/// that already includes information about its intended stream isolation.
181
/// In such cases, the stream isolation MUST be blank.
182
/// Implementations MUST reject non-blank stream isolation in such cases.
183
///
184
/// In some cases, the RPC object ID may denote an object
185
/// that already includes information
186
/// about its intended destination address and port.
187
/// In such cases, the destination address MUST be `0.0.0.0` or `::`
188
/// (encoded either as an IPv4 address, an IPv6 address, or a hostname)
189
/// and the destination port MUST be 0.
190
/// Implementations MUST reject other addresses in such cases.
191
///
192
/// ### Another proposed change
193
///
194
/// We could add a new method to clients, with a name like
195
/// "open_stream" or "connect_stream".
196
/// This method would include all target and isolation information in its parameters.
197
/// It would actually create a DataStream immediately, tell it to begin connecting,
198
/// and return an externally visible object ID.
199
/// The RPC protocol could be used to watch the DataStream object,
200
/// to see when it was connected.
201
///
202
/// The resulting DataStream object could also be used as the target of a SOCKS connection.
203
/// We would require in such a case that no isolation be provided in the SOCKS handshake,
204
/// and that the target address was (e.g.) INADDR_ANY.
205
///
206
/// ## Intended use cases (examples)
207
///
208
/// (These examples assume that the application
209
/// already knows the SOCKS port it should use.
210
/// I'm leaving out the isolation strings as orthogonal.)
211
///
212
/// These are **NOT** the only possible use cases;
213
/// they're just the two that help understand this system best (I hope).
214
///
215
/// ### Case 1: Using a client-like object directly.
216
///
217
/// Here the application has authenticated to RPC
218
/// and gotten the session ID `SESSION-1`.
219
/// (In reality, this would be a longer ID, and full of crypto).
220
///
221
/// The application wants to open a new stream to www.example.com.
222
/// They don't particularly care about isolation,
223
/// but they do want their stream to use their RPC session.
224
/// They don't want an Object ID for the stream.
225
///
226
/// To do this, they make a SOCKS connection to arti,
227
/// with target address www.example.com.
228
/// They set the username to `<torS0X>0SESSION-1`,
229
/// and the password to the empty string.
230
///
231
/// Arti looks up the Session object via the `SESSION-1` object ID
232
/// and tells it (via the ConnectWithPrefs special method)
233
/// to connect to www.example.com.
234
/// The session creates a new DataStream using its internal TorClient,
235
/// but does not register the stream with an RPC Object ID.
236
/// Arti proxies the application's SOCKS connection through this DataStream.
237
///
238
///
239
/// ### Case 2: Creating an identifiable stream.
240
///
241
/// Here the application wants to be able to refer to its DataStream
242
/// after the stream is created.
243
/// As before, we assume that it's on an RPC session
244
/// where the Session ID is `SESSION-1`.
245
///
246
/// The application sends an RPC request of the form:
247
/// `{"id": 123, "obj": "SESSION-1", "method": "arti:new_oneshot_client", "params": {}}`
248
///
249
/// It receives a reply like:
250
/// `{"id": 123, "result": {"id": "STREAM-1"} }`
251
///
252
/// (In reality, `STREAM-1` would also be longer and full of crypto.)
253
///
254
/// Now the application has an object called `STREAM-1` that is not yet a connected
255
/// stream, but which may become one.
256
///
257
/// This time, it wants to set its isolation string to "xyzzy".
258
///
259
/// The application opens a socks connection as before.
260
/// For the username it sends `<torS0X>0STREAM-1`,
261
/// and for the password it sends `xyzzy`.
262
///
263
/// Now Arti looks up the `RpcDataStream` object via `STREAM-1`,
264
/// and tells it (via the ConnectWithPrefs special method)
265
/// to connect to www.example.com.
266
/// This causes the `RpcDataStream` internally to create a new `DataStream`,
267
/// and to store that `DataStream` in itself.
268
/// The `RpcDataStream` with Object ID `STREAM-1`
269
/// is now an alias for the newly created `DataStream`.
270
/// Arti proxies the application's SOCKS connection through that `DataStream`.
271
///
272
#[cfg(feature = "rpc")]
273
#[allow(dead_code)]
274
mod socks_and_rpc {}
275

            
276
/// Given the authentication object from a socks connection, determine what it's telling
277
/// us to do.
278
///
279
/// (In no case is it actually SOCKS authentication: it can either be a message
280
/// to the stream isolation system or the RPC system.)
281
fn interpret_socks_auth(auth: &SocksAuth) -> Result<AuthInterpretation> {
282
    /// Interpretation of a SOCKS5 username according to
283
    /// the [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth)
284
    /// specification.
285
    enum Uname<'a> {
286
        /// This is a legacy username; it's just part of the
287
        /// isolation information.
288
        //
289
        // Note: We're not actually throwing away the username here;
290
        // instead we're going to use the whole SocksAuth
291
        // in a `ProvidedAuthentication::Legacy``.
292
        // TODO RPC: Find a more idiomatic way to express this data flow.
293
        Legacy,
294
        /// This is using the socks extension: contains the extension
295
        /// format code and the remaining information from the username.
296
        Extended(u8, &'a [u8]),
297
    }
298
    /// Helper: Try to interpret a SOCKS5 username field as indicating the start of a set of
299
    /// extended socks authentication information.
300
    ///
301
    /// Implements [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
302
    ///
303
    /// If it does indicate that extensions are in use,
304
    /// return a `Uname::Extended` containing
305
    /// the extension format type and the remaining information from the username.
306
    ///
307
    /// If it indicates that no extensions are in use,
308
    /// return `Uname::Legacy`.
309
    ///
310
    /// If it is badly formatted, return an error.
311
    fn interpret_socks5_username(username: &[u8]) -> Result<Uname<'_>> {
312
        /// 8-byte "magic" sequence from
313
        /// [SOCKS extended authentication](https://spec.torproject.org/socks-extensions.html#extended-auth).
314
        /// When it appears at the start of a username,
315
        /// indicates that the username/password are to be interpreted as
316
        /// as encoding SOCKS5 extended parameters,
317
        /// but the format might not be one we recognize.
318
        const SOCKS_EXT_CONST_ANY: &[u8] = b"<torS0X>";
319
        let Some(remainder) = username.strip_prefix(SOCKS_EXT_CONST_ANY) else {
320
            return Ok(Uname::Legacy);
321
        };
322
        if remainder.is_empty() {
323
            return Err(anyhow!("Extended SOCKS information without format code."));
324
        }
325
        // TODO MSRV 1.80: use split_at_checked instead.
326
        // This won't panic since we checked for an empty string above.
327
        let (format_code, remainder) = remainder.split_at(1);
328
        Ok(Uname::Extended(format_code[0], remainder))
329
    }
330

            
331
    let isolation = match auth {
332
        SocksAuth::Username(user, pass) => match interpret_socks5_username(user)? {
333
            Uname::Legacy => ProvidedIsolation::Legacy(auth.clone()),
334
            Uname::Extended(b'1', b"") => {
335
                return Err(anyhow!("Received empty RPC object ID"));
336
            }
337
            Uname::Extended(format_code @ b'1', remainder) => {
338
                #[cfg(not(feature = "rpc"))]
339
                return Err(anyhow!(
340
                    "Received RPC object ID, but not built with support for RPC"
341
                ));
342
                #[cfg(feature = "rpc")]
343
                return Ok(AuthInterpretation {
344
                    rpc_object: Some(rpc::ObjectId::from(
345
                        std::str::from_utf8(remainder).context("Rpc object ID was not utf-8")?,
346
                    )),
347
                    isolation: ProvidedIsolation::Extended {
348
                        format_code,
349
                        isolation: pass.clone().into(),
350
                    },
351
                });
352
            }
353
            Uname::Extended(format_code @ b'0', b"") => ProvidedIsolation::Extended {
354
                format_code,
355
                isolation: pass.clone().into(),
356
            },
357
            Uname::Extended(b'0', _) => {
358
                return Err(anyhow!("Extraneous information in SOCKS username field."))
359
            }
360
            _ => return Err(anyhow!("Unrecognized SOCKS format code")),
361
        },
362
        _ => ProvidedIsolation::Legacy(auth.clone()),
363
    };
364

            
365
    Ok(AuthInterpretation {
366
        #[cfg(feature = "rpc")]
367
        rpc_object: None,
368
        isolation,
369
    })
370
}
371

            
372
/// Information used to implement a SOCKS connection.
373
struct SocksConnContext<R: Runtime> {
374
    /// A TorClient to use (by default) to anonymize requests.
375
    tor_client: TorClient<R>,
376
    /// If present, an RpcMgr to use when for attaching requests to RPC
377
    /// sessions.
378
    #[cfg(feature = "rpc")]
379
    rpc_mgr: Option<Arc<arti_rpcserver::RpcMgr>>,
380
}
381

            
382
/// Type alias for the isolation information associated with a given SOCKS
383
/// connection _before_ SOCKS is negotiated.
384
///
385
/// Currently this is an index for which listener accepted the connection, plus
386
/// the address of the client that connected to the Socks port.
387
type ConnIsolation = (usize, IpAddr);
388

            
389
cfg_if::cfg_if! {
390
    if #[cfg(feature="rpc")] {
391
        use crate::rpc::conntarget::ConnTarget;
392
    } else {
393
        /// A type returned by get_prefs_and_session,
394
        /// and used to launch data streams or resolve attempts.
395
        ///
396
        /// TODO RPC: This is quite ugly; we should do something better.
397
        /// At least, we should never expose this outside the socks module.
398
        type ConnTarget<R> = TorClient<R>;
399
    }
400
}
401

            
402
impl<R: Runtime> SocksConnContext<R> {
403
    /// Interpret a SOCKS request and our input information to determine which
404
    /// TorClient / ClientConnectionTarget object and StreamPrefs we should use.
405
    ///
406
    /// TODO RPC: The return type here is a bit ugly.
407
    fn get_prefs_and_session(
408
        &self,
409
        request: &SocksRequest,
410
        target_addr: &str,
411
        conn_isolation: ConnIsolation,
412
    ) -> Result<(StreamPrefs, ConnTarget<R>)> {
413
        // Determine whether we want to ask for IPv4/IPv6 addresses.
414
        let mut prefs = stream_preference(request, target_addr);
415

            
416
        // Interpret socks authentication to see whether we want to connect to an RPC connector.
417
        let interp = interpret_socks_auth(request.auth())?;
418
        prefs.set_isolation(SocksIsolationKey(conn_isolation, interp.isolation));
419

            
420
        #[cfg(feature = "rpc")]
421
        if let Some(session) = interp.rpc_object {
422
            if let Some(mgr) = &self.rpc_mgr {
423
                let (context, object) = mgr
424
                    .lookup_object(&session)
425
                    .context("no such session found")?;
426
                let target = ConnTarget::Rpc { context, object };
427
                return Ok((prefs, target));
428
            } else {
429
                return Err(anyhow!("no rpc manager found!?"));
430
            }
431
        }
432

            
433
        let client = self.tor_client.clone();
434
        #[cfg(feature = "rpc")]
435
        let client = ConnTarget::Client(client);
436

            
437
        Ok((prefs, client))
438
    }
439
}
440

            
441
/// Given a just-received TCP connection `S` on a SOCKS port, handle the
442
/// SOCKS handshake and relay the connection over the Tor network.
443
///
444
/// Uses `isolation_info` to decide which circuits this connection
445
/// may use.  Requires that `isolation_info` is a pair listing the listener
446
/// id and the source address for the socks request.
447
async fn handle_socks_conn<R, S>(
448
    runtime: R,
449
    context: SocksConnContext<R>,
450
    mut socks_stream: S,
451
    isolation_info: ConnIsolation,
452
) -> Result<()>
453
where
454
    R: Runtime,
455
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
456
{
457
    // Part 1: Perform the SOCKS handshake, to learn where we are
458
    // being asked to connect, and what we're being asked to do once
459
    // we connect there.
460
    //
461
    // The SOCKS handshake can require multiple round trips (SOCKS5
462
    // always does) so we we need to run this part of the process in a
463
    // loop.
464
    let mut handshake = tor_socksproto::SocksProxyHandshake::new();
465

            
466
    let mut inbuf = tor_socksproto::Buffer::new();
467
    let request = loop {
468
        use tor_socksproto::NextStep as NS;
469

            
470
        let rv = handshake.step(&mut inbuf);
471

            
472
        let step = match rv {
473
            Err(e) => {
474
                if let tor_socksproto::Error::BadProtocol(version) = e {
475
                    // check for HTTP methods: CONNECT, DELETE, GET, HEAD, OPTION, PUT, POST, PATCH and
476
                    // TRACE.
477
                    // To do so, check the first byte of the connection, which happen to be placed
478
                    // where SOCKs version field is.
479
                    if [b'C', b'D', b'G', b'H', b'O', b'P', b'T'].contains(&version) {
480
                        write_all_and_close(&mut socks_stream, WRONG_PROTOCOL_PAYLOAD).await?;
481
                    }
482
                }
483
                // if there is an handshake error, don't reply with a Socks error, remote does not
484
                // seems to speak Socks.
485
                return Err(e.into());
486
            }
487
            Ok(y) => y,
488
        };
489

            
490
        match step {
491
            NS::Recv(mut recv) => {
492
                let n = socks_stream
493
                    .read(recv.buf())
494
                    .await
495
                    .context("Error while reading SOCKS handshake")?;
496
                recv.note_received(n)?;
497
            }
498
            NS::Send(data) => write_all_and_flush(&mut socks_stream, &data).await?,
499
            NS::Finished(fin) => break fin.into_output_forbid_pipelining()?,
500
        }
501
    };
502

            
503
    // Unpack the socks request and find out where we're connecting to.
504
    let addr = request.addr().to_string();
505
    let port = request.port();
506
    debug!(
507
        "Got a socks request: {} {}:{}",
508
        request.command(),
509
        sensitive(&addr),
510
        port
511
    );
512

            
513
    let (prefs, tor_client) = context.get_prefs_and_session(&request, &addr, isolation_info)?;
514

            
515
    match request.command() {
516
        SocksCmd::CONNECT => {
517
            // The SOCKS request wants us to connect to a given address.
518
            // So, launch a connection over Tor.
519
            let tor_addr = (addr.clone(), port).into_tor_addr()?;
520
            let tor_stream = tor_client.connect_with_prefs(&tor_addr, &prefs).await;
521
            let tor_stream = match tor_stream {
522
                Ok(s) => s,
523
                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
524
            };
525
            // Okay, great! We have a connection over the Tor network.
526
            debug!("Got a stream for {}:{}", sensitive(&addr), port);
527

            
528
            // Send back a SOCKS response, telling the client that it
529
            // successfully connected.
530
            let reply = request
531
                .reply(tor_socksproto::SocksStatus::SUCCEEDED, None)
532
                .context("Encoding socks reply")?;
533
            write_all_and_flush(&mut socks_stream, &reply[..]).await?;
534

            
535
            let (socks_r, socks_w) = socks_stream.split();
536
            let (tor_r, tor_w) = tor_stream.split();
537

            
538
            // Finally, spawn two background tasks to relay traffic between
539
            // the socks stream and the tor stream.
540
            runtime.spawn(copy_interactive(socks_r, tor_w).map(|_| ()))?;
541
            runtime.spawn(copy_interactive(tor_r, socks_w).map(|_| ()))?;
542
        }
543
        SocksCmd::RESOLVE => {
544
            // We've been asked to perform a regular hostname lookup.
545
            // (This is a tor-specific SOCKS extension.)
546

            
547
            let addr = if let Ok(addr) = addr.parse() {
548
                // if this is a valid ip address, just parse it and reply.
549
                Ok(addr)
550
            } else {
551
                tor_client
552
                    .resolve_with_prefs(&addr, &prefs)
553
                    .await
554
                    .map_err(|e| e.kind())
555
                    .and_then(|addrs| addrs.first().copied().ok_or(ErrorKind::Other))
556
            };
557
            match addr {
558
                Ok(addr) => {
559
                    let reply = request
560
                        .reply(
561
                            tor_socksproto::SocksStatus::SUCCEEDED,
562
                            Some(&SocksAddr::Ip(addr)),
563
                        )
564
                        .context("Encoding socks reply")?;
565
                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
566
                }
567
                Err(e) => return reply_error(&mut socks_stream, &request, e).await,
568
            }
569
        }
570
        SocksCmd::RESOLVE_PTR => {
571
            // We've been asked to perform a reverse hostname lookup.
572
            // (This is a tor-specific SOCKS extension.)
573
            let addr: IpAddr = match addr.parse() {
574
                Ok(ip) => ip,
575
                Err(e) => {
576
                    let reply = request
577
                        .reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None)
578
                        .context("Encoding socks reply")?;
579
                    write_all_and_close(&mut socks_stream, &reply[..]).await?;
580
                    return Err(anyhow!(e));
581
                }
582
            };
583
            let hosts = match tor_client.resolve_ptr_with_prefs(addr, &prefs).await {
584
                Ok(hosts) => hosts,
585
                Err(e) => return reply_error(&mut socks_stream, &request, e.kind()).await,
586
            };
587
            if let Some(host) = hosts.into_iter().next() {
588
                // this conversion should never fail, legal DNS names len must be <= 253 but Socks
589
                // names can be up to 255 chars.
590
                let hostname = SocksAddr::Hostname(host.try_into()?);
591
                let reply = request
592
                    .reply(tor_socksproto::SocksStatus::SUCCEEDED, Some(&hostname))
593
                    .context("Encoding socks reply")?;
594
                write_all_and_close(&mut socks_stream, &reply[..]).await?;
595
            }
596
        }
597
        _ => {
598
            // We don't support this SOCKS command.
599
            warn!("Dropping request; {:?} is unsupported", request.command());
600
            let reply = request
601
                .reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None)
602
                .context("Encoding socks reply")?;
603
            write_all_and_close(&mut socks_stream, &reply[..]).await?;
604
        }
605
    };
606

            
607
    // TODO: we should close the TCP stream if either task fails. Do we?
608
    // See #211 and #190.
609

            
610
    Ok(())
611
}
612

            
613
/// write_all the data to the writer & flush the writer if write_all is successful.
614
async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
615
where
616
    W: AsyncWrite + Unpin,
617
{
618
    writer
619
        .write_all(buf)
620
        .await
621
        .context("Error while writing SOCKS reply")?;
622
    writer
623
        .flush()
624
        .await
625
        .context("Error while flushing SOCKS stream")
626
}
627

            
628
/// write_all the data to the writer & close the writer if write_all is successful.
629
async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
630
where
631
    W: AsyncWrite + Unpin,
632
{
633
    writer
634
        .write_all(buf)
635
        .await
636
        .context("Error while writing SOCKS reply")?;
637
    writer
638
        .close()
639
        .await
640
        .context("Error while closing SOCKS stream")
641
}
642

            
643
/// Reply a Socks error based on an arti-client Error and close the stream.
644
/// Returns the error provided in parameter
645
async fn reply_error<W>(
646
    writer: &mut W,
647
    request: &SocksRequest,
648
    error: arti_client::ErrorKind,
649
) -> Result<()>
650
where
651
    W: AsyncWrite + Unpin,
652
{
653
    use {tor_socksproto::SocksStatus as S, ErrorKind as EK};
654

            
655
    // TODO: Currently we _always_ try to return extended SOCKS return values
656
    // for onion service failures from proposal 304 when they are appropriate.
657
    // But according to prop 304, this is something we should only do when it's
658
    // requested, for compatibility with SOCKS implementations that can't handle
659
    // unexpected REP codes.
660
    //
661
    // I suggest we make these extended error codes "always-on" for now, and
662
    // later add a feature to disable them if it's needed. -nickm
663

            
664
    // TODO: Perhaps we should map the extended SOCKS return values for onion
665
    // service failures unconditionally, even if we haven't compiled in onion
666
    // service client support.  We can make that change after the relevant
667
    // ErrorKinds are no longer `experimental-api` in `tor-error`.
668

            
669
    // We need to send an error. See what kind it is.
670
    let status = match error {
671
        EK::RemoteNetworkFailed => S::TTL_EXPIRED,
672

            
673
        #[cfg(feature = "onion-service-client")]
674
        EK::OnionServiceNotFound => S::HS_DESC_NOT_FOUND,
675
        #[cfg(feature = "onion-service-client")]
676
        EK::OnionServiceAddressInvalid => S::HS_BAD_ADDRESS,
677
        #[cfg(feature = "onion-service-client")]
678
        EK::OnionServiceMissingClientAuth => S::HS_MISSING_CLIENT_AUTH,
679
        #[cfg(feature = "onion-service-client")]
680
        EK::OnionServiceWrongClientAuth => S::HS_WRONG_CLIENT_AUTH,
681

            
682
        // NOTE: This is not a perfect correspondence from these ErrorKinds to
683
        // the errors we're returning here. In the longer run, we'll want to
684
        // encourage other ways to indicate failure to clients.  Those ways might
685
        // include encouraging HTTP CONNECT, or the RPC system, both of which
686
        // would give us more robust ways to report different kinds of failure.
687
        #[cfg(feature = "onion-service-client")]
688
        EK::OnionServiceNotRunning
689
        | EK::OnionServiceConnectionFailed
690
        | EK::OnionServiceProtocolViolation => S::HS_INTRO_FAILED,
691

            
692
        _ => S::GENERAL_FAILURE,
693
    };
694
    let reply = request
695
        .reply(status, None)
696
        .context("Encoding socks reply")?;
697
    // if writing back the error fail, still return the original error
698
    let _ = write_all_and_close(writer, &reply[..]).await;
699

            
700
    Err(anyhow!(error))
701
}
702

            
703
/// Copy all the data from `reader` into `writer` until we encounter an EOF or
704
/// an error.
705
///
706
/// Unlike as futures::io::copy(), this function is meant for use with
707
/// interactive readers and writers, where the reader might pause for
708
/// a while, but where we want to send data on the writer as soon as
709
/// it is available.
710
///
711
/// This function assumes that the writer might need to be flushed for
712
/// any buffered data to be sent.  It tries to minimize the number of
713
/// flushes, however, by only flushing the writer when the reader has no data.
714
async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
715
where
716
    R: AsyncRead + Unpin,
717
    W: AsyncWrite + Unpin,
718
{
719
    use futures::{poll, task::Poll};
720

            
721
    let mut buf = [0_u8; SOCKS_BUF_LEN];
722

            
723
    // At this point we could just loop, calling read().await,
724
    // write_all().await, and flush().await.  But we want to be more
725
    // clever than that: we only want to flush when the reader is
726
    // stalled.  That way we can pack our data into as few cells as
727
    // possible, but flush it immediately whenever there's no more
728
    // data coming.
729
    let loop_result: IoResult<()> = loop {
730
        let mut read_future = reader.read(&mut buf[..]);
731
        match poll!(&mut read_future) {
732
            Poll::Ready(Err(e)) => break Err(e),
733
            Poll::Ready(Ok(0)) => break Ok(()), // EOF
734
            Poll::Ready(Ok(n)) => {
735
                writer.write_all(&buf[..n]).await?;
736
                continue;
737
            }
738
            Poll::Pending => writer.flush().await?,
739
        }
740

            
741
        // The read future is pending, so we should wait on it.
742
        match read_future.await {
743
            Err(e) => break Err(e),
744
            Ok(0) => break Ok(()),
745
            Ok(n) => writer.write_all(&buf[..n]).await?,
746
        }
747
    };
748

            
749
    // Make sure that we flush any lingering data if we can.
750
    //
751
    // If there is a difference between closing and dropping, then we
752
    // only want to do a "proper" close if the reader closed cleanly.
753
    let flush_result = if loop_result.is_ok() {
754
        writer.close().await
755
    } else {
756
        writer.flush().await
757
    };
758

            
759
    loop_result.or(flush_result)
760
}
761

            
762
/// Return true if a given IoError, when received from accept, is a fatal
763
/// error.
764
fn accept_err_is_fatal(err: &IoError) -> bool {
765
    #![allow(clippy::match_like_matches_macro)]
766

            
767
    /// Re-declaration of WSAEMFILE with the right type to match
768
    /// `raw_os_error()`.
769
    #[cfg(windows)]
770
    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
771

            
772
    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
773
    // we need to use OS-specific errors. :P
774
    match err.raw_os_error() {
775
        #[cfg(unix)]
776
        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
777
        #[cfg(windows)]
778
        Some(WSAEMFILE) => false,
779
        _ => true,
780
    }
781
}
782

            
783
/// Launch a SOCKS proxy to listen on a given localhost port, and run
784
/// indefinitely.
785
///
786
/// Requires a `runtime` to use for launching tasks and handling
787
/// timeouts, and a `tor_client` to use in connecting over the Tor
788
/// network.
789
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
790
pub(crate) async fn run_socks_proxy<R: Runtime>(
791
    runtime: R,
792
    tor_client: TorClient<R>,
793
    listen: Listen,
794
    rpc_data: Option<RpcProxySupport>,
795
) -> Result<()> {
796
    #[cfg(feature = "rpc")]
797
    let (rpc_mgr, mut rpc_state_sender) = match rpc_data {
798
        Some(RpcProxySupport {
799
            rpc_mgr,
800
            rpc_state_sender,
801
        }) => (Some(rpc_mgr), Some(rpc_state_sender)),
802
        None => (None, None),
803
    };
804

            
805
    if !listen.is_localhost_only() {
806
        warn!("Configured to listen for SOCKS on non-local addresses. This is usually insecure! We recommend listening on localhost only.");
807
    }
808

            
809
    let mut listeners = Vec::new();
810
    let mut listening_on_addrs = Vec::new();
811

            
812
    // Try to bind to the SOCKS ports.
813
    match listen.ip_addrs() {
814
        Ok(addrgroups) => {
815
            for addrgroup in addrgroups {
816
                for addr in addrgroup {
817
                    match runtime.listen(&addr).await {
818
                        Ok(listener) => {
819
                            info!("Listening on {:?}.", addr);
820
                            listeners.push(listener);
821
                            listening_on_addrs.push(addr);
822
                        }
823
                        #[cfg(unix)]
824
                        Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
825
                            warn_report!(e, "Address family not supported {}", addr);
826
                        }
827
                        Err(ref e) => {
828
                            return Err(anyhow!("Can't listen on {}: {e}", addr));
829
                        }
830
                    }
831
                }
832
            }
833
        }
834
        Err(e) => warn_report!(e, "Invalid listen spec"),
835
    }
836

            
837
    // We weren't able to bind any ports: There's nothing to do.
838
    if listeners.is_empty() {
839
        error!("Couldn't open any SOCKS listeners.");
840
        return Err(anyhow!("Couldn't open SOCKS listeners"));
841
    }
842

            
843
    cfg_if::cfg_if! {
844
        if #[cfg(feature="rpc")] {
845
            if let Some(rpc_state_sender) = &mut rpc_state_sender {
846
                rpc_state_sender.set_socks_listeners(&listening_on_addrs[..]);
847
            }
848
        } else {
849
            let _ = listening_on_addrs;
850
        }
851
    }
852

            
853
    // Create a stream of (incoming socket, listener_id) pairs, selected
854
    // across all the listeners.
855
    let mut incoming = futures::stream::select_all(
856
        listeners
857
            .into_iter()
858
            .map(NetStreamListener::incoming)
859
            .enumerate()
860
            .map(|(listener_id, incoming_conns)| {
861
                incoming_conns.map(move |socket| (socket, listener_id))
862
            }),
863
    );
864

            
865
    // Loop over all incoming connections.  For each one, call
866
    // handle_socks_conn() in a new task.
867
    while let Some((stream, sock_id)) = incoming.next().await {
868
        let (stream, addr) = match stream {
869
            Ok((s, a)) => (s, a),
870
            Err(err) => {
871
                if accept_err_is_fatal(&err) {
872
                    return Err(err).context("Failed to receive incoming stream on SOCKS port");
873
                } else {
874
                    warn_report!(err, "Incoming stream failed");
875
                    continue;
876
                }
877
            }
878
        };
879
        let socks_context = SocksConnContext {
880
            tor_client: tor_client.clone(),
881
            #[cfg(feature = "rpc")]
882
            rpc_mgr: rpc_mgr.clone(),
883
        };
884
        let runtime_copy = runtime.clone();
885
        runtime.spawn(async move {
886
            let res =
887
                handle_socks_conn(runtime_copy, socks_context, stream, (sock_id, addr.ip())).await;
888
            if let Err(e) = res {
889
                // TODO: warn_report doesn't work on anyhow::Error.
890
                warn!("connection exited with error: {}", tor_error::Report(e));
891
            }
892
        })?;
893
    }
894

            
895
    Ok(())
896
}