tor_chanmgr/lib.rs
1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
46
47pub mod builder;
48mod config;
49mod err;
50mod event;
51pub mod factory;
52mod mgr;
53#[cfg(test)]
54mod testing;
55pub mod transport;
56pub(crate) mod util;
57
58use futures::select_biased;
59use futures::task::SpawnExt;
60use futures::StreamExt;
61use std::result::Result as StdResult;
62use std::sync::{Arc, Weak};
63use std::time::Duration;
64use tor_config::ReconfigureError;
65use tor_error::error_report;
66use tor_linkspec::{ChanTarget, OwnedChanTarget};
67use tor_netdir::{params::NetParameters, NetDirProvider};
68use tor_proto::channel::Channel;
69#[cfg(feature = "experimental-api")]
70use tor_proto::memquota::ChannelAccount;
71use tor_proto::memquota::ToplevelAccount;
72use tracing::debug;
73use void::{ResultVoidErrExt, Void};
74
75pub use err::Error;
76
77pub use config::{ChannelConfig, ChannelConfigBuilder};
78
79use tor_rtcompat::Runtime;
80
81/// A Result as returned by this crate.
82pub type Result<T> = std::result::Result<T, Error>;
83
84use crate::factory::BootstrapReporter;
85pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
86use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
87
88/// An object that remembers a set of live channels, and launches new ones on
89/// request.
90///
91/// Use the [`ChanMgr::get_or_launch`] function to create a new [`Channel`], or
92/// get one if it exists. (For a slightly lower-level API that does no caching,
93/// see [`ChannelFactory`](factory::ChannelFactory) and its implementors. For a
94/// much lower-level API, see [`tor_proto::channel::ChannelBuilder`].)
95///
96/// Each channel is kept open as long as there is a reference to it, or
97/// something else (such as the relay or a network error) kills the channel.
98///
99/// After a `ChanMgr` launches a channel, it keeps a reference to it until that
100/// channel has been unused (that is, had no circuits attached to it) for a
101/// certain amount of time. (Currently this interval is chosen randomly from
102/// between 180-270 seconds, but this is an implementation detail that may change
103/// in the future.)
104pub struct ChanMgr<R: Runtime> {
105 /// Internal channel manager object that does the actual work.
106 ///
107 /// ## How this is built
108 ///
109 /// This internal manager is parameterized over an
110 /// [`mgr::AbstractChannelFactory`], which here is instantiated with a [`factory::CompoundFactory`].
111 /// The `CompoundFactory` itself holds:
112 /// * A `dyn` [`factory::AbstractPtMgr`] that can provide a `dyn`
113 /// [`factory::ChannelFactory`] for each supported pluggable transport.
114 /// This starts out as `None`, but can be replaced with [`ChanMgr::set_pt_mgr`].
115 /// The `TorClient` code currently sets this using `tor_ptmgr::PtMgr`.
116 /// `PtMgr` currently returns `ChannelFactory` implementations that are
117 /// built using [`transport::proxied::ExternalProxyPlugin`], which implements
118 /// [`transport::TransportImplHelper`], which in turn is wrapped into a
119 /// `ChanBuilder` to implement `ChannelFactory`.
120 /// * A generic [`factory::ChannelFactory`] that it uses for everything else
121 /// We instantiate this with a
122 /// [`builder::ChanBuilder`] using a [`transport::default::DefaultTransport`].
123 // This type is a bit long, but I think it's better to just state it here explicitly rather than
124 // hiding parts of it behind a type alias to make it look nicer.
125 mgr: mgr::AbstractChanMgr<
126 factory::CompoundFactory<builder::ChanBuilder<R, transport::DefaultTransport<R>>>,
127 >,
128
129 /// Stream of [`ConnStatus`] events.
130 bootstrap_status: event::ConnStatusEvents,
131
132 /// This currently isn't actually used, but we're keeping a PhantomData here
133 /// since probably we'll want it again, sooner or later.
134 runtime: std::marker::PhantomData<fn(R) -> R>,
135}
136
137/// Description of how we got a channel.
138#[non_exhaustive]
139#[derive(Debug, Copy, Clone, Eq, PartialEq)]
140pub enum ChanProvenance {
141 /// This channel was newly launched, or was in progress and finished while
142 /// we were waiting.
143 NewlyCreated,
144 /// This channel already existed when we asked for it.
145 Preexisting,
146}
147
148/// Dormancy state, as far as the channel manager is concerned
149///
150/// This is usually derived in higher layers from `arti_client::DormantMode`.
151#[non_exhaustive]
152#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
153pub enum Dormancy {
154 /// Not dormant
155 ///
156 /// Channels will operate normally.
157 #[default]
158 Active,
159 /// Totally dormant
160 ///
161 /// Channels will not perform any spontaneous activity (eg, netflow padding)
162 Dormant,
163}
164
165/// The usage that we have in mind when requesting a channel.
166///
167/// A channel may be used in multiple ways. Each time a channel is requested
168/// from `ChanMgr` a separate `ChannelUsage` is passed in to tell the `ChanMgr`
169/// how the channel will be used this time.
170///
171/// To be clear, the `ChannelUsage` is aspect of a _request_ for a channel, and
172/// is not an immutable property of the channel itself.
173///
174/// This type is obtained from a `tor_circmgr::usage::SupportedCircUsage` in
175/// `tor_circmgr::usage`, and it has roughly the same set of variants.
176#[derive(Clone, Debug, Copy, Eq, PartialEq)]
177#[non_exhaustive]
178pub enum ChannelUsage {
179 /// Requesting a channel to use for BEGINDIR-based non-anonymous directory
180 /// connections.
181 Dir,
182
183 /// Requesting a channel to transmit user traffic (including exit traffic)
184 /// over the network.
185 ///
186 /// This includes the case where we are constructing a circuit preemptively,
187 /// and _planning_ to use it for user traffic later on.
188 UserTraffic,
189
190 /// Requesting a channel that the caller does not plan to used at all, or
191 /// which it plans to use only for testing circuits.
192 UselessCircuit,
193}
194
195impl<R: Runtime> ChanMgr<R> {
196 /// Construct a new channel manager.
197 ///
198 /// A new `ChannelAccount` will be made from `memquota`, for each Channel.
199 ///
200 /// The `ChannelAccount` is used for data associated with this channel.
201 ///
202 /// This does *not* (currently) include downstream outbound data
203 /// (ie, data processed by the channel implementation here,
204 /// awaiting TLS processing and actual transmission).
205 /// In any case we try to keep those buffers small.
206 ///
207 /// The ChannelAccount *does* track upstream outbound data
208 /// (ie, data processed by a circuit, but not yet by the channel),
209 /// even though that data relates to a specific circuit.
210 /// TODO #1652 use `CircuitAccount` for circuit->channel queue.
211 ///
212 /// # Usage note
213 ///
214 /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
215 pub fn new(
216 runtime: R,
217 config: &ChannelConfig,
218 dormancy: Dormancy,
219 netparams: &NetParameters,
220 memquota: ToplevelAccount,
221 ) -> Self
222 where
223 R: 'static,
224 {
225 let (sender, receiver) = event::channel();
226 let sender = Arc::new(std::sync::Mutex::new(sender));
227 let reporter = BootstrapReporter(sender);
228 let transport = transport::DefaultTransport::new(runtime.clone());
229 let builder = builder::ChanBuilder::new(runtime, transport);
230 let factory = factory::CompoundFactory::new(
231 Arc::new(builder),
232 #[cfg(feature = "pt-client")]
233 None,
234 );
235 let mgr =
236 mgr::AbstractChanMgr::new(factory, config, dormancy, netparams, reporter, memquota);
237 ChanMgr {
238 mgr,
239 bootstrap_status: receiver,
240 runtime: std::marker::PhantomData,
241 }
242 }
243
244 /// Launch the periodic daemon tasks required by the manager to function properly.
245 ///
246 /// Returns a [`TaskHandle`] that can be used to manage
247 /// those daemon tasks that poll periodically.
248 pub fn launch_background_tasks(
249 self: &Arc<Self>,
250 runtime: &R,
251 netdir: Arc<dyn NetDirProvider>,
252 ) -> Result<Vec<TaskHandle>> {
253 runtime
254 .spawn(Self::continually_update_channels_config(
255 Arc::downgrade(self),
256 netdir,
257 ))
258 .map_err(|e| Error::from_spawn("channels config task", e))?;
259
260 let (sched, handle) = TaskSchedule::new(runtime.clone());
261 runtime
262 .spawn(Self::continually_expire_channels(
263 sched,
264 Arc::downgrade(self),
265 ))
266 .map_err(|e| Error::from_spawn("channel expiration task", e))?;
267 Ok(vec![handle])
268 }
269
270 /// Build a channel for an incoming stream.
271 ///
272 /// The channel may or may not be authenticated.
273 /// This method will wait until the channel is usable,
274 /// and may return an error if we already have an existing channel to this peer,
275 /// or if there are already too many open connections with this
276 /// peer or subnet (as a dos defence).
277 #[cfg(feature = "relay")]
278 pub async fn handle_incoming(
279 &self,
280 src: std::net::SocketAddr,
281 stream: <R as tor_rtcompat::NetStreamProvider>::Stream,
282 ) -> Result<Arc<Channel>> {
283 self.mgr.handle_incoming(src, stream).await
284 }
285
286 /// Try to get a suitable channel to the provided `target`,
287 /// launching one if one does not exist.
288 ///
289 /// If there is already a channel launch attempt in progress, this
290 /// function will wait until that launch is complete, and succeed
291 /// or fail depending on its outcome.
292 pub async fn get_or_launch<T: ChanTarget + ?Sized>(
293 &self,
294 target: &T,
295 usage: ChannelUsage,
296 ) -> Result<(Arc<Channel>, ChanProvenance)> {
297 let targetinfo = OwnedChanTarget::from_chan_target(target);
298
299 let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
300 // Double-check the match to make sure that the RSA identity is
301 // what we wanted too.
302 chan.check_match(target)
303 .map_err(|e| Error::from_proto_no_skew(e, target))?;
304 Ok((chan, provenance))
305 }
306
307 /// Return a stream of [`ConnStatus`] events to tell us about changes
308 /// in our ability to connect to the internet.
309 ///
310 /// Note that this stream can be lossy: the caller will not necessarily
311 /// observe every event on the stream
312 pub fn bootstrap_events(&self) -> ConnStatusEvents {
313 self.bootstrap_status.clone()
314 }
315
316 /// Expire all channels that have been unused for too long.
317 ///
318 /// Return the duration from now until next channel expires.
319 pub fn expire_channels(&self) -> Duration {
320 self.mgr.expire_channels()
321 }
322
323 /// Notifies the chanmgr to be dormant like dormancy
324 pub fn set_dormancy(
325 &self,
326 dormancy: Dormancy,
327 netparams: Arc<dyn AsRef<NetParameters>>,
328 ) -> StdResult<(), tor_error::Bug> {
329 self.mgr.set_dormancy(dormancy, netparams)
330 }
331
332 /// Reconfigure all channels
333 pub fn reconfigure(
334 &self,
335 config: &ChannelConfig,
336 how: tor_config::Reconfigure,
337 netparams: Arc<dyn AsRef<NetParameters>>,
338 ) -> StdResult<(), ReconfigureError> {
339 if how == tor_config::Reconfigure::CheckAllOrNothing {
340 // Since `self.mgr.reconfigure` returns an error type of `Bug` and not
341 // `ReconfigureError` (see check below), the reconfigure should only fail due to bugs.
342 // This means we can return `Ok` here since there should never be an error with the
343 // provided `config` values.
344 return Ok(());
345 }
346
347 let r = self.mgr.reconfigure(config, netparams);
348
349 // Check that `self.mgr.reconfigure` returns an error type of `Bug` (see comment above).
350 let _: Option<&tor_error::Bug> = r.as_ref().err();
351
352 Ok(r?)
353 }
354
355 /// Replace the transport registry with one that may know about
356 /// more transports.
357 ///
358 /// Note that the [`ChannelFactory`](factory::ChannelFactory) instances returned by `ptmgr` are
359 /// required to time-out channels that take too long to build. You'll get
360 /// this behavior by default if the factories implement [`ChannelFactory`](factory::ChannelFactory) using
361 /// [`transport::proxied::ExternalProxyPlugin`], which `tor-ptmgr` does.
362 #[cfg(feature = "pt-client")]
363 pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
364 self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
365 }
366
367 /// Try to create a new, unmanaged channel to `target`.
368 ///
369 /// Unlike [`get_or_launch`](ChanMgr::get_or_launch), this function always
370 /// creates a new channel, never retries transient failure, and does not
371 /// register this channel with the `ChanMgr`.
372 ///
373 /// Generally you should not use this function; `get_or_launch` is usually a
374 /// better choice. This function is the right choice if, for whatever
375 /// reason, you need to manage the lifetime of the channel you create, and
376 /// make sure that no other code with access to this `ChanMgr` will be able
377 /// to use the channel.
378 #[cfg(feature = "experimental-api")]
379 pub async fn build_unmanaged_channel(
380 &self,
381 target: impl tor_linkspec::IntoOwnedChanTarget,
382 memquota: ChannelAccount,
383 ) -> Result<Arc<Channel>> {
384 use factory::ChannelFactory as _;
385 let target = target.to_owned();
386
387 self.mgr
388 .channels
389 .builder()
390 .connect_via_transport(&target, self.mgr.reporter.clone(), memquota)
391 .await
392 }
393
394 /// Watch for things that ought to change the configuration of all channels in the client
395 ///
396 /// Currently this handles enabling and disabling channel padding.
397 ///
398 /// This is a daemon task that runs indefinitely in the background,
399 /// and exits when we find that `chanmgr` is dropped.
400 async fn continually_update_channels_config(
401 self_: Weak<Self>,
402 netdir: Arc<dyn NetDirProvider>,
403 ) {
404 use tor_netdir::DirEvent as DE;
405 let mut netdir_stream = netdir.events().fuse();
406 let netdir = {
407 let weak = Arc::downgrade(&netdir);
408 drop(netdir);
409 weak
410 };
411 let termination_reason: std::result::Result<Void, &str> = async move {
412 loop {
413 select_biased! {
414 direvent = netdir_stream.next() => {
415 let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
416 if ! matches!(direvent, DE::NewConsensus) { continue };
417 let self_ = self_.upgrade().ok_or("channel manager gone away")?;
418 let netdir = netdir.upgrade().ok_or("netdir gone away")?;
419 let netparams = netdir.params();
420 self_.mgr.update_netparams(netparams).map_err(|e| {
421 error_report!(e, "continually_update_channels_config: failed to process!");
422 "error processing netdir"
423 })?;
424 }
425 }
426 }
427 }
428 .await;
429 debug!(
430 "continually_update_channels_config: shutting down: {}",
431 termination_reason.void_unwrap_err()
432 );
433 }
434
435 /// Periodically expire any channels that have been unused beyond
436 /// the maximum duration allowed.
437 ///
438 /// Exist when we find that `chanmgr` is dropped
439 ///
440 /// This is a daemon task that runs indefinitely in the background
441 async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
442 while sched.next().await.is_some() {
443 let Some(cm) = Weak::upgrade(&chanmgr) else {
444 // channel manager is closed.
445 return;
446 };
447 let delay = cm.expire_channels();
448 // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
449 sched.fire_in(delay);
450 }
451 }
452}