1
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_duration_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46

            
47
pub mod builder;
48
mod config;
49
mod err;
50
mod event;
51
pub mod factory;
52
mod mgr;
53
#[cfg(test)]
54
mod testing;
55
pub mod transport;
56
pub(crate) mod util;
57

            
58
use futures::select_biased;
59
use futures::task::SpawnExt;
60
use futures::StreamExt;
61
use std::result::Result as StdResult;
62
use std::sync::{Arc, Weak};
63
use std::time::Duration;
64
use tor_config::ReconfigureError;
65
use tor_error::error_report;
66
use tor_linkspec::{ChanTarget, OwnedChanTarget};
67
use tor_netdir::{params::NetParameters, NetDirProvider};
68
use tor_proto::channel::Channel;
69
#[cfg(feature = "experimental-api")]
70
use tor_proto::memquota::ChannelAccount;
71
use tor_proto::memquota::ToplevelAccount;
72
use tracing::debug;
73
use void::{ResultVoidErrExt, Void};
74

            
75
pub use err::Error;
76

            
77
pub use config::{ChannelConfig, ChannelConfigBuilder};
78

            
79
use tor_rtcompat::Runtime;
80

            
81
/// A Result as returned by this crate.
82
pub type Result<T> = std::result::Result<T, Error>;
83

            
84
use crate::factory::BootstrapReporter;
85
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
86
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
87

            
88
/// An object that remembers a set of live channels, and launches new ones on
89
/// request.
90
///
91
/// Use the [`ChanMgr::get_or_launch`] function to create a new [`Channel`], or
92
/// get one if it exists.  (For a slightly lower-level API that does no caching,
93
/// see [`ChannelFactory`](factory::ChannelFactory) and its implementors.  For a
94
/// much lower-level API, see [`tor_proto::channel::ChannelBuilder`].)
95
///
96
/// Each channel is kept open as long as there is a reference to it, or
97
/// something else (such as the relay or a network error) kills the channel.
98
///
99
/// After a `ChanMgr` launches a channel, it keeps a reference to it until that
100
/// channel has been unused (that is, had no circuits attached to it) for a
101
/// certain amount of time. (Currently this interval is chosen randomly from
102
/// between 180-270 seconds, but this is an implementation detail that may change
103
/// in the future.)
104
pub struct ChanMgr<R: Runtime> {
105
    /// Internal channel manager object that does the actual work.
106
    ///
107
    /// ## How this is built
108
    ///
109
    /// This internal manager is parameterized over an
110
    /// [`mgr::AbstractChannelFactory`], which here is instantiated with a [`factory::CompoundFactory`].
111
    /// The `CompoundFactory` itself holds:
112
    ///   * A `dyn` [`factory::AbstractPtMgr`] that can provide a `dyn`
113
    ///     [`factory::ChannelFactory`] for each supported pluggable transport.
114
    ///     This starts out as `None`, but can be replaced with [`ChanMgr::set_pt_mgr`].
115
    ///     The `TorClient` code currently sets this using `tor_ptmgr::PtMgr`.
116
    ///     `PtMgr` currently returns `ChannelFactory` implementations that are
117
    ///     built using [`transport::proxied::ExternalProxyPlugin`], which implements
118
    ///     [`transport::TransportImplHelper`], which in turn is wrapped into a
119
    ///     `ChanBuilder` to implement `ChannelFactory`.
120
    ///   * A generic [`factory::ChannelFactory`] that it uses for everything else
121
    ///     We instantiate this with a
122
    ///     [`builder::ChanBuilder`] using a [`transport::default::DefaultTransport`].
123
    // This type is a bit long, but I think it's better to just state it here explicitly rather than
124
    // hiding parts of it behind a type alias to make it look nicer.
125
    mgr: mgr::AbstractChanMgr<
126
        factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
127
    >,
128

            
129
    /// Stream of [`ConnStatus`] events.
130
    bootstrap_status: event::ConnStatusEvents,
131

            
132
    /// This currently isn't actually used, but we're keeping a PhantomData here
133
    /// since probably we'll want it again, sooner or later.
134
    runtime: std::marker::PhantomData<fn(R) -> R>,
135
}
136

            
137
/// Description of how we got a channel.
138
#[non_exhaustive]
139
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
140
pub enum ChanProvenance {
141
    /// This channel was newly launched, or was in progress and finished while
142
    /// we were waiting.
143
    NewlyCreated,
144
    /// This channel already existed when we asked for it.
145
    Preexisting,
146
}
147

            
148
/// Dormancy state, as far as the channel manager is concerned
149
///
150
/// This is usually derived in higher layers from `arti_client::DormantMode`.
151
#[non_exhaustive]
152
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
153
pub enum Dormancy {
154
    /// Not dormant
155
    ///
156
    /// Channels will operate normally.
157
    #[default]
158
    Active,
159
    /// Totally dormant
160
    ///
161
    /// Channels will not perform any spontaneous activity (eg, netflow padding)
162
    Dormant,
163
}
164

            
165
/// The usage that we have in mind when requesting a channel.
166
///
167
/// A channel may be used in multiple ways.  Each time a channel is requested
168
/// from `ChanMgr` a separate `ChannelUsage` is passed in to tell the `ChanMgr`
169
/// how the channel will be used this time.
170
///
171
/// To be clear, the `ChannelUsage` is aspect of a _request_ for a channel, and
172
/// is not an immutable property of the channel itself.
173
///
174
/// This type is obtained from a `tor_circmgr::usage::SupportedCircUsage` in
175
/// `tor_circmgr::usage`, and it has roughly the same set of variants.
176
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
177
#[non_exhaustive]
178
pub enum ChannelUsage {
179
    /// Requesting a channel to use for BEGINDIR-based non-anonymous directory
180
    /// connections.
181
    Dir,
182

            
183
    /// Requesting a channel to transmit user traffic (including exit traffic)
184
    /// over the network.
185
    ///
186
    /// This includes the case where we are constructing a circuit preemptively,
187
    /// and _planning_ to use it for user traffic later on.
188
    UserTraffic,
189

            
190
    /// Requesting a channel that the caller does not plan to used at all, or
191
    /// which it plans to use only for testing circuits.
192
    UselessCircuit,
193
}
194

            
195
impl<R: Runtime> ChanMgr<R> {
196
    /// Construct a new channel manager.
197
    ///
198
    /// A new `ChannelAccount` will be made from `memquota`, for each Channel.
199
    ///
200
    /// The `ChannelAccount` is used for data associated with this channel.
201
    ///
202
    /// This does *not* (currently) include downstream outbound data
203
    /// (ie, data processed by the channel implementation here,
204
    /// awaiting TLS processing and actual transmission).
205
    /// In any case we try to keep those buffers small.
206
    ///
207
    /// The ChannelAccount *does* track upstream outbound data
208
    /// (ie, data processed by a circuit, but not yet by the channel),
209
    /// even though that data relates to a specific circuit.
210
    /// TODO #1652 use `CircuitAccount` for circuit->channel queue.
211
    ///
212
    /// # Usage note
213
    ///
214
    /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
215
44
    pub fn new(
216
44
        runtime: R,
217
44
        config: &ChannelConfig,
218
44
        dormancy: Dormancy,
219
44
        netparams: &NetParameters,
220
44
        memquota: ToplevelAccount,
221
44
    ) -> Self
222
44
    where
223
44
        R: 'static,
224
44
    {
225
44
        let (sender, receiver) = event::channel();
226
44
        let sender = Arc::new(std::sync::Mutex::new(sender));
227
44
        let reporter = BootstrapReporter(sender);
228
44
        let transport = transport::DefaultTransport::new(runtime.clone());
229
44
        let builder = builder::ChanBuilder::new(runtime, transport);
230
44
        let factory = factory::CompoundFactory::new(
231
44
            Arc::new(builder),
232
44
            #[cfg(feature = "pt-client")]
233
44
            None,
234
44
        );
235
44
        let mgr =
236
44
            mgr::AbstractChanMgr::new(factory, config, dormancy, netparams, reporter, memquota);
237
44
        ChanMgr {
238
44
            mgr,
239
44
            bootstrap_status: receiver,
240
44
            runtime: std::marker::PhantomData,
241
44
        }
242
44
    }
243

            
244
    /// Launch the periodic daemon tasks required by the manager to function properly.
245
    ///
246
    /// Returns a [`TaskHandle`] that can be used to manage
247
    /// those daemon tasks that poll periodically.
248
8
    pub fn launch_background_tasks(
249
8
        self: &Arc<Self>,
250
8
        runtime: &R,
251
8
        netdir: Arc<dyn NetDirProvider>,
252
8
    ) -> Result<Vec<TaskHandle>> {
253
8
        runtime
254
8
            .spawn(Self::continually_update_channels_config(
255
8
                Arc::downgrade(self),
256
8
                netdir,
257
8
            ))
258
8
            .map_err(|e| Error::from_spawn("channels config task", e))?;
259

            
260
8
        let (sched, handle) = TaskSchedule::new(runtime.clone());
261
8
        runtime
262
8
            .spawn(Self::continually_expire_channels(
263
8
                sched,
264
8
                Arc::downgrade(self),
265
8
            ))
266
8
            .map_err(|e| Error::from_spawn("channel expiration task", e))?;
267
8
        Ok(vec![handle])
268
8
    }
269

            
270
    /// Build a channel for an incoming stream.
271
    ///
272
    /// The channel may or may not be authenticated.
273
    /// This method will wait until the channel is usable,
274
    /// and may return an error if we already have an existing channel to this peer,
275
    /// or if there are already too many open connections with this
276
    /// peer or subnet (as a dos defence).
277
    #[cfg(feature = "relay")]
278
    pub async fn handle_incoming(
279
        &self,
280
        src: std::net::SocketAddr,
281
        stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
282
    ) -> Result<Arc<Channel>> {
283
        self.mgr.handle_incoming(src, stream).await
284
    }
285

            
286
    /// Try to get a suitable channel to the provided `target`,
287
    /// launching one if one does not exist.
288
    ///
289
    /// If there is already a channel launch attempt in progress, this
290
    /// function will wait until that launch is complete, and succeed
291
    /// or fail depending on its outcome.
292
    pub async fn get_or_launch<T: ChanTarget + ?Sized>(
293
        &self,
294
        target: &T,
295
        usage: ChannelUsage,
296
    ) -> Result<(Arc<Channel>, ChanProvenance)> {
297
        let targetinfo = OwnedChanTarget::from_chan_target(target);
298

            
299
        let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
300
        // Double-check the match to make sure that the RSA identity is
301
        // what we wanted too.
302
        chan.check_match(target)
303
            .map_err(|e| Error::from_proto_no_skew(e, target))?;
304
        Ok((chan, provenance))
305
    }
306

            
307
    /// Return a stream of [`ConnStatus`] events to tell us about changes
308
    /// in our ability to connect to the internet.
309
    ///
310
    /// Note that this stream can be lossy: the caller will not necessarily
311
    /// observe every event on the stream
312
8
    pub fn bootstrap_events(&self) -> ConnStatusEvents {
313
8
        self.bootstrap_status.clone()
314
8
    }
315

            
316
    /// Expire all channels that have been unused for too long.
317
    ///
318
    /// Return the duration from now until next channel expires.
319
14
    pub fn expire_channels(&self) -> Duration {
320
14
        self.mgr.expire_channels()
321
14
    }
322

            
323
    /// Notifies the chanmgr to be dormant like dormancy
324
8
    pub fn set_dormancy(
325
8
        &self,
326
8
        dormancy: Dormancy,
327
8
        netparams: Arc<dyn AsRef<NetParameters>>,
328
8
    ) -> StdResult<(), tor_error::Bug> {
329
8
        self.mgr.set_dormancy(dormancy, netparams)
330
8
    }
331

            
332
    /// Reconfigure all channels
333
4
    pub fn reconfigure(
334
4
        &self,
335
4
        config: &ChannelConfig,
336
4
        how: tor_config::Reconfigure,
337
4
        netparams: Arc<dyn AsRef<NetParameters>>,
338
4
    ) -> StdResult<(), ReconfigureError> {
339
4
        if how == tor_config::Reconfigure::CheckAllOrNothing {
340
            // Since `self.mgr.reconfigure` returns an error type of `Bug` and not
341
            // `ReconfigureError` (see check below), the reconfigure should only fail due to bugs.
342
            // This means we can return `Ok` here since there should never be an error with the
343
            // provided `config` values.
344
2
            return Ok(());
345
2
        }
346
2

            
347
2
        let r = self.mgr.reconfigure(config, netparams);
348
2

            
349
2
        // Check that `self.mgr.reconfigure` returns an error type of `Bug` (see comment above).
350
2
        let _: Option<&tor_error::Bug> = r.as_ref().err();
351
2

            
352
2
        Ok(r?)
353
4
    }
354

            
355
    /// Replace the transport registry with one that may know about
356
    /// more transports.
357
    ///
358
    /// Note that the [`ChannelFactory`](factory::ChannelFactory) instances returned by `ptmgr` are
359
    /// required to time-out channels that take too long to build.  You'll get
360
    /// this behavior by default if the factories implement [`ChannelFactory`](factory::ChannelFactory) using
361
    /// [`transport::proxied::ExternalProxyPlugin`], which `tor-ptmgr` does.
362
    #[cfg(feature = "pt-client")]
363
8
    pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
364
8
        self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
365
8
    }
