tor_chanmgr/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_time_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45#![allow(clippy::collapsible_if)] // See arti#2342
46#![deny(clippy::unused_async)]
47//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
48
49pub mod builder;
50mod config;
51mod err;
52mod event;
53pub mod factory;
54mod mgr;
55#[cfg(test)]
56mod testing;
57pub mod transport;
58pub(crate) mod util;
59
60use futures::StreamExt;
61use futures::select_biased;
62use std::result::Result as StdResult;
63use std::sync::{Arc, Weak};
64use std::time::Duration;
65use tor_config::ReconfigureError;
66use tor_error::error_report;
67use tor_linkspec::{ChanTarget, OwnedChanTarget};
68use tor_netdir::{NetDirProvider, params::NetParameters};
69use tor_proto::channel::Channel;
70#[cfg(feature = "experimental-api")]
71use tor_proto::memquota::ChannelAccount;
72use tor_proto::memquota::ToplevelAccount;
73use tor_rtcompat::SpawnExt;
74use tracing::debug;
75use tracing::instrument;
76use void::{ResultVoidErrExt, Void};
77
78#[cfg(feature = "relay")]
79use {
80 async_trait::async_trait, safelog::Sensitive, tor_proto::relay::CreateRequestHandler,
81 tor_proto::relay::channel_provider::ChannelProvider,
82};
83
84pub use err::Error;
85
86pub use config::{ChannelConfig, ChannelConfigBuilder};
87pub use mgr::ChanMgrConfig;
88
89use tor_rtcompat::Runtime;
90
91/// A Result as returned by this crate.
92pub type Result<T> = std::result::Result<T, Error>;
93
94use crate::factory::BootstrapReporter;
95pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
96use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
97
98/// An object that remembers a set of live channels, and launches new ones on
99/// request.
100///
101/// Use the [`ChanMgr::get_or_launch`] function to create a new [`Channel`], or
102/// get one if it exists. (For a slightly lower-level API that does no caching,
103/// see [`ChannelFactory`](factory::ChannelFactory) and its implementors.
104///
105/// Each channel is kept open as long as there is a reference to it, or
106/// something else (such as the relay or a network error) kills the channel.
107///
108/// After a `ChanMgr` launches a channel, it keeps a reference to it until that
109/// channel has been unused (that is, had no circuits attached to it) for a
110/// certain amount of time. (Currently this interval is chosen randomly from
111/// between 180-270 seconds, but this is an implementation detail that may change
112/// in the future.)
113pub struct ChanMgr<R: Runtime> {
114 /// Internal channel manager object that does the actual work.
115 ///
116 /// ## How this is built
117 ///
118 /// This internal manager is parameterized over an
119 /// [`mgr::AbstractChannelFactory`], which here is instantiated with a [`factory::CompoundFactory`].
120 /// The `CompoundFactory` itself holds:
121 /// * A `dyn` [`factory::AbstractPtMgr`] that can provide a `dyn`
122 /// [`factory::ChannelFactory`] for each supported pluggable transport.
123 /// This starts out as `None`, but can be replaced with [`ChanMgr::set_pt_mgr`].
124 /// The `TorClient` code currently sets this using `tor_ptmgr::PtMgr`.
125 /// `PtMgr` currently returns `ChannelFactory` implementations that are
126 /// built using [`transport::proxied::ExternalProxyPlugin`], which implements
127 /// [`transport::TransportImplHelper`], which in turn is wrapped into a
128 /// `ChanBuilder` to implement `ChannelFactory`.
129 /// * A generic [`factory::ChannelFactory`] that it uses for everything else
130 /// We instantiate this with a
131 /// [`builder::ChanBuilder`] using a [`transport::default::DefaultTransport`].
132 // This type is a bit long, but I think it's better to just state it here explicitly rather than
133 // hiding parts of it behind a type alias to make it look nicer.
134 mgr: mgr::AbstractChanMgr<
135 factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
136 >,
137
138 /// Stream of [`ConnStatus`] events.
139 bootstrap_status: event::ConnStatusEvents,
140
141 /// The runtime. Needed to possibly spawn tasks.
142 #[allow(unused)] // Relay use this, not client yet. Keep it here instead of gating.
143 runtime: R,
144}
145
146/// Description of how we got a channel.
147#[non_exhaustive]
148#[derive(Debug, Copy, Clone, Eq, PartialEq)]
149pub enum ChanProvenance {
150 /// This channel was newly launched, or was in progress and finished while
151 /// we were waiting.
152 NewlyCreated,
153 /// This channel already existed when we asked for it.
154 Preexisting,
155}
156
157/// Dormancy state, as far as the channel manager is concerned
158///
159/// This is usually derived in higher layers from `arti_client::DormantMode`.
160#[non_exhaustive]
161#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
162pub enum Dormancy {
163 /// Not dormant
164 ///
165 /// Channels will operate normally.
166 #[default]
167 Active,
168 /// Totally dormant
169 ///
170 /// Channels will not perform any spontaneous activity (eg, netflow padding)
171 Dormant,
172}
173
174/// The usage that we have in mind when requesting a channel.
175///
176/// A channel may be used in multiple ways. Each time a channel is requested
177/// from `ChanMgr` a separate `ChannelUsage` is passed in to tell the `ChanMgr`
178/// how the channel will be used this time.
179///
180/// To be clear, the `ChannelUsage` is aspect of a _request_ for a channel, and
181/// is not an immutable property of the channel itself.
182///
183/// This type is obtained from a `tor_circmgr::usage::SupportedCircUsage` in
184/// `tor_circmgr::usage`, and it has roughly the same set of variants.
185#[derive(Clone, Debug, Copy, Eq, PartialEq)]
186#[non_exhaustive]
187pub enum ChannelUsage {
188 /// Requesting a channel to use for BEGINDIR-based non-anonymous directory
189 /// connections.
190 Dir,
191
192 /// Requesting a channel to transmit user traffic (including exit traffic)
193 /// over the network.
194 ///
195 /// This includes the case where we are constructing a circuit preemptively,
196 /// and _planning_ to use it for user traffic later on.
197 UserTraffic,
198
199 /// Requesting a channel that the caller does not plan to used at all, or
200 /// which it plans to use only for testing circuits.
201 UselessCircuit,
202}
203
204impl<R: Runtime> ChanMgr<R> {
205 /// Construct a new channel manager.
206 ///
207 /// A new `ChannelAccount` will be made from `memquota`, for each Channel.
208 ///
209 /// The `ChannelAccount` is used for data associated with this channel.
210 ///
211 /// This does *not* (currently) include downstream outbound data
212 /// (ie, data processed by the channel implementation here,
213 /// awaiting TLS processing and actual transmission).
214 /// In any case we try to keep those buffers small.
215 ///
216 /// The ChannelAccount *does* track upstream outbound data
217 /// (ie, data processed by a circuit, but not yet by the channel),
218 /// even though that data relates to a specific circuit.
219 /// TODO #1652 use `CircuitAccount` for circuit->channel queue.
220 ///
221 /// # Usage note
222 ///
223 /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
224 ///
225 /// The `keymgr` is only needed for a relay which is used for authenticating its channel to
226 /// other relays. Pass `None` for a client.
227 pub fn new(
228 runtime: R,
229 config: ChanMgrConfig,
230 dormancy: Dormancy,
231 netparams: &NetParameters,
232 memquota: ToplevelAccount,
233 ) -> Result<Self>
234 where
235 R: 'static,
236 {
237 let (sender, receiver) = event::channel();
238 let sender = Arc::new(std::sync::Mutex::new(sender));
239 let reporter = BootstrapReporter(sender);
240 let transport =
241 transport::DefaultTransport::new(runtime.clone(), config.cfg.outbound_proxy.clone());
242 cfg_if::cfg_if! {
243 if #[cfg(feature = "relay")] {
244 let builder = if let Some(auth_material) = &config.auth_material {
245 builder::ChanBuilder::new_relay(runtime.clone(), transport, auth_material.clone(), config.my_addrs, None)?
246 } else {
247 // Yes, clients can have the "relay" feature enabled (unit tests).
248 builder::ChanBuilder::new_client(runtime.clone(), transport)
249 };
250 } else {
251 let builder = builder::ChanBuilder::new_client(runtime.clone(), transport);
252 }
253 };
254
255 let factory = factory::CompoundFactory::new(
256 Arc::new(builder),
257 #[cfg(feature = "pt-client")]
258 None,
259 );
260
261 // Warn if outbound_proxy is configured to a non-loopback address
262 if let Some(ref proxy) = config.cfg.outbound_proxy {
263 if !proxy.is_loopback() {
264 tracing::warn!(
265 proxy_addr = %proxy,
266 "outbound_proxy is configured to a non-loopback address; \
267 this may expose Tor traffic to an untrusted intermediate"
268 );
269 }
270 }
271
272 let mgr =
273 mgr::AbstractChanMgr::new(factory, config.cfg, dormancy, netparams, reporter, memquota);
274
275 Ok(ChanMgr {
276 mgr,
277 bootstrap_status: receiver,
278 runtime,
279 })
280 }
281
282 /// Launch the periodic daemon tasks required by the manager to function properly.
283 ///
284 /// Returns a [`TaskHandle`] that can be used to manage
285 /// those daemon tasks that poll periodically.
286 #[instrument(level = "trace", skip_all)]
287 pub fn launch_background_tasks(
288 self: &Arc<Self>,
289 runtime: &R,
290 netdir: Arc<dyn NetDirProvider>,
291 ) -> Result<Vec<TaskHandle>> {
292 runtime
293 .spawn(Self::continually_update_channels_config(
294 Arc::downgrade(self),
295 netdir,
296 ))
297 .map_err(|e| Error::from_spawn("channels config task", e))?;
298
299 let (sched, handle) = TaskSchedule::new(runtime.clone());
300 runtime
301 .spawn(Self::continually_expire_channels(
302 sched,
303 Arc::downgrade(self),
304 ))
305 .map_err(|e| Error::from_spawn("channel expiration task", e))?;
306 Ok(vec![handle])
307 }
308
309 /// Build a channel for an incoming stream.
310 ///
311 /// The `my_addrs` are the IP address(es) that are advertised by the relay in the consensus. We
312 /// need to pass them so they can be sent in the NETINFO cell.
313 ///
314 /// The channel may or may not be authenticated. This method will wait until the channel is
315 /// usable, and may return an error if we already have an existing channel to this peer, or if
316 /// there are already too many open connections with this peer or subnet (as a dos defence).
317 #[cfg(feature = "relay")]
318 pub async fn handle_incoming(
319 &self,
320 src: Sensitive<std::net::SocketAddr>,
321 stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
322 ) -> Result<Arc<Channel>> {
323 self.mgr.handle_incoming(src, stream).await
324 }
325
326 /// Try to get a suitable channel to the provided `target`,
327 /// launching one if one does not exist.
328 ///
329 /// This function does not guarantee that the returned channel
330 /// satisfies all of the properties of `target`. For example if an
331 /// existing channel is returned, it might not be connected to any
332 /// of the addresses specified in `target`.
333 // ^ see https://gitlab.torproject.org/tpo/core/arti/-/issues/2344
334 ///
335 /// If there is already a channel launch attempt in progress, this
336 /// function will wait until that launch is complete, and succeed
337 /// or fail depending on its outcome.
338 #[instrument(level = "trace", skip_all)]
339 pub async fn get_or_launch<T: ChanTarget + ?Sized>(
340 &self,
341 target: &T,
342 usage: ChannelUsage,
343 ) -> Result<(Arc<Channel>, ChanProvenance)> {
344 let targetinfo = OwnedChanTarget::from_chan_target(target);
345
346 let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
347 // Double-check the match to make sure that the RSA identity is
348 // what we wanted too.
349 chan.check_match(target)
350 .map_err(|e| Error::from_proto_no_skew(e, target))?;
351 Ok((chan, provenance))
352 }
353
354 /// Return a stream of [`ConnStatus`] events to tell us about changes
355 /// in our ability to connect to the internet.
356 ///
357 /// Note that this stream can be lossy: the caller will not necessarily
358 /// observe every event on the stream
359 pub fn bootstrap_events(&self) -> ConnStatusEvents {
360 self.bootstrap_status.clone()
361 }
362
363 /// Expire all channels that have been unused for too long.
364 ///
365 /// Return the duration from now until next channel expires.
366 pub fn expire_channels(&self) -> Duration {
367 self.mgr.expire_channels()
368 }
369
370 /// Notifies the chanmgr to be dormant like dormancy
371 pub fn set_dormancy(
372 &self,
373 dormancy: Dormancy,
374 netparams: Arc<dyn AsRef<NetParameters>>,
375 ) -> StdResult<(), tor_error::Bug> {
376 self.mgr.set_dormancy(dormancy, netparams)
377 }
378
379 /// Reconfigure all channels
380 pub fn reconfigure(
381 &self,
382 config: &ChannelConfig,
383 how: tor_config::Reconfigure,
384 netparams: Arc<dyn AsRef<NetParameters>>,
385 ) -> StdResult<(), ReconfigureError> {
386 if how == tor_config::Reconfigure::CheckAllOrNothing {
387 // Since `self.mgr.reconfigure` returns an error type of `Bug` and not
388 // `ReconfigureError` (see check below), the reconfigure should only fail due to bugs.
389 // This means we can return `Ok` here since there should never be an error with the
390 // provided `config` values.
391 return Ok(());
392 }
393
394 let r = self.mgr.reconfigure(config, netparams);
395
396 // Check that `self.mgr.reconfigure` returns an error type of `Bug` (see comment above).
397 let _: Option<&tor_error::Bug> = r.as_ref().err();
398
399 Ok(r?)
400 }
401
402 /// Replace the transport registry with one that may know about
403 /// more transports.
404 ///
405 /// Note that the [`ChannelFactory`](factory::ChannelFactory) instances returned by `ptmgr` are
406 /// required to time-out channels that take too long to build. You'll get
407 /// this behavior by default if the factories implement [`ChannelFactory`](factory::ChannelFactory) using
408 /// [`transport::proxied::ExternalProxyPlugin`], which `tor-ptmgr` does.
409 #[cfg(feature = "pt-client")]
410 pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
411 self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
412 }
413
414 /// Replace the relay auth material used for building new channels.
415 ///
416 /// This rebuilds the internal channel builder with the provided `auth_material`, which includes a
417 /// new TLS cert and key. Existing channels are not affected, only newly created channels will
418 /// use the new keys.
419 #[cfg(feature = "relay")]
420 pub fn set_relay_auth_material(
421 &self,
422 auth_material: Arc<tor_proto::RelayChannelAuthMaterial>,
423 ) -> Result<()> {
424 let mut result = Ok(());
425 self.mgr.with_mut_builder(|f| {
426 match f
427 .default_factory()
428 .rebuild_with_auth_material(auth_material)
429 {
430 Ok(b) => f.replace_default_factory(Arc::new(b)),
431 Err(e) => result = Err(e),
432 }
433 });
434 result
435 }
436
437 /// This will be used to handle CREATE* requests on channels.
438 ///
439 /// This handler will only be used for new channels, not existing channels.
440 ///
441 /// This will *not* be updated in any way by the channel manager,
442 /// for example by a netdir update or when any keys change.
443 /// The caller must handle this.
444 /// The idea is that the channel manager shouldn't need to deal with circuit-specific stuff.
445 ///
446 /// It's expected to only ever call this once.
447 /// Ideally it would be an `Option` in the constructor,
448 /// but we don't want conditionally-compiled constructor arguments,
449 /// and the [`CreateRequestHandler`] requires a [`dyn ChannelProvider`]
450 /// which is typically this [`ChanMgr`] itself.
451 #[cfg(feature = "relay")]
452 pub fn set_create_request_handler(&self, handler: Arc<CreateRequestHandler>) -> Result<()> {
453 let mut result = Ok(());
454 self.mgr.with_mut_builder(|f| {
455 match f
456 .default_factory()
457 .rebuild_with_create_request_handler(handler)
458 {
459 Ok(b) => f.replace_default_factory(Arc::new(b)),
460 Err(e) => result = Err(e),
461 }
462 });
463 result
464 }
465
466 /// Try to create a new, unmanaged channel to `target`.
467 ///
468 /// Unlike [`get_or_launch`](ChanMgr::get_or_launch), this function always
469 /// creates a new channel, never retries transient failure, and does not
470 /// register this channel with the `ChanMgr`.
471 ///
472 /// Generally you should not use this function; `get_or_launch` is usually a
473 /// better choice. This function is the right choice if, for whatever
474 /// reason, you need to manage the lifetime of the channel you create, and
475 /// make sure that no other code with access to this `ChanMgr` will be able
476 /// to use the channel.
477 #[cfg(feature = "experimental-api")]
478 #[instrument(level = "trace", skip_all)]
479 pub async fn build_unmanaged_channel(
480 &self,
481 target: impl tor_linkspec::IntoOwnedChanTarget,
482 memquota: ChannelAccount,
483 ) -> Result<Arc<Channel>> {
484 use factory::ChannelFactory as _;
485 let target = target.to_owned();
486
487 self.mgr
488 .channels
489 .builder()
490 .connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
491 .await
492 }
493
494 /// Watch for things that ought to change the configuration of all channels in the client
495 ///
496 /// Currently this handles enabling and disabling channel padding.
497 ///
498 /// This is a daemon task that runs indefinitely in the background,
499 /// and exits when we find that `chanmgr` is dropped.
500 #[instrument(level = "trace", skip_all)]
501 async fn continually_update_channels_config(
502 self_: Weak<Self>,
503 netdir: Arc<dyn NetDirProvider>,
504 ) {
505 use tor_netdir::DirEvent as DE;
506 let mut netdir_stream = netdir.events().fuse();
507 let netdir = {
508 let weak = Arc::downgrade(&netdir);
509 drop(netdir);
510 weak
511 };
512 let termination_reason: std::result::Result<Void, &str> = async move {
513 loop {
514 select_biased! {
515 direvent = netdir_stream.next() => {
516 let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
517 if ! matches!(direvent, DE::NewConsensus) { continue };
518 let self_ = self_.upgrade().ok_or("channel manager gone away")?;
519 let netdir = netdir.upgrade().ok_or("netdir gone away")?;
520 let netparams = netdir.params();
521 self_.mgr.update_netparams(netparams).map_err(|e| {
522 error_report!(e, "continually_update_channels_config: failed to process!");
523 "error processing netdir"
524 })?;
525 }
526 }
527 }
528 }
529 .await;
530 debug!(
531 "continually_update_channels_config: shutting down: {}",
532 termination_reason.void_unwrap_err()
533 );
534 }
535
536 /// Periodically expire any channels that have been unused beyond
537 /// the maximum duration allowed.
538 ///
539 /// Exist when we find that `chanmgr` is dropped
540 ///
541 /// This is a daemon task that runs indefinitely in the background
542 #[instrument(level = "trace", skip_all)]
543 async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
544 while sched.next().await.is_some() {
545 let Some(cm) = Weak::upgrade(&chanmgr) else {
546 // channel manager is closed.
547 return;
548 };
549 let delay = cm.expire_channels();
550 // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
551 sched.fire_in(delay);
552 }
553 }
554}
555
556#[cfg(feature = "relay")]
557#[async_trait]
558impl<R: Runtime> ChannelProvider for ChanMgr<R> {
559 type BuildSpec = OwnedChanTarget;
560
561 fn get_or_launch(
562 self: Arc<Self>,
563 reactor_id: tor_proto::circuit::UniqId,
564 target: Self::BuildSpec,
565 tx: tor_proto::relay::channel_provider::OutboundChanSender,
566 ) -> tor_proto::Result<()> {
567 use tor_error::into_internal;
568
569 debug!("Get or launch channel to {target} for circuit reactor {reactor_id}");
570
571 let chanmgr = self.clone();
572 self.runtime
573 .spawn(async move {
574 let r = chanmgr
575 .mgr
576 .get_or_launch(target, ChannelUsage::UserTraffic)
577 .await
578 .map_err(|e| tor_proto::Error::ChanProto(e.to_string())); // Is it a ChanProto?
579 // Send back the channel.
580 tx.send(r.map(|(chan, _)| chan));
581 })
582 .map_err(into_internal!("Failed to launch channel provider task"))?;
583
584 Ok(())
585 }
586}