tor_chanmgr/mgr/
state.rs

1//! Simple implementation for the internal map state of a ChanMgr.
2
3use std::time::Duration;
4
5use super::AbstractChannelFactory;
6use super::{select, AbstractChannel, Pending, Sending};
7use crate::{ChannelConfig, Dormancy, Error, Result};
8
9use futures::FutureExt;
10use std::result::Result as StdResult;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use tor_async_utils::oneshot;
14use tor_basic_utils::RngExt as _;
15use tor_cell::chancell::msg::PaddingNegotiate;
16use tor_config::PaddingLevel;
17use tor_error::{error_report, internal, into_internal};
18use tor_linkspec::{HasRelayIds, ListByRelayIds, RelayIds};
19use tor_netdir::{params::NetParameters, params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND};
20use tor_proto::channel::kist::{KistMode, KistParams};
21use tor_proto::channel::padding::Parameters as PaddingParameters;
22use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
23use tor_proto::channel::ChannelPaddingInstructionsUpdates;
24use tor_proto::ChannelPaddingInstructions;
25use tor_units::{BoundedInt32, IntegerMilliseconds};
26use tracing::info;
27use void::{ResultVoidExt as _, Void};
28
29#[cfg(test)]
30mod padding_test;
31
32/// All mutable state held by an `AbstractChannelMgr`.
33///
34/// One reason that this is an isolated type is that we want to
35/// to limit the amount of code that can see and
36/// lock the Mutex here.  (We're using a blocking mutex close to async
37/// code, so we need to be careful.)
38pub(crate) struct MgrState<C: AbstractChannelFactory> {
39    /// The data, within a lock
40    ///
41    /// (Danger: this uses a blocking mutex close to async code.  This mutex
42    /// must never be held while an await is happening.)
43    inner: std::sync::Mutex<Inner<C>>,
44}
45
46/// Parameters for channels that we create, and that all existing channels are using
47struct ChannelParams {
48    /// Channel padding instructions
49    padding: ChannelPaddingInstructions,
50
51    /// KIST parameters
52    kist: KistParams,
53}
54
55/// A map from channel id to channel state, plus necessary auxiliary state - inside lock
56struct Inner<C: AbstractChannelFactory> {
57    /// The channel factory type that we store.
58    ///
59    /// In this module we never use this _as_ an AbstractChannelFactory: we just
60    /// hand out clones of it when asked.
61    builder: C,
62
63    /// A map from identity to channels, or to pending channel statuses.
64    channels: ListByRelayIds<ChannelState<C::Channel>>,
65
66    /// Parameters for channels that we create, and that all existing channels are using
67    ///
68    /// Will be updated by a background task, which also notifies all existing
69    /// `Open` channels via `channels`.
70    ///
71    /// (Must be protected by the same lock as `channels`, or a channel might be
72    /// created using being-replaced parameters, but not get an update.)
73    channels_params: ChannelParams,
74
75    /// The configuration (from the config file or API caller)
76    config: ChannelConfig,
77
78    /// Dormancy
79    ///
80    /// The last dormancy information we have been told about and passed on to our channels.
81    /// Updated via `MgrState::set_dormancy` and hence `MgrState::reconfigure_general`,
82    /// which then uses it to calculate how to reconfigure the channels.
83    dormancy: Dormancy,
84}
85
86/// The state of a channel (or channel build attempt) within a map.
87///
88/// A ChannelState can be Open (representing a fully negotiated channel) or
89/// Building (representing a pending attempt to build a channel). Both states
90/// have a set of RelayIds, but these RelayIds represent slightly different
91/// things:
92///  * On a Building channel, the set of RelayIds is all the identities that we
93///    require the peer to have. (The peer may turn out to have _more_
94///    identities than this.)
95///  * On an Open channel, the set of RelayIds is all the identities that
96///    we were able to successfully authenticate for the peer.
97pub(crate) enum ChannelState<C> {
98    /// An open channel.
99    ///
100    /// This channel might not be usable: it might be closing or
101    /// broken.  We need to check its is_usable() method before
102    /// yielding it to the user.
103    Open(OpenEntry<C>),
104    /// A channel that's getting built.
105    Building(PendingEntry),
106}
107
108/// An open channel entry.
109#[derive(Clone)]
110pub(crate) struct OpenEntry<C> {
111    /// The underlying open channel.
112    pub(crate) channel: Arc<C>,
113    /// The maximum unused duration allowed for this channel.
114    pub(crate) max_unused_duration: Duration,
115}
116
117/// A unique ID for a pending ([`PendingEntry`]) channel.
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
119pub(crate) struct UniqPendingChanId(u64);
120
121impl UniqPendingChanId {
122    /// Construct a new `UniqPendingChanId`.
123    pub(crate) fn new() -> Self {
124        /// The next unique ID.
125        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
126        // Relaxed ordering is fine; we don't care about how this
127        // is instantiated with respect to other channels.
128        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
129        assert!(id != u64::MAX, "Exhausted the pending channel ID namespace");
130        Self(id)
131    }
132}
133
134impl std::fmt::Display for UniqPendingChanId {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        write!(f, "PendingChan {}", self.0)
137    }
138}
139
140/// An entry for a not-yet-build channel
141#[derive(Clone)]
142pub(crate) struct PendingEntry {
143    /// The keys of the relay to which we're trying to open a channel.
144    pub(crate) ids: RelayIds,
145
146    /// A future we can clone and listen on to learn when this channel attempt
147    /// is successful or failed.
148    ///
149    /// This entry will be removed from the map (and possibly replaced with an
150    /// `OpenEntry`) _before_ this future becomes ready.
151    pub(crate) pending: Pending,
152
153    /// A unique ID that allows us to find this exact pending entry later.
154    pub(crate) unique_id: UniqPendingChanId,
155}
156
157impl<C> HasRelayIds for ChannelState<C>
158where
159    C: HasRelayIds,
160{
161    fn identity(
162        &self,
163        key_type: tor_linkspec::RelayIdType,
164    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
165        match self {
166            ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
167            ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
168        }
169    }
170}
171
172impl<C: Clone> ChannelState<C> {
173    /// For testing: either give the Open channel inside this state,
174    /// or panic if there is none.
175    #[cfg(test)]
176    fn unwrap_open(&self) -> &C {
177        match self {
178            ChannelState::Open(ent) => &ent.channel,
179            _ => panic!("Not an open channel"),
180        }
181    }
182}
183
184/// Type of the `nf_ito_*` netdir parameters, convenience alias
185type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
186
187/// Extract from a `NetParameters` which we need, conveniently organized for our processing
188///
189/// This type serves two functions at once:
190///
191///  1. Being a subset of the parameters, we can copy it out of
192///     the netdir, before we do more complex processing - and, in particular,
193///     before we obtain the lock on `inner` (which we need to actually handle the update,
194///     because we need to combine information from the config with that from the netdir).
195///
196///  2. Rather than four separate named fields for the padding options,
197///     it has arrays, so that it is easy to
198///     select the values without error-prone recapitulation of field names.
199#[derive(Debug, Clone)]
200struct NetParamsExtract {
201    /// `nf_ito_*`, the padding timeout parameters from the netdir consensus
202    ///
203    /// `nf_ito[ 0=normal, 1=reduced ][ 0=low, 1=high ]`
204    /// are `nf_ito_{low,high}{,_reduced` from `NetParameters`.
205    // TODO we could use some enum or IndexVec or something to make this less `0` and `1`
206    nf_ito: [[NfIto; 2]; 2],
207
208    /// The KIST parameters.
209    kist: KistParams,
210}
211
212impl From<&NetParameters> for NetParamsExtract {
213    fn from(p: &NetParameters) -> Self {
214        let kist_enabled = kist_mode_from_net_parameter(p.kist_enabled);
215        // NOTE: in theory, this cast shouldn't be needed
216        // (kist_tcp_notsent_lowat is supposed to be a u32, not an i32).
217        // In practice, however, the type conversion is needed
218        // because consensus params are i32s.
219        //
220        // See the `NetParamaters::kist_tcp_notsent_lowat docs for more details.
221        let tcp_notsent_lowat = u32::from(p.kist_tcp_notsent_lowat);
222        let kist = KistParams::new(kist_enabled, tcp_notsent_lowat);
223
224        NetParamsExtract {
225            nf_ito: [
226                [p.nf_ito_low, p.nf_ito_high],
227                [p.nf_ito_low_reduced, p.nf_ito_high_reduced],
228            ],
229            kist,
230        }
231    }
232}
233
234/// Build a `KistMode` from [`NetParameters`].
235///
236/// Used for converting [`kist_enabled`](NetParameters::kist_enabled)
237/// to a corresponding `KistMode`.
238fn kist_mode_from_net_parameter(val: BoundedInt32<0, 1>) -> KistMode {
239    caret::caret_int! {
240        /// KIST flavor, defined by a numerical value read from the consensus.
241        struct KistType(i32) {
242            /// KIST disabled
243            DISABLED = 0,
244            /// KIST using TCP_NOTSENT_LOWAT.
245            TCP_NOTSENT_LOWAT = 1,
246        }
247    }
248
249    match val.get().into() {
250        KistType::DISABLED => KistMode::Disabled,
251        KistType::TCP_NOTSENT_LOWAT => KistMode::TcpNotSentLowat,
252        _ => unreachable!("BoundedInt32 was not bounded?!"),
253    }
254}
255
256impl NetParamsExtract {
257    /// Return the padding timer parameter low end, for reduced-ness `reduced`, as a `u32`
258    fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
259        self.pad_get(reduced, 0)
260    }
261    /// Return the padding timer parameter high end, for reduced-ness `reduced`, as a `u32`
262    fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
263        self.pad_get(reduced, 1)
264    }
265
266    /// Return and converts one padding parameter timer
267    ///
268    /// Internal function.
269    fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
270        self.nf_ito[usize::from(reduced)][low_or_high]
271            .try_map(|v| Ok::<_, Void>(v.into()))
272            .void_unwrap()
273    }
274}
275
276impl<C: AbstractChannel> ChannelState<C> {
277    /// Return true if a channel is ready to expire.
278    /// Update `expire_after` if a smaller duration than
279    /// the given value is required to expire this channel.
280    fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
281        let ChannelState::Open(ent) = self else {
282            return false;
283        };
284        let Some(unused_duration) = ent.channel.duration_unused() else {
285            // still in use
286            return false;
287        };
288        let max_unused_duration = ent.max_unused_duration;
289        let Some(remaining) = max_unused_duration.checked_sub(unused_duration) else {
290            // no time remaining; drop now.
291            return true;
292        };
293        if remaining.is_zero() {
294            // Ignoring this edge case would result in a fairly benign race
295            // condition outside of Shadow, but deadlock in Shadow.
296            return true;
297        }
298        *expire_after = std::cmp::min(*expire_after, remaining);
299        false
300    }
301}
302
303impl<C: AbstractChannelFactory> MgrState<C> {
304    /// Create a new empty `MgrState`.
305    pub(crate) fn new(
306        builder: C,
307        config: ChannelConfig,
308        dormancy: Dormancy,
309        netparams: &NetParameters,
310    ) -> Self {
311        let mut padding_params = ChannelPaddingInstructions::default();
312        let netparams = NetParamsExtract::from(netparams);
313        let kist_params = netparams.kist;
314        let update = parameterize(&mut padding_params, &config, dormancy, &netparams)
315            .unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
316        let _: Option<_> = update; // there are no channels yet, that would need to be told
317
318        let channels_params = ChannelParams {
319            padding: padding_params,
320            kist: kist_params,
321        };
322
323        MgrState {
324            inner: std::sync::Mutex::new(Inner {
325                builder,
326                channels: ListByRelayIds::new(),
327                config,
328                channels_params,
329                dormancy,
330            }),
331        }
332    }
333
334    /// Run a function on the [`ListByRelayIds`] that implements the map in this `MgrState`.
335    ///
336    /// This function grabs a mutex: do not provide a slow function.
337    ///
338    /// We provide this function rather than exposing the channels set directly,
339    /// to make sure that the calling code doesn't await while holding the lock.
340    ///
341    /// This is only `cfg(test)` since it can deadlock.
342    ///
343    /// # Deadlock
344    ///
345    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
346    #[cfg(test)]
347    pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
348    where
349        F: FnOnce(&mut ListByRelayIds<ChannelState<C::Channel>>) -> T,
350    {
351        let mut inner = self.inner.lock()?;
352        Ok(func(&mut inner.channels))
353    }
354
355    /// Return a copy of the builder stored in this state.
356    pub(crate) fn builder(&self) -> C
357    where
358        C: Clone,
359    {
360        let inner = self.inner.lock().expect("lock poisoned");
361        inner.builder.clone()
362    }
363
364    /// Run a function to modify the builder stored in this state.
365    ///
366    /// # Deadlock
367    ///
368    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
369    #[allow(dead_code)]
370    pub(crate) fn with_mut_builder<F>(&self, func: F)
371    where
372        F: FnOnce(&mut C),
373    {
374        let mut inner = self.inner.lock().expect("lock poisoned");
375        func(&mut inner.builder);
376    }
377
378    /// Remove every unusable state from the map in this state.
379    #[cfg(test)]
380    pub(crate) fn remove_unusable(&self) -> Result<()> {
381        let mut inner = self.inner.lock()?;
382        inner.channels.retain(|state| match state {
383            ChannelState::Open(ent) => ent.channel.is_usable(),
384            ChannelState::Building(_) => true,
385        });
386        Ok(())
387    }
388
389    /// Request an open or pending channel to `target`. If `add_new_entry_if_not_found` is true and
390    /// an open or pending channel isn't found, a new pending entry will be added and
391    /// [`ChannelForTarget::NewEntry`] will be returned. This is all done as part of the same method
392    /// so that all operations are performed under the same lock acquisition.
393    pub(crate) fn request_channel(
394        &self,
395        target: &C::BuildSpec,
396        add_new_entry_if_not_found: bool,
397    ) -> Result<Option<ChannelForTarget<C>>> {
398        use ChannelState::*;
399
400        let mut inner = self.inner.lock()?;
401
402        // The idea here is to choose the channel in two steps:
403        //
404        // - Eligibility: Get channels from the channel map and filter them down to only channels
405        //   which are eligible to be returned.
406        // - Ranking: From the eligible channels, choose the best channel.
407        //
408        // Another way to choose the channel could be something like: first try all canonical open
409        // channels, then all non-canonical open channels, then all pending channels with all
410        // matching relay ids, then remaining pending channels, etc. But this ends up being hard to
411        // follow and inflexible (what if you want to prioritize pending channels over non-canonical
412        // open channels?).
413
414        // Open channels which are allowed for requests to `target`.
415        let open_channels = inner
416            .channels
417            // channels with all target relay identifiers
418            .by_all_ids(target)
419            .filter(|entry| match entry {
420                Open(x) => select::open_channel_is_allowed(x, target),
421                Building(_) => false,
422            });
423
424        // Pending channels which will *probably* be allowed for requests to `target` once they
425        // complete.
426        let pending_channels = inner
427            .channels
428            // channels that have a subset of the relay ids of `target`
429            .all_subset(target)
430            .into_iter()
431            .filter(|entry| match entry {
432                Open(_) => false,
433                Building(x) => select::pending_channel_maybe_allowed(x, target),
434            });
435
436        match select::choose_best_channel(open_channels.chain(pending_channels), target) {
437            Some(Open(OpenEntry { channel, .. })) => {
438                // This entry is a perfect match for the target keys: we'll return the open
439                // entry.
440                return Ok(Some(ChannelForTarget::Open(Arc::clone(channel))));
441            }
442            Some(Building(PendingEntry { pending, .. })) => {
443                // This entry is potentially a match for the target identities: we'll return the
444                // pending entry. (We don't know for sure if it will match once it completes,
445                // since we might discover additional keys beyond those listed for this pending
446                // entry.)
447                return Ok(Some(ChannelForTarget::Pending(pending.clone())));
448            }
449            None => {}
450        }
451
452        // It's possible we know ahead of time that building a channel would be unsuccessful.
453        if inner
454            .channels
455            // channels with at least one id in common with `target`
456            .all_overlapping(target)
457            .into_iter()
458            // but not channels which completely satisfy the id requirements of `target`
459            .filter(|entry| !entry.has_all_relay_ids_from(target))
460            .any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
461        {
462            // At least one *open, usable* channel has been negotiated that overlaps only
463            // partially with our target: it has proven itself to have _one_ of our target
464            // identities, but not all.
465            //
466            // Because this channel exists, we know that our target cannot succeed, since relays
467            // are not allowed to share _any_ identities.
468            //return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
469            return Err(Error::IdentityConflict);
470        }
471
472        if !add_new_entry_if_not_found {
473            return Ok(None);
474        }
475
476        // Great, nothing interfered at all.
477        let any_relay_id = target
478            .identities()
479            .next()
480            .ok_or(internal!("relay target had no id"))?
481            .to_owned();
482        let (new_state, send, unique_id) = setup_launch(RelayIds::from_relay_ids(target));
483        inner
484            .channels
485            .try_insert(ChannelState::Building(new_state))?;
486        let handle = PendingChannelHandle::new(any_relay_id, unique_id);
487        Ok(Some(ChannelForTarget::NewEntry((handle, send))))
488    }
489
490    /// Remove the pending channel identified by its `handle`.
491    pub(crate) fn remove_pending_channel(&self, handle: PendingChannelHandle) -> Result<()> {
492        let mut inner = self.inner.lock()?;
493        remove_pending(&mut inner.channels, handle);
494        Ok(())
495    }
496
497    /// Upgrade the pending channel identified by its `handle` by replacing it with a new open
498    /// `channel`.
499    pub(crate) fn upgrade_pending_channel_to_open(
500        &self,
501        handle: PendingChannelHandle,
502        channel: Arc<C::Channel>,
503    ) -> Result<()> {
504        // Do all operations under the same lock acquisition.
505        let mut inner = self.inner.lock()?;
506
507        remove_pending(&mut inner.channels, handle);
508
509        // This isn't great.  We context switch to the newly-created
510        // channel just to tell it how and whether to do padding.  Ideally
511        // we would pass the params at some suitable point during
512        // building.  However, that would involve the channel taking a
513        // copy of the params, and that must happen in the same channel
514        // manager lock acquisition span as the one where we insert the
515        // channel into the table so it will receive updates.  I.e.,
516        // here.
517        let update = inner.channels_params.padding.initial_update();
518        if let Some(update) = update {
519            channel
520                .reparameterize(update.into())
521                .map_err(|_| internal!("failure on new channel"))?;
522        }
523        let new_entry = ChannelState::Open(OpenEntry {
524            channel,
525            max_unused_duration: Duration::from_secs(
526                rand::rng()
527                    .gen_range_checked(180..270)
528                    .expect("not 180 < 270 !"),
529            ),
530        });
531        inner.channels.insert(new_entry);
532
533        Ok(())
534    }
535
536    /// Reconfigure all channels as necessary
537    ///
538    /// (By reparameterizing channels as needed)
539    /// This function will handle
540    ///   - netdir update
541    ///   - a reconfiguration
542    ///   - dormancy
543    ///
544    /// For `new_config` and `new_dormancy`, `None` means "no change to previous info".
545    pub(super) fn reconfigure_general(
546        &self,
547        new_config: Option<&ChannelConfig>,
548        new_dormancy: Option<Dormancy>,
549        netparams: Arc<dyn AsRef<NetParameters>>,
550    ) -> StdResult<(), tor_error::Bug> {
551        use ChannelState as CS;
552
553        // TODO when we support operation as a relay, inter-relay channels ought
554        // not to get padding.
555        let netdir = {
556            let extract = NetParamsExtract::from((*netparams).as_ref());
557            drop(netparams);
558            extract
559        };
560
561        let mut inner = self
562            .inner
563            .lock()
564            .map_err(|_| internal!("poisoned channel manager"))?;
565        let inner = &mut *inner;
566
567        if let Some(new_config) = new_config {
568            inner.config = new_config.clone();
569        }
570        if let Some(new_dormancy) = new_dormancy {
571            inner.dormancy = new_dormancy;
572        }
573
574        let update = parameterize(
575            &mut inner.channels_params.padding,
576            &inner.config,
577            inner.dormancy,
578            &netdir,
579        )?;
580
581        let update = update.map(Arc::new);
582
583        let new_kist_params = netdir.kist;
584        let kist_params = if new_kist_params != inner.channels_params.kist {
585            // The KIST params have changed: remember their value,
586            // and reparameterize_kist()
587            inner.channels_params.kist = new_kist_params;
588            Some(new_kist_params)
589        } else {
590            // If the new KIST params are identical to the previous ones,
591            // we don't need to call reparameterize_kist()
592            None
593        };
594
595        if update.is_none() && kist_params.is_none() {
596            // Return early, nothing to reconfigure
597            return Ok(());
598        }
599
600        for channel in inner.channels.values() {
601            let channel = match channel {
602                CS::Open(OpenEntry { channel, .. }) => channel,
603                CS::Building(_) => continue,
604            };
605
606            if let Some(ref update) = update {
607                // Ignore error (which simply means the channel is closed or gone)
608                let _ = channel.reparameterize(Arc::clone(update));
609            }
610
611            if let Some(kist) = kist_params {
612                // Ignore error (which simply means the channel is closed or gone)
613                let _ = channel.reparameterize_kist(kist);
614            }
615        }
616        Ok(())
617    }
618
619    /// Expire all channels that have been unused for too long.
620    ///
621    /// Return a Duration until the next time at which
622    /// a channel _could_ expire.
623    pub(crate) fn expire_channels(&self) -> Duration {
624        let mut ret = Duration::from_secs(180);
625        self.inner
626            .lock()
627            .expect("Poisoned lock")
628            .channels
629            .retain(|chan| !chan.ready_to_expire(&mut ret));
630        ret
631    }
632}
633
634/// A channel for a given target relay.
635pub(crate) enum ChannelForTarget<CF: AbstractChannelFactory> {
636    /// A channel that is open.
637    Open(Arc<CF::Channel>),
638    /// A channel that is building.
639    Pending(Pending),
640    /// Information about a new pending channel entry.
641    NewEntry((PendingChannelHandle, Sending)),
642}
643
644/// A handle for a pending channel.
645///
646/// WARNING: This handle should never be dropped, and should always be passed back into
647/// [`MgrState::remove_pending_channel`] or [`MgrState::upgrade_pending_channel_to_open`], otherwise
648/// the pending channel may be left in the channel map forever.
649///
650/// This handle must only be used with the `MgrState` from which it was given.
651pub(crate) struct PendingChannelHandle {
652    /// Any relay ID for this pending channel.
653    relay_id: tor_linkspec::RelayId,
654    /// The unique ID for this pending channel.
655    unique_id: UniqPendingChanId,
656    /// The pending channel has been removed from the channel map.
657    chan_has_been_removed: bool,
658}
659
660impl PendingChannelHandle {
661    /// Create a new [`PendingChannelHandle`].
662    fn new(relay_id: tor_linkspec::RelayId, unique_id: UniqPendingChanId) -> Self {
663        Self {
664            relay_id,
665            unique_id,
666            chan_has_been_removed: false,
667        }
668    }
669
670    /// This should be called when the pending channel has been removed from the pending channel
671    /// map. Not calling this will result in an error log message (and panic in debug builds) when
672    /// this handle is dropped.
673    fn chan_has_been_removed(mut self) {
674        self.chan_has_been_removed = true;
675    }
676}
677
678impl std::ops::Drop for PendingChannelHandle {
679    fn drop(&mut self) {
680        if !self.chan_has_been_removed {
681            #[allow(clippy::missing_docs_in_private_items)]
682            const MSG: &str = "Dropped the 'PendingChannelHandle' without removing the channel";
683            error_report!(
684                internal!("{MSG}"),
685                "'PendingChannelHandle' dropped unexpectedly",
686            );
687        }
688    }
689}
690
691/// Helper: return the objects used to inform pending tasks about a newly open or failed channel.
692fn setup_launch(ids: RelayIds) -> (PendingEntry, Sending, UniqPendingChanId) {
693    let (snd, rcv) = oneshot::channel();
694    let pending = rcv.shared();
695    let unique_id = UniqPendingChanId::new();
696    let entry = PendingEntry {
697        ids,
698        pending,
699        unique_id,
700    };
701
702    (entry, snd, unique_id)
703}
704
705/// Helper: remove the pending channel identified by `handle` from `channel_map`.
706fn remove_pending<C: AbstractChannel>(
707    channel_map: &mut tor_linkspec::ListByRelayIds<ChannelState<C>>,
708    handle: PendingChannelHandle,
709) {
710    // we need only one relay id to locate it, even if it has multiple relay ids
711    let removed = channel_map.remove_by_id(&handle.relay_id, |c| {
712        let ChannelState::Building(c) = c else {
713            return false;
714        };
715        c.unique_id == handle.unique_id
716    });
717    debug_assert_eq!(removed.len(), 1, "expected to remove exactly one channel");
718
719    handle.chan_has_been_removed();
720}
721
722/// Converts config, dormancy, and netdir, into parameter updates
723///
724/// Calculates new parameters, updating `channels_params` as appropriate.
725/// If anything changed, the corresponding update instruction is returned.
726///
727/// `channels_params` is updated with the new parameters,
728/// and the update message, if one is needed, is returned.
729///
730/// This is called in two places:
731///
732///  1. During chanmgr creation, it is called once to analyze the initial state
733///     and construct a corresponding ChannelPaddingInstructions.
734///
735///  2. During reconfiguration.
736fn parameterize(
737    channels_params: &mut ChannelPaddingInstructions,
738    config: &ChannelConfig,
739    dormancy: Dormancy,
740    netdir: &NetParamsExtract,
741) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
742    // Everything in this calculation applies to *all* channels, disregarding
743    // channel usage.  Usage is handled downstream, in the channel frontend.
744    // See the module doc in `crates/tor-proto/src/channel/padding.rs`.
745
746    let padding_of_level = |level| padding_parameters(level, netdir);
747    let send_padding = padding_of_level(config.padding)?;
748    let padding_default = padding_of_level(PaddingLevel::default())?;
749
750    let send_padding = match dormancy {
751        Dormancy::Active => send_padding,
752        Dormancy::Dormant => None,
753    };
754
755    let recv_padding = match config.padding {
756        PaddingLevel::Reduced => None,
757        PaddingLevel::Normal => send_padding,
758        PaddingLevel::None => None,
759    };
760
761    // Whether the inbound padding approach we are to use, is the same as the default
762    // derived from the netdir (disregarding our config and dormancy).
763    //
764    // Ie, whether the parameters we want are precisely those that a peer would
765    // use by default (assuming they have the same view of the netdir as us).
766    let recv_equals_default = recv_padding == padding_default;
767
768    let padding_negotiate = if recv_equals_default {
769        // Our padding approach is the same as peers' defaults.  So the PADDING_NEGOTIATE
770        // message we need to send is the START(0,0).  (The channel frontend elides an
771        // initial message of this form, - see crates/tor-proto/src/channel.rs::note_usage.)
772        //
773        // If the netdir default is no padding, and we previously negotiated
774        // padding being enabled, and now want to disable it, we would send
775        // START(0,0) rather than STOP.  That is OK (even, arguably, right).
776        PaddingNegotiate::start_default()
777    } else {
778        match recv_padding {
779            None => PaddingNegotiate::stop(),
780            Some(params) => params.padding_negotiate_cell()?,
781        }
782    };
783
784    let mut update = channels_params
785        .start_update()
786        .padding_enable(send_padding.is_some())
787        .padding_negotiate(padding_negotiate);
788    if let Some(params) = send_padding {
789        update = update.padding_parameters(params);
790    }
791    let update = update.finish();
792
793    Ok(update)
794}
795
796/// Given a `NetDirExtract` and whether we're reducing padding, return a `PaddingParameters`
797///
798/// With `PaddingLevel::None`, or the consensus specifies no padding, will return `None`;
799/// but does not account for other reasons why padding might be enabled/disabled.
800fn padding_parameters(
801    config: PaddingLevel,
802    netdir: &NetParamsExtract,
803) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
804    let reduced = match config {
805        PaddingLevel::Reduced => true,
806        PaddingLevel::Normal => false,
807        PaddingLevel::None => return Ok(None),
808    };
809
810    padding_parameters_builder(reduced, netdir)
811        .unwrap_or_else(|e: &str| {
812            info!(
813                "consensus channel padding parameters wrong, using defaults: {}",
814                &e,
815            );
816            Some(PaddingParametersBuilder::default())
817        })
818        .map(|p| {
819            p.build()
820                .map_err(into_internal!("failed to build padding parameters"))
821        })
822        .transpose()
823}
824
825/// Given a `NetDirExtract` and whether we're reducing padding,
826/// return a `PaddingParametersBuilder`
827///
828/// If the consensus specifies no padding, will return `None`;
829/// but does not account for other reasons why padding might be enabled/disabled.
830///
831/// If `Err`, the string is a description of what is wrong with the parameters;
832/// the caller should use `PaddingParameters::Default`.
833fn padding_parameters_builder(
834    reduced: bool,
835    netdir: &NetParamsExtract,
836) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
837    let mut p = PaddingParametersBuilder::default();
838
839    let low = netdir.pad_low(reduced);
840    let high = netdir.pad_high(reduced);
841    if low > high {
842        return Err("low > high");
843    }
844    if low.as_millis() == 0 && high.as_millis() == 0 {
845        // Zeroes for both channel padding consensus parameters means "don't send padding".
846        // padding-spec.txt s2.6, see description of `nf_ito_high`.
847        return Ok(None);
848    }
849    p.low(low);
850    p.high(high);
851    Ok::<_, &'static str>(Some(p))
852}
853
854#[cfg(test)]
855mod test {
856    // @@ begin test lint list maintained by maint/add_warning @@
857    #![allow(clippy::bool_assert_comparison)]
858    #![allow(clippy::clone_on_copy)]
859    #![allow(clippy::dbg_macro)]
860    #![allow(clippy::mixed_attributes_style)]
861    #![allow(clippy::print_stderr)]
862    #![allow(clippy::print_stdout)]
863    #![allow(clippy::single_char_pattern)]
864    #![allow(clippy::unwrap_used)]
865    #![allow(clippy::unchecked_duration_subtraction)]
866    #![allow(clippy::useless_vec)]
867    #![allow(clippy::needless_pass_by_value)]
868    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
869
870    use super::*;
871    use crate::factory::BootstrapReporter;
872    use async_trait::async_trait;
873    use std::sync::{Arc, Mutex};
874    use tor_llcrypto::pk::ed25519::Ed25519Identity;
875    use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
876    use tor_proto::memquota::ChannelAccount;
877
878    fn new_test_state() -> MgrState<FakeChannelFactory> {
879        MgrState::new(
880            FakeChannelFactory::default(),
881            ChannelConfig::default(),
882            Default::default(),
883            &Default::default(),
884        )
885    }
886
887    #[derive(Clone, Debug, Default)]
888    struct FakeChannelFactory {}
889
890    #[allow(clippy::diverging_sub_expression)] // for unimplemented!() + async_trait
891    #[async_trait]
892    impl AbstractChannelFactory for FakeChannelFactory {
893        type Channel = FakeChannel;
894
895        type BuildSpec = tor_linkspec::OwnedChanTarget;
896
897        type Stream = ();
898
899        async fn build_channel(
900            &self,
901            _target: &Self::BuildSpec,
902            _reporter: BootstrapReporter,
903            _memquota: ChannelAccount,
904        ) -> Result<Arc<FakeChannel>> {
905            unimplemented!()
906        }
907
908        #[cfg(feature = "relay")]
909        async fn build_channel_using_incoming(
910            &self,
911            _peer: std::net::SocketAddr,
912            _stream: Self::Stream,
913            _memquota: ChannelAccount,
914        ) -> Result<Arc<Self::Channel>> {
915            unimplemented!()
916        }
917    }
918
919    #[derive(Clone, Debug)]
920    struct FakeChannel {
921        ed_ident: Ed25519Identity,
922        usable: bool,
923        unused_duration: Option<u64>,
924        params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
925    }
926    impl AbstractChannel for FakeChannel {
927        fn is_usable(&self) -> bool {
928            self.usable
929        }
930        fn duration_unused(&self) -> Option<Duration> {
931            self.unused_duration.map(Duration::from_secs)
932        }
933        fn reparameterize(
934            &self,
935            update: Arc<ChannelPaddingInstructionsUpdates>,
936        ) -> tor_proto::Result<()> {
937            *self.params_update.lock().unwrap() = Some(update);
938            Ok(())
939        }
940        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
941            Ok(())
942        }
943        fn engage_padding_activities(&self) {}
944    }
945    impl tor_linkspec::HasRelayIds for FakeChannel {
946        fn identity(
947            &self,
948            key_type: tor_linkspec::RelayIdType,
949        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
950            match key_type {
951                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
952                _ => None,
953            }
954        }
955    }
956    /// Get a fake ed25519 identity from the first byte of a string.
957    fn str_to_ed(s: &str) -> Ed25519Identity {
958        let byte = s.as_bytes()[0];
959        [byte; 32].into()
960    }
961    fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
962        let channel = FakeChannel {
963            ed_ident: str_to_ed(ident),
964            usable: true,
965            unused_duration: None,
966            params_update: Arc::new(Mutex::new(None)),
967        };
968        ChannelState::Open(OpenEntry {
969            channel: Arc::new(channel),
970            max_unused_duration: Duration::from_secs(180),
971        })
972    }
973    fn ch_with_details(
974        ident: &'static str,
975        max_unused_duration: Duration,
976        unused_duration: Option<u64>,
977    ) -> ChannelState<FakeChannel> {
978        let channel = FakeChannel {
979            ed_ident: str_to_ed(ident),
980            usable: true,
981            unused_duration,
982            params_update: Arc::new(Mutex::new(None)),
983        };
984        ChannelState::Open(OpenEntry {
985            channel: Arc::new(channel),
986            max_unused_duration,
987        })
988    }
989    fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
990        let channel = FakeChannel {
991            ed_ident: str_to_ed(ident),
992            usable: false,
993            unused_duration: None,
994            params_update: Arc::new(Mutex::new(None)),
995        };
996        ChannelState::Open(OpenEntry {
997            channel: Arc::new(channel),
998            max_unused_duration: Duration::from_secs(180),
999        })
1000    }
1001
1002    #[test]
1003    fn rmv_unusable() -> Result<()> {
1004        let map = new_test_state();
1005
1006        map.with_channels(|map| {
1007            map.insert(closed("machen"));
1008            map.insert(closed("wir"));
1009            map.insert(ch("wir"));
1010            map.insert(ch("feinen"));
1011            map.insert(ch("Fug"));
1012            map.insert(ch("Fug"));
1013        })?;
1014
1015        map.remove_unusable().unwrap();
1016
1017        map.with_channels(|map| {
1018            assert_eq!(map.by_id(&str_to_ed("m")).len(), 0);
1019            assert_eq!(map.by_id(&str_to_ed("w")).len(), 1);
1020            assert_eq!(map.by_id(&str_to_ed("f")).len(), 1);
1021            assert_eq!(map.by_id(&str_to_ed("F")).len(), 2);
1022        })?;
1023
1024        Ok(())
1025    }
1026
1027    #[test]
1028    fn reparameterize_via_netdir() -> Result<()> {
1029        let map = new_test_state();
1030
1031        // Set some non-default parameters so that we can tell when an update happens
1032        let _ = map
1033            .inner
1034            .lock()
1035            .unwrap()
1036            .channels_params
1037            .padding
1038            .start_update()
1039            .padding_parameters(
1040                PaddingParametersBuilder::default()
1041                    .low(1234.into())
1042                    .build()
1043                    .unwrap(),
1044            )
1045            .finish();
1046
1047        map.with_channels(|map| {
1048            map.insert(ch("track"));
1049        })?;
1050
1051        let netdir = tor_netdir::testnet::construct_netdir()
1052            .unwrap_if_sufficient()
1053            .unwrap();
1054        let netdir = Arc::new(netdir);
1055
1056        let with_ch = |f: &dyn Fn(&FakeChannel)| {
1057            let inner = map.inner.lock().unwrap();
1058            let mut ch = inner.channels.by_ed25519(&str_to_ed("t"));
1059            let ch = ch.next().unwrap().unwrap_open();
1060            f(ch);
1061        };
1062
1063        eprintln!("-- process a default netdir, which should send an update --");
1064        map.reconfigure_general(None, None, netdir.clone()).unwrap();
1065        with_ch(&|ch| {
1066            assert_eq!(
1067                format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
1068                // evade field visibility by (ab)using Debug impl
1069                "ChannelPaddingInstructionsUpdates { padding_enable: None, \
1070                    padding_parameters: Some(Parameters { \
1071                        low: IntegerMilliseconds { value: 1500 }, \
1072                        high: IntegerMilliseconds { value: 9500 } }), \
1073                    padding_negotiate: None }"
1074            );
1075        });
1076        eprintln!();
1077
1078        eprintln!("-- process a default netdir again, which should *not* send an update --");
1079        map.reconfigure_general(None, None, netdir).unwrap();
1080        with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
1081
1082        Ok(())
1083    }
1084
1085    #[test]
1086    fn expire_channels() -> Result<()> {
1087        let map = new_test_state();
1088
1089        // Channel that has been unused beyond max duration allowed is expired
1090        map.with_channels(|map| {
1091            map.insert(ch_with_details(
1092                "wello",
1093                Duration::from_secs(180),
1094                Some(181),
1095            ));
1096        })?;
1097
1098        // Minimum value of max unused duration is 180 seconds
1099        assert_eq!(180, map.expire_channels().as_secs());
1100        map.with_channels(|map| {
1101            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 0);
1102        })?;
1103
1104        let map = new_test_state();
1105
1106        // Channel that has been unused for shorter than max unused duration
1107        map.with_channels(|map| {
1108            map.insert(ch_with_details(
1109                "wello",
1110                Duration::from_secs(180),
1111                Some(120),
1112            ));
1113
1114            map.insert(ch_with_details(
1115                "yello",
1116                Duration::from_secs(180),
1117                Some(170),
1118            ));
1119
1120            // Channel that has been unused beyond max duration allowed is expired
1121            map.insert(ch_with_details(
1122                "gello",
1123                Duration::from_secs(180),
1124                Some(181),
1125            ));
1126
1127            // Closed channel should be retained
1128            map.insert(closed("hello"));
1129        })?;
1130
1131        // Return duration until next channel expires
1132        assert_eq!(10, map.expire_channels().as_secs());
1133        map.with_channels(|map| {
1134            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 1);
1135            assert_eq!(map.by_ed25519(&str_to_ed("y")).len(), 1);
1136            assert_eq!(map.by_ed25519(&str_to_ed("h")).len(), 1);
1137            assert_eq!(map.by_ed25519(&str_to_ed("g")).len(), 0);
1138        })?;
1139        Ok(())
1140    }
1141}