366

            
367
    /// Try to create a new, unmanaged channel to `target`.
368
    ///
369
    /// Unlike [`get_or_launch`](ChanMgr::get_or_launch), this function always
370
    /// creates a new channel, never retries transient failure, and does not
371
    /// register this channel with the `ChanMgr`.  
372
    ///
373
    /// Generally you should not use this function; `get_or_launch` is usually a
374
    /// better choice.  This function is the right choice if, for whatever
375
    /// reason, you need to manage the lifetime of the channel you create, and
376
    /// make sure that no other code with access to this `ChanMgr` will be able
377
    /// to use the channel.
378
    #[cfg(feature = "experimental-api")]
379
    pub async fn build_unmanaged_channel(
380
        &self,
381
        target: impl tor_linkspec::IntoOwnedChanTarget,
382
        memquota: ChannelAccount,
383
    ) -> Result<Arc<Channel>> {
384
        use factory::ChannelFactory as _;
385
        let target = target.to_owned();
386

            
387
        self.mgr
388
            .channels
389
            .builder()
390
            .connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
391
            .await
392
    }
393

            
394
    /// Watch for things that ought to change the configuration of all channels in the client
395
    ///
396
    /// Currently this handles enabling and disabling channel padding.
397
    ///
398
    /// This is a daemon task that runs indefinitely in the background,
