1
//! A simple reverse-proxy implementation for onion services.
2

            
3
use std::sync::{Arc, Mutex};
4

            
5
use futures::{
6
    select_biased, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future,
7
    FutureExt as _, Stream, StreamExt as _,
8
};
9
use oneshot_fused_workaround as oneshot;
10
use safelog::sensitive as sv;
11
use std::io::{Error as IoError, Result as IoResult};
12
use tor_cell::relaycell::msg as relaymsg;
13
use tor_error::{debug_report, ErrorKind, HasKind};
14
use tor_hsservice::{HsNickname, RendRequest, StreamRequest};
15
use tor_log_ratelim::log_ratelim;
16
use tor_proto::stream::{DataStream, IncomingStreamRequest};
17
use tor_rtcompat::Runtime;
18

            
19
use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr};
20

            
21
/// A reverse proxy that handles connections from an `OnionService` by routing
22
/// them to local addresses.
23
#[derive(Debug)]
24
pub struct OnionServiceReverseProxy {
25
    /// Mutable state held by this reverse proxy.
26
    state: Mutex<State>,
27
}
28

            
29
/// Mutable part of an RProxy
30
#[derive(Debug)]
31
struct State {
32
    /// The current configuration for this reverse proxy.
33
    config: ProxyConfig,
34
    /// A sender that we'll drop when it's time to shut down this proxy.
35
    shutdown_tx: Option<oneshot::Sender<void::Void>>,
36
    /// A receiver that we'll use to monitor for shutdown signals.
37
    shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
38
}
39

            
40
/// An error that prevents further progress while processing requests.
41
#[derive(Clone, Debug, thiserror::Error)]
42
#[non_exhaustive]
43
pub enum HandleRequestsError {
44
    /// The runtime says it was unable to spawn a task.
45
    #[error("Unable to spawn a task")]
46
    Spawn(#[source] Arc<futures::task::SpawnError>),
47
}
48

            
49
impl HasKind for HandleRequestsError {
50
    fn kind(&self) -> ErrorKind {
51
        match self {
52
            HandleRequestsError::Spawn(e) => e.kind(),
53
        }
54
    }
55
}
56

            
57
impl OnionServiceReverseProxy {
58
    /// Create a new proxy with a given configuration.
59
    pub fn new(config: ProxyConfig) -> Arc<Self> {
60
        let (shutdown_tx, shutdown_rx) = oneshot::channel();
61
        Arc::new(Self {
62
            state: Mutex::new(State {
63
                config,
64
                shutdown_tx: Some(shutdown_tx),
65
                shutdown_rx: shutdown_rx.shared(),
66
            }),
67
        })
68
    }
69

            
70
    /// Try to change the configuration of this proxy.
71
    ///
72
    /// This change applies only to new connections through the proxy; existing
73
    /// connections are not affected.
74
    pub fn reconfigure(
75
        &self,
76
        config: ProxyConfig,
77
        how: tor_config::Reconfigure,
78
    ) -> Result<(), tor_config::ReconfigureError> {
79
        if how == tor_config::Reconfigure::CheckAllOrNothing {
80
            // Every possible reconfiguration is allowed.
81
            return Ok(());
82
        }
83
        let mut state = self.state.lock().expect("poisoned lock");
84
        state.config = config;
85
        // Note: we don't need to use a postage::watch here, since we just want
86
        // to lock this configuration whenever we get a request.  We could use a
87
        // Mutex<Arc<>> instead, but the performance shouldn't matter.
88
        //
89
        Ok(())
90
    }
91

            
92
    /// Shut down all request-handlers running using with this proxy.
93
    pub fn shutdown(&self) {
94
        let mut state = self.state.lock().expect("poisoned lock");
95
        let _ = state.shutdown_tx.take();
96
    }
97

            
98
    /// Use this proxy to handle a stream of [`RendRequest`]s.
99
    ///
100
    /// The future returned by this function blocks indefinitely, so you may
101
    /// want to spawn a separate task for it.
102
    ///
103
    /// The provided nickname is used for logging.
104
    pub async fn handle_requests<R, S>(
105
        &self,
106
        runtime: R,
107
        nickname: HsNickname,
108
        requests: S,
109
    ) -> Result<(), HandleRequestsError>
110
    where
111
        R: Runtime,
112
        S: Stream<Item = RendRequest> + Unpin,
113
    {
114
        let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse();
115
        let mut shutdown_rx = self
116
            .state
117
            .lock()
118
            .expect("poisoned lock")
119
            .shutdown_rx
120
            .clone()
121
            .fuse();
122
        let nickname = Arc::new(nickname);
123

            
124
        loop {
125
            let stream_request = select_biased! {
126
                _ = shutdown_rx => return Ok(()),
127
                stream_request = stream_requests.next() => match stream_request {
128
                    None => return Ok(()),
129
                    Some(s) => s,
130
                }
131
            };
132

            
133
            let action = self.choose_action(stream_request.request());
134
            let a_clone = action.clone();
135
            let rt_clone = runtime.clone();
136
            let nn_clone = Arc::clone(&nickname);
137
            let req = stream_request.request().clone();
138

            
139
            runtime
140
                .spawn(async move {
141
                    let outcome =
142
                        run_action(rt_clone, nn_clone.as_ref(), action, stream_request).await;
143

            
144
                    log_ratelim!(
145
                        "Performing action on {}", nn_clone;
146
                        outcome;
147
                        Err(_) => WARN, "Unable to take action {:?} for request {:?}", sv(a_clone), sv(req)
148
                    );
149
                })
150
                .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?;
151
        }
152
    }
153

            
154
    /// Choose the configured action that we should take in response to a
155
    /// [`StreamRequest`], based on our current configuration.
156
    fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction {
157
        let port: u16 = match stream_request {
158
            IncomingStreamRequest::Begin(begin) => {
159
                // The C tor implementation deliberately ignores the address and
160
                // flags on the BEGIN message, so we do too.
161
                begin.port()
162
            }
163
            other => {
164
                tracing::warn!(
165
                    "Rejecting onion service request for invalid command {:?}. Internal error.",
166
                    other
167
                );
168
                return ProxyAction::DestroyCircuit;
169
            }
170
        };
171

            
172
        self.state
173
            .lock()
174
            .expect("poisoned lock")
175
            .config
176
            .resolve_port_for_begin(port)
177
            .cloned()
178
            // The default action is "destroy the circuit."
179
            .unwrap_or(ProxyAction::DestroyCircuit)
180
    }
181
}
182

            
183
/// Take the configured action from `action` on the incoming request `request`.
184
async fn run_action<R: Runtime>(
185
    runtime: R,
186
    nickname: &HsNickname,
187
    action: ProxyAction,
188
    request: StreamRequest,
189
) -> Result<(), RequestFailed> {
190
    match action {
191
        ProxyAction::DestroyCircuit => {
192
            request
193
                .shutdown_circuit()
194
                .map_err(RequestFailed::CantDestroy)?;
195
        }
196
        ProxyAction::Forward(encap, target) => match (encap, target) {
197
            (Encapsulation::Simple, ref addr @ TargetAddr::Inet(a)) => {
198
                let rt_clone = runtime.clone();
199
                forward_connection(rt_clone, request, runtime.connect(&a), nickname, addr).await?;
200
            } /* TODO (#1246)
201
                (Encapsulation::Simple, TargetAddr::Unix(_)) => {
202
                    // TODO: We need to implement unix connections.
203
                }
204
              */
205
        },
206
        ProxyAction::RejectStream => {
207
            // C tor sends DONE in this case, so we do too.
208
            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
209

            
210
            request
211
                .reject(end)
212
                .await
213
                .map_err(RequestFailed::CantReject)?;
214
        }
215
        ProxyAction::IgnoreStream => drop(request),
216
    };
217
    Ok(())
218
}
219

            
220
/// An error from a single attempt to handle an onion service request.
221
#[derive(thiserror::Error, Debug, Clone)]
222
enum RequestFailed {
223
    /// Encountered an error trying to destroy a circuit.
224
    #[error("Unable to destroy onion service circuit")]
225
    CantDestroy(#[source] tor_error::Bug),
226

            
227
    /// Encountered an error trying to reject a single stream request.
228
    #[error("Unable to reject onion service request")]
229
    CantReject(#[source] tor_hsservice::ClientError),
230

            
231
    /// Encountered an error trying to tell the remote onion service client that
232
    /// we have accepted their connection.
233
    #[error("Unable to accept onion service connection")]
234
    AcceptRemote(#[source] tor_hsservice::ClientError),
235

            
236
    /// The runtime refused to spawn a task for us.
237
    #[error("Unable to spawn task")]
238
    Spawn(#[source] Arc<futures::task::SpawnError>),
239
}
240

            
241
impl HasKind for RequestFailed {
242
    fn kind(&self) -> ErrorKind {
243
        match self {
244
            RequestFailed::CantDestroy(e) => e.kind(),
245
            RequestFailed::CantReject(e) => e.kind(),
246
            RequestFailed::AcceptRemote(e) => e.kind(),
247
            RequestFailed::Spawn(e) => e.kind(),
248
        }
249
    }
250
}
251

            
252
/// Try to open a connection to an appropriate local target using
253
/// `target_stream_future`.  If successful, try to report success on `request`
254
/// and transmit data between the two stream indefinitely.  On failure, close
255
/// `request`.
256
///
257
/// Only return an error if we were unable to behave as intended due to a
258
/// problem we did not already report.
259
async fn forward_connection<R, FUT, TS>(
260
    runtime: R,
261
    request: StreamRequest,
262
    target_stream_future: FUT,
263
    nickname: &HsNickname,
264
    addr: &TargetAddr,
265
) -> Result<(), RequestFailed>
266
where
267
    R: Runtime,
268
    FUT: Future<Output = Result<TS, IoError>>,
269
    TS: AsyncRead + AsyncWrite + Send + 'static,
270
{
271
    let local_stream = target_stream_future.await.map_err(Arc::new);
272

            
273
    // TODO: change this to "log_ratelim!(nickname=%nickname, ..." when log_ratelim can do that
274
    // (we should search for HSS log messages and make them all be in the same form)
275
    log_ratelim!(
276
        "Connecting to {} for onion service {}", sv(addr), nickname;
277
        local_stream
278
    );
279

            
280
    let local_stream = match local_stream {
281
        Ok(s) => s,
282
        Err(_) => {
283
            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
284
            if let Err(e_rejecting) = request.reject(end).await {
285
                debug_report!(
286
                    &e_rejecting,
287
                    "Unable to reject onion service request from client"
288
                );
289
                return Err(RequestFailed::CantReject(e_rejecting));
290
            }
291
            // We reported the (rate-limited) error from local_stream in
292
            // DEBUG_REPORT above.
293
            return Ok(());
294
        }
295
    };
296

            
297
    let onion_service_stream: DataStream = {
298
        let connected = relaymsg::Connected::new_empty();
299
        request
300
            .accept(connected)
301
            .await
302
            .map_err(RequestFailed::AcceptRemote)?
303
    };
304

            
305
    let (svc_r, svc_w) = onion_service_stream.split();
306
    let (local_r, local_w) = local_stream.split();
307

            
308
    runtime
309
        .spawn(copy_interactive(local_r, svc_w).map(|_| ()))
310
        .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
311
    runtime
312
        .spawn(copy_interactive(svc_r, local_w).map(|_| ()))
313
        .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
314

            
315
    Ok(())
316
}
317

            
318
/// Copy all the data from `reader` into `writer` until we encounter an EOF or
319
/// an error.
320
///
321
/// Unlike as futures::io::copy(), this function is meant for use with
322
/// interactive readers and writers, where the reader might pause for
323
/// a while, but where we want to send data on the writer as soon as
324
/// it is available.
325
///
326
/// This function assumes that the writer might need to be flushed for
327
/// any buffered data to be sent.  It tries to minimize the number of
328
/// flushes, however, by only flushing the writer when the reader has no data.
329
///
330
/// NOTE: This is duplicate code from `arti::socks`.  But instead of
331
/// deduplicating it, we should change the behavior in `DataStream` that makes
332
/// it necessary. See arti#786 for a fuller discussion.
333
async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
334
where
335
    R: AsyncRead + Unpin,
336
    W: AsyncWrite + Unpin,
337
{
338
    use futures::{poll, task::Poll};
339

            
340
    let mut buf = [0_u8; 1024];
341

            
342
    // At this point we could just loop, calling read().await,
343
    // write_all().await, and flush().await.  But we want to be more
344
    // clever than that: we only want to flush when the reader is
345
    // stalled.  That way we can pack our data into as few cells as
346
    // possible, but flush it immediately whenever there's no more
347
    // data coming.
348
    let loop_result: IoResult<()> = loop {
349
        let mut read_future = reader.read(&mut buf[..]);
350
        match poll!(&mut read_future) {
351
            Poll::Ready(Err(e)) => break Err(e),
352
            Poll::Ready(Ok(0)) => break Ok(()), // EOF
353
            Poll::Ready(Ok(n)) => {
354
                writer.write_all(&buf[..n]).await?;
355
                continue;
356
            }
357
            Poll::Pending => writer.flush().await?,
358
        }
359

            
360
        // The read future is pending, so we should wait on it.
361
        match read_future.await {
362
            Err(e) => break Err(e),
363
            Ok(0) => break Ok(()),
364
            Ok(n) => writer.write_all(&buf[..n]).await?,
365
        }
366
    };
367

            
368
    // Make sure that we flush any lingering data if we can.
369
    //
370
    // If there is a difference between closing and dropping, then we
371
    // only want to do a "proper" close if the reader closed cleanly.
372
    let flush_result = if loop_result.is_ok() {
373
        writer.close().await
374
    } else {
375
        writer.flush().await
376
    };
377

            
378
    loop_result.or(flush_result)
379
}