1
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_duration_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46

            
47
pub mod config;
48
pub mod err;
49

            
50
#[cfg(feature = "managed-pts")]
51
pub mod ipc;
52

            
53
#[cfg(feature = "managed-pts")]
54
mod managed;
55

            
56
use crate::config::{TransportConfig, TransportOptions};
57
use crate::err::PtError;
58
use oneshot_fused_workaround as oneshot;
59
use std::collections::HashMap;
60
use std::net::SocketAddr;
61
use std::path::PathBuf;
62
use std::sync::{Arc, RwLock};
63
use tor_config_path::CfgPathResolver;
64
use tor_linkspec::PtTransportName;
65
use tor_rtcompat::Runtime;
66
use tor_socksproto::SocksVersion;
67
#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
68
use tracing::info;
69
use tracing::warn;
70
#[cfg(feature = "managed-pts")]
71
use {
72
    crate::managed::{PtReactor, PtReactorMessage},
73
    futures::channel::mpsc::{self, UnboundedSender},
74
    futures::task::SpawnExt,
75
    tor_error::error_report,
76
};
77
#[cfg(feature = "tor-channel-factory")]
78
use {
79
    async_trait::async_trait,
80
    tor_chanmgr::{
81
        builder::ChanBuilder,
82
        factory::{AbstractPtError, ChannelFactory},
83
        transport::ExternalProxyPlugin,
84
    },
85
    tracing::trace,
86
};
87

            
88
/// Shared mutable state between the `PtReactor` and `PtMgr`.
89
#[derive(Default, Debug)]
90
struct PtSharedState {
91
    /// Connection information for pluggable transports from currently running binaries.
92
    ///
93
    /// Unmanaged pluggable transports are not included in this map.
94
    #[allow(dead_code)]
95
    managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
96
    /// Current configured set of pluggable transports.
97
    configured: HashMap<PtTransportName, TransportOptions>,
98
}
99

            
100
/// A pluggable transport manager knows how to make different
101
/// kinds of connections to the Tor network, for censorship avoidance.
102
pub struct PtMgr<R> {
103
    /// An underlying `Runtime`, used to spawn background tasks.
104
    #[allow(dead_code)]
105
    runtime: R,
106
    /// State for this `PtMgr` that's shared with the `PtReactor`.
107
    state: Arc<RwLock<PtSharedState>>,
108
    /// PtReactor channel when the `managed-pts` feature is enabled.
109
    #[cfg(feature = "managed-pts")]
110
    tx: UnboundedSender<PtReactorMessage>,
111
}
112

            
113
impl<R: Runtime> PtMgr<R> {
114
    /// Transform the config into a more useful representation indexed by transport name.
115
12
    fn transform_config(
116
12
        binaries: Vec<TransportConfig>,
117
12
    ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
118
12
        let mut ret = HashMap::new();
119
        // FIXME(eta): You can currently specify overlapping protocols, and it'll
120
        //             just use the last transport specified.
121
        //             I attempted to fix this, but decided I didn't want to stare into the list
122
        //             builder macro void after trying it for 15 minutes.
123
12
        for thing in binaries {
124
            for tn in thing.protocols.iter() {
125
                ret.insert(tn.clone(), thing.clone().try_into()?);
126
            }
127
        }
128
12
        for opt in ret.values() {
129
            if let TransportOptions::Unmanaged(u) = opt {
130
                if !u.is_localhost() {
131
                    warn!("Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only.");
132
                }
133
            }
134
        }
135
12
        Ok(ret)
136
12
    }
137

            
138
    /// Create a new PtMgr.
139
    // TODO: maybe don't have the Vec directly exposed?
140
8
    pub fn new(
141
8
        transports: Vec<TransportConfig>,
142
8
        #[allow(unused)] state_dir: PathBuf,
143
8
        path_resolver: Arc<CfgPathResolver>,
144
8
        rt: R,
145
8
    ) -> Result<Self, PtError> {
146
8
        let state = PtSharedState {
147
8
            managed_cmethods: Default::default(),
148
8
            configured: Self::transform_config(transports)?,
149
        };
150
8
        let state = Arc::new(RwLock::new(state));
151

            
152
        // reactor is only needed if we support managed pts
153
        #[cfg(feature = "managed-pts")]
154
8
        let tx = {
155
8
            let (tx, rx) = mpsc::unbounded();
156
8

            
157
8
            let mut reactor =
158
8
                PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
159
8
            rt.spawn(async move {
160
                loop {
161
10
                    match reactor.run_one_step().await {
162
                        Ok(true) => return,
163
2
                        Ok(false) => {}
164
                        Err(e) => {
165
                            error_report!(e, "PtReactor failed");
166
                            return;
167
                        }
168
                    }
169
                }
170
8
            })
171
8
            .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
172

            
173
8
            tx
174
8
        };
175
8

            
176
8
        Ok(Self {
177
8
            runtime: rt,
178
8
            state,
179
8
            #[cfg(feature = "managed-pts")]
180
8
            tx,
181
8
        })
182
8
    }
183

            
184
    /// Reload the configuration
185
4
    pub fn reconfigure(
186
4
        &self,
187
4
        how: tor_config::Reconfigure,
188
4
        transports: Vec<TransportConfig>,
189
4
    ) -> Result<(), tor_config::ReconfigureError> {
190
4
        let configured = Self::transform_config(transports)?;
191
4
        if how == tor_config::Reconfigure::CheckAllOrNothing {
192
2
            return Ok(());
193
2
        }
194
2
        {
195
2
            let mut inner = self.state.write().expect("ptmgr poisoned");
196
2
            inner.configured = configured;
197
2
        }
198
2
        // We don't have any way of propagating this sanely; the caller will find out the reactor
199
2
        // has died later on anyway.
200
2
        #[cfg(feature = "managed-pts")]
201
2
        let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
202
2
        Ok(())
203
4
    }
204

            
205
    /// Given a transport name, return a method that we can use to contact that transport.
206
    ///
207
    /// May have to launch a managed transport as needed.
208
    ///
209
    /// Returns Ok(None) if no such transport exists.
210
    #[cfg(feature = "tor-channel-factory")]
211
    async fn get_cmethod_for_transport(
212
        &self,
213
        transport: &PtTransportName,
214
    ) -> Result<Option<PtClientMethod>, PtError> {
215
        #[allow(unused)]
216
        let (cfg, managed_cmethod) = {
217
            // NOTE(eta): This is using a RwLock inside async code (but not across an await point).
218
            //            Arguably this is fine since it's just a small read, and nothing should ever
219
            //            hold this lock for very long.
220
            let inner = self.state.read().expect("ptmgr poisoned");
221
            let cfg = inner.configured.get(transport);
222
            let managed_cmethod = inner.managed_cmethods.get(transport);
223
            (cfg.cloned(), managed_cmethod.cloned())
224
        };
225

            
226
        match cfg {
227
            Some(TransportOptions::Unmanaged(cfg)) => {
228
                let cmethod = cfg.cmethod();
229
                trace!(
230
                    "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
231
                );
232
                Ok(Some(cmethod))
233
            }
234
            #[cfg(feature = "managed-pts")]
235
            Some(TransportOptions::Managed(_cfg)) => {
236
                match managed_cmethod {
237
                    // A configured-and-running cmethod.
238
                    Some(cmethod) => {
239
                        trace!("Found configured managed transport {transport} accessible via {cmethod:?}");
240
                        Ok(Some(cmethod))
241
                    }
242
                    // A configured-but-not-running cmethod.
243
                    None => {
244
                        // There is going to be a lot happening "under the hood" here.
245
                        //
246
                        // When we are asked to get a ChannelFactory for a given
247
                        // connection, we will need to:
248
                        //    - launch the binary for that transport if it is not already running*.
249
                        //    - If we launched the binary, talk to it and see which ports it
250
                        //      is listening on.
251
                        //    - Return a ChannelFactory that connects via one of those ports,
252
                        //      using the appropriate version of SOCKS, passing K=V parameters
253
                        //      encoded properly.
254
                        //
255
                        // * As in other managers, we'll need to avoid trying to launch the same
256
                        //   transport twice if we get two concurrent requests.
257
                        //
258
                        // Later if the binary crashes, we should detect that.  We should relaunch
259
                        // it on demand.
260
                        //
261
                        // On reconfigure, we should shut down any no-longer-used transports.
262
                        //
263
                        // Maybe, we should shut down transports that haven't been used
264
                        // for a long time.
265
                        Ok(Some(self.spawn_transport(transport).await?))
266
                    }
267
                }
268
            }
269
            // No configuration for this transport.
270
            None => {
271
                trace!("Got a request for transport {transport}, which is not configured.");
272
                Ok(None)
273
            }
274
        }
275
    }
276

            
277
    /// Communicate with the PT reactor to launch a managed transport.
278
    #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
279
    async fn spawn_transport(
280
        &self,
281
        transport: &PtTransportName,
282
    ) -> Result<PtClientMethod, PtError> {
283
        // Tell the reactor to spawn the PT, and wait for it.
284
        // (The reactor will handle coalescing multiple requests.)
285
        info!("Got a request for transport {transport}, which is not currently running. Launching it.");
286

            
287
        let (tx, rx) = oneshot::channel();
288
        self.tx
289
            .unbounded_send(PtReactorMessage::Spawn {
290
                pt: transport.clone(),
291
                result: tx,
292
            })
293
            .map_err(|_| {
294
                PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
295
            })?;
296

            
297
        let method = match rx.await {
298
            Err(_) => {
299
                return Err(PtError::Internal(tor_error::internal!(
300
                    "PT reactor closed unexpectedly"
301
                )));
302
            }
303
            Ok(Err(e)) => {
304
                warn!("PT for {transport} failed to launch: {e}");
305
                return Err(e);
306
            }
307
            Ok(Ok(method)) => method,
308
        };
309

            
310
        info!("Successfully launched PT for {transport} at {method:?}.");
311
        Ok(method)
312
    }
313
}
314

            
315
/// A SOCKS endpoint to connect through a pluggable transport.
316
#[derive(Debug, Clone, PartialEq, Eq)]
317
pub struct PtClientMethod {
318
    /// The SOCKS protocol version to use.
319
    pub(crate) kind: SocksVersion,
320
    /// The socket address to connect to.
321
    pub(crate) endpoint: SocketAddr,
322
}
323

            
324
impl PtClientMethod {
325
    /// Get the SOCKS protocol version to use.
326
    pub fn kind(&self) -> SocksVersion {
327
        self.kind
328
    }
329

            
330
    /// Get the socket address to connect to.
331
    pub fn endpoint(&self) -> SocketAddr {
332
        self.endpoint
333
    }
334
}
335

            
336
#[cfg(feature = "tor-channel-factory")]
337
#[async_trait]
338
impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
339
    async fn factory_for_transport(
340
        &self,
341
        transport: &PtTransportName,
342
    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
343
        let cmethod = match self.get_cmethod_for_transport(transport).await {
344
            Err(e) => return Err(Arc::new(e)),
345
            Ok(None) => return Ok(None),
346
            Ok(Some(m)) => m,
347
        };
348

            
349
        let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
350
        let factory = ChanBuilder::new(self.runtime.clone(), proxy);
351
        // FIXME(eta): Should we cache constructed factories? If no: should this still be an Arc?
352
        // FIXME(eta): Should we track what transports are live somehow, so we can shut them down?
353
        Ok(Some(Arc::new(factory)))
354
    }
355
}