1
//! Infrastructure required to support managed PTs.
2

            
3
use crate::config::{ManagedTransportOptions, TransportOptions};
4
use crate::err;
5
use crate::err::PtError;
6
use crate::ipc::{
7
    sealed::PluggableTransportPrivate, PluggableClientTransport, PluggableTransport,
8
    PtClientParameters, PtCommonParameters,
9
};
10
use crate::{PtClientMethod, PtSharedState};
11
use futures::channel::mpsc::UnboundedReceiver;
12
use futures::stream::FuturesUnordered;
13
use futures::{select, FutureExt, StreamExt};
14
use oneshot_fused_workaround as oneshot;
15
use std::collections::{HashMap, HashSet};
16
use std::future::Future;
17
use std::path::{Path, PathBuf};
18
use std::pin::Pin;
19
use std::sync::{Arc, RwLock};
20
use tor_config_path::CfgPathResolver;
21
use tor_error::internal;
22
use tor_linkspec::PtTransportName;
23
use tor_rtcompat::Runtime;
24
use tracing::{debug, warn};
25

            
26
/// A message to the `PtReactor`.
27
pub(crate) enum PtReactorMessage {
28
    /// Notify the reactor that the currently configured set of PTs has changed.
29
    Reconfigured,
30
    /// Ask the reactor to spawn a pluggable transport binary.
31
    Spawn {
32
        /// Spawn a binary to provide this PT.
33
        pt: PtTransportName,
34
        /// Notify the result via this channel.
35
        result: oneshot::Sender<err::Result<PtClientMethod>>,
36
    },
37
}
38

            
39
/// The result of a spawn attempt: the list of transports the spawned binary covers, and the result.
40
type SpawnResult = (Vec<PtTransportName>, err::Result<PluggableClientTransport>);
41

            
42
/// Background reactor to handle managing pluggable transport binaries.
43
pub(crate) struct PtReactor<R> {
44
    /// Runtime.
45
    rt: R,
46
    /// Currently running pluggable transport binaries.
47
    running: Vec<PluggableClientTransport>,
48
    /// A map of asked-for transports.
49
    ///
50
    /// If a transport name has an entry, we will append any additional requests for that entry.
51
    /// If no entry is present, we will start a request.
52
    requests: HashMap<PtTransportName, Vec<oneshot::Sender<err::Result<PtClientMethod>>>>,
53
    /// FuturesUnordered that spawned tasks get pushed on to.
54
    ///
55
    /// WARNING: This MUST always contain one "will never resolve" future!
56
    spawning: FuturesUnordered<Pin<Box<dyn Future<Output = SpawnResult> + Send>>>,
57
    /// State for the corresponding PtMgr.
58
    state: Arc<RwLock<PtSharedState>>,
59
    /// PtMgr channel.
60
    /// (Unbounded so that we can reconfigure without blocking: we're unlikely to have the reactor
61
    /// get behind.)
62
    rx: UnboundedReceiver<PtReactorMessage>,
63
    /// State directory.
64
    state_dir: PathBuf,
65
    /// Path resolver for configuration files.
66
    path_resolver: Arc<CfgPathResolver>,
67
}
68

            
69
impl<R: Runtime> PtReactor<R> {
70
    /// Make a new reactor.
71
8
    pub(crate) fn new(
72
8
        rt: R,
73
8
        state: Arc<RwLock<PtSharedState>>,
74
8
        rx: UnboundedReceiver<PtReactorMessage>,
75
8
        state_dir: PathBuf,
76
8
        path_resolver: Arc<CfgPathResolver>,
77
8
    ) -> Self {
78
8
        let spawning = FuturesUnordered::new();
79
8
        spawning.push(Box::pin(futures::future::pending::<SpawnResult>())
80
8
            as Pin<Box<dyn Future<Output = _> + Send>>);
81
8
        Self {
82
8
            rt,
83
8
            running: vec![],
84
8
            requests: Default::default(),
85
8
            spawning,
86
8
            state,
87
8
            rx,
88
8
            state_dir,
89
8
            path_resolver,
90
8
        }
91
8
    }
92

            
93
    /// Called when a spawn request completes.
94
    #[allow(clippy::needless_pass_by_value)]
95
    fn handle_spawned(
96
        &mut self,
97
        covers: Vec<PtTransportName>,
98
        result: err::Result<PluggableClientTransport>,
99
    ) {
100
        match result {
101
            Err(e) => {
102
                warn!("Spawning PT for {:?} failed: {}", covers, e);
103
                // Go and tell all the transports about the bad news.
104
                let senders = covers
105
                    .iter()
106
                    .flat_map(|x| self.requests.remove(x))
107
                    .flatten();
108
                for sender in senders {
109
                    // We don't really care if the sender went away.
110
                    let _ = sender.send(Err(e.clone()));
111
                }
112
            }
113
            Ok(pt) => {
114
                let mut state = self.state.write().expect("ptmgr state poisoned");
115
                for (transport, method) in pt.transport_methods() {
116
                    state
117
                        .managed_cmethods
118
                        .insert(transport.clone(), method.clone());
119
                    for sender in self.requests.remove(transport).into_iter().flatten() {
120
                        let _ = sender.send(Ok(method.clone()));
121
                    }
122
                }
123

            
124
                let requested: HashSet<_> = covers.iter().collect();
125
                let found: HashSet<_> = pt.transport_methods().keys().collect();
126
                if requested != found {
127
                    warn!("Bug: PT {} succeeded, but did not give the same transports we asked for. ({:?} vs {:?})",
128
                          pt.identifier(), found, requested);
129
                }
130
                self.running.push(pt);
131
            }
132
        }
133
    }
134

            
135
    /// Called to remove a pluggable transport from the shared state.
136
    fn remove_pt(&self, pt: PluggableClientTransport) {
137
        let mut state = self.state.write().expect("ptmgr state poisoned");
138
        for transport in pt.transport_methods().keys() {
139
            state.managed_cmethods.remove(transport);
140
        }
141
        // to satisfy clippy, and make it clear that this is a desired side-effect: doing this
142
        // shuts down the PT (asynchronously).
143
        drop(pt);
144
    }
145

            
146
    /// Run one step of the reactor. Returns true if the reactor should terminate.
147
10
    pub(crate) async fn run_one_step(&mut self) -> err::Result<bool> {
148
        use futures::future::Either;
149

            
150
        // FIXME(eta): This allocates a lot, which is technically unnecessary but requires careful
151
        //             engineering to get right. It's not really in the hot path, at least.
152
10
        let mut all_next_messages = self
153
10
            .running
154
10
            .iter_mut()
155
10
            // We could avoid the Box, but that'd require using unsafe to replicate what tokio::pin!
156
10
            // does under the hood.
157
10
            .map(|pt| Box::pin(pt.next_message()))
158
10
            .collect::<Vec<_>>();
159

            
160
        // We can't construct a select_all if all_next_messages is empty.
161
10
        let mut next_message = if all_next_messages.is_empty() {
162
10
            Either::Left(futures::future::pending())
163
        } else {
164
            Either::Right(futures::future::select_all(all_next_messages.iter_mut()).fuse())
165
        };
166

            
167
10
        select! {
168
            (result, idx, _) = next_message => {
169
                drop(all_next_messages); // no idea why NLL doesn't just infer this but sure
170

            
171
                match result {
172
                    Ok(m) => {
173
                        // FIXME(eta): We should forward the Status messages onto API consumers.
174
                        debug!("PT {} message: {:?}", self.running[idx].identifier(), m);
175
                    },
176
                    Err(e) => {
177
                        warn!("PT {} quit: {:?}", self.running[idx].identifier(), e);
178
                        let pt = self.running.remove(idx);
179
                        self.remove_pt(pt);
180
                    }
181
                }
182
            },
183
10
            spawn_result = self.spawning.next() => {
184
                drop(all_next_messages);
185
                // See the Warning in this field's documentation.
186
                let (covers, result) = spawn_result.expect("self.spawning should never dry up");
187
                self.handle_spawned(covers, result);
188
            }
189
10
            internal = self.rx.next() => {
190
2
                drop(all_next_messages);
191

            
192
2
                match internal {
193
2
                    Some(PtReactorMessage::Reconfigured) => {},
194
                    Some(PtReactorMessage::Spawn { pt, result }) => {
195
                        // Make sure we don't already have a running request.
196
                        if let Some(requests) = self.requests.get_mut(&pt) {
197
                            requests.push(result);
198
                            return Ok(false);
199
                        }
200
                        // Make sure we don't already have a binary for this PT.
201
                        for rpt in self.running.iter() {
202
                            if let Some(cmethod) = rpt.transport_methods().get(&pt) {
203
                                let _ = result.send(Ok(cmethod.clone()));
204
                                return Ok(false);
205
                            }
206
                        }
207
                        // We don't, so time to spawn one.
208
                        let config = {
209
                            let state = self.state.read().expect("ptmgr state poisoned");
210
                            state.configured.get(&pt).cloned()
211
                        };
212

            
213
                        let Some(config) = config else {
214
                            let _ = result.send(Err(PtError::UnconfiguredTransportDueToConcurrentReconfiguration));
215
                            return Ok(false);
216
                        };
217

            
218
                        let TransportOptions::Managed(config) = config else {
219
                            let _ = result.send(Err(internal!("Tried to spawn an unmanaged transport").into()));
220
                            return Ok(false);
221
                        };
222

            
223
                        // Keep track of the request, and also fill holes in other protocols so
224
                        // we don't try and run another spawn request for those.
225
                        self.requests.entry(pt).or_default().push(result);
226
                        for proto in config.protocols.iter() {
227
                            self.requests.entry(proto.clone()).or_default();
228
                        }
229

            
230
                        // Add the spawn future to our pile of them.
231
                        let spawn_fut = Box::pin(
232
                            spawn_from_config(
233
                                self.rt.clone(),
234
                                self.state_dir.clone(),
235
                                config.clone(),
236
                                Arc::clone(&self.path_resolver)
237
                            )
238
                            .map(|result| (config.protocols, result))
239
                        );
240
                        self.spawning.push(spawn_fut);
241
                    },
242
                    None => return Ok(true)
243
                }
244
            }
245
        }
246
2
        Ok(false)
247
2
    }
248
}
249

            
250
/// Spawn a managed `PluggableTransport` using a `ManagedTransportOptions`.
251
async fn spawn_from_config<R: Runtime>(
252
    rt: R,
253
    state_dir: PathBuf,
254
    cfg: ManagedTransportOptions,
255
    path_resolver: Arc<CfgPathResolver>,
256
) -> Result<PluggableClientTransport, PtError> {
257
    // FIXME(eta): I really think this expansion should happen at builder validation time...
258

            
259
    let cfg_path = cfg.path;
260

            
261
    let binary_path = cfg_path
262
        .path(&path_resolver)
263
        .map_err(|e| PtError::PathExpansionFailed {
264
            path: cfg_path.clone(),
265
            error: e,
266
        })?;
267

            
268
    let filename = pt_identifier_as_path(&binary_path)?;
269

            
270
    // HACK(eta): Currently the state directory is named after the PT binary name. Maybe we should
271
    //            invent a better way of doing this?
272
    let new_state_dir = state_dir.join(filename);
273
    std::fs::create_dir_all(&new_state_dir).map_err(|e| PtError::StatedirCreateFailed {
274
        path: new_state_dir.clone(),
275
        error: Arc::new(e),
276
    })?;
277

            
278
    // FIXME(eta): make the rest of these parameters configurable
279
    let pt_common_params = PtCommonParameters::builder()
280
        .state_location(new_state_dir)
281
        .build()
282
        .expect("PtCommonParameters constructed incorrectly");
283

            
284
    let pt_client_params = PtClientParameters::builder()
285
        .transports(cfg.protocols)
286
        .build()
287
        .expect("PtClientParameters constructed incorrectly");
288

            
289
    let mut pt = PluggableClientTransport::new(
290
        binary_path,
291
        cfg.arguments,
292
        pt_common_params,
293
        pt_client_params,
294
    );
295
    pt.launch(rt).await?;
296
    Ok(pt)
297
}
298

            
299
/// Given a path to a binary for a pluggable transport, return an identifier for
300
/// that binary in a format that can be used as a path component.
301
fn pt_identifier_as_path(binary_path: impl AsRef<Path>) -> Result<PathBuf, PtError> {
302
    // Extract the final component.
303
    let mut filename =
304
        PathBuf::from(
305
            binary_path
306
                .as_ref()
307
                .file_name()
308
                .ok_or_else(|| PtError::NotAFile {
309
                    path: binary_path.as_ref().to_path_buf(),
310
                })?,
311
        );
312

            
313
    // Strip an "exe" off the end, if appropriate.
314
    if let Some(ext) = filename.extension() {
315
        if ext.eq_ignore_ascii_case(std::env::consts::EXE_EXTENSION) {
316
            filename.set_extension("");
317
        }
318
    }
319

            
320
    Ok(filename)
321
}
322

            
323
/// Given a path to a binary for a pluggable transport, return an identifier for
324
/// that binary in human-readable form.
325
pub(crate) fn pt_identifier(binary_path: impl AsRef<Path>) -> Result<String, PtError> {
326
    Ok(pt_identifier_as_path(binary_path)?
327
        .to_string_lossy()
328
        .to_string())
329
}