tor_ptmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46
47pub mod config;
48pub mod err;
49
50#[cfg(feature = "managed-pts")]
51pub mod ipc;
52
53#[cfg(feature = "managed-pts")]
54mod managed;
55
56use crate::config::{TransportConfig, TransportOptions};
57use crate::err::PtError;
58use oneshot_fused_workaround as oneshot;
59use std::collections::HashMap;
60use std::net::SocketAddr;
61use std::path::PathBuf;
62use std::sync::{Arc, RwLock};
63use tor_config_path::CfgPathResolver;
64use tor_linkspec::PtTransportName;
65use tor_rtcompat::Runtime;
66use tor_socksproto::SocksVersion;
67#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
68use tracing::info;
69use tracing::warn;
70#[cfg(feature = "managed-pts")]
71use {
72    crate::managed::{PtReactor, PtReactorMessage},
73    futures::channel::mpsc::{self, UnboundedSender},
74    futures::task::SpawnExt,
75    tor_error::error_report,
76};
77#[cfg(feature = "tor-channel-factory")]
78use {
79    async_trait::async_trait,
80    tor_chanmgr::{
81        builder::ChanBuilder,
82        factory::{AbstractPtError, ChannelFactory},
83        transport::ExternalProxyPlugin,
84    },
85    tracing::trace,
86};
87
88/// Shared mutable state between the `PtReactor` and `PtMgr`.
89#[derive(Default, Debug)]
90struct PtSharedState {
91    /// Connection information for pluggable transports from currently running binaries.
92    ///
93    /// Unmanaged pluggable transports are not included in this map.
94    #[allow(dead_code)]
95    managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
96    /// Current configured set of pluggable transports.
97    configured: HashMap<PtTransportName, TransportOptions>,
98}
99
100/// A pluggable transport manager knows how to make different
101/// kinds of connections to the Tor network, for censorship avoidance.
102pub struct PtMgr<R> {
103    /// An underlying `Runtime`, used to spawn background tasks.
104    #[allow(dead_code)]
105    runtime: R,
106    /// State for this `PtMgr` that's shared with the `PtReactor`.
107    state: Arc<RwLock<PtSharedState>>,
108    /// PtReactor channel when the `managed-pts` feature is enabled.
109    #[cfg(feature = "managed-pts")]
110    tx: UnboundedSender<PtReactorMessage>,
111}
112
113impl<R: Runtime> PtMgr<R> {
114    /// Transform the config into a more useful representation indexed by transport name.
115    fn transform_config(
116        binaries: Vec<TransportConfig>,
117    ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
118        let mut ret = HashMap::new();
119        // FIXME(eta): You can currently specify overlapping protocols, and it'll
120        //             just use the last transport specified.
121        //             I attempted to fix this, but decided I didn't want to stare into the list
122        //             builder macro void after trying it for 15 minutes.
123        for thing in binaries {
124            for tn in thing.protocols.iter() {
125                ret.insert(tn.clone(), thing.clone().try_into()?);
126            }
127        }
128        for opt in ret.values() {
129            if let TransportOptions::Unmanaged(u) = opt {
130                if !u.is_localhost() {
131                    warn!(
132                        "Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only."
133                    );
134                }
135            }
136        }
137        Ok(ret)
138    }
139
140    /// Create a new PtMgr.
141    // TODO: maybe don't have the Vec directly exposed?
142    pub fn new(
143        transports: Vec<TransportConfig>,
144        #[allow(unused)] state_dir: PathBuf,
145        path_resolver: Arc<CfgPathResolver>,
146        rt: R,
147    ) -> Result<Self, PtError> {
148        let state = PtSharedState {
149            managed_cmethods: Default::default(),
150            configured: Self::transform_config(transports)?,
151        };
152        let state = Arc::new(RwLock::new(state));
153
154        // reactor is only needed if we support managed pts
155        #[cfg(feature = "managed-pts")]
156        let tx = {
157            let (tx, rx) = mpsc::unbounded();
158
159            let mut reactor =
160                PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
161            rt.spawn(async move {
162                loop {
163                    match reactor.run_one_step().await {
164                        Ok(true) => return,
165                        Ok(false) => {}
166                        Err(e) => {
167                            error_report!(e, "PtReactor failed");
168                            return;
169                        }
170                    }
171                }
172            })
173            .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
174
175            tx
176        };
177
178        Ok(Self {
179            runtime: rt,
180            state,
181            #[cfg(feature = "managed-pts")]
182            tx,
183        })
184    }
185
186    /// Reload the configuration
187    pub fn reconfigure(
188        &self,
189        how: tor_config::Reconfigure,
190        transports: Vec<TransportConfig>,
191    ) -> Result<(), tor_config::ReconfigureError> {
192        let configured = Self::transform_config(transports)?;
193        if how == tor_config::Reconfigure::CheckAllOrNothing {
194            return Ok(());
195        }
196        {
197            let mut inner = self.state.write().expect("ptmgr poisoned");
198            inner.configured = configured;
199        }
200        // We don't have any way of propagating this sanely; the caller will find out the reactor
201        // has died later on anyway.
202        #[cfg(feature = "managed-pts")]
203        let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
204        Ok(())
205    }
206
207    /// Given a transport name, return a method that we can use to contact that transport.
208    ///
209    /// May have to launch a managed transport as needed.
210    ///
211    /// Returns Ok(None) if no such transport exists.
212    #[cfg(feature = "tor-channel-factory")]
213    async fn get_cmethod_for_transport(
214        &self,
215        transport: &PtTransportName,
216    ) -> Result<Option<PtClientMethod>, PtError> {
217        #[allow(unused)]
218        let (cfg, managed_cmethod) = {
219            // NOTE(eta): This is using a RwLock inside async code (but not across an await point).
220            //            Arguably this is fine since it's just a small read, and nothing should ever
221            //            hold this lock for very long.
222            let inner = self.state.read().expect("ptmgr poisoned");
223            let cfg = inner.configured.get(transport);
224            let managed_cmethod = inner.managed_cmethods.get(transport);
225            (cfg.cloned(), managed_cmethod.cloned())
226        };
227
228        match cfg {
229            Some(TransportOptions::Unmanaged(cfg)) => {
230                let cmethod = cfg.cmethod();
231                trace!(
232                    "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
233                );
234                Ok(Some(cmethod))
235            }
236            #[cfg(feature = "managed-pts")]
237            Some(TransportOptions::Managed(_cfg)) => {
238                match managed_cmethod {
239                    // A configured-and-running cmethod.
240                    Some(cmethod) => {
241                        trace!(
242                            "Found configured managed transport {transport} accessible via {cmethod:?}"
243                        );
244                        Ok(Some(cmethod))
245                    }
246                    // A configured-but-not-running cmethod.
247                    None => {
248                        // There is going to be a lot happening "under the hood" here.
249                        //
250                        // When we are asked to get a ChannelFactory for a given
251                        // connection, we will need to:
252                        //    - launch the binary for that transport if it is not already running*.
253                        //    - If we launched the binary, talk to it and see which ports it
254                        //      is listening on.
255                        //    - Return a ChannelFactory that connects via one of those ports,
256                        //      using the appropriate version of SOCKS, passing K=V parameters
257                        //      encoded properly.
258                        //
259                        // * As in other managers, we'll need to avoid trying to launch the same
260                        //   transport twice if we get two concurrent requests.
261                        //
262                        // Later if the binary crashes, we should detect that.  We should relaunch
263                        // it on demand.
264                        //
265                        // On reconfigure, we should shut down any no-longer-used transports.
266                        //
267                        // Maybe, we should shut down transports that haven't been used
268                        // for a long time.
269                        Ok(Some(self.spawn_transport(transport).await?))
270                    }
271                }
272            }
273            // No configuration for this transport.
274            None => {
275                trace!("Got a request for transport {transport}, which is not configured.");
276                Ok(None)
277            }
278        }
279    }
280
281    /// Communicate with the PT reactor to launch a managed transport.
282    #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
283    async fn spawn_transport(
284        &self,
285        transport: &PtTransportName,
286    ) -> Result<PtClientMethod, PtError> {
287        // Tell the reactor to spawn the PT, and wait for it.
288        // (The reactor will handle coalescing multiple requests.)
289        info!(
290            "Got a request for transport {transport}, which is not currently running. Launching it."
291        );
292
293        let (tx, rx) = oneshot::channel();
294        self.tx
295            .unbounded_send(PtReactorMessage::Spawn {
296                pt: transport.clone(),
297                result: tx,
298            })
299            .map_err(|_| {
300                PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
301            })?;
302
303        let method = match rx.await {
304            Err(_) => {
305                return Err(PtError::Internal(tor_error::internal!(
306                    "PT reactor closed unexpectedly"
307                )));
308            }
309            Ok(Err(e)) => {
310                warn!("PT for {transport} failed to launch: {e}");
311                return Err(e);
312            }
313            Ok(Ok(method)) => method,
314        };
315
316        info!("Successfully launched PT for {transport} at {method:?}.");
317        Ok(method)
318    }
319}
320
321/// A SOCKS endpoint to connect through a pluggable transport.
322#[derive(Debug, Clone, PartialEq, Eq)]
323pub struct PtClientMethod {
324    /// The SOCKS protocol version to use.
325    pub(crate) kind: SocksVersion,
326    /// The socket address to connect to.
327    pub(crate) endpoint: SocketAddr,
328}
329
330impl PtClientMethod {
331    /// Get the SOCKS protocol version to use.
332    pub fn kind(&self) -> SocksVersion {
333        self.kind
334    }
335
336    /// Get the socket address to connect to.
337    pub fn endpoint(&self) -> SocketAddr {
338        self.endpoint
339    }
340}
341
342#[cfg(feature = "tor-channel-factory")]
343#[async_trait]
344impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
345    async fn factory_for_transport(
346        &self,
347        transport: &PtTransportName,
348    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
349        let cmethod = match self.get_cmethod_for_transport(transport).await {
350            Err(e) => return Err(Arc::new(e)),
351            Ok(None) => return Ok(None),
352            Ok(Some(m)) => m,
353        };
354
355        let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
356        let factory = ChanBuilder::new(self.runtime.clone(), proxy, None);
357        // FIXME(eta): Should we cache constructed factories? If no: should this still be an Arc?
358        // FIXME(eta): Should we track what transports are live somehow, so we can shut them down?
359        Ok(Some(Arc::new(factory)))
360    }
361}