1use crate::config::{ManagedTransportOptions, TransportOptions};
4use crate::err;
5use crate::err::PtError;
6use crate::ipc::{
7 sealed::PluggableTransportPrivate, PluggableClientTransport, PluggableTransport,
8 PtClientParameters, PtCommonParameters,
9};
10use crate::{PtClientMethod, PtSharedState};
11use futures::channel::mpsc::UnboundedReceiver;
12use futures::stream::FuturesUnordered;
13use futures::{select, FutureExt, StreamExt};
14use oneshot_fused_workaround as oneshot;
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::path::{Path, PathBuf};
18use std::pin::Pin;
19use std::sync::{Arc, RwLock};
20use tor_config_path::CfgPathResolver;
21use tor_error::internal;
22use tor_linkspec::PtTransportName;
23use tor_rtcompat::Runtime;
24use tracing::{debug, warn};
25
26pub(crate) enum PtReactorMessage {
28 Reconfigured,
30 Spawn {
32 pt: PtTransportName,
34 result: oneshot::Sender<err::Result<PtClientMethod>>,
36 },
37}
38
39type SpawnResult = (Vec<PtTransportName>, err::Result<PluggableClientTransport>);
41
42pub(crate) struct PtReactor<R> {
44 rt: R,
46 running: Vec<PluggableClientTransport>,
48 requests: HashMap<PtTransportName, Vec<oneshot::Sender<err::Result<PtClientMethod>>>>,
53 spawning: FuturesUnordered<Pin<Box<dyn Future<Output = SpawnResult> + Send>>>,
57 state: Arc<RwLock<PtSharedState>>,
59 rx: UnboundedReceiver<PtReactorMessage>,
63 state_dir: PathBuf,
65 path_resolver: Arc<CfgPathResolver>,
67}
68
69impl<R: Runtime> PtReactor<R> {
70 pub(crate) fn new(
72 rt: R,
73 state: Arc<RwLock<PtSharedState>>,
74 rx: UnboundedReceiver<PtReactorMessage>,
75 state_dir: PathBuf,
76 path_resolver: Arc<CfgPathResolver>,
77 ) -> Self {
78 let spawning = FuturesUnordered::new();
79 spawning.push(Box::pin(futures::future::pending::<SpawnResult>())
80 as Pin<Box<dyn Future<Output = _> + Send>>);
81 Self {
82 rt,
83 running: vec![],
84 requests: Default::default(),
85 spawning,
86 state,
87 rx,
88 state_dir,
89 path_resolver,
90 }
91 }
92
93 #[allow(clippy::needless_pass_by_value)]
95 fn handle_spawned(
96 &mut self,
97 covers: Vec<PtTransportName>,
98 result: err::Result<PluggableClientTransport>,
99 ) {
100 match result {
101 Err(e) => {
102 warn!("Spawning PT for {:?} failed: {}", covers, e);
103 let senders = covers
105 .iter()
106 .flat_map(|x| self.requests.remove(x))
107 .flatten();
108 for sender in senders {
109 let _ = sender.send(Err(e.clone()));
111 }
112 }
113 Ok(pt) => {
114 let mut state = self.state.write().expect("ptmgr state poisoned");
115 for (transport, method) in pt.transport_methods() {
116 state
117 .managed_cmethods
118 .insert(transport.clone(), method.clone());
119 for sender in self.requests.remove(transport).into_iter().flatten() {
120 let _ = sender.send(Ok(method.clone()));
121 }
122 }
123
124 let requested: HashSet<_> = covers.iter().collect();
125 let found: HashSet<_> = pt.transport_methods().iter().map(|(t, _)| t).collect();
126 if requested != found {
127 warn!("Bug: PT {} succeeded, but did not give the same transports we asked for. ({:?} vs {:?})",
128 pt.identifier(), found, requested);
129 }
130 self.running.push(pt);
131 }
132 }
133 }
134
135 fn remove_pt(&self, pt: PluggableClientTransport) {
137 let mut state = self.state.write().expect("ptmgr state poisoned");
138 for transport in pt.transport_methods().keys() {
139 state.managed_cmethods.remove(transport);
140 }
141 drop(pt);
144 }
145
146 pub(crate) async fn run_one_step(&mut self) -> err::Result<bool> {
148 use futures::future::Either;
149
150 let mut all_next_messages = self
153 .running
154 .iter_mut()
155 .map(|pt| Box::pin(pt.next_message()))
158 .collect::<Vec<_>>();
159
160 let mut next_message = if all_next_messages.is_empty() {
162 Either::Left(futures::future::pending())
163 } else {
164 Either::Right(futures::future::select_all(all_next_messages.iter_mut()).fuse())
165 };
166
167 select! {
168 (result, idx, _) = next_message => {
169 drop(all_next_messages); match result {
172 Ok(m) => {
173 debug!("PT {} message: {:?}", self.running[idx].identifier(), m);
175 },
176 Err(e) => {
177 warn!("PT {} quit: {:?}", self.running[idx].identifier(), e);
178 let pt = self.running.remove(idx);
179 self.remove_pt(pt);
180 }
181 }
182 },
183 spawn_result = self.spawning.next() => {
184 drop(all_next_messages);
185 let (covers, result) = spawn_result.expect("self.spawning should never dry up");
187 self.handle_spawned(covers, result);
188 }
189 internal = self.rx.next() => {
190 drop(all_next_messages);
191
192 match internal {
193 Some(PtReactorMessage::Reconfigured) => {},
194 Some(PtReactorMessage::Spawn { pt, result }) => {
195 if let Some(requests) = self.requests.get_mut(&pt) {
197 requests.push(result);
198 return Ok(false);
199 }
200 for rpt in self.running.iter() {
202 if let Some(cmethod) = rpt.transport_methods().get(&pt) {
203 let _ = result.send(Ok(cmethod.clone()));
204 return Ok(false);
205 }
206 }
207 let config = {
209 let state = self.state.read().expect("ptmgr state poisoned");
210 state.configured.get(&pt).cloned()
211 };
212
213 let Some(config) = config else {
214 let _ = result.send(Err(PtError::UnconfiguredTransportDueToConcurrentReconfiguration));
215 return Ok(false);
216 };
217
218 let TransportOptions::Managed(config) = config else {
219 let _ = result.send(Err(internal!("Tried to spawn an unmanaged transport").into()));
220 return Ok(false);
221 };
222
223 self.requests.entry(pt).or_default().push(result);
226 for proto in config.protocols.iter() {
227 self.requests.entry(proto.clone()).or_default();
228 }
229
230 let spawn_fut = Box::pin(
232 spawn_from_config(
233 self.rt.clone(),
234 self.state_dir.clone(),
235 config.clone(),
236 Arc::clone(&self.path_resolver)
237 )
238 .map(|result| (config.protocols, result))
239 );
240 self.spawning.push(spawn_fut);
241 },
242 None => return Ok(true)
243 }
244 }
245 }
246 Ok(false)
247 }
248}
249
250async fn spawn_from_config<R: Runtime>(
252 rt: R,
253 state_dir: PathBuf,
254 cfg: ManagedTransportOptions,
255 path_resolver: Arc<CfgPathResolver>,
256) -> Result<PluggableClientTransport, PtError> {
257 let cfg_path = cfg.path;
260
261 let binary_path = cfg_path
262 .path(&path_resolver)
263 .map_err(|e| PtError::PathExpansionFailed {
264 path: cfg_path.clone(),
265 error: e,
266 })?;
267
268 let filename = pt_identifier_as_path(&binary_path)?;
269
270 let new_state_dir = state_dir.join(filename);
273 std::fs::create_dir_all(&new_state_dir).map_err(|e| PtError::StatedirCreateFailed {
274 path: new_state_dir.clone(),
275 error: Arc::new(e),
276 })?;
277
278 let pt_common_params = PtCommonParameters::builder()
280 .state_location(new_state_dir)
281 .build()
282 .expect("PtCommonParameters constructed incorrectly");
283
284 let pt_client_params = PtClientParameters::builder()
285 .transports(cfg.protocols)
286 .build()
287 .expect("PtClientParameters constructed incorrectly");
288
289 let mut pt = PluggableClientTransport::new(
290 binary_path,
291 cfg.arguments,
292 pt_common_params,
293 pt_client_params,
294 );
295 pt.launch(rt).await?;
296 Ok(pt)
297}
298
299fn pt_identifier_as_path(binary_path: impl AsRef<Path>) -> Result<PathBuf, PtError> {
302 let mut filename =
304 PathBuf::from(
305 binary_path
306 .as_ref()
307 .file_name()
308 .ok_or_else(|| PtError::NotAFile {
309 path: binary_path.as_ref().to_path_buf(),
310 })?,
311 );
312
313 if let Some(ext) = filename.extension() {
315 if ext.eq_ignore_ascii_case(std::env::consts::EXE_EXTENSION) {
316 filename.set_extension("");
317 }
318 }
319
320 Ok(filename)
321}
322
323pub(crate) fn pt_identifier(binary_path: impl AsRef<Path>) -> Result<String, PtError> {
326 Ok(pt_identifier_as_path(binary_path)?
327 .to_string_lossy()
328 .to_string())
329}