1
//! Simple implementation for the internal map state of a ChanMgr.
2

            
3
use std::time::Duration;
4

            
5
use super::AbstractChannelFactory;
6
use super::{select, AbstractChannel, Pending, Sending};
7
use crate::{ChannelConfig, Dormancy, Error, Result};
8

            
9
use futures::FutureExt;
10
use std::result::Result as StdResult;
11
use std::sync::atomic::{AtomicU64, Ordering};
12
use std::sync::Arc;
13
use tor_async_utils::oneshot;
14
use tor_basic_utils::RngExt as _;
15
use tor_cell::chancell::msg::PaddingNegotiate;
16
use tor_config::PaddingLevel;
17
use tor_error::{error_report, internal, into_internal};
18
use tor_linkspec::{HasRelayIds, ListByRelayIds, RelayIds};
19
use tor_netdir::{params::NetParameters, params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND};
20
use tor_proto::channel::kist::{KistMode, KistParams};
21
use tor_proto::channel::padding::Parameters as PaddingParameters;
22
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
23
use tor_proto::channel::ChannelPaddingInstructionsUpdates;
24
use tor_proto::ChannelPaddingInstructions;
25
use tor_units::{BoundedInt32, IntegerMilliseconds};
26
use tracing::info;
27
use void::{ResultVoidExt as _, Void};
28

            
29
#[cfg(test)]
30
mod padding_test;
31

            
32
/// All mutable state held by an `AbstractChannelMgr`.
33
///
34
/// One reason that this is an isolated type is that we want to
35
/// to limit the amount of code that can see and
36
/// lock the Mutex here.  (We're using a blocking mutex close to async
37
/// code, so we need to be careful.)
38
pub(crate) struct MgrState<C: AbstractChannelFactory> {
39
    /// The data, within a lock
40
    ///
41
    /// (Danger: this uses a blocking mutex close to async code.  This mutex
42
    /// must never be held while an await is happening.)
43
    inner: std::sync::Mutex<Inner<C>>,
44
}
45

            
46
/// Parameters for channels that we create, and that all existing channels are using
47
struct ChannelParams {
48
    /// Channel padding instructions
49
    padding: ChannelPaddingInstructions,
50

            
51
    /// KIST parameters
52
    kist: KistParams,
53
}
54

            
55
/// A map from channel id to channel state, plus necessary auxiliary state - inside lock
56
struct Inner<C: AbstractChannelFactory> {
57
    /// The channel factory type that we store.
58
    ///
59
    /// In this module we never use this _as_ an AbstractChannelFactory: we just
60
    /// hand out clones of it when asked.
61
    builder: C,
62

            
63
    /// A map from identity to channels, or to pending channel statuses.
64
    channels: ListByRelayIds<ChannelState<C::Channel>>,
65

            
66
    /// Parameters for channels that we create, and that all existing channels are using
67
    ///
68
    /// Will be updated by a background task, which also notifies all existing
69
    /// `Open` channels via `channels`.
70
    ///
71
    /// (Must be protected by the same lock as `channels`, or a channel might be
72
    /// created using being-replaced parameters, but not get an update.)
73
    channels_params: ChannelParams,
74

            
75
    /// The configuration (from the config file or API caller)
76
    config: ChannelConfig,
77

            
78
    /// Dormancy
79
    ///
80
    /// The last dormancy information we have been told about and passed on to our channels.
81
    /// Updated via `MgrState::set_dormancy` and hence `MgrState::reconfigure_general`,
82
    /// which then uses it to calculate how to reconfigure the channels.
83
    dormancy: Dormancy,
84
}
85

            
86
/// The state of a channel (or channel build attempt) within a map.
87
///
88
/// A ChannelState can be Open (representing a fully negotiated channel) or
89
/// Building (representing a pending attempt to build a channel). Both states
90
/// have a set of RelayIds, but these RelayIds represent slightly different
91
/// things:
92
///  * On a Building channel, the set of RelayIds is all the identities that we
93
///    require the peer to have. (The peer may turn out to have _more_
94
///    identities than this.)
95
///  * On an Open channel, the set of RelayIds is all the identities that
96
///    we were able to successfully authenticate for the peer.
97
pub(crate) enum ChannelState<C> {
98
    /// An open channel.
99
    ///
100
    /// This channel might not be usable: it might be closing or
101
    /// broken.  We need to check its is_usable() method before
102
    /// yielding it to the user.
103
    Open(OpenEntry<C>),
104
    /// A channel that's getting built.
105
    Building(PendingEntry),
106
}
107

            
108
/// An open channel entry.
109
#[derive(Clone)]
110
pub(crate) struct OpenEntry<C> {
111
    /// The underlying open channel.
112
    pub(crate) channel: Arc<C>,
113
    /// The maximum unused duration allowed for this channel.
114
    pub(crate) max_unused_duration: Duration,
115
}
116

            
117
/// A unique ID for a pending ([`PendingEntry`]) channel.
118
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
119
pub(crate) struct UniqPendingChanId(u64);
120

            
121
impl UniqPendingChanId {
122
    /// Construct a new `UniqPendingChanId`.
123
54
    pub(crate) fn new() -> Self {
124
        /// The next unique ID.
125
        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
126
        // Relaxed ordering is fine; we don't care about how this
127
        // is instantiated with respect to other channels.
128
54
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
129
54
        assert!(id != u64::MAX, "Exhausted the pending channel ID namespace");
130
54
        Self(id)
131
54
    }
132
}
133

            
134
impl std::fmt::Display for UniqPendingChanId {
135
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136
        write!(f, "PendingChan {}", self.0)
137
    }
