1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] pub mod config;
47pub mod err;
48
49#[cfg(feature = "managed-pts")]
50pub mod ipc;
51
52#[cfg(feature = "managed-pts")]
53mod managed;
54
55use crate::config::{TransportConfig, TransportOptions};
56use crate::err::PtError;
57use oneshot_fused_workaround as oneshot;
58use std::collections::HashMap;
59use std::net::SocketAddr;
60use std::path::PathBuf;
61use std::sync::{Arc, RwLock};
62use tor_config_path::CfgPathResolver;
63use tor_linkspec::PtTransportName;
64use tor_rtcompat::Runtime;
65use tor_socksproto::SocksVersion;
66#[cfg(any(feature = "tor-channel-factory", feature = "managed-pts"))]
67use tracing::info;
68use tracing::warn;
69#[cfg(feature = "managed-pts")]
70use {
71 crate::managed::{PtReactor, PtReactorMessage},
72 futures::channel::mpsc::{self, UnboundedSender},
73 futures::task::SpawnExt,
74 tor_error::error_report,
75};
76#[cfg(feature = "tor-channel-factory")]
77use {
78 async_trait::async_trait,
79 tor_chanmgr::{
80 builder::ChanBuilder,
81 factory::{AbstractPtError, ChannelFactory},
82 transport::ExternalProxyPlugin,
83 },
84 tracing::trace,
85};
86
87#[derive(Default, Debug)]
89struct PtSharedState {
90 #[allow(dead_code)]
94 managed_cmethods: HashMap<PtTransportName, PtClientMethod>,
95 configured: HashMap<PtTransportName, TransportOptions>,
97}
98
99pub struct PtMgr<R> {
102 #[allow(dead_code)]
104 runtime: R,
105 state: Arc<RwLock<PtSharedState>>,
107 #[cfg(feature = "managed-pts")]
109 tx: UnboundedSender<PtReactorMessage>,
110}
111
112impl<R: Runtime> PtMgr<R> {
113 fn transform_config(
115 binaries: Vec<TransportConfig>,
116 ) -> Result<HashMap<PtTransportName, TransportOptions>, tor_error::Bug> {
117 let mut ret = HashMap::new();
118 for thing in binaries {
123 for tn in thing.protocols.iter() {
124 ret.insert(tn.clone(), thing.clone().try_into()?);
125 }
126 }
127 for opt in ret.values() {
128 if let TransportOptions::Unmanaged(u) = opt {
129 if !u.is_localhost() {
130 warn!("Configured to connect to a PT on a non-local addresses. This is usually insecure! We recommend running PTs on localhost only.");
131 }
132 }
133 }
134 Ok(ret)
135 }
136
137 pub fn new(
140 transports: Vec<TransportConfig>,
141 #[allow(unused)] state_dir: PathBuf,
142 path_resolver: Arc<CfgPathResolver>,
143 rt: R,
144 ) -> Result<Self, PtError> {
145 let state = PtSharedState {
146 managed_cmethods: Default::default(),
147 configured: Self::transform_config(transports)?,
148 };
149 let state = Arc::new(RwLock::new(state));
150
151 #[cfg(feature = "managed-pts")]
153 let tx = {
154 let (tx, rx) = mpsc::unbounded();
155
156 let mut reactor =
157 PtReactor::new(rt.clone(), state.clone(), rx, state_dir, path_resolver);
158 rt.spawn(async move {
159 loop {
160 match reactor.run_one_step().await {
161 Ok(true) => return,
162 Ok(false) => {}
163 Err(e) => {
164 error_report!(e, "PtReactor failed");
165 return;
166 }
167 }
168 }
169 })
170 .map_err(|e| PtError::Spawn { cause: Arc::new(e) })?;
171
172 tx
173 };
174
175 Ok(Self {
176 runtime: rt,
177 state,
178 #[cfg(feature = "managed-pts")]
179 tx,
180 })
181 }
182
183 pub fn reconfigure(
185 &self,
186 how: tor_config::Reconfigure,
187 transports: Vec<TransportConfig>,
188 ) -> Result<(), tor_config::ReconfigureError> {
189 let configured = Self::transform_config(transports)?;
190 if how == tor_config::Reconfigure::CheckAllOrNothing {
191 return Ok(());
192 }
193 {
194 let mut inner = self.state.write().expect("ptmgr poisoned");
195 inner.configured = configured;
196 }
197 #[cfg(feature = "managed-pts")]
200 let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
201 Ok(())
202 }
203
204 #[cfg(feature = "tor-channel-factory")]
210 async fn get_cmethod_for_transport(
211 &self,
212 transport: &PtTransportName,
213 ) -> Result<Option<PtClientMethod>, PtError> {
214 #[allow(unused)]
215 let (cfg, managed_cmethod) = {
216 let inner = self.state.read().expect("ptmgr poisoned");
220 let cfg = inner.configured.get(transport);
221 let managed_cmethod = inner.managed_cmethods.get(transport);
222 (cfg.cloned(), managed_cmethod.cloned())
223 };
224
225 match cfg {
226 Some(TransportOptions::Unmanaged(cfg)) => {
227 let cmethod = cfg.cmethod();
228 trace!(
229 "Found configured unmanaged transport {transport} accessible via {cmethod:?}"
230 );
231 Ok(Some(cmethod))
232 }
233 #[cfg(feature = "managed-pts")]
234 Some(TransportOptions::Managed(_cfg)) => {
235 match managed_cmethod {
236 Some(cmethod) => {
238 trace!("Found configured managed transport {transport} accessible via {cmethod:?}");
239 Ok(Some(cmethod))
240 }
241 None => {
243 Ok(Some(self.spawn_transport(transport).await?))
265 }
266 }
267 }
268 None => {
270 trace!("Got a request for transport {transport}, which is not configured.");
271 Ok(None)
272 }
273 }
274 }
275
276 #[cfg(all(feature = "tor-channel-factory", feature = "managed-pts"))]
278 async fn spawn_transport(
279 &self,
280 transport: &PtTransportName,
281 ) -> Result<PtClientMethod, PtError> {
282 info!("Got a request for transport {transport}, which is not currently running. Launching it.");
285
286 let (tx, rx) = oneshot::channel();
287 self.tx
288 .unbounded_send(PtReactorMessage::Spawn {
289 pt: transport.clone(),
290 result: tx,
291 })
292 .map_err(|_| {
293 PtError::Internal(tor_error::internal!("PT reactor closed unexpectedly"))
294 })?;
295
296 let method = match rx.await {
297 Err(_) => {
298 return Err(PtError::Internal(tor_error::internal!(
299 "PT reactor closed unexpectedly"
300 )));
301 }
302 Ok(Err(e)) => {
303 warn!("PT for {transport} failed to launch: {e}");
304 return Err(e);
305 }
306 Ok(Ok(method)) => method,
307 };
308
309 info!("Successfully launched PT for {transport} at {method:?}.");
310 Ok(method)
311 }
312}
313
314#[derive(Debug, Clone, PartialEq, Eq)]
316pub struct PtClientMethod {
317 pub(crate) kind: SocksVersion,
319 pub(crate) endpoint: SocketAddr,
321}
322
323impl PtClientMethod {
324 pub fn kind(&self) -> SocksVersion {
326 self.kind
327 }
328
329 pub fn endpoint(&self) -> SocketAddr {
331 self.endpoint
332 }
333}
334
335#[cfg(feature = "tor-channel-factory")]
336#[async_trait]
337impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
338 async fn factory_for_transport(
339 &self,
340 transport: &PtTransportName,
341 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
342 let cmethod = match self.get_cmethod_for_transport(transport).await {
343 Err(e) => return Err(Arc::new(e)),
344 Ok(None) => return Ok(None),
345 Ok(Some(m)) => m,
346 };
347
348 let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
349 let factory = ChanBuilder::new(self.runtime.clone(), proxy);
350 Ok(Some(Arc::new(factory)))
353 }
354}