Skip to main content

tor_chanmgr/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49pub mod builder;
50mod config;
51mod err;
52mod event;
53pub mod factory;
54mod mgr;
55#[cfg(test)]
56mod testing;
57pub mod transport;
58pub(crate) mod util;
59
60use futures::StreamExt;
61use futures::select_biased;
62use std::result::Result as StdResult;
63use std::sync::{Arc, Weak};
64use std::time::Duration;
65use tor_config::ReconfigureError;
66use tor_error::error_report;
67use tor_linkspec::{ChanTarget, OwnedChanTarget};
68use tor_netdir::{NetDirProvider, params::NetParameters};
69use tor_proto::channel::Channel;
70#[cfg(feature = "experimental-api")]
71use tor_proto::memquota::ChannelAccount;
72use tor_proto::memquota::ToplevelAccount;
73use tor_rtcompat::SpawnExt;
74use tracing::debug;
75use tracing::instrument;
76use void::{ResultVoidErrExt, Void};
77
78#[cfg(feature = "relay")]
79use {
80    async_trait::async_trait, safelog::Sensitive, tor_proto::relay::CreateRequestHandler,
81    tor_proto::relay::channel_provider::ChannelProvider,
82};
83
84pub use err::Error;
85
86pub use config::{ChannelConfig, ChannelConfigBuilder};
87pub use mgr::ChanMgrConfig;
88
89use tor_rtcompat::Runtime;
90
91/// A Result as returned by this crate.
92pub type Result<T> = std::result::Result<T, Error>;
93
94use crate::factory::BootstrapReporter;
95pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
96use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
97
98/// An object that remembers a set of live channels, and launches new ones on
99/// request.
100///
101/// Use the [`ChanMgr::get_or_launch`] function to create a new [`Channel`], or
102/// get one if it exists.  (For a slightly lower-level API that does no caching,
103/// see [`ChannelFactory`](factory::ChannelFactory) and its implementors.
104///
105/// Each channel is kept open as long as there is a reference to it, or
106/// something else (such as the relay or a network error) kills the channel.
107///
108/// After a `ChanMgr` launches a channel, it keeps a reference to it until that
109/// channel has been unused (that is, had no circuits attached to it) for a
110/// certain amount of time. (Currently this interval is chosen randomly from
111/// between 180-270 seconds, but this is an implementation detail that may change
112/// in the future.)
113pub struct ChanMgr<R: Runtime> {
114    /// Internal channel manager object that does the actual work.
115    ///
116    /// ## How this is built
117    ///
118    /// This internal manager is parameterized over an
119    /// [`mgr::AbstractChannelFactory`], which here is instantiated with a [`factory::CompoundFactory`].
120    /// The `CompoundFactory` itself holds:
121    ///   * A `dyn` [`factory::AbstractPtMgr`] that can provide a `dyn`
122    ///     [`factory::ChannelFactory`] for each supported pluggable transport.
123    ///     This starts out as `None`, but can be replaced with [`ChanMgr::set_pt_mgr`].
124    ///     The `TorClient` code currently sets this using `tor_ptmgr::PtMgr`.
125    ///     `PtMgr` currently returns `ChannelFactory` implementations that are
126    ///     built using [`transport::proxied::ExternalProxyPlugin`], which implements
127    ///     [`transport::TransportImplHelper`], which in turn is wrapped into a
128    ///     `ChanBuilder` to implement `ChannelFactory`.
129    ///   * A generic [`factory::ChannelFactory`] that it uses for everything else
130    ///     We instantiate this with a
131    ///     [`builder::ChanBuilder`] using a [`transport::default::DefaultTransport`].
132    // This type is a bit long, but I think it's better to just state it here explicitly rather than
133    // hiding parts of it behind a type alias to make it look nicer.
134    mgr: mgr::AbstractChanMgr<
135        factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
136    >,
137
138    /// Stream of [`ConnStatus`] events.
139    bootstrap_status: event::ConnStatusEvents,
140
141    /// The runtime. Needed to possibly spawn tasks.
142    #[allow(unused)] // Relay use this, not client yet. Keep it here instead of gating.
143    runtime: R,
144}
145
146/// Description of how we got a channel.
147#[non_exhaustive]
148#[derive(Debug, Copy, Clone, Eq, PartialEq)]
149pub enum ChanProvenance {
150    /// This channel was newly launched, or was in progress and finished while
151    /// we were waiting.
152    NewlyCreated,
153    /// This channel already existed when we asked for it.
154    Preexisting,
155}
156
157/// Dormancy state, as far as the channel manager is concerned
158///
159/// This is usually derived in higher layers from `arti_client::DormantMode`.
160#[non_exhaustive]
161#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
162pub enum Dormancy {
163    /// Not dormant
164    ///
165    /// Channels will operate normally.
166    #[default]
167    Active,
168    /// Totally dormant
169    ///
170    /// Channels will not perform any spontaneous activity (eg, netflow padding)
171    Dormant,
172}
173
174/// The usage that we have in mind when requesting a channel.
175///
176/// A channel may be used in multiple ways.  Each time a channel is requested
177/// from `ChanMgr` a separate `ChannelUsage` is passed in to tell the `ChanMgr`
178/// how the channel will be used this time.
179///
180/// To be clear, the `ChannelUsage` is aspect of a _request_ for a channel, and
181/// is not an immutable property of the channel itself.
182///
183/// This type is obtained from a `tor_circmgr::usage::SupportedCircUsage` in
184/// `tor_circmgr::usage`, and it has roughly the same set of variants.
185#[derive(Clone, Debug, Copy, Eq, PartialEq)]
186#[non_exhaustive]
187pub enum ChannelUsage {
188    /// Requesting a channel to use for BEGINDIR-based non-anonymous directory
189    /// connections.
190    Dir,
191
192    /// Requesting a channel to transmit user traffic (including exit traffic)
193    /// over the network.
194    ///
195    /// This includes the case where we are constructing a circuit preemptively,
196    /// and _planning_ to use it for user traffic later on.
197    UserTraffic,
198
199    /// Requesting a channel that the caller does not plan to used at all, or
200    /// which it plans to use only for testing circuits.
201    UselessCircuit,
202}
203
204impl<R: Runtime> ChanMgr<R> {
205    /// Construct a new channel manager.
206    ///
207    /// A new `ChannelAccount` will be made from `memquota`, for each Channel.
208    ///
209    /// The `ChannelAccount` is used for data associated with this channel.
210    ///
211    /// This does *not* (currently) include downstream outbound data
212    /// (ie, data processed by the channel implementation here,
213    /// awaiting TLS processing and actual transmission).
214    /// In any case we try to keep those buffers small.
215    ///
216    /// The ChannelAccount *does* track upstream outbound data
217    /// (ie, data processed by a circuit, but not yet by the channel),
218    /// even though that data relates to a specific circuit.
219    /// TODO #1652 use `CircuitAccount` for circuit->channel queue.
220    ///
221    /// # Usage note
222    ///
223    /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
224    ///
225    /// The `keymgr` is only needed for a relay which is used for authenticating its channel to
226    /// other relays. Pass `None` for a client.
227    pub fn new(
228        runtime: R,
229        config: ChanMgrConfig,
230        dormancy: Dormancy,
231        netparams: &NetParameters,
232        memquota: ToplevelAccount,
233    ) -> Result<Self>
234    where
235        R: 'static,
236    {
237        let (sender, receiver) = event::channel();
238        let sender = Arc::new(std::sync::Mutex::new(sender));
239        let reporter = BootstrapReporter(sender);
240        let transport =
241            transport::DefaultTransport::new(runtime.clone(), config.cfg.outbound_proxy.clone());
242        cfg_if::cfg_if! {
243            if #[cfg(feature = "relay")] {
244                let builder = if let Some(auth_material) = &config.auth_material {
245                    builder::ChanBuilder::new_relay(runtime.clone(), transport, auth_material.clone(), config.my_addrs, None)?
246                } else {
247                    // Yes, clients can have the "relay" feature enabled (unit tests).
248                    builder::ChanBuilder::new_client(runtime.clone(), transport)
249                };
250            } else {
251                let builder =  builder::ChanBuilder::new_client(runtime.clone(), transport);
252            }
253        };
254
255        let factory = factory::CompoundFactory::new(
256            Arc::new(builder),
257            #[cfg(feature = "pt-client")]
258            None,
259        );
260
261        // Warn if outbound_proxy is configured to a non-loopback address
262        if let Some(ref proxy) = config.cfg.outbound_proxy {
263            if !proxy.is_loopback() {
264                tracing::warn!(
265                    proxy_addr = %proxy,
266                    "outbound_proxy is configured to a non-loopback address; \
267                     this may expose Tor traffic to an untrusted intermediate"
268                );
269            }
270        }
271
272        let mgr =
273            mgr::AbstractChanMgr::new(factory, config.cfg, dormancy, netparams, reporter, memquota);
274
275        Ok(ChanMgr {
276            mgr,
277            bootstrap_status: receiver,
278            runtime,
279        })
280    }
281
282    /// Launch the periodic daemon tasks required by the manager to function properly.
283    ///
284    /// Returns a [`TaskHandle`] that can be used to manage
285    /// those daemon tasks that poll periodically.
286    #[instrument(level = "trace", skip_all)]
287    pub fn launch_background_tasks(
288        self: &Arc<Self>,
289        runtime: &R,
290        netdir: Arc<dyn NetDirProvider>,
291    ) -> Result<Vec<TaskHandle>> {
292        runtime
293            .spawn(Self::continually_update_channels_config(
294                Arc::downgrade(self),
295                netdir,
296            ))
297            .map_err(|e| Error::from_spawn("channels config task", e))?;
298
299        let (sched, handle) = TaskSchedule::new(runtime.clone());
300        runtime
301            .spawn(Self::continually_expire_channels(
302                sched,
303                Arc::downgrade(self),
304            ))
305            .map_err(|e| Error::from_spawn("channel expiration task", e))?;
306        Ok(vec![handle])
307    }
308
309    /// Build a channel for an incoming stream.
310    ///
311    /// The `my_addrs` are the IP address(es) that are advertised by the relay in the consensus. We
312    /// need to pass them so they can be sent in the NETINFO cell.
313    ///
314    /// The channel may or may not be authenticated. This method will wait until the channel is
315    /// usable, and may return an error if we already have an existing channel to this peer, or if
316    /// there are already too many open connections with this peer or subnet (as a dos defence).
317    #[cfg(feature = "relay")]
318    pub async fn handle_incoming(
319        &self,
320        src: Sensitive<std::net::SocketAddr>,
321        stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
322    ) -> Result<Arc<Channel>> {
323        self.mgr.handle_incoming(src, stream).await
324    }
325
326    /// Try to get a suitable channel to the provided `target`,
327    /// launching one if one does not exist.
328    ///
329    /// This function does not guarantee that the returned channel
330    /// satisfies all of the properties of `target`. For example if an
331    /// existing channel is returned, it might not be connected to any
332    /// of the addresses specified in `target`.
333    // ^ see https://gitlab.torproject.org/tpo/core/arti/-/issues/2344
334    ///
335    /// If there is already a channel launch attempt in progress, this
336    /// function will wait until that launch is complete, and succeed
337    /// or fail depending on its outcome.
338    #[instrument(level = "trace", skip_all)]
339    pub async fn get_or_launch<T: ChanTarget + ?Sized>(
340        &self,
341        target: &T,
342        usage: ChannelUsage,
343    ) -> Result<(Arc<Channel>, ChanProvenance)> {
344        let targetinfo = OwnedChanTarget::from_chan_target(target);
345
346        let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
347        // Double-check the match to make sure that the RSA identity is
348        // what we wanted too.
349        chan.check_match(target)
350            .map_err(|e| Error::from_proto_no_skew(e, target))?;
351        Ok((chan, provenance))
352    }
353
354    /// Return a stream of [`ConnStatus`] events to tell us about changes
355    /// in our ability to connect to the internet.
356    ///
357    /// Note that this stream can be lossy: the caller will not necessarily
358    /// observe every event on the stream
359    pub fn bootstrap_events(&self) -> ConnStatusEvents {
360        self.bootstrap_status.clone()
361    }
362
363    /// Expire all channels that have been unused for too long.
364    ///
365    /// Return the duration from now until next channel expires.
366    pub fn expire_channels(&self) -> Duration {
367        self.mgr.expire_channels()
368    }
369
370    /// Notifies the chanmgr to be dormant like dormancy
371    pub fn set_dormancy(
372        &self,
373        dormancy: Dormancy,
374        netparams: Arc<dyn AsRef<NetParameters>>,
375    ) -> StdResult<(), tor_error::Bug> {
376        self.mgr.set_dormancy(dormancy, netparams)
377    }
378
379    /// Reconfigure all channels
380    pub fn reconfigure(
381        &self,
382        config: &ChannelConfig,
383        how: tor_config::Reconfigure,
384        netparams: Arc<dyn AsRef<NetParameters>>,
385    ) -> StdResult<(), ReconfigureError> {
386        if how == tor_config::Reconfigure::CheckAllOrNothing {
387            // Since `self.mgr.reconfigure` returns an error type of `Bug` and not
388            // `ReconfigureError` (see check below), the reconfigure should only fail due to bugs.
389            // This means we can return `Ok` here since there should never be an error with the
390            // provided `config` values.
391            return Ok(());
392        }
393
394        let r = self.mgr.reconfigure(config, netparams);
395
396        // Check that `self.mgr.reconfigure` returns an error type of `Bug` (see comment above).
397        let _: Option<&tor_error::Bug> = r.as_ref().err();
398
399        Ok(r?)
400    }
401
402    /// Replace the transport registry with one that may know about
403    /// more transports.
404    ///
405    /// Note that the [`ChannelFactory`](factory::ChannelFactory) instances returned by `ptmgr` are
406    /// required to time-out channels that take too long to build.  You'll get
407    /// this behavior by default if the factories implement [`ChannelFactory`](factory::ChannelFactory) using
408    /// [`transport::proxied::ExternalProxyPlugin`], which `tor-ptmgr` does.
409    #[cfg(feature = "pt-client")]
410    pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
411        self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
412    }
413
414    /// Replace the relay auth material used for building new channels.
415    ///
416    /// This rebuilds the internal channel builder with the provided `auth_material`, which includes a
417    /// new TLS cert and key. Existing channels are not affected, only newly created channels will
418    /// use the new keys.
419    #[cfg(feature = "relay")]
420    pub fn set_relay_auth_material(
421        &self,
422        auth_material: Arc<tor_proto::RelayChannelAuthMaterial>,
423    ) -> Result<()> {
424        let mut result = Ok(());
425        self.mgr.with_mut_builder(|f| {
426            match f
427                .default_factory()
428                .rebuild_with_auth_material(auth_material)
429            {
430                Ok(b) => f.replace_default_factory(Arc::new(b)),
431                Err(e) => result = Err(e),
432            }
433        });
434        result
435    }
436
437    /// This will be used to handle CREATE* requests on channels.
438    ///
439    /// This handler will only be used for new channels, not existing channels.
440    ///
441    /// This will *not* be updated in any way by the channel manager,
442    /// for example by a netdir update or when any keys change.
443    /// The caller must handle this.
444    /// The idea is that the channel manager shouldn't need to deal with circuit-specific stuff.
445    ///
446    /// It's expected to only ever call this once.
447    /// Ideally it would be an `Option` in the constructor,
448    /// but we don't want conditionally-compiled constructor arguments,
449    /// and the [`CreateRequestHandler`] requires a [`dyn ChannelProvider`]
450    /// which is typically this [`ChanMgr`] itself.
451    #[cfg(feature = "relay")]
452    pub fn set_create_request_handler(&self, handler: Arc<CreateRequestHandler>) -> Result<()> {
453        let mut result = Ok(());
454        self.mgr.with_mut_builder(|f| {
455            match f
456                .default_factory()
457                .rebuild_with_create_request_handler(handler)
458            {
459                Ok(b) => f.replace_default_factory(Arc::new(b)),
460                Err(e) => result = Err(e),
461            }
462        });
463        result
464    }
465
466    /// Try to create a new, unmanaged channel to `target`.
467    ///
468    /// Unlike [`get_or_launch`](ChanMgr::get_or_launch), this function always
469    /// creates a new channel, never retries transient failure, and does not
470    /// register this channel with the `ChanMgr`.
471    ///
472    /// Generally you should not use this function; `get_or_launch` is usually a
473    /// better choice.  This function is the right choice if, for whatever
474    /// reason, you need to manage the lifetime of the channel you create, and
475    /// make sure that no other code with access to this `ChanMgr` will be able
476    /// to use the channel.
477    #[cfg(feature = "experimental-api")]
478    #[instrument(level = "trace", skip_all)]
479    pub async fn build_unmanaged_channel(
480        &self,
481        target: impl tor_linkspec::IntoOwnedChanTarget,
482        memquota: ChannelAccount,
483    ) -> Result<Arc<Channel>> {
484        use factory::ChannelFactory as _;
485        let target = target.to_owned();
486
487        self.mgr
488            .channels
489            .builder()
490            .connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
491            .await
492    }
493
494    /// Watch for things that ought to change the configuration of all channels in the client
495    ///
496    /// Currently this handles enabling and disabling channel padding.
497    ///
498    /// This is a daemon task that runs indefinitely in the background,
499    /// and exits when we find that `chanmgr` is dropped.
500    #[instrument(level = "trace", skip_all)]
501    async fn continually_update_channels_config(
502        self_: Weak<Self>,
503        netdir: Arc<dyn NetDirProvider>,
504    ) {
505        use tor_netdir::DirEvent as DE;
506        let mut netdir_stream = netdir.events().fuse();
507        let netdir = {
508            let weak = Arc::downgrade(&netdir);
509            drop(netdir);
510            weak
511        };
512        let termination_reason: std::result::Result<Void, &str> = async move {
513            loop {
514                select_biased! {
515                    direvent = netdir_stream.next() => {
516                        let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
517                        if ! matches!(direvent, DE::NewConsensus) { continue };
518                        let self_ = self_.upgrade().ok_or("channel manager gone away")?;
519                        let netdir = netdir.upgrade().ok_or("netdir gone away")?;
520                        let netparams = netdir.params();
521                        self_.mgr.update_netparams(netparams).map_err(|e| {
522                            error_report!(e, "continually_update_channels_config: failed to process!");
523                            "error processing netdir"
524                        })?;
525                    }
526                }
527            }
528        }
529        .await;
530        debug!(
531            "continually_update_channels_config: shutting down: {}",
532            termination_reason.void_unwrap_err()
533        );
534    }
535
536    /// Periodically expire any channels that have been unused beyond
537    /// the maximum duration allowed.
538    ///
539    /// Exist when we find that `chanmgr` is dropped
540    ///
541    /// This is a daemon task that runs indefinitely in the background
542    #[instrument(level = "trace", skip_all)]
543    async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
544        while sched.next().await.is_some() {
545            let Some(cm) = Weak::upgrade(&chanmgr) else {
546                // channel manager is closed.
547                return;
548            };
549            let delay = cm.expire_channels();
550            // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
551            sched.fire_in(delay);
552        }
553    }
554}
555
556#[cfg(feature = "relay")]
557#[async_trait]
558impl<R: Runtime> ChannelProvider for ChanMgr<R> {
559    type BuildSpec = OwnedChanTarget;
560
561    fn get_or_launch(
562        self: Arc<Self>,
563        reactor_id: tor_proto::circuit::UniqId,
564        target: Self::BuildSpec,
565        tx: tor_proto::relay::channel_provider::OutboundChanSender,
566    ) -> tor_proto::Result<()> {
567        use tor_error::into_internal;
568
569        debug!("Get or launch channel to {target} for circuit reactor {reactor_id}");
570
571        let chanmgr = self.clone();
572        self.runtime
573            .spawn(async move {
574                let r = chanmgr
575                    .mgr
576                    .get_or_launch(target, ChannelUsage::UserTraffic)
577                    .await
578                    .map_err(|e| tor_proto::Error::ChanProto(e.to_string())); // Is it a ChanProto?
579                // Send back the channel.
580                tx.send(r.map(|(chan, _)| chan));
581            })
582            .map_err(into_internal!("Failed to launch channel provider task"))?;
583
584        Ok(())
585    }
586}