138
}
139

            
140
/// An entry for a not-yet-build channel
141
#[derive(Clone)]
142
pub(crate) struct PendingEntry {
143
    /// The keys of the relay to which we're trying to open a channel.
144
    pub(crate) ids: RelayIds,
145

            
146
    /// A future we can clone and listen on to learn when this channel attempt
147
    /// is successful or failed.
148
    ///
149
    /// This entry will be removed from the map (and possibly replaced with an
150
    /// `OpenEntry`) _before_ this future becomes ready.
151
    pub(crate) pending: Pending,
152

            
153
    /// A unique ID that allows us to find this exact pending entry later.
154
    pub(crate) unique_id: UniqPendingChanId,
155
}
156

            
157
impl<C> HasRelayIds for ChannelState<C>
158
where
159
    C: HasRelayIds,
160
{
161
456
    fn identity(
162
456
        &self,
163
456
        key_type: tor_linkspec::RelayIdType,
164
456
    ) -> Option<tor_linkspec::RelayIdRef<'_>> {
165
456
        match self {
166
244
            ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
167
212
            ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
168
        }
169
456
    }
170
}
171

            
172
impl<C: Clone> ChannelState<C> {
173
    /// For testing: either give the Open channel inside this state,
174
    /// or panic if there is none.
175
    #[cfg(test)]
176
4
    fn unwrap_open(&self) -> &C {
177
4
        match self {
178
4
            ChannelState::Open(ent) => &ent.channel,
179
            _ => panic!("Not an open channel"),
180
        }
181
4
    }
182
}
183

            
184
/// Type of the `nf_ito_*` netdir parameters, convenience alias
185
type NfIto = IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>>;
186

            
187
/// Extract from a `NetParameters` which we need, conveniently organized for our processing
188
///
189
/// This type serves two functions at once:
190
///
191
///  1. Being a subset of the parameters, we can copy it out of
192
///     the netdir, before we do more complex processing - and, in particular,
193
///     before we obtain the lock on `inner` (which we need to actually handle the update,
194
///     because we need to combine information from the config with that from the netdir).
195
///
196
///  2. Rather than four separate named fields for the padding options,
197
///     it has arrays, so that it is easy to
198
///     select the values without error-prone recapitulation of field names.
199
#[derive(Debug, Clone)]
200
struct NetParamsExtract {
201
    /// `nf_ito_*`, the padding timeout parameters from the netdir consensus
202
    ///
203
    /// `nf_ito[ 0=normal, 1=reduced ][ 0=low, 1=high ]`
204
    /// are `nf_ito_{low,high}{,_reduced` from `NetParameters`.
205
    // TODO we could use some enum or IndexVec or something to make this less `0` and `1`
206
    nf_ito: [[NfIto; 2]; 2],
207

            
208
    /// The KIST parameters.
209
    kist: KistParams,
210
}
211

            
212
impl From<&NetParameters> for NetParamsExtract {
213
1180
    fn from(p: &NetParameters) -> Self {
214
1180
        let kist_enabled = kist_mode_from_net_parameter(p.kist_enabled);
215
1180
        // NOTE: in theory, this cast shouldn't be needed
216
1180
        // (kist_tcp_notsent_lowat is supposed to be a u32, not an i32).
217
1180
        // In practice, however, the type conversion is needed
218
1180
        // because consensus params are i32s.
219
1180
        //
220
1180
        // See the `NetParamaters::kist_tcp_notsent_lowat docs for more details.
221
1180
        let tcp_notsent_lowat = u32::from(p.kist_tcp_notsent_lowat);
222
1180
        let kist = KistParams::new(kist_enabled, tcp_notsent_lowat);
223
1180

            
224
1180
        NetParamsExtract {
225
1180
            nf_ito: [
226
1180
                [p.nf_ito_low, p.nf_ito_high],
227
1180
                [p.nf_ito_low_reduced, p.nf_ito_high_reduced],
228
1180
            ],
229
1180
            kist,
230
1180
        }
231
1180
    }
232
}
233

            
234
/// Build a `KistMode` from [`NetParameters`].
235
///
236
/// Used for converting [`kist_enabled`](NetParameters::kist_enabled)
237
/// to a corresponding `KistMode`.
238
1180
fn kist_mode_from_net_parameter(val: BoundedInt32<0, 1>) -> KistMode {
239
1180
    caret::caret_int! {
240
1180
        /// KIST flavor, defined by a numerical value read from the consensus.
241
1180
        struct KistType(i32) {
242
1180
            /// KIST disabled
243
1180
            DISABLED = 0,
244
1180
            /// KIST using TCP_NOTSENT_LOWAT.
245
1180
            TCP_NOTSENT_LOWAT = 1,
246
1180
        }
247
1180
    }
248
1180

            
249
1180
    match val.get().into() {
250
1180
        KistType::DISABLED => KistMode::Disabled,
251
        KistType::TCP_NOTSENT_LOWAT => KistMode::TcpNotSentLowat,
252
        _ => unreachable!("BoundedInt32 was not bounded?!"),
253
    }
254
1180
}
255

            
256
impl NetParamsExtract {
257
    /// Return the padding timer parameter low end, for reduced-ness `reduced`, as a `u32`
258
2354
    fn pad_low(&self, reduced: bool) -> IntegerMilliseconds<u32> {
259
2354
        self.pad_get(reduced, 0)
260
2354
    }
261
    /// Return the padding timer parameter high end, for reduced-ness `reduced`, as a `u32`
262
2354
    fn pad_high(&self, reduced: bool) -> IntegerMilliseconds<u32> {
263
2354
        self.pad_get(reduced, 1)
264
2354
    }
265

            
266
    /// Return and converts one padding parameter timer
267
    ///
268
    /// Internal function.
269
4708
    fn pad_get(&self, reduced: bool, low_or_high: usize) -> IntegerMilliseconds<u32> {
270
4708
        self.nf_ito[usize::from(reduced)][low_or_high]
271
4902
            .try_map(|v| Ok::<_, Void>(v.into()))
272
4708
            .void_unwrap()
273
4708
    }
274
}
275

            
276
impl<C: AbstractChannel> ChannelState<C> {
277
    /// Return true if a channel is ready to expire.
278
    /// Update `expire_after` if a smaller duration than
279
    /// the given value is required to expire this channel.
280
10
    fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
281
10
        let ChannelState::Open(ent) = self else {
282
            return false;
283
        };
284
10
        let Some(unused_duration) = ent.channel.duration_unused() else {
285
            // still in use
286
2
            return false;
287
        };
288
8
        let max_unused_duration = ent.max_unused_duration;
289
8
        let Some(remaining) = max_unused_duration.checked_sub(unused_duration) else {
290
            // no time remaining; drop now.
291
4
            return true;
292
        };
293
4
        if remaining.is_zero() {
294
            // Ignoring this edge case would result in a fairly benign race
295
            // condition outside of Shadow, but deadlock in Shadow.
296
            return true;
297
4
        }
298
4
        *expire_after = std::cmp::min(*expire_after, remaining);
299
4
        false
300
10
    }