399
    /// and exits when we find that `chanmgr` is dropped.
400
8
    async fn continually_update_channels_config(
401
8
        self_: Weak<Self>,
402
8
        netdir: Arc<dyn NetDirProvider>,
403
8
    ) {
404
        use tor_netdir::DirEvent as DE;
405
8
        let mut netdir_stream = netdir.events().fuse();
406
8
        let netdir = {
407
8
            let weak = Arc::downgrade(&netdir);
408
8
            drop(netdir);
409
8
            weak
410
        };
411
8
        let termination_reason: std::result::Result<Void, &str> = async move {
412
            loop {
413
8
                select_biased! {
414
8
                    direvent = netdir_stream.next() => {
415
                        let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
416
                        if ! matches!(direvent, DE::NewConsensus) { continue };
417
                        let self_ = self_.upgrade().ok_or("channel manager gone away")?;
418
                        let netdir = netdir.upgrade().ok_or("netdir gone away")?;
419
                        let netparams = netdir.params();
420
                        self_.mgr.update_netparams(netparams).map_err(|e| {
421
                            error_report!(e, "continually_update_channels_config: failed to process!");
422
                            "error processing netdir"
423
                        })?;
424
                    }
425
                }
426
            }
427
8
        }
428
8
        .await;
429
        debug!(
430
            "continually_update_channels_config: shutting down: {}",
431
            termination_reason.void_unwrap_err()
432
        );
433
    }
434

            
435
    /// Periodically expire any channels that have been unused beyond
436
    /// the maximum duration allowed.
437
    ///
438
    /// Exist when we find that `chanmgr` is dropped
439
    ///
440
    /// This is a daemon task that runs indefinitely in the background
441
8
    async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
442
22
        while sched.next().await.is_some() {
443
14
            let Some(cm) = Weak::upgrade(&chanmgr) else {
444
                // channel manager is closed.
445
                return;
446
            };
447
14
            let delay = cm.expire_channels();
448
14
            // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
449
14
            sched.fire_in(delay);
450
        }
451
8
    }
452
}