tor_hsrproxy/
proxy.rs

1//! A simple reverse-proxy implementation for onion services.
2
3use std::sync::{Arc, Mutex};
4
5use futures::{
6    select_biased, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future,
7    FutureExt as _, Stream, StreamExt as _,
8};
9use itertools::iproduct;
10use oneshot_fused_workaround as oneshot;
11use safelog::sensitive as sv;
12use std::collections::HashMap;
13use std::io::{Error as IoError, Result as IoResult};
14use strum::IntoEnumIterator;
15use tor_cell::relaycell::msg as relaymsg;
16use tor_error::{debug_report, ErrorKind, HasKind};
17use tor_hsservice::{HsNickname, RendRequest, StreamRequest};
18use tor_log_ratelim::log_ratelim;
19use tor_proto::stream::{DataStream, IncomingStreamRequest};
20use tor_rtcompat::Runtime;
21
22use crate::config::{
23    Encapsulation, ProxyAction, ProxyActionDiscriminants, ProxyConfig, TargetAddr,
24};
25
26/// A reverse proxy that handles connections from an `OnionService` by routing
27/// them to local addresses.
28#[derive(Debug)]
29pub struct OnionServiceReverseProxy {
30    /// Mutable state held by this reverse proxy.
31    state: Mutex<State>,
32}
33
34/// Mutable part of an RProxy
35#[derive(Debug)]
36struct State {
37    /// The current configuration for this reverse proxy.
38    config: ProxyConfig,
39    /// A sender that we'll drop when it's time to shut down this proxy.
40    shutdown_tx: Option<oneshot::Sender<void::Void>>,
41    /// A receiver that we'll use to monitor for shutdown signals.
42    shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>,
43}
44
45/// An error that prevents further progress while processing requests.
46#[derive(Clone, Debug, thiserror::Error)]
47#[non_exhaustive]
48pub enum HandleRequestsError {
49    /// The runtime says it was unable to spawn a task.
50    #[error("Unable to spawn a task")]
51    Spawn(#[source] Arc<futures::task::SpawnError>),
52}
53
54impl HasKind for HandleRequestsError {
55    fn kind(&self) -> ErrorKind {
56        match self {
57            HandleRequestsError::Spawn(e) => e.kind(),
58        }
59    }
60}
61
62impl OnionServiceReverseProxy {
63    /// Create a new proxy with a given configuration.
64    pub fn new(config: ProxyConfig) -> Arc<Self> {
65        let (shutdown_tx, shutdown_rx) = oneshot::channel();
66        Arc::new(Self {
67            state: Mutex::new(State {
68                config,
69                shutdown_tx: Some(shutdown_tx),
70                shutdown_rx: shutdown_rx.shared(),
71            }),
72        })
73    }
74
75    /// Try to change the configuration of this proxy.
76    ///
77    /// This change applies only to new connections through the proxy; existing
78    /// connections are not affected.
79    pub fn reconfigure(
80        &self,
81        config: ProxyConfig,
82        how: tor_config::Reconfigure,
83    ) -> Result<(), tor_config::ReconfigureError> {
84        if how == tor_config::Reconfigure::CheckAllOrNothing {
85            // Every possible reconfiguration is allowed.
86            return Ok(());
87        }
88        let mut state = self.state.lock().expect("poisoned lock");
89        state.config = config;
90        // Note: we don't need to use a postage::watch here, since we just want
91        // to lock this configuration whenever we get a request.  We could use a
92        // Mutex<Arc<>> instead, but the performance shouldn't matter.
93        //
94        Ok(())
95    }
96
97    /// Shut down all request-handlers running using with this proxy.
98    pub fn shutdown(&self) {
99        let mut state = self.state.lock().expect("poisoned lock");
100        let _ = state.shutdown_tx.take();
101    }
102
103    /// Use this proxy to handle a stream of [`RendRequest`]s.
104    ///
105    /// The future returned by this function blocks indefinitely, so you may
106    /// want to spawn a separate task for it.
107    ///
108    /// The provided nickname is used for logging.
109    pub async fn handle_requests<R, S>(
110        &self,
111        runtime: R,
112        nickname: HsNickname,
113        requests: S,
114    ) -> Result<(), HandleRequestsError>
115    where
116        R: Runtime,
117        S: Stream<Item = RendRequest> + Unpin,
118    {
119        let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse();
120        let mut shutdown_rx = self
121            .state
122            .lock()
123            .expect("poisoned lock")
124            .shutdown_rx
125            .clone()
126            .fuse();
127        let nickname = Arc::new(nickname);
128
129        /// Which of the three counters for each action
130        #[cfg(feature = "metrics")]
131        #[derive(Clone, Copy, Eq, PartialEq, Hash)]
132        enum CounterSelector {
133            /// Two counters, one for successes, one for failures
134            Ret(Result<(), ()>),
135            /// One counter for the total
136            Total,
137        }
138
139        #[cfg(feature = "metrics")]
140        let metrics_counters = {
141            use CounterSelector as CS;
142
143            let counters = iproduct!(
144                ProxyActionDiscriminants::iter(),
145                [
146                    (CS::Total, "arti_hss_proxy_connections_total"),
147                    (CS::Ret(Ok(())), "arti_hss_proxy_connections_ok_total"),
148                    (CS::Ret(Err(())), "arti_hss_proxy_connections_failed_total"),
149                ],
150            )
151            .map(|(action, (outcome, name))| {
152                let k = (action, outcome);
153                let nickname = nickname.to_string();
154                let action: &str = action.into();
155                let v = metrics::counter!(name, "nickname" => nickname, "action" => action);
156                (k, v)
157            })
158            .collect::<HashMap<(ProxyActionDiscriminants, CounterSelector), _>>();
159
160            Arc::new(counters)
161        };
162
163        loop {
164            let stream_request = select_biased! {
165                _ = shutdown_rx => return Ok(()),
166                stream_request = stream_requests.next() => match stream_request {
167                    None => return Ok(()),
168                    Some(s) => s,
169                }
170            };
171
172            runtime.spawn({
173                let action = self.choose_action(stream_request.request());
174                let runtime = runtime.clone();
175                let nickname = nickname.clone();
176                let req = stream_request.request().clone();
177
178                #[cfg(feature = "metrics")]
179                let metrics_counters = metrics_counters.clone();
180
181                async move {
182                    let outcome =
183                        run_action(runtime, nickname.as_ref(), action.clone(), stream_request).await;
184
185                    #[cfg(feature = "metrics")]
186                    {
187                        use CounterSelector as CS;
188
189                        let action = ProxyActionDiscriminants::from(&action);
190                        let outcome = outcome.as_ref().map(|_|()).map_err(|_|());
191                        for outcome in [CS::Total, CS::Ret(outcome)] {
192                            if let Some(counter) = metrics_counters.get(&(action, outcome)) {
193                                counter.increment(1);
194                            } else {
195                                // statically be impossible, but let's not panic
196                            }
197                        }
198                    }
199
200                    log_ratelim!(
201                        "Performing action on {}", nickname;
202                        outcome;
203                        Err(_) => WARN, "Unable to take action {:?} for request {:?}", sv(action), sv(req)
204                    );
205                }
206            })
207                .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?;
208        }
209    }
210
211    /// Choose the configured action that we should take in response to a
212    /// [`StreamRequest`], based on our current configuration.
213    fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction {
214        let port: u16 = match stream_request {
215            IncomingStreamRequest::Begin(begin) => {
216                // The C tor implementation deliberately ignores the address and
217                // flags on the BEGIN message, so we do too.
218                begin.port()
219            }
220            other => {
221                tracing::warn!(
222                    "Rejecting onion service request for invalid command {:?}. Internal error.",
223                    other
224                );
225                return ProxyAction::DestroyCircuit;
226            }
227        };
228
229        self.state
230            .lock()
231            .expect("poisoned lock")
232            .config
233            .resolve_port_for_begin(port)
234            .cloned()
235            // The default action is "destroy the circuit."
236            .unwrap_or(ProxyAction::DestroyCircuit)
237    }
238}
239
240/// Take the configured action from `action` on the incoming request `request`.
241async fn run_action<R: Runtime>(
242    runtime: R,
243    nickname: &HsNickname,
244    action: ProxyAction,
245    request: StreamRequest,
246) -> Result<(), RequestFailed> {
247    match action {
248        ProxyAction::DestroyCircuit => {
249            request
250                .shutdown_circuit()
251                .map_err(RequestFailed::CantDestroy)?;
252        }
253        ProxyAction::Forward(encap, target) => match (encap, target) {
254            (Encapsulation::Simple, ref addr @ TargetAddr::Inet(a)) => {
255                let rt_clone = runtime.clone();
256                forward_connection(rt_clone, request, runtime.connect(&a), nickname, addr).await?;
257            } /* TODO (#1246)
258                (Encapsulation::Simple, TargetAddr::Unix(_)) => {
259                    // TODO: We need to implement unix connections.
260                }
261              */
262        },
263        ProxyAction::RejectStream => {
264            // C tor sends DONE in this case, so we do too.
265            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
266
267            request
268                .reject(end)
269                .await
270                .map_err(RequestFailed::CantReject)?;
271        }
272        ProxyAction::IgnoreStream => drop(request),
273    };
274    Ok(())
275}
276
277/// An error from a single attempt to handle an onion service request.
278#[derive(thiserror::Error, Debug, Clone)]
279enum RequestFailed {
280    /// Encountered an error trying to destroy a circuit.
281    #[error("Unable to destroy onion service circuit")]
282    CantDestroy(#[source] tor_error::Bug),
283
284    /// Encountered an error trying to reject a single stream request.
285    #[error("Unable to reject onion service request")]
286    CantReject(#[source] tor_hsservice::ClientError),
287
288    /// Encountered an error trying to tell the remote onion service client that
289    /// we have accepted their connection.
290    #[error("Unable to accept onion service connection")]
291    AcceptRemote(#[source] tor_hsservice::ClientError),
292
293    /// The runtime refused to spawn a task for us.
294    #[error("Unable to spawn task")]
295    Spawn(#[source] Arc<futures::task::SpawnError>),
296}
297
298impl HasKind for RequestFailed {
299    fn kind(&self) -> ErrorKind {
300        match self {
301            RequestFailed::CantDestroy(e) => e.kind(),
302            RequestFailed::CantReject(e) => e.kind(),
303            RequestFailed::AcceptRemote(e) => e.kind(),
304            RequestFailed::Spawn(e) => e.kind(),
305        }
306    }
307}
308
309/// Try to open a connection to an appropriate local target using
310/// `target_stream_future`.  If successful, try to report success on `request`
311/// and transmit data between the two stream indefinitely.  On failure, close
312/// `request`.
313///
314/// Only return an error if we were unable to behave as intended due to a
315/// problem we did not already report.
316async fn forward_connection<R, FUT, TS>(
317    runtime: R,
318    request: StreamRequest,
319    target_stream_future: FUT,
320    nickname: &HsNickname,
321    addr: &TargetAddr,
322) -> Result<(), RequestFailed>
323where
324    R: Runtime,
325    FUT: Future<Output = Result<TS, IoError>>,
326    TS: AsyncRead + AsyncWrite + Send + 'static,
327{
328    let local_stream = target_stream_future.await.map_err(Arc::new);
329
330    // TODO: change this to "log_ratelim!(nickname=%nickname, ..." when log_ratelim can do that
331    // (we should search for HSS log messages and make them all be in the same form)
332    log_ratelim!(
333        "Connecting to {} for onion service {}", sv(addr), nickname;
334        local_stream
335    );
336
337    let local_stream = match local_stream {
338        Ok(s) => s,
339        Err(_) => {
340            let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE);
341            if let Err(e_rejecting) = request.reject(end).await {
342                debug_report!(
343                    &e_rejecting,
344                    "Unable to reject onion service request from client"
345                );
346                return Err(RequestFailed::CantReject(e_rejecting));
347            }
348            // We reported the (rate-limited) error from local_stream in
349            // DEBUG_REPORT above.
350            return Ok(());
351        }
352    };
353
354    let onion_service_stream: DataStream = {
355        let connected = relaymsg::Connected::new_empty();
356        request
357            .accept(connected)
358            .await
359            .map_err(RequestFailed::AcceptRemote)?
360    };
361
362    let (svc_r, svc_w) = onion_service_stream.split();
363    let (local_r, local_w) = local_stream.split();
364
365    runtime
366        .spawn(copy_interactive(local_r, svc_w).map(|_| ()))
367        .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
368    runtime
369        .spawn(copy_interactive(svc_r, local_w).map(|_| ()))
370        .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?;
371
372    Ok(())
373}
374
375/// Copy all the data from `reader` into `writer` until we encounter an EOF or
376/// an error.
377///
378/// Unlike as futures::io::copy(), this function is meant for use with
379/// interactive readers and writers, where the reader might pause for
380/// a while, but where we want to send data on the writer as soon as
381/// it is available.
382///
383/// This function assumes that the writer might need to be flushed for
384/// any buffered data to be sent.  It tries to minimize the number of
385/// flushes, however, by only flushing the writer when the reader has no data.
386///
387/// NOTE: This is duplicate code from `arti::socks`.  But instead of
388/// deduplicating it, we should change the behavior in `DataStream` that makes
389/// it necessary. See arti#786 for a fuller discussion.
390async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
391where
392    R: AsyncRead + Unpin,
393    W: AsyncWrite + Unpin,
394{
395    use futures::{poll, task::Poll};
396
397    let mut buf = [0_u8; 1024];
398
399    // At this point we could just loop, calling read().await,
400    // write_all().await, and flush().await.  But we want to be more
401    // clever than that: we only want to flush when the reader is
402    // stalled.  That way we can pack our data into as few cells as
403    // possible, but flush it immediately whenever there's no more
404    // data coming.
405    let loop_result: IoResult<()> = loop {
406        let mut read_future = reader.read(&mut buf[..]);
407        match poll!(&mut read_future) {
408            Poll::Ready(Err(e)) => break Err(e),
409            Poll::Ready(Ok(0)) => break Ok(()), // EOF
410            Poll::Ready(Ok(n)) => {
411                writer.write_all(&buf[..n]).await?;
412                continue;
413            }
414            Poll::Pending => writer.flush().await?,
415        }
416
417        // The read future is pending, so we should wait on it.
418        match read_future.await {
419            Err(e) => break Err(e),
420            Ok(0) => break Ok(()),
421            Ok(n) => writer.write_all(&buf[..n]).await?,
422        }
423    };
424
425    // Make sure that we flush any lingering data if we can.
426    //
427    // If there is a difference between closing and dropping, then we
428    // only want to do a "proper" close if the reader closed cleanly.
429    let flush_result = if loop_result.is_ok() {
430        writer.close().await
431    } else {
432        writer.flush().await
433    };
434
435    loop_result.or(flush_result)
436}