301
}
302

            
303
impl<C: AbstractChannelFactory> MgrState<C> {
304
    /// Create a new empty `MgrState`.
305
68
    pub(crate) fn new(
306
68
        builder: C,
307
68
        config: ChannelConfig,
308
68
        dormancy: Dormancy,
309
68
        netparams: &NetParameters,
310
68
    ) -> Self {
311
68
        let mut padding_params = ChannelPaddingInstructions::default();
312
68
        let netparams = NetParamsExtract::from(netparams);
313
68
        let kist_params = netparams.kist;
314
68
        let update = parameterize(&mut padding_params, &config, dormancy, &netparams)
315
68
            .unwrap_or_else(|e: tor_error::Bug| panic!("bug detected on startup: {:?}", e));
316
68
        let _: Option<_> = update; // there are no channels yet, that would need to be told
317
68

            
318
68
        let channels_params = ChannelParams {
319
68
            padding: padding_params,
320
68
            kist: kist_params,
321
68
        };
322
68

            
323
68
        MgrState {
324
68
            inner: std::sync::Mutex::new(Inner {
325
68
                builder,
326
68
                channels: ListByRelayIds::new(),
327
68
                config,
328
68
                channels_params,
329
68
                dormancy,
330
68
            }),
331
68
        }
332
68
    }
333

            
334
    /// Run a function on the [`ListByRelayIds`] that implements the map in this `MgrState`.
335
    ///
336
    /// This function grabs a mutex: do not provide a slow function.
337
    ///
338
    /// We provide this function rather than exposing the channels set directly,
339
    /// to make sure that the calling code doesn't await while holding the lock.
340
    ///
341
    /// This is only `cfg(test)` since it can deadlock.
342
    ///
343
    /// # Deadlock
344
    ///
345
    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
346
    #[cfg(test)]
347
24
    pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
348
24
    where
349
24
        F: FnOnce(&mut ListByRelayIds<ChannelState<C::Channel>>) -> T,
350
24
    {
351
24
        let mut inner = self.inner.lock()?;
352
24
        Ok(func(&mut inner.channels))
353
24
    }
354

            
355
    /// Return a copy of the builder stored in this state.
356
30
    pub(crate) fn builder(&self) -> C
357
30
    where
358
30
        C: Clone,
359
30
    {
360
30
        let inner = self.inner.lock().expect("lock poisoned");
361
30
        inner.builder.clone()
362
30
    }
363

            
364
    /// Run a function to modify the builder stored in this state.
365
    ///
366
    /// # Deadlock
367
    ///
368
    /// Calling a method on [`MgrState`] from within `func` may cause a deadlock.
369
    #[allow(dead_code)]
370
8
    pub(crate) fn with_mut_builder<F>(&self, func: F)
371
8
    where
372
8
        F: FnOnce(&mut C),
373
8
    {
374
8
        let mut inner = self.inner.lock().expect("lock poisoned");
375
8
        func(&mut inner.builder);
376
8
    }
377

            
378
    /// Remove every unusable state from the map in this state.
379
    #[cfg(test)]
380
4
    pub(crate) fn remove_unusable(&self) -> Result<()> {
381
4
        let mut inner = self.inner.lock()?;
382
20
        inner.channels.retain(|state| match state {
383
20
            ChannelState::Open(ent) => ent.channel.is_usable(),
384
            ChannelState::Building(_) => true,
385
20
        });
386
4
        Ok(())
387
4
    }
388

            
389
    /// Request an open or pending channel to `target`. If `add_new_entry_if_not_found` is true and
390
    /// an open or pending channel isn't found, a new pending entry will be added and
391
    /// [`ChannelForTarget::NewEntry`] will be returned. This is all done as part of the same method
392
    /// so that all operations are performed under the same lock acquisition.
393
44
    pub(crate) fn request_channel(
394
44
        &self,
395
44
        target: &C::BuildSpec,
396
44
        add_new_entry_if_not_found: bool,
397
44
    ) -> Result<Option<ChannelForTarget<C>>> {
398
        use ChannelState::*;
399

            
400
44
        let mut inner = self.inner.lock()?;
401

            
402
        // The idea here is to choose the channel in two steps:
403
        //
404
        // - Eligibility: Get channels from the channel map and filter them down to only channels
405
        //   which are eligible to be returned.
406
        // - Ranking: From the eligible channels, choose the best channel.
407
        //
408
        // Another way to choose the channel could be something like: first try all canonical open
409
        // channels, then all non-canonical open channels, then all pending channels with all
410
        // matching relay ids, then remaining pending channels, etc. But this ends up being hard to
411
        // follow and inflexible (what if you want to prioritize pending channels over non-canonical
412
        // open channels?).
413

            
414
        // Open channels which are allowed for requests to `target`.
415
44
        let open_channels = inner
416
44
            .channels
417
44
            // channels with all target relay identifiers
418
44
            .by_all_ids(target)
419
44
            .filter(|entry| match entry {
420
8
                Open(x) => select::open_channel_is_allowed(x, target),
421
8
                Building(_) => false,
422
44
            });
423
44

            
424
44
        // Pending channels which will *probably* be allowed for requests to `target` once they
425
44
        // complete.
426
44
        let pending_channels = inner
427
44
            .channels
428
44
            // channels that have a subset of the relay ids of `target`
429
44
            .all_subset(target)
430
44
            .into_iter()
431
44
            .filter(|entry| match entry {
432
8
                Open(_) => false,
433
8
                Building(x) => select::pending_channel_maybe_allowed(x, target),
434
44
            });
435
44

            
436
44
        match select::choose_best_channel(open_channels.chain(pending_channels), target) {
437
6
            Some(Open(OpenEntry { channel, .. })) => {
438
6
                // This entry is a perfect match for the target keys: we'll return the open
439
6
                // entry.
440
6
                return Ok(Some(ChannelForTarget::Open(Arc::clone(channel))));
441
            }
442
8
            Some(Building(PendingEntry { pending, .. })) => {
443
8
                // This entry is potentially a match for the target identities: we'll return the
444
8
                // pending entry. (We don't know for sure if it will match once it completes,
445
8
                // since we might discover additional keys beyond those listed for this pending
446
8
                // entry.)
447
8
                return Ok(Some(ChannelForTarget::Pending(pending.clone())));
448
            }
449
30
            None => {}
450
30
        }
451
30

            
452
30
        // It's possible we know ahead of time that building a channel would be unsuccessful.
453
30
        if inner
454
30
            .channels
455
30
            // channels with at least one id in common with `target`
456
30
            .all_overlapping(target)
457
30
            .into_iter()
458
30
            // but not channels which completely satisfy the id requirements of `target`
459
30
            .filter(|entry| !entry.has_all_relay_ids_from(target))
460
30
            .any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
461
        {
462
            // At least one *open, usable* channel has been negotiated that overlaps only
463
            // partially with our target: it has proven itself to have _one_ of our target
464
            // identities, but not all.
465
            //
466
            // Because this channel exists, we know that our target cannot succeed, since relays
467
            // are not allowed to share _any_ identities.
468
            //return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
469
            return Err(Error::IdentityConflict);
470
30
        }
471
30

            
472
30
        if !add_new_entry_if_not_found {
473
            return Ok(None);
474
30
        }
475

            
476
        // Great, nothing interfered at all.
477
30
        let any_relay_id = target
478
30
            .identities()
479
30
            .next()
480
30
            .ok_or(internal!("relay target had no id"))?
481
30
            .to_owned();
482
30
        let (new_state, send, unique_id) = setup_launch(RelayIds::from_relay_ids(target));
483
30
        inner
484
30
            .channels
485
30
            .try_insert(ChannelState::Building(new_state))?;
486
30
        let handle = PendingChannelHandle::new(any_relay_id, unique_id);
487
30
        Ok(Some(ChannelForTarget::NewEntry((handle, send))))
488
44
    }
489

            
490
    /// Remove the pending channel identified by its `handle`.
491
8
    pub(crate) fn remove_pending_channel(&self, handle: PendingChannelHandle) -> Result<()> {
492
8
        let mut inner = self.inner.lock()?;
493
8
        remove_pending(&mut inner.channels, handle);
494
8
        Ok(())
495
8
    }
496

            
497
    /// Upgrade the pending channel identified by its `handle` by replacing it with a new open
498
    /// `channel`.
499
22
    pub(crate) fn upgrade_pending_channel_to_open(
500
22
        &self,
501
22
        handle: PendingChannelHandle,
502
22
        channel: Arc<C::Channel>,
503
22
    ) -> Result<()> {
504
        // Do all operations under the same lock acquisition.
505
22
        let mut inner = self.inner.lock()?;
506

            
507
22
        remove_pending(&mut inner.channels, handle);
508
22

            
509
22
        // This isn't great.  We context switch to the newly-created
510
22
        // channel just to tell it how and whether to do padding.  Ideally
511
22
        // we would pass the params at some suitable point during
512
22
        // building.  However, that would involve the channel taking a
513
22
        // copy of the params, and that must happen in the same channel
514
22
        // manager lock acquisition span as the one where we insert the
515
22
        // channel into the table so it will receive updates.  I.e.,
516
22
        // here.
517
22
        let update = inner.channels_params.padding.initial_update();
518
22
        if let Some(update) = update {
519
22
            channel
520
22
                .reparameterize(update.into())
521
22
                .map_err(|_| internal!("failure on new channel"))?;
522
        }
523
22
        let new_entry = ChannelState::Open(OpenEntry {
524
22
            channel,
525
22
            max_unused_duration: Duration::from_secs(
526
22
                rand::rng()
527
22
                    .gen_range_checked(180..270)
528
22
                    .expect("not 180 < 270 !"),
529
22
            ),
530
22
        });
531
22
        inner.channels.insert(new_entry);
532
22

            
533
22
        Ok(())
534
22
    }
535

            
536
    /// Reconfigure all channels as necessary
537
    ///
538
    /// (By reparameterizing channels as needed)
539
    /// This function will handle
540
    ///   - netdir update
541
    ///   - a reconfiguration
542
    ///   - dormancy
543
    ///
544
    /// For `new_config` and `new_dormancy`, `None` means "no change to previous info".
545
26
    pub(super) fn reconfigure_general(
546
26
        &self,
547
26
        new_config: Option<&ChannelConfig>,
548
26
        new_dormancy: Option<Dormancy>,
549
26
        netparams: Arc<dyn AsRef<NetParameters>>,
550
26
    ) -> StdResult<(), tor_error::Bug> {
551
        use ChannelState as CS;
552

            
553
        // TODO when we support operation as a relay, inter-relay channels ought
554
        // not to get padding.
555
26
        let netdir = {
556
26
            let extract = NetParamsExtract::from((*netparams).as_ref());
557
26
            drop(netparams);
558
26
            extract
559
        };
560

            
561
26
        let mut inner = self
562
26
            .inner
563
26
            .lock()
564
26
            .map_err(|_| internal!("poisoned channel manager"))?;
565
26
        let inner = &mut *inner;
566

            
567
26
        if let Some(new_config) = new_config {
568
6
            inner.config = new_config.clone();
569
20
        }
570
26
        if let Some(new_dormancy) = new_dormancy {
571
12
            inner.dormancy = new_dormancy;
572
20
        }
573

            
574
26
        let update = parameterize(
575
26
            &mut inner.channels_params.padding,
576
26
            &inner.config,
577
26
            inner.dormancy,
578
26
            &netdir,
579
26
        )?;
580

            
581
26
        let update = update.map(Arc::new);
582
26

            
583
26
        let new_kist_params = netdir.kist;
584
26
        let kist_params = if new_kist_params != inner.channels_params.kist {
585
            // The KIST params have changed: remember their value,
586
            // and reparameterize_kist()
587
            inner.channels_params.kist = new_kist_params;
588
            Some(new_kist_params)
589
        } else {
590
            // If the new KIST params are identical to the previous ones,
591
            // we don't need to call reparameterize_kist()
592
26
            None
593
        };
594

            
595
26
        if update.is_none() && kist_params.is_none() {
596
            // Return early, nothing to reconfigure
597
14
            return Ok(());
598
12
        }
599

            
600
12
        for channel in inner.channels.values() {
601
12
            let channel = match channel {
602
12
                CS::Open(OpenEntry { channel, .. }) => channel,
603
                CS::Building(_) => continue,
604
            };
605

            
606
12
            if let Some(ref update) = update {
607
12
                // Ignore error (which simply means the channel is closed or gone)
608
12
                let _ = channel.reparameterize(Arc::clone(update));
609
12
            }
610

            
611
12
            if let Some(kist) = kist_params {
612
                // Ignore error (which simply means the channel is closed or gone)
613
                let _ = channel.reparameterize_kist(kist);
614
12
            }
615
        }
616
12
        Ok(())
617
26
    }
618

            
619
    /// Expire all channels that have been unused for too long.
620
    ///
621
    /// Return a Duration until the next time at which
622
    /// a channel _could_ expire.
623
20
    pub(crate) fn expire_channels(&self) -> Duration {
624
20
        let mut ret = Duration::from_secs(180);
625
20
        self.inner
626
20
            .lock()
627
20
            .expect("Poisoned lock")
628
20
            .channels
629
26
            .retain(|chan| !chan.ready_to_expire(&mut ret));
630
20
        ret
631
20
    }
632
}
633

            
634
/// A channel for a given target relay.
635
pub(crate) enum ChannelForTarget<CF: AbstractChannelFactory> {
636
    /// A channel that is open.
637
    Open(Arc<CF::Channel>),
638
    /// A channel that is building.
639
    Pending(Pending),
640
    /// Information about a new pending channel entry.
641
    NewEntry((PendingChannelHandle, Sending)),
642
}
643

            
644
/// A handle for a pending channel.
645
///
646
/// WARNING: This handle should never be dropped, and should always be passed back into
647
/// [`MgrState::remove_pending_channel`] or [`MgrState::upgrade_pending_channel_to_open`], otherwise
648
/// the pending channel may be left in the channel map forever.
649
///
650
/// This handle must only be used with the `MgrState` from which it was given.
651
pub(crate) struct PendingChannelHandle {
652
    /// Any relay ID for this pending channel.
653
    relay_id: tor_linkspec::RelayId,
654
    /// The unique ID for this pending channel.
655
    unique_id: UniqPendingChanId,
656
    /// The pending channel has been removed from the channel map.
657
    chan_has_been_removed: bool,
658
}
659

            
660
impl PendingChannelHandle {
661
    /// Create a new [`PendingChannelHandle`].
662
30
    fn new(relay_id: tor_linkspec::RelayId, unique_id: UniqPendingChanId) -> Self {
663
30
        Self {
664
30
            relay_id,
665
30
            unique_id,
666
30
            chan_has_been_removed: false,
667
30
        }
668
30
    }
669

            
670
    /// This should be called when the pending channel has been removed from the pending channel
671
    /// map. Not calling this will result in an error log message (and panic in debug builds) when
672
    /// this handle is dropped.
673
30
    fn chan_has_been_removed(mut self) {
674
30
        self.chan_has_been_removed = true;
675
30
    }
676
}
677

            
678
impl std::ops::Drop for PendingChannelHandle {
679
30
    fn drop(&mut self) {
680
30
        if !self.chan_has_been_removed {
681
            #[allow(clippy::missing_docs_in_private_items)]
682
            const MSG: &str = "Dropped the 'PendingChannelHandle' without removing the channel";
683
            error_report!(
684
                internal!("{MSG}"),
685
                "'PendingChannelHandle' dropped unexpectedly",
686
            );
687
30
        }
688
30
    }
689
}
690

            
691
/// Helper: return the objects used to inform pending tasks about a newly open or failed channel.
692
30
fn setup_launch(ids: RelayIds) -> (PendingEntry, Sending, UniqPendingChanId) {
693
30
    let (snd, rcv) = oneshot::channel();
694
30
    let pending = rcv.shared();
695
30
    let unique_id = UniqPendingChanId::new();
696
30
    let entry = PendingEntry {
697
30
        ids,
698
30
        pending,
699
30
        unique_id,
700
30
    };
701
30

            
702
30
    (entry, snd, unique_id)
703
30
}
704

            
705
/// Helper: remove the pending channel identified by `handle` from `channel_map`.
706
30
fn remove_pending<C: AbstractChannel>(
707
30
    channel_map: &mut tor_linkspec::ListByRelayIds<ChannelState<C>>,
708
30
    handle: PendingChannelHandle,
709
30
) {
710
30
    // we need only one relay id to locate it, even if it has multiple relay ids
711
32
    let removed = channel_map.remove_by_id(&handle.relay_id, |c| {
712
32
        let ChannelState::Building(c) = c else {
713
2
            return false;
714
        };
715
30
        c.unique_id == handle.unique_id
716
32
    });
717
30
    debug_assert_eq!(removed.len(), 1, "expected to remove exactly one channel");
718

            
719
30
    handle.chan_has_been_removed();
720
30
}
721

            
722
/// Converts config, dormancy, and netdir, into parameter updates
723
///
724
/// Calculates new parameters, updating `channels_params` as appropriate.
725
/// If anything changed, the corresponding update instruction is returned.
726
///
727
/// `channels_params` is updated with the new parameters,
728
/// and the update message, if one is needed, is returned.
729
///
730
/// This is called in two places:
731
///
732
///  1. During chanmgr creation, it is called once to analyze the initial state
733
///     and construct a corresponding ChannelPaddingInstructions.
734
///
735
///  2. During reconfiguration.
736
1174
fn parameterize(
737
1174
    channels_params: &mut ChannelPaddingInstructions,
738
1174
    config: &ChannelConfig,
739
1174
    dormancy: Dormancy,
740
1174
    netdir: &NetParamsExtract,
741
1174
) -> StdResult<Option<ChannelPaddingInstructionsUpdates>, tor_error::Bug> {
742
1174
    // Everything in this calculation applies to *all* channels, disregarding
743
1174
    // channel usage.  Usage is handled downstream, in the channel frontend.
744
1174
    // See the module doc in `crates/tor-proto/src/channel/padding.rs`.
745
1174

            
746
2395
    let padding_of_level = |level| padding_parameters(level, netdir);
747
1174
    let send_padding = padding_of_level(config.padding)?;
748
1174
    let padding_default = padding_of_level(PaddingLevel::default())?;
749

            
750
1174
    let send_padding = match dormancy {
751
748
        Dormancy::Active => send_padding,
752
426
        Dormancy::Dormant => None,
753
    };
754

            
755
1174
    let recv_padding = match config.padding {
756
8
        PaddingLevel::Reduced => None,
757
1166
        PaddingLevel::Normal => send_padding,
758
        PaddingLevel::None => None,
759
    };
760

            
761
    // Whether the inbound padding approach we are to use, is the same as the default
762
    // derived from the netdir (disregarding our config and dormancy).
763
    //
764
    // Ie, whether the parameters we want are precisely those that a peer would
765
    // use by default (assuming they have the same view of the netdir as us).
766
1174
    let recv_equals_default = recv_padding == padding_default;
767

            
768
1174
    let padding_negotiate = if recv_equals_default {
769
        // Our padding approach is the same as peers' defaults.  So the PADDING_NEGOTIATE
770
        // message we need to send is the START(0,0).  (The channel frontend elides an
771
        // initial message of this form, - see crates/tor-proto/src/channel.rs::note_usage.)
772
        //
773
        // If the netdir default is no padding, and we previously negotiated
774
        // padding being enabled, and now want to disable it, we would send
775
        // START(0,0) rather than STOP.  That is OK (even, arguably, right).
776
742
        PaddingNegotiate::start_default()
777
    } else {
778
432
        match recv_padding {
779
432
            None => PaddingNegotiate::stop(),
780
            Some(params) => params.padding_negotiate_cell()?,
781
        }
782
    };
783

            
784
1174
    let mut update = channels_params
785
1174
        .start_update()
786
1174
        .padding_enable(send_padding.is_some())
787
1174
        .padding_negotiate(padding_negotiate);
788
1174
    if let Some(params) = send_padding {
789
746
        update = update.padding_parameters(params);
790
746
    }
791
1174
    let update = update.finish();
792
1174

            
793
1174
    Ok(update)
794
1174
}
795

            
796
/// Given a `NetDirExtract` and whether we're reducing padding, return a `PaddingParameters`
797
///
798
/// With `PaddingLevel::None`, or the consensus specifies no padding, will return `None`;
799
/// but does not account for other reasons why padding might be enabled/disabled.
800
2354
fn padding_parameters(
801
2354
    config: PaddingLevel,
802
2354
    netdir: &NetParamsExtract,
803
2354
) -> StdResult<Option<PaddingParameters>, tor_error::Bug> {
804
2354
    let reduced = match config {
805
10
        PaddingLevel::Reduced => true,
806
2344
        PaddingLevel::Normal => false,
807
        PaddingLevel::None => return Ok(None),
808
    };
809

            
810
2354
    padding_parameters_builder(reduced, netdir)
811
2355
        .unwrap_or_else(|e: &str| {
812
2
            info!(
813
                "consensus channel padding parameters wrong, using defaults: {}",
814
                &e,
815
            );
816
2
            Some(PaddingParametersBuilder::default())
817
2355
        })
818
2449
        .map(|p| {
819
2350
            p.build()
820
2350
                .map_err(into_internal!("failed to build padding parameters"))
821
2449
        })
822
2354
        .transpose()
823
2354
}
824

            
825
/// Given a `NetDirExtract` and whether we're reducing padding,
826
/// return a `PaddingParametersBuilder`
827
///
828
/// If the consensus specifies no padding, will return `None`;
829
/// but does not account for other reasons why padding might be enabled/disabled.
830
///
831
/// If `Err`, the string is a description of what is wrong with the parameters;
832
/// the caller should use `PaddingParameters::Default`.
833
2354
fn padding_parameters_builder(
834
2354
    reduced: bool,
835
2354
    netdir: &NetParamsExtract,
836
2354
) -> StdResult<Option<PaddingParametersBuilder>, &'static str> {
837
2354
    let mut p = PaddingParametersBuilder::default();
