tor_chanmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
45
46pub mod builder;
47mod config;
48mod err;
49mod event;
50pub mod factory;
51mod mgr;
52#[cfg(test)]
53mod testing;
54pub mod transport;
55pub(crate) mod util;
56
57use futures::select_biased;
58use futures::task::SpawnExt;
59use futures::StreamExt;
60use std::result::Result as StdResult;
61use std::sync::{Arc, Weak};
62use std::time::Duration;
63use tor_config::ReconfigureError;
64use tor_error::error_report;
65use tor_linkspec::{ChanTarget, OwnedChanTarget};
66use tor_netdir::{params::NetParameters, NetDirProvider};
67use tor_proto::channel::Channel;
68#[cfg(feature = "experimental-api")]
69use tor_proto::memquota::ChannelAccount;
70use tor_proto::memquota::ToplevelAccount;
71use tracing::debug;
72use void::{ResultVoidErrExt, Void};
73
74pub use err::Error;
75
76pub use config::{ChannelConfig, ChannelConfigBuilder};
77
78use tor_rtcompat::Runtime;
79
80/// A Result as returned by this crate.
81pub type Result<T> = std::result::Result<T, Error>;
82
83use crate::factory::BootstrapReporter;
84pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
85use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
86
87/// An object that remembers a set of live channels, and launches new ones on
88/// request.
89///
90/// Use the [`ChanMgr::get_or_launch`] function to create a new [`Channel`], or
91/// get one if it exists.  (For a slightly lower-level API that does no caching,
92/// see [`ChannelFactory`](factory::ChannelFactory) and its implementors.  For a
93/// much lower-level API, see [`tor_proto::channel::ChannelBuilder`].)
94///
95/// Each channel is kept open as long as there is a reference to it, or
96/// something else (such as the relay or a network error) kills the channel.
97///
98/// After a `ChanMgr` launches a channel, it keeps a reference to it until that
99/// channel has been unused (that is, had no circuits attached to it) for a
100/// certain amount of time. (Currently this interval is chosen randomly from
101/// between 180-270 seconds, but this is an implementation detail that may change
102/// in the future.)
103pub struct ChanMgr<R: Runtime> {
104    /// Internal channel manager object that does the actual work.
105    ///
106    /// ## How this is built
107    ///
108    /// This internal manager is parameterized over an
109    /// [`mgr::AbstractChannelFactory`], which here is instantiated with a [`factory::CompoundFactory`].
110    /// The `CompoundFactory` itself holds:
111    ///   * A `dyn` [`factory::AbstractPtMgr`] that can provide a `dyn`
112    ///     [`factory::ChannelFactory`] for each supported pluggable transport.
113    ///     This starts out as `None`, but can be replaced with [`ChanMgr::set_pt_mgr`].
114    ///     The `TorClient` code currently sets this using `tor_ptmgr::PtMgr`.
115    ///     `PtMgr` currently returns `ChannelFactory` implementations that are
116    ///     built using [`transport::proxied::ExternalProxyPlugin`], which implements
117    ///     [`transport::TransportImplHelper`], which in turn is wrapped into a
118    ///     `ChanBuilder` to implement `ChannelFactory`.
119    ///   * A generic [`factory::ChannelFactory`] that it uses for everything else
120    ///     We instantiate this with a
121    ///     [`builder::ChanBuilder`] using a [`transport::default::DefaultTransport`].
122    // This type is a bit long, but I think it's better to just state it here explicitly rather than
123    // hiding parts of it behind a type alias to make it look nicer.
124    mgr: mgr::AbstractChanMgr<
125        factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
126    >,
127
128    /// Stream of [`ConnStatus`] events.
129    bootstrap_status: event::ConnStatusEvents,
130
131    /// This currently isn't actually used, but we're keeping a PhantomData here
132    /// since probably we'll want it again, sooner or later.
133    runtime: std::marker::PhantomData<fn(R) -> R>,
134}
135
136/// Description of how we got a channel.
137#[non_exhaustive]
138#[derive(Debug, Copy, Clone, Eq, PartialEq)]
139pub enum ChanProvenance {
140    /// This channel was newly launched, or was in progress and finished while
141    /// we were waiting.
142    NewlyCreated,
143    /// This channel already existed when we asked for it.
144    Preexisting,
145}
146
147/// Dormancy state, as far as the channel manager is concerned
148///
149/// This is usually derived in higher layers from `arti_client::DormantMode`.
150#[non_exhaustive]
151#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
152pub enum Dormancy {
153    /// Not dormant
154    ///
155    /// Channels will operate normally.
156    #[default]
157    Active,
158    /// Totally dormant
159    ///
160    /// Channels will not perform any spontaneous activity (eg, netflow padding)
161    Dormant,
162}
163
164/// The usage that we have in mind when requesting a channel.
165///
166/// A channel may be used in multiple ways.  Each time a channel is requested
167/// from `ChanMgr` a separate `ChannelUsage` is passed in to tell the `ChanMgr`
168/// how the channel will be used this time.
169///
170/// To be clear, the `ChannelUsage` is aspect of a _request_ for a channel, and
171/// is not an immutable property of the channel itself.
172///
173/// This type is obtained from a `tor_circmgr::usage::SupportedCircUsage` in
174/// `tor_circmgr::usage`, and it has roughly the same set of variants.
175#[derive(Clone, Debug, Copy, Eq, PartialEq)]
176#[non_exhaustive]
177pub enum ChannelUsage {
178    /// Requesting a channel to use for BEGINDIR-based non-anonymous directory
179    /// connections.
180    Dir,
181
182    /// Requesting a channel to transmit user traffic (including exit traffic)
183    /// over the network.
184    ///
185    /// This includes the case where we are constructing a circuit preemptively,
186    /// and _planning_ to use it for user traffic later on.
187    UserTraffic,
188
189    /// Requesting a channel that the caller does not plan to used at all, or
190    /// which it plans to use only for testing circuits.
191    UselessCircuit,
192}
193
194impl<R: Runtime> ChanMgr<R> {
195    /// Construct a new channel manager.
196    ///
197    /// A new `ChannelAccount` will be made from `memquota`, for each Channel.
198    ///
199    /// The `ChannelAccount` is used for data associated with this channel.
200    ///
201    /// This does *not* (currently) include downstream outbound data
202    /// (ie, data processed by the channel implementation here,
203    /// awaiting TLS processing and actual transmission).
204    /// In any case we try to keep those buffers small.
205    ///
206    /// The ChannelAccount *does* track upstream outbound data
207    /// (ie, data processed by a circuit, but not yet by the channel),
208    /// even though that data relates to a specific circuit.
209    /// TODO #1652 use `CircuitAccount` for circuit->channel queue.
210    ///
211    /// # Usage note
212    ///
213    /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
214    pub fn new(
215        runtime: R,
216        config: &ChannelConfig,
217        dormancy: Dormancy,
218        netparams: &NetParameters,
219        memquota: ToplevelAccount,
220    ) -> Self
221    where
222        R: 'static,
223    {
224        let (sender, receiver) = event::channel();
225        let sender = Arc::new(std::sync::Mutex::new(sender));
226        let reporter = BootstrapReporter(sender);
227        let transport = transport::DefaultTransport::new(runtime.clone());
228        let builder = builder::ChanBuilder::new(runtime, transport);
229        let factory = factory::CompoundFactory::new(
230            Arc::new(builder),
231            #[cfg(feature = "pt-client")]
232            None,
233        );
234        let mgr =
235            mgr::AbstractChanMgr::new(factory, config, dormancy, netparams, reporter, memquota);
236        ChanMgr {
237            mgr,
238            bootstrap_status: receiver,
239            runtime: std::marker::PhantomData,
240        }
241    }
242
243    /// Launch the periodic daemon tasks required by the manager to function properly.
244    ///
245    /// Returns a [`TaskHandle`] that can be used to manage
246    /// those daemon tasks that poll periodically.
247    pub fn launch_background_tasks(
248        self: &Arc<Self>,
249        runtime: &R,
250        netdir: Arc<dyn NetDirProvider>,
251    ) -> Result<Vec<TaskHandle>> {
252        runtime
253            .spawn(Self::continually_update_channels_config(
254                Arc::downgrade(self),
255                netdir,
256            ))
257            .map_err(|e| Error::from_spawn("channels config task", e))?;
258
259        let (sched, handle) = TaskSchedule::new(runtime.clone());
260        runtime
261            .spawn(Self::continually_expire_channels(
262                sched,
263                Arc::downgrade(self),
264            ))
265            .map_err(|e| Error::from_spawn("channel expiration task", e))?;
266        Ok(vec![handle])
267    }
268
269    /// Build a channel for an incoming stream.
270    ///
271    /// The channel may or may not be authenticated.
272    /// This method will wait until the channel is usable,
273    /// and may return an error if we already have an existing channel to this peer,
274    /// or if there are already too many open connections with this
275    /// peer or subnet (as a dos defence).
276    #[cfg(feature = "relay")]
277    pub async fn handle_incoming(
278        &self,
279        src: std::net::SocketAddr,
280        stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
281    ) -> Result<Arc<Channel>> {
282        self.mgr.handle_incoming(src, stream).await
283    }
284
285    /// Try to get a suitable channel to the provided `target`,
286    /// launching one if one does not exist.
287    ///
288    /// If there is already a channel launch attempt in progress, this
289    /// function will wait until that launch is complete, and succeed
290    /// or fail depending on its outcome.
291    pub async fn get_or_launch<T: ChanTarget + ?Sized>(
292        &self,
293        target: &T,
294        usage: ChannelUsage,
295    ) -> Result<(Arc<Channel>, ChanProvenance)> {
296        let targetinfo = OwnedChanTarget::from_chan_target(target);
297
298        let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
299        // Double-check the match to make sure that the RSA identity is
300        // what we wanted too.
301        chan.check_match(target)
302            .map_err(|e| Error::from_proto_no_skew(e, target))?;
303        Ok((chan, provenance))
304    }
305
306    /// Return a stream of [`ConnStatus`] events to tell us about changes
307    /// in our ability to connect to the internet.
308    ///
309    /// Note that this stream can be lossy: the caller will not necessarily
310    /// observe every event on the stream
311    pub fn bootstrap_events(&self) -> ConnStatusEvents {
312        self.bootstrap_status.clone()
313    }
314
315    /// Expire all channels that have been unused for too long.
316    ///
317    /// Return the duration from now until next channel expires.
318    pub fn expire_channels(&self) -> Duration {
319        self.mgr.expire_channels()
320    }
321
322    /// Notifies the chanmgr to be dormant like dormancy
323    pub fn set_dormancy(
324        &self,
325        dormancy: Dormancy,
326        netparams: Arc<dyn AsRef<NetParameters>>,
327    ) -> StdResult<(), tor_error::Bug> {
328        self.mgr.set_dormancy(dormancy, netparams)
329    }
330
331    /// Reconfigure all channels
332    pub fn reconfigure(
333        &self,
334        config: &ChannelConfig,
335        how: tor_config::Reconfigure,
336        netparams: Arc<dyn AsRef<NetParameters>>,
337    ) -> StdResult<(), ReconfigureError> {
338        if how == tor_config::Reconfigure::CheckAllOrNothing {
339            // Since `self.mgr.reconfigure` returns an error type of `Bug` and not
340            // `ReconfigureError` (see check below), the reconfigure should only fail due to bugs.
341            // This means we can return `Ok` here since there should never be an error with the
342            // provided `config` values.
343            return Ok(());
344        }
345
346        let r = self.mgr.reconfigure(config, netparams);
347
348        // Check that `self.mgr.reconfigure` returns an error type of `Bug` (see comment above).
349        let _: Option<&tor_error::Bug> = r.as_ref().err();
350
351        Ok(r?)
352    }
353
354    /// Replace the transport registry with one that may know about
355    /// more transports.
356    ///
357    /// Note that the [`ChannelFactory`](factory::ChannelFactory) instances returned by `ptmgr` are
358    /// required to time-out channels that take too long to build.  You'll get
359    /// this behavior by default if the factories implement [`ChannelFactory`](factory::ChannelFactory) using
360    /// [`transport::proxied::ExternalProxyPlugin`], which `tor-ptmgr` does.
361    #[cfg(feature = "pt-client")]
362    pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
363        self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
364    }
365
366    /// Try to create a new, unmanaged channel to `target`.
367    ///
368    /// Unlike [`get_or_launch`](ChanMgr::get_or_launch), this function always
369    /// creates a new channel, never retries transient failure, and does not
370    /// register this channel with the `ChanMgr`.  
371    ///
372    /// Generally you should not use this function; `get_or_launch` is usually a
373    /// better choice.  This function is the right choice if, for whatever
374    /// reason, you need to manage the lifetime of the channel you create, and
375    /// make sure that no other code with access to this `ChanMgr` will be able
376    /// to use the channel.
377    #[cfg(feature = "experimental-api")]
378    pub async fn build_unmanaged_channel(
379        &self,
380        target: impl tor_linkspec::IntoOwnedChanTarget,
381        memquota: ChannelAccount,
382    ) -> Result<Arc<Channel>> {
383        use factory::ChannelFactory as _;
384        let target = target.to_owned();
385
386        self.mgr
387            .channels
388            .builder()
389            .connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
390            .await
391    }
392
393    /// Watch for things that ought to change the configuration of all channels in the client
394    ///
395    /// Currently this handles enabling and disabling channel padding.
396    ///
397    /// This is a daemon task that runs indefinitely in the background,
398    /// and exits when we find that `chanmgr` is dropped.
399    async fn continually_update_channels_config(
400        self_: Weak<Self>,
401        netdir: Arc<dyn NetDirProvider>,
402    ) {
403        use tor_netdir::DirEvent as DE;
404        let mut netdir_stream = netdir.events().fuse();
405        let netdir = {
406            let weak = Arc::downgrade(&netdir);
407            drop(netdir);
408            weak
409        };
410        let termination_reason: std::result::Result<Void, &str> = async move {
411            loop {
412                select_biased! {
413                    direvent = netdir_stream.next() => {
414                        let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
415                        if ! matches!(direvent, DE::NewConsensus) { continue };
416                        let self_ = self_.upgrade().ok_or("channel manager gone away")?;
417                        let netdir = netdir.upgrade().ok_or("netdir gone away")?;
418                        let netparams = netdir.params();
419                        self_.mgr.update_netparams(netparams).map_err(|e| {
420                            error_report!(e, "continually_update_channels_config: failed to process!");
421                            "error processing netdir"
422                        })?;
423                    }
424                }
425            }
426        }
427        .await;
428        debug!(
429            "continually_update_channels_config: shutting down: {}",
430            termination_reason.void_unwrap_err()
431        );
432    }
433
434    /// Periodically expire any channels that have been unused beyond
435    /// the maximum duration allowed.
436    ///
437    /// Exist when we find that `chanmgr` is dropped
438    ///
439    /// This is a daemon task that runs indefinitely in the background
440    async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
441        while sched.next().await.is_some() {
442            let Some(cm) = Weak::upgrade(&chanmgr) else {
443                // channel manager is closed.
444                return;
445            };
446            let delay = cm.expire_channels();
447            // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
448            sched.fire_in(delay);
449        }
450    }
451}