1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![allow(mismatched_lifetime_syntaxes)] pub mod config;
48pub mod err;
49
50#[cfg(feature = "managed-pts")]
51pub mod ipc;
52
53#[cfg(feature = "managed-pts")]
54mod managed;
55
56use crate::config::{TransportConfig, TransportOptions};
57use crate::err::PtError;
58use oneshot_fused_workaround as oneshot;
59use std::collections::HashMap;
60use std::net::SocketAddr;
61use std::path::PathBuf;
62use std::sync::{Arc, RwLock};
63use tor_config_path::CfgPathResolver;
64use tor_linkspec::PtTransportName;
65use tor_rtcompat::Runtime;
66use tor_socksproto::SocksVersion;
67#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
68use tracing::info;
69use tracing::warn;
70#[cfg(feature = "managed-pts")]
71use {
72 crate::managed::{PtReactor, PtReactorMessage},
73 futures::channel::mpsc::{self, UnboundedSender},
74 futures::task::SpawnExt,
75 tor_error::error_report,
76};
77#[cfg(feature = "tor-channel-factory")]
78use {
79 async_trait::async_trait,
80 tor_chanmgr::{
81 builder::ChanBuilder,
82 factory::{AbstractPtError, ChannelFactory},
83 transport::ExternalProxyPlugin,
84 },
85 tracing::trace,
86};
87
88#[derive(Default, Debug)]
90struct PtSharedState {
91 #[allow(dead_code)]
95 managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
96 configured: HashMap<PtTransportName, TransportOptions>,
98}
99
100pub struct PtMgr<R> {
103 #[allow(dead_code)]
105 runtime: R,
106 state: Arc<RwLock<PtSharedState>>,
108 #[cfg(feature = "managed-pts")]
110 tx: UnboundedSender<PtReactorMessage>,
111}
112
113impl<R: Runtime> PtMgr<R> {
114 fn transform_config(
116 binaries: Vec<TransportConfig>,
117 ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
118 let mut ret = HashMap::new();
119 for thing in binaries {
124 for tn in thing.protocols.iter() {
125 ret.insert(tn.clone(), thing.clone().try_into()?);
126 }
127 }
128 for opt in ret.values() {
129 if let TransportOptions::Unmanaged(u) = opt {
130 if !u.is_localhost() {
131 warn!(
132 "Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only."
133 );
134 }
135 }
136 }
137 Ok(ret)
138 }
139
140 pub fn new(
143 transports: Vec<TransportConfig>,
144 #[allow(unused)] state_dir: PathBuf,
145 path_resolver: Arc<CfgPathResolver>,
146 rt: R,
147 ) -> Result<Self, PtError> {
148 let state = PtSharedState {
149 managed_cmethods: Default::default(),
150 configured: Self::transform_config(transports)?,
151 };
152 let state = Arc::new(RwLock::new(state));
153
154 #[cfg(feature = "managed-pts")]
156 let tx = {
157 let (tx, rx) = mpsc::unbounded();
158
159 let mut reactor =
160 PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
161 rt.spawn(async move {
162 loop {
163 match reactor.run_one_step().await {
164 Ok(true) => return,
165 Ok(false) => {}
166 Err(e) => {
167 error_report!(e, "PtReactor failed");
168 return;
169 }
170 }
171 }
172 })
173 .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
174
175 tx
176 };
177
178 Ok(Self {
179 runtime: rt,
180 state,
181 #[cfg(feature = "managed-pts")]
182 tx,
183 })
184 }
185
186 pub fn reconfigure(
188 &self,
189 how: tor_config::Reconfigure,
190 transports: Vec<TransportConfig>,
191 ) -> Result<(), tor_config::ReconfigureError> {
192 let configured = Self::transform_config(transports)?;
193 if how == tor_config::Reconfigure::CheckAllOrNothing {
194 return Ok(());
195 }
196 {
197 let mut inner = self.state.write().expect("ptmgr poisoned");
198 inner.configured = configured;
199 }
200 #[cfg(feature = "managed-pts")]
203 let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
204 Ok(())
205 }
206
207 #[cfg(feature = "tor-channel-factory")]
213 async fn get_cmethod_for_transport(
214 &self,
215 transport: &PtTransportName,
216 ) -> Result<Option<PtClientMethod>, PtError> {
217 #[allow(unused)]
218 let (cfg, managed_cmethod) = {
219 let inner = self.state.read().expect("ptmgr poisoned");
223 let cfg = inner.configured.get(transport);
224 let managed_cmethod = inner.managed_cmethods.get(transport);
225 (cfg.cloned(), managed_cmethod.cloned())
226 };
227
228 match cfg {
229 Some(TransportOptions::Unmanaged(cfg)) => {
230 let cmethod = cfg.cmethod();
231 trace!(
232 "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
233 );
234 Ok(Some(cmethod))
235 }
236 #[cfg(feature = "managed-pts")]
237 Some(TransportOptions::Managed(_cfg)) => {
238 match managed_cmethod {
239 Some(cmethod) => {
241 trace!(
242 "Found configured managed transport {transport} accessible via {cmethod:?}"
243 );
244 Ok(Some(cmethod))
245 }
246 None => {
248 Ok(Some(self.spawn_transport(transport).await?))
270 }
271 }
272 }
273 None => {
275 trace!("Got a request for transport {transport}, which is not configured.");
276 Ok(None)
277 }
278 }
279 }
280
281 #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
283 async fn spawn_transport(
284 &self,
285 transport: &PtTransportName,
286 ) -> Result<PtClientMethod, PtError> {
287 info!(
290 "Got a request for transport {transport}, which is not currently running. Launching it."
291 );
292
293 let (tx, rx) = oneshot::channel();
294 self.tx
295 .unbounded_send(PtReactorMessage::Spawn {
296 pt: transport.clone(),
297 result: tx,
298 })
299 .map_err(|_| {
300 PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
301 })?;
302
303 let method = match rx.await {
304 Err(_) => {
305 return Err(PtError::Internal(tor_error::internal!(
306 "PT reactor closed unexpectedly"
307 )));
308 }
309 Ok(Err(e)) => {
310 warn!("PT for {transport} failed to launch: {e}");
311 return Err(e);
312 }
313 Ok(Ok(method)) => method,
314 };
315
316 info!("Successfully launched PT for {transport} at {method:?}.");
317 Ok(method)
318 }
319}
320
321#[derive(Debug, Clone, PartialEq, Eq)]
323pub struct PtClientMethod {
324 pub(crate) kind: SocksVersion,
326 pub(crate) endpoint: SocketAddr,
328}
329
330impl PtClientMethod {
331 pub fn kind(&self) -> SocksVersion {
333 self.kind
334 }
335
336 pub fn endpoint(&self) -> SocketAddr {
338 self.endpoint
339 }
340}
341
342#[cfg(feature = "tor-channel-factory")]
343#[async_trait]
344impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
345 async fn factory_for_transport(
346 &self,
347 transport: &PtTransportName,
348 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
349 let cmethod = match self.get_cmethod_for_transport(transport).await {
350 Err(e) => return Err(Arc::new(e)),
351 Ok(None) => return Ok(None),
352 Ok(Some(m)) => m,
353 };
354
355 let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
356 let factory = ChanBuilder::new(self.runtime.clone(), proxy, None);
357 Ok(Some(Arc::new(factory)))
360 }
361}