838
2354

            
839
2354
    let low = netdir.pad_low(reduced);
840
2354
    let high = netdir.pad_high(reduced);
841
2354
    if low > high {
842
2
        return Err("low > high");
843
2352
    }
844
2352
    if low.as_millis() == 0 && high.as_millis() == 0 {
845
        // Zeroes for both channel padding consensus parameters means "don't send padding".
846
        // padding-spec.txt s2.6, see description of `nf_ito_high`.
847
4
        return Ok(None);
848
2348
    }
849
2348
    p.low(low);
850
2348
    p.high(high);
851
2348
    Ok::<_, &'static str>(Some(p))
852
2354
}
853

            
854
#[cfg(test)]
855
mod test {
856
    // @@ begin test lint list maintained by maint/add_warning @@
857
    #![allow(clippy::bool_assert_comparison)]
858
    #![allow(clippy::clone_on_copy)]
859
    #![allow(clippy::dbg_macro)]
860
    #![allow(clippy::mixed_attributes_style)]
861
    #![allow(clippy::print_stderr)]
862
    #![allow(clippy::print_stdout)]
863
    #![allow(clippy::single_char_pattern)]
864
    #![allow(clippy::unwrap_used)]
865
    #![allow(clippy::unchecked_duration_subtraction)]
866
    #![allow(clippy::useless_vec)]
867
    #![allow(clippy::needless_pass_by_value)]
868
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
869

            
870
    use super::*;
871
    use crate::factory::BootstrapReporter;
872
    use async_trait::async_trait;
873
    use std::sync::{Arc, Mutex};
874
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
875
    use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
876
    use tor_proto::memquota::ChannelAccount;
877

            
878
    fn new_test_state() -> MgrState<FakeChannelFactory> {
879
        MgrState::new(
880
            FakeChannelFactory::default(),
881
            ChannelConfig::default(),
882
            Default::default(),
883
            &Default::default(),
884
        )
885
    }
886

            
887
    #[derive(Clone, Debug, Default)]
888
    struct FakeChannelFactory {}
889

            
890
    #[allow(clippy::diverging_sub_expression)] // for unimplemented!() + async_trait
891
    #[async_trait]
892
    impl AbstractChannelFactory for FakeChannelFactory {
893
        type Channel = FakeChannel;
894

            
895
        type BuildSpec = tor_linkspec::OwnedChanTarget;
896

            
897
        type Stream = ();
898

            
899
        async fn build_channel(
900
            &self,
901
            _target: &Self::BuildSpec,
902
            _reporter: BootstrapReporter,
903
            _memquota: ChannelAccount,
904
        ) -> Result<Arc<FakeChannel>> {
905
            unimplemented!()
906
        }
907

            
908
        #[cfg(feature = "relay")]
909
        async fn build_channel_using_incoming(
910
            &self,
911
            _peer: std::net::SocketAddr,
912
            _stream: Self::Stream,
913
            _memquota: ChannelAccount,
914
        ) -> Result<Arc<Self::Channel>> {
915
            unimplemented!()
916
        }
917
    }
918

            
919
    #[derive(Clone, Debug)]
920
    struct FakeChannel {
921
        ed_ident: Ed25519Identity,
922
        usable: bool,
923
        unused_duration: Option<u64>,
924
        params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
925
    }
926
    impl AbstractChannel for FakeChannel {
927
        fn is_usable(&self) -> bool {
928
            self.usable
929
        }
930
        fn duration_unused(&self) -> Option<Duration> {
931
            self.unused_duration.map(Duration::from_secs)
932
        }
933
        fn reparameterize(
934
            &self,
935
            update: Arc<ChannelPaddingInstructionsUpdates>,
936
        ) -> tor_proto::Result<()> {
937
            *self.params_update.lock().unwrap() = Some(update);
938
            Ok(())
939
        }
940
        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
941
            Ok(())
942
        }
943
        fn engage_padding_activities(&self) {}
944
    }
945
    impl tor_linkspec::HasRelayIds for FakeChannel {
946
        fn identity(
947
            &self,
948
            key_type: tor_linkspec::RelayIdType,
949
        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
950
            match key_type {
951
                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
952
                _ => None,
953
            }
954
        }
955
    }
956
    /// Get a fake ed25519 identity from the first byte of a string.
957
    fn str_to_ed(s: &str) -> Ed25519Identity {
958
        let byte = s.as_bytes()[0];
959
        [byte; 32].into()
960
    }
961
    fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
962
        let channel = FakeChannel {
963
            ed_ident: str_to_ed(ident),
964
            usable: true,
965
            unused_duration: None,
966
            params_update: Arc::new(Mutex::new(None)),
967
        };
968
        ChannelState::Open(OpenEntry {
969
            channel: Arc::new(channel),
970
            max_unused_duration: Duration::from_secs(180),
971
        })
972
    }
973
    fn ch_with_details(
974
        ident: &'static str,
975
        max_unused_duration: Duration,
976
        unused_duration: Option<u64>,
977
    ) -> ChannelState<FakeChannel> {
978
        let channel = FakeChannel {
979
            ed_ident: str_to_ed(ident),
980
            usable: true,
981
            unused_duration,
982
            params_update: Arc::new(Mutex::new(None)),
983
        };
984
        ChannelState::Open(OpenEntry {
985
            channel: Arc::new(channel),
986
            max_unused_duration,
987
        })
988
    }
989
    fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
990
        let channel = FakeChannel {
991
            ed_ident: str_to_ed(ident),
992
            usable: false,
993
            unused_duration: None,
994
            params_update: Arc::new(Mutex::new(None)),
995
        };
996
        ChannelState::Open(OpenEntry {
997
            channel: Arc::new(channel),
998
            max_unused_duration: Duration::from_secs(180),
999
        })
    }
    #[test]
    fn rmv_unusable() -> Result<()> {
        let map = new_test_state();
        map.with_channels(|map| {
            map.insert(closed("machen"));
            map.insert(closed("wir"));
            map.insert(ch("wir"));
            map.insert(ch("feinen"));
            map.insert(ch("Fug"));
            map.insert(ch("Fug"));
        })?;
        map.remove_unusable().unwrap();
        map.with_channels(|map| {
            assert_eq!(map.by_id(&str_to_ed("m")).len(), 0);
            assert_eq!(map.by_id(&str_to_ed("w")).len(), 1);
            assert_eq!(map.by_id(&str_to_ed("f")).len(), 1);
            assert_eq!(map.by_id(&str_to_ed("F")).len(), 2);
        })?;
        Ok(())
    }
    #[test]
    fn reparameterize_via_netdir() -> Result<()> {
        let map = new_test_state();
        // Set some non-default parameters so that we can tell when an update happens
        let _ = map
            .inner
            .lock()
            .unwrap()
            .channels_params
            .padding
            .start_update()
            .padding_parameters(
                PaddingParametersBuilder::default()
                    .low(1234.into())
                    .build()
                    .unwrap(),
            )
            .finish();
        map.with_channels(|map| {
            map.insert(ch("track"));
        })?;
        let netdir = tor_netdir::testnet::construct_netdir()
            .unwrap_if_sufficient()
            .unwrap();
        let netdir = Arc::new(netdir);
        let with_ch = |f: &dyn Fn(&FakeChannel)| {
            let inner = map.inner.lock().unwrap();
            let mut ch = inner.channels.by_ed25519(&str_to_ed("t"));
            let ch = ch.next().unwrap().unwrap_open();
            f(ch);
        };
        eprintln!("-- process a default netdir, which should send an update --");
        map.reconfigure_general(None, None, netdir.clone()).unwrap();
        with_ch(&|ch| {
            assert_eq!(
                format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
                // evade field visibility by (ab)using Debug impl
                "ChannelPaddingInstructionsUpdates { padding_enable: None, \
                    padding_parameters: Some(Parameters { \
                        low: IntegerMilliseconds { value: 1500 }, \
                        high: IntegerMilliseconds { value: 9500 } }), \
                    padding_negotiate: None }"
            );
        });
        eprintln!();
        eprintln!("-- process a default netdir again, which should *not* send an update --");
        map.reconfigure_general(None, None, netdir).unwrap();
        with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
        Ok(())
    }
    #[test]
    fn expire_channels() -> Result<()> {
        let map = new_test_state();
        // Channel that has been unused beyond max duration allowed is expired
        map.with_channels(|map| {
            map.insert(ch_with_details(
                "wello",
                Duration::from_secs(180),
                Some(181),
            ));
        })?;
        // Minimum value of max unused duration is 180 seconds
        assert_eq!(180, map.expire_channels().as_secs());
        map.with_channels(|map| {
            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 0);
        })?;
        let map = new_test_state();
        // Channel that has been unused for shorter than max unused duration
        map.with_channels(|map| {
            map.insert(ch_with_details(
                "wello",
                Duration::from_secs(180),
                Some(120),
            ));
            map.insert(ch_with_details(
                "yello",
                Duration::from_secs(180),
                Some(170),
            ));
            // Channel that has been unused beyond max duration allowed is expired
            map.insert(ch_with_details(
                "gello",
                Duration::from_secs(180),
                Some(181),
            ));
            // Closed channel should be retained
            map.insert(closed("hello"));
        })?;
        // Return duration until next channel expires
        assert_eq!(10, map.expire_channels().as_secs());
        map.with_channels(|map| {
            assert_eq!(map.by_ed25519(&str_to_ed("w")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("y")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("h")).len(), 1);
            assert_eq!(map.by_ed25519(&str_to_ed("g")).len(), 0);
        })?;
        Ok(())
    }
}