1
//! Facilities to build circuits directly, instead of via a circuit manager.
2

            
3
use crate::path::{OwnedPath, TorPath};
4
use crate::timeouts::{self, Action};
5
use crate::{Error, Result};
6
use async_trait::async_trait;
7
use futures::task::SpawnExt;
8
use futures::Future;
9
use oneshot_fused_workaround as oneshot;
10
use std::sync::{
11
    atomic::{AtomicU32, Ordering},
12
    Arc,
13
};
14
use std::time::{Duration, Instant};
15
use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
16
use tor_error::into_internal;
17
use tor_guardmgr::GuardStatus;
18
use tor_linkspec::{ChanTarget, CircTarget, IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
19
use tor_netdir::params::NetParameters;
20
use tor_proto::ccparams::{self, AlgorithmType};
21
use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
22
use tor_protover::named::FLOWCTRL_CC;
23
use tor_protover::Protocols;
24
use tor_rtcompat::{Runtime, SleepProviderExt};
25
use tor_units::Percentage;
26

            
27
#[cfg(all(feature = "vanguards", feature = "hs-common"))]
28
use tor_guardmgr::vanguards::VanguardMgr;
29

            
30
mod guardstatus;
31

            
32
pub(crate) use guardstatus::GuardStatusHandle;
33

            
34
/// Represents an objects that can be constructed in a circuit-like way.
35
///
36
/// This is only a separate trait for testing purposes, so that we can swap
37
/// our some other type when we're testing Builder.
38
///
39
/// TODO: I'd like to have a simpler testing strategy here; this one
40
/// complicates things a bit.
41
#[async_trait]
42
pub(crate) trait Buildable: Sized {
43
    /// Launch a new one-hop circuit to a given relay, given only a
44
    /// channel target `ct` specifying that relay.
45
    ///
46
    /// (Since we don't have a CircTarget here, we can't extend the circuit
47
    /// to be multihop later on.)
48
    async fn create_chantarget<RT: Runtime>(
49
        chanmgr: &ChanMgr<RT>,
50
        rt: &RT,
51
        guard_status: &GuardStatusHandle,
52
        ct: &OwnedChanTarget,
53
        params: CircParameters,
54
        usage: ChannelUsage,
55
    ) -> Result<Arc<Self>>;
56

            
57
    /// Launch a new circuit through a given relay, given a circuit target
58
    /// `ct` specifying that relay.
59
    async fn create<RT: Runtime>(
60
        chanmgr: &ChanMgr<RT>,
61
        rt: &RT,
62
        guard_status: &GuardStatusHandle,
63
        ct: &OwnedCircTarget,
64
        params: CircParameters,
65
        usage: ChannelUsage,
66
    ) -> Result<Arc<Self>>;
67

            
68
    /// Extend this circuit-like object by one hop, to the location described
69
    /// in `ct`.
70
    async fn extend<RT: Runtime>(
71
        &self,
72
        rt: &RT,
73
        ct: &OwnedCircTarget,
74
        params: CircParameters,
75
    ) -> Result<()>;
76
}
77

            
78
/// Try to make a [`PendingClientCirc`] to a given relay, and start its
79
/// reactor.
80
///
81
/// This is common code, shared by all the first-hop functions in the
82
/// implementation of `Buildable` for `Arc<ClientCirc>`.
83
async fn create_common<RT: Runtime, CT: ChanTarget>(
84
    chanmgr: &ChanMgr<RT>,
85
    rt: &RT,
86
    target: &CT,
87
    guard_status: &GuardStatusHandle,
88
    usage: ChannelUsage,
89
) -> Result<PendingClientCirc> {
90
    // Get or construct the channel.
91
    let result = chanmgr.get_or_launch(target, usage).await;
92

            
93
    // Report the clock skew if appropriate, and exit if there has been an error.
94
    let chan = match result {
95
        Ok((chan, ChanProvenance::NewlyCreated)) => {
96
            guard_status.skew(chan.clock_skew());
97
            chan
98
        }
99
        Ok((chan, _)) => chan,
100
        Err(cause) => {
101
            if let Some(skew) = cause.clock_skew() {
102
                guard_status.skew(skew);
103
            }
104
            return Err(Error::Channel {
105
                peer: target.to_logged(),
106
                cause,
107
            });
108
        }
109
    };
110
    // Construct the (zero-hop) circuit.
111
    let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol {
112
        error,
113
        peer: None, // we don't blame the peer, because new_circ() does no networking.
114
        action: "initializing circuit",
115
        unique_id: None,
116
    })?;
117

            
118
    rt.spawn(async {
119
        let _ = reactor.run().await;
120
    })
121
    .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
122

            
123
    Ok(pending_circ)
124
}
125

            
126
#[async_trait]
127
impl Buildable for ClientCirc {
128
    async fn create_chantarget<RT: Runtime>(
129
        chanmgr: &ChanMgr<RT>,
130
        rt: &RT,
131
        guard_status: &GuardStatusHandle,
132
        ct: &OwnedChanTarget,
133
        params: CircParameters,
134
        usage: ChannelUsage,
135
    ) -> Result<Arc<Self>> {
136
        let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
137
        let unique_id = Some(circ.peek_unique_id());
138
        circ.create_firsthop_fast(params)
139
            .await
140
            .map_err(|error| Error::Protocol {
141
                peer: Some(ct.to_logged()),
142
                error,
143
                action: "running CREATE_FAST handshake",
144
                unique_id,
145
            })
146
    }
147
    async fn create<RT: Runtime>(
148
        chanmgr: &ChanMgr<RT>,
149
        rt: &RT,
150
        guard_status: &GuardStatusHandle,
151
        ct: &OwnedCircTarget,
152
        params: CircParameters,
153
        usage: ChannelUsage,
154
    ) -> Result<Arc<Self>> {
155
        let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
156
        let unique_id = Some(circ.peek_unique_id());
157

            
158
        let handshake_res = circ.create_firsthop(ct, params).await;
159

            
160
        handshake_res.map_err(|error| Error::Protocol {
161
            peer: Some(ct.to_logged()),
162
            error,
163
            action: "creating first hop",
164
            unique_id,
165
        })
166
    }
167
    async fn extend<RT: Runtime>(
168
        &self,
169
        _rt: &RT,
170
        ct: &OwnedCircTarget,
171
        params: CircParameters,
172
    ) -> Result<()> {
173
        // use "ClientCirc::" name to avoid calling _this_ method.
174
        let res = ClientCirc::extend(self, ct, params).await;
175

            
176
        res.map_err(|error| Error::Protocol {
177
            error,
178
            // We can't know who caused the error, since it may have been
179
            // the hop we were extending from, or the hop we were extending
180
            // to.
181
            peer: None,
182
            action: "extending circuit",
183
            unique_id: Some(self.unique_id()),
184
        })
185
    }
186
}
187

            
188
/// An implementation type for [`CircuitBuilder`].
189
///
190
/// A `CircuitBuilder` holds references to all the objects that are needed
191
/// to build circuits correctly.
192
///
193
/// In general, you should not need to construct or use this object yourself,
194
/// unless you are choosing your own paths.
195
struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
196
    /// The runtime used by this circuit builder.
197
    runtime: R,
198
    /// A channel manager that this circuit builder uses to make channels.
199
    chanmgr: Arc<ChanMgr<R>>,
200
    /// An estimator to determine the correct timeouts for circuit building.
201
    timeouts: timeouts::Estimator,
202
    /// We don't actually hold any clientcircs, so we need to put this
203
    /// type here so the compiler won't freak out.
204
    _phantom: std::marker::PhantomData<C>,
205
}
206

            
207
impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
208
    /// Construct a new [`Builder`].
209
44
    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
210
44
        Builder {
211
44
            runtime,
212
44
            chanmgr,
213
44
            timeouts,
214
44
            _phantom: std::marker::PhantomData,
215
44
        }
216
44
    }
217

            
218
    /// Helper function that takes the circuit parameters and apply any changes from the given
219
    /// subprotocol versions.
220
36
    fn apply_protovers_to_circparams(params: &mut CircParameters, protocols: &Protocols) {
221
36
        // Not supporting FlowCtrl=2 means we have to use the fallback congestion control algorithm
222
36
        // which is the FixedWindow one.
223
36
        if !protocols.supports_named_subver(FLOWCTRL_CC) {
224
36
            params.ccontrol.use_fallback_alg();
225
36
        }
226
36
    }
227

            
228
    /// Build a circuit, without performing any timeout operations.
229
    ///
230
    /// After each hop is built, increments n_hops_built.  Make sure that
231
    /// `guard_status` has its pending status set correctly to correspond
232
    /// to a circuit failure at any given stage.
233
    ///
234
    /// (TODO: Find
235
    /// a better design there.)
236
16
    async fn build_notimeout(
237
16
        self: Arc<Self>,
238
16
        path: OwnedPath,
239
16
        params: CircParameters,
240
16
        start_time: Instant,
241
16
        n_hops_built: Arc<AtomicU32>,
242
16
        guard_status: Arc<GuardStatusHandle>,
243
16
        usage: ChannelUsage,
244
16
    ) -> Result<Arc<C>> {
245
16
        match path {
246
4
            OwnedPath::ChannelOnly(target) => {
247
4
                // If we fail now, it's the guard's fault.
248
4
                guard_status.pending(GuardStatus::Failure);
249
4
                let circ = C::create_chantarget(
250
4
                    &self.chanmgr,
251
4
                    &self.runtime,
252
4
                    &guard_status,
253
4
                    &target,
254
4
                    params,
255
4
                    usage,
256
4
                )
257
4
                .await?;
258
4
                self.timeouts
259
4
                    .note_hop_completed(0, self.runtime.now() - start_time, true);
260
4
                n_hops_built.fetch_add(1, Ordering::SeqCst);
261
4
                Ok(circ)
262
            }
263
12
            OwnedPath::Normal(p) => {
264
12
                assert!(!p.is_empty());
265
12
                let n_hops = p.len() as u8;
266
12
                // If we fail now, it's the guard's fault.
267
12
                guard_status.pending(GuardStatus::Failure);
268
12
                // Each hop has its own circ parameters. This is for the first hop (CREATE).
269
12
                let mut first_hop_params = params.clone();
270
12
                Self::apply_protovers_to_circparams(&mut first_hop_params, p[0].protovers());
271
12
                let circ = C::create(
272
12
                    &self.chanmgr,
273
12
                    &self.runtime,
274
12
                    &guard_status,
275
12
                    &p[0],
276
12
                    first_hop_params,
277
12
                    usage,
278
12
                )
279
12
                .await?;
280
12
                self.timeouts
281
12
                    .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
282
12
                // If we fail after this point, we can't tell whether it's
283
12
                // the fault of the guard or some later relay.
284
12
                guard_status.pending(GuardStatus::Indeterminate);
285
12
                n_hops_built.fetch_add(1, Ordering::SeqCst);
286
12
                let mut hop_num = 1;
287
24
                for relay in p[1..].iter() {
288
                    // Get the params per subsequent hop (EXTEND).
289
24
                    let mut hop_params = params.clone();
290
24
                    Self::apply_protovers_to_circparams(&mut hop_params, relay.protovers());
291
24
                    circ.extend(&self.runtime, relay, hop_params).await?;
292
20
                    n_hops_built.fetch_add(1, Ordering::SeqCst);
293
20
                    self.timeouts.note_hop_completed(
294
20
                        hop_num,
295
20
                        self.runtime.now() - start_time,
296
20
                        hop_num == (n_hops - 1),
297
20
                    );
298
20
                    hop_num += 1;
299
                }
300
8
                Ok(circ)
301
            }
302
        }
303
12
    }
304

            
305
    /// Build a circuit from an [`OwnedPath`].
306
16
    async fn build_owned(
307
16
        self: &Arc<Self>,
308
16
        path: OwnedPath,
309
16
        params: &CircParameters,
310
16
        guard_status: Arc<GuardStatusHandle>,
311
16
        usage: ChannelUsage,
312
16
    ) -> Result<Arc<C>> {
313
16
        let action = Action::BuildCircuit { length: path.len() };
314
16
        let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
315
16
        let start_time = self.runtime.now();
316
16

            
317
16
        // TODO: This is probably not the best way for build_notimeout to
318
16
        // tell us how many hops it managed to build, but at least it is
319
16
        // isolated here.
320
16
        let hops_built = Arc::new(AtomicU32::new(0));
321
16

            
322
16
        let self_clone = Arc::clone(self);
323
16
        let params = params.clone();
324
16

            
325
16
        let circuit_future = self_clone.build_notimeout(
326
16
            path,
327
16
            params,
328
16
            start_time,
329
16
            Arc::clone(&hops_built),
330
16
            guard_status,
331
16
            usage,
332
16
        );
333
16

            
334
16
        match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
335
8
            Ok(circuit) => Ok(circuit),
336
8
            Err(Error::CircTimeout(unique_id)) => {
337
8
                let n_built = hops_built.load(Ordering::SeqCst);
338
8
                self.timeouts
339
8
                    .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
340
8
                Err(Error::CircTimeout(unique_id))
341
            }
342
            Err(e) => Err(e),
343
        }
344
16
    }
345

            
346
    /// Return a reference to this Builder runtime.
347
    pub(crate) fn runtime(&self) -> &R {
348
        &self.runtime
349
    }
350

            
351
    /// Return a reference to this Builder's timeout estimator.
352
    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
353
        &self.timeouts
354
    }
355
}
356

            
357
/// A factory object to build circuits.
358
///
359
/// A `CircuitBuilder` holds references to all the objects that are needed
360
/// to build circuits correctly.
361
///
362
/// In general, you should not need to construct or use this object yourself,
363
/// unless you are choosing your own paths.
364
pub struct CircuitBuilder<R: Runtime> {
365
    /// The underlying [`Builder`] object
366
    builder: Arc<Builder<R, ClientCirc>>,
367
    /// Configuration for how to choose paths for circuits.
368
    path_config: tor_config::MutCfg<crate::PathConfig>,
369
    /// State-manager object to use in storing current state.
370
    storage: crate::TimeoutStateHandle,
371
    /// Guard manager to tell us which guards nodes to use for the circuits
372
    /// we build.
373
    guardmgr: tor_guardmgr::GuardMgr<R>,
374
    /// The vanguard manager object used for HS circuits.
375
    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
376
    vanguardmgr: Arc<VanguardMgr<R>>,
377
}
378

            
379
impl<R: Runtime> CircuitBuilder<R> {
380
    /// Construct a new [`CircuitBuilder`].
381
    // TODO: eventually I'd like to make this a public function, but
382
    // TimeoutStateHandle is private.
383
28
    pub(crate) fn new(
384
28
        runtime: R,
385
28
        chanmgr: Arc<ChanMgr<R>>,
386
28
        path_config: crate::PathConfig,
387
28
        storage: crate::TimeoutStateHandle,
388
28
        guardmgr: tor_guardmgr::GuardMgr<R>,
389
28
        #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
390
28
    ) -> Self {
391
28
        let timeouts = timeouts::Estimator::from_storage(&storage);
392
28

            
393
28
        CircuitBuilder {
394
28
            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
395
28
            path_config: path_config.into(),
396
28
            storage,
397
28
            guardmgr,
398
28
            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
399
28
            vanguardmgr: Arc::new(vanguardmgr),
400
28
        }
401
28
    }
402

            
403
    /// Return this builder's [`PathConfig`](crate::PathConfig).
404
4
    pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
405
4
        self.path_config.get()
406
4
    }
407

            
408
    /// Replace this builder's [`PathConfig`](crate::PathConfig).
409
2
    pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
410
2
        self.path_config.replace(new_config);
411
2
    }
412

            
413
    /// Flush state to the state manager if we own the lock.
414
    ///
415
    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
416
24
    pub(crate) fn save_state(&self) -> Result<bool> {
417
24
        if !self.storage.can_store() {
418
12
            return Ok(false);
419
12
        }
420
12
        // TODO: someday we'll want to only do this if there is something
421
12
        // changed.
422
12
        self.builder.timeouts.save_state(&self.storage)?;
423
12
        self.guardmgr.store_persistent_state()?;
424
12
        Ok(true)
425
24
    }
426

            
427
    /// Replace our state with a new owning state, assuming we have
428
    /// storage permission.
429
    pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
430
        self.builder
431
            .timeouts
432
            .upgrade_to_owning_storage(&self.storage);
433
        self.guardmgr.upgrade_to_owned_persistent_state()?;
434
        Ok(())
435
    }
436

            
437
    /// Reload persistent state from disk, if we don't have storage permission.
438
    pub(crate) fn reload_state(&self) -> Result<()> {
439
        if !self.storage.can_store() {
440
            self.builder
441
                .timeouts
442
                .reload_readonly_from_storage(&self.storage);
443
        }
444
        self.guardmgr.reload_persistent_state()?;
445
        Ok(())
446
    }
447

            
448
    /// Reconfigure this builder using the latest set of network parameters.
449
    ///
450
    /// (NOTE: for now, this only affects circuit timeout estimation.)
451
    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
452
        self.builder.timeouts.update_params(p);
453
    }
454

            
455
    /// Like `build`, but construct a new circuit from an [`OwnedPath`].
456
    pub(crate) async fn build_owned(
457
        &self,
458
        path: OwnedPath,
459
        params: &CircParameters,
460
        guard_status: Arc<GuardStatusHandle>,
461
        usage: ChannelUsage,
462
    ) -> Result<Arc<ClientCirc>> {
463
        self.builder
464
            .build_owned(path, params, guard_status, usage)
465
            .await
466
    }
467

            
468
    /// Try to construct a new circuit from a given path, using appropriate
469
    /// timeouts.
470
    ///
471
    /// This circuit is _not_ automatically registered with any
472
    /// circuit manager; if you don't hang on it it, it will
473
    /// automatically go away when the last reference is dropped.
474
    pub async fn build(
475
        &self,
476
        path: &TorPath<'_>,
477
        params: &CircParameters,
478
        usage: ChannelUsage,
479
    ) -> Result<Arc<ClientCirc>> {
480
        let owned = path.try_into()?;
481
        self.build_owned(owned, params, Arc::new(None.into()), usage)
482
            .await
483
    }
484

            
485
    /// Return true if this builder is currently learning timeout info.
486
    pub(crate) fn learning_timeouts(&self) -> bool {
487
        self.builder.timeouts.learning_timeouts()
488
    }
489

            
490
    /// Return a reference to this builder's `GuardMgr`.
491
18
    pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
492
18
        &self.guardmgr
493
18
    }
494

            
495
    /// Return a reference to this builder's `VanguardMgr`.
496
    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
497
22
    pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
498
22
        &self.vanguardmgr
499
22
    }
500

            
501
    /// Return a reference to this builder's runtime
502
    pub(crate) fn runtime(&self) -> &R {
503
        self.builder.runtime()
504
    }
505

            
506
    /// Return a reference to this builder's timeout estimator.
507
    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
508
        self.builder.estimator()
509
    }
510
}
511

            
512
/// Return the congestion control Vegas algorithm using the given network parameters.
513
#[cfg(feature = "flowctl-cc")]
514
fn build_cc_vegas(
515
    inp: &NetParameters,
516
    vegas_queue_params: ccparams::VegasQueueParams,
517
) -> ccparams::Algorithm {
518
    ccparams::Algorithm::Vegas(
519
        ccparams::VegasParamsBuilder::default()
520
            .cell_in_queue_params(vegas_queue_params)
521
            .ss_cwnd_max(inp.cc_ss_max.into())
522
            .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
523
            .cwnd_full_min_pct(Percentage::new(
524
                inp.cc_cwnd_full_minpct.as_percent().get() as u32
525
            ))
526
            .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
527
            .build()
528
            .expect("Unable to build Vegas params from NetParams"),
529
    )
530
}
531

            
532
/// Return the congestion control FixedWindow algorithm using the given network parameters.
533
44
fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
534
44
    ccparams::Algorithm::FixedWindow(
535
44
        ccparams::FixedWindowParamsBuilder::default()
536
44
            .circ_window_start(inp.circuit_window.get() as u16)
537
44
            .circ_window_min(inp.circuit_window.lower() as u16)
538
44
            .circ_window_max(inp.circuit_window.upper() as u16)
539
44
            .build()
540
44
            .expect("Unable to build FixedWindow params from NetParams"),
541
44
    )
542
44
}
543

            
544
/// Return a new circuit parameter struct using the given network parameters and algorithm to use.
545
22
fn circparameters_from_netparameters(
546
22
    inp: &NetParameters,
547
22
    alg: ccparams::Algorithm,
548
22
) -> Result<CircParameters> {
549
22
    let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
550
22
        .cwnd_init(inp.cc_cwnd_init.into())
551
22
        .cwnd_inc_pct_ss(Percentage::new(
552
22
            inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
553
22
        ))
554
22
        .cwnd_inc(inp.cc_cwnd_inc.into())
555
22
        .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
556
22
        .cwnd_min(inp.cc_cwnd_min.into())
557
22
        .cwnd_max(inp.cc_cwnd_max.into())
558
22
        .sendme_inc(inp.cc_sendme_inc.into())
559
22
        .build()
560
22
        .map_err(into_internal!(
561
22
            "Unable to build CongestionWindow params from NetParams"
562
22
        ))?;
563
22
    let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
564
22
        .ewma_cwnd_pct(Percentage::new(
565
22
            inp.cc_ewma_cwnd_pct.as_percent().get() as u32
566
22
        ))
567
22
        .ewma_max(inp.cc_ewma_max.into())
568
22
        .ewma_ss_max(inp.cc_ewma_ss.into())
569
22
        .rtt_reset_pct(Percentage::new(
570
22
            inp.cc_rtt_reset_pct.as_percent().get() as u32
571
22
        ))
572
22
        .build()
573
22
        .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
574
22
    let ccontrol = ccparams::CongestionControlParamsBuilder::default()
575
22
        .alg(alg)
576
22
        .fallback_alg(build_cc_fixedwindow(inp))
577
22
        .cwnd_params(cwnd_params)
578
22
        .rtt_params(rtt_params)
579
22
        .build()
580
22
        .map_err(into_internal!(
581
22
            "Unable to build CongestionControl params from NetParams"
582
22
        ))?;
583
22
    Ok(CircParameters::new(
584
22
        inp.extend_by_ed25519_id.into(),
585
22
        ccontrol,
586
22
    ))
587
22
}
588

            
589
/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an exit circuit or
590
/// single onion service (when implemented).
591
22
pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
592
22
    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
593
        #[cfg(feature = "flowctl-cc")]
594
        AlgorithmType::VEGAS => {
595
            // TODO(arti#88): We always use fixed window for now,
596
            // even with the "flowctl-cc" feature enabled:
597
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
598
            // We use `if false` so that the vegas cc code is still type checked.
599
            if false {
600
                build_cc_vegas(
601
                    inp,
602
                    (
603
                        inp.cc_vegas_alpha_exit.into(),
604
                        inp.cc_vegas_beta_exit.into(),
605
                        inp.cc_vegas_delta_exit.into(),
606
                        inp.cc_vegas_gamma_exit.into(),
607
                        inp.cc_vegas_sscap_exit.into(),
608
                    )
609
                        .into(),
610
                )
611
            } else {
612
                build_cc_fixedwindow(inp)
613
            }
614
        }
615
        // Unrecognized, fallback to fixed window as in SENDME v0.
616
22
        _ => build_cc_fixedwindow(inp),
617
    };
618
22
    circparameters_from_netparameters(inp, alg)
619
22
}
620

            
621
/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an onion circuit
622
/// which also includes an onion service with Vanguard.
623
pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
624
    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
625
        #[cfg(feature = "flowctl-cc")]
626
        AlgorithmType::VEGAS => {
627
            // TODO(arti#88): We always use fixed window for now,
628
            // even with the "flowctl-cc" feature enabled:
629
            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
630
            // We use `if false` so that the vegas cc code is still type checked.
631
            if false {
632
                build_cc_vegas(
633
                    inp,
634
                    (
635
                        inp.cc_vegas_alpha_onion.into(),
636
                        inp.cc_vegas_beta_onion.into(),
637
                        inp.cc_vegas_delta_onion.into(),
638
                        inp.cc_vegas_gamma_onion.into(),
639
                        inp.cc_vegas_sscap_onion.into(),
640
                    )
641
                        .into(),
642
                )
643
            } else {
644
                build_cc_fixedwindow(inp)
645
            }
646
        }
647
        // Unrecognized, fallback to fixed window as in SENDME v0.
648
        _ => build_cc_fixedwindow(inp),
649
    };
650
    circparameters_from_netparameters(inp, alg)
651
}
652

            
653
/// Helper function: spawn a future as a background task, and run it with
654
/// two separate timeouts.
655
///
656
/// If the future does not complete by `timeout`, then return a
657
/// timeout error immediately, but keep running the future in the
658
/// background.
659
///
660
/// If the future does not complete by `abandon`, then abandon the
661
/// future completely.
662
32
async fn double_timeout<R, F, T>(
663
32
    runtime: &R,
664
32
    fut: F,
665
32
    timeout: Duration,
666
32
    abandon: Duration,
667
32
) -> Result<T>
668
32
where
669
32
    R: Runtime,
670
32
    F: Future<Output = Result<T>> + Send + 'static,
671
32
    T: Send + 'static,
672
32
{
673
32
    let (snd, rcv) = oneshot::channel();
674
32
    let rt = runtime.clone();
675
32
    // We create these futures now, since we want them to look at the current
676
32
    // time when they decide when to expire.
677
32
    let inner_timeout_future = rt.timeout(abandon, fut);
678
32
    let outer_timeout_future = rt.timeout(timeout, rcv);
679
32

            
680
32
    runtime
681
32
        .spawn(async move {
682
32
            let result = inner_timeout_future.await;
683
32
            let _ignore_cancelled_error = snd.send(result);
684
32
        })
685
32
        .map_err(|e| Error::from_spawn("circuit construction task", e))?;
686

            
687
32
    let outcome = outer_timeout_future.await;
688
    // 4 layers of error to collapse:
689
    //     One from the receiver being cancelled.
690
    //     One from the outer timeout.
691
    //     One from the inner timeout.
692
    //     One from the actual future's result.
693
    //
694
    // (Technically, we could refrain from unwrapping the future's result,
695
    // but doing it this way helps make it more certain that we really are
696
    // collapsing all the layers into one.)
697
32
    outcome
698
32
        .map_err(|_| Error::CircTimeout(None))??
699
16
        .map_err(|_| Error::CircTimeout(None))?
700
32
}
701

            
702
#[cfg(test)]
703
mod test {
704
    // @@ begin test lint list maintained by maint/add_warning @@
705
    #![allow(clippy::bool_assert_comparison)]
706
    #![allow(clippy::clone_on_copy)]
707
    #![allow(clippy::dbg_macro)]
708
    #![allow(clippy::mixed_attributes_style)]
709
    #![allow(clippy::print_stderr)]
710
    #![allow(clippy::print_stdout)]
711
    #![allow(clippy::single_char_pattern)]
712
    #![allow(clippy::unwrap_used)]
713
    #![allow(clippy::unchecked_duration_subtraction)]
714
    #![allow(clippy::useless_vec)]
715
    #![allow(clippy::needless_pass_by_value)]
716
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
717
    use super::*;
718
    use crate::timeouts::TimeoutEstimator;
719
    use futures::FutureExt;
720
    use std::sync::Mutex;
721
    use tor_chanmgr::ChannelConfig;
722
    use tor_chanmgr::ChannelUsage as CU;
723
    use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
724
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
725
    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
726
    use tor_proto::memquota::ToplevelAccount;
727
    use tor_rtcompat::SleepProvider;
728
    use tracing::trace;
729

            
730
    /// Make a new nonfunctional `Arc<GuardStatusHandle>`
731
    fn gs() -> Arc<GuardStatusHandle> {
732
        Arc::new(None.into())
733
    }
734

            
735
    #[test]
736
    // Re-enabled after work from eta, discussed in arti#149
737
    fn test_double_timeout() {
738
        let t1 = Duration::from_secs(1);
739
        let t10 = Duration::from_secs(10);
740
        /// Return true if d1 is in range [d2...d2 + 0.5sec]
741
        fn duration_close_to(d1: Duration, d2: Duration) -> bool {
742
            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
743
        }
744

            
745
        tor_rtmock::MockRuntime::test_with_various(|rto| async move {
746
            // Try a future that's ready immediately.
747
            let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
748
            assert!(x.is_ok());
749
            assert_eq!(x.unwrap(), 3_u32);
750

            
751
            trace!("acquiesce after test1");
752
            #[allow(clippy::clone_on_copy)]
753
            #[allow(deprecated)] // TODO #1885
754
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
755

            
756
            // Try a future that's ready after a short delay.
757
            let rt_clone = rt.clone();
758
            // (We only want the short delay to fire, not any of the other timeouts.)
759
            rt_clone.block_advance("manually controlling advances");
760
            let x = rt
761
                .wait_for(double_timeout(
762
                    &rt,
763
                    async move {
764
                        let sl = rt_clone.sleep(Duration::from_millis(100));
765
                        rt_clone.allow_one_advance(Duration::from_millis(100));
766
                        sl.await;
767
                        Ok(4_u32)
768
                    },
769
                    t1,
770
                    t10,
771
                ))
772
                .await;
773
            assert!(x.is_ok());
774
            assert_eq!(x.unwrap(), 4_u32);
775

            
776
            trace!("acquiesce after test2");
777
            #[allow(clippy::clone_on_copy)]
778
            #[allow(deprecated)] // TODO #1885
779
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
780

            
781
            // Try a future that passes the first timeout, and make sure that
782
            // it keeps running after it times out.
783
            let rt_clone = rt.clone();
784
            let (snd, rcv) = oneshot::channel();
785
            let start = rt.now();
786
            rt.block_advance("manually controlling advances");
787
            let x = rt
788
                .wait_for(double_timeout(
789
                    &rt,
790
                    async move {
791
                        let sl = rt_clone.sleep(Duration::from_secs(2));
792
                        rt_clone.allow_one_advance(Duration::from_secs(2));
793
                        sl.await;
794
                        snd.send(()).unwrap();
795
                        Ok(4_u32)
796
                    },
797
                    t1,
798
                    t10,
799
                ))
800
                .await;
801
            assert!(matches!(x, Err(Error::CircTimeout(_))));
802
            let end = rt.now();
803
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
804
            let waited = rt.wait_for(rcv).await;
805
            assert_eq!(waited, Ok(()));
806

            
807
            trace!("acquiesce after test3");
808
            #[allow(clippy::clone_on_copy)]
809
            #[allow(deprecated)] // TODO #1885
810
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
811

            
812
            // Try a future that times out and gets abandoned.
813
            let rt_clone = rt.clone();
814
            rt.block_advance("manually controlling advances");
815
            let (snd, rcv) = oneshot::channel();
816
            let start = rt.now();
817
            // Let it hit the first timeout...
818
            rt.allow_one_advance(Duration::from_secs(1));
819
            let x = rt
820
                .wait_for(double_timeout(
821
                    &rt,
822
                    async move {
823
                        rt_clone.sleep(Duration::from_secs(30)).await;
824
                        snd.send(()).unwrap();
825
                        Ok(4_u32)
826
                    },
827
                    t1,
828
                    t10,
829
                ))
830
                .await;
831
            assert!(matches!(x, Err(Error::CircTimeout(_))));
832
            let end = rt.now();
833
            // ...and let it hit the second, too.
834
            rt.allow_one_advance(Duration::from_secs(9));
835
            let waited = rt.wait_for(rcv).await;
836
            assert!(waited.is_err());
837
            let end2 = rt.now();
838
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
839
            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
840
        });
841
    }
842

            
843
    /// Get a pair of timeouts that we've encoded as an Ed25519 identity.
844
    ///
845
    /// In our FakeCircuit code below, the first timeout is the amount of
846
    /// time that we should sleep while building a hop to this key,
847
    /// and the second timeout is the length of time-advance we should allow
848
    /// after the hop is built.
849
    ///
850
    /// (This is pretty silly, but it's good enough for testing.)
851
    fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
852
        let mut be = [0; 8];
853
        be[..].copy_from_slice(&id.as_bytes()[0..8]);
854
        let dur = u64::from_be_bytes(be);
855
        be[..].copy_from_slice(&id.as_bytes()[8..16]);
856
        let dur2 = u64::from_be_bytes(be);
857
        (Duration::from_millis(dur), Duration::from_millis(dur2))
858
    }
859
    /// Encode a pair of timeouts as an Ed25519 identity.
860
    ///
861
    /// In our FakeCircuit code below, the first timeout is the amount of
862
    /// time that we should sleep while building a hop to this key,
863
    /// and the second timeout is the length of time-advance we should allow
864
    /// after the hop is built.
865
    ///
866
    /// (This is pretty silly but it's good enough for testing.)
867
    fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
868
        let mut bytes = [0; 32];
869
        let dur = (d1.as_millis() as u64).to_be_bytes();
870
        bytes[0..8].copy_from_slice(&dur);
871
        let dur = (d2.as_millis() as u64).to_be_bytes();
872
        bytes[8..16].copy_from_slice(&dur);
873
        bytes.into()
874
    }
875

            
876
    /// As [`timeouts_from_key`], but first extract the relevant key from the
877
    /// OwnedChanTarget.
878
    fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
879
        // Extracting the Ed25519 identity should always succeed in this case:
880
        // we put it there ourselves!
881
        let ed_id = ct
882
            .identity(RelayIdType::Ed25519)
883
            .expect("No ed25519 key was present for fake ChanTarget‽")
884
            .try_into()
885
            .expect("ChanTarget provided wrong key type");
886
        timeouts_from_key(ed_id)
887
    }
888

            
889
    /// Replacement type for circuit, to implement buildable.
890
    #[derive(Debug, Clone)]
891
    struct FakeCirc {
892
        hops: Vec<RelayIds>,
893
        onehop: bool,
894
    }
895
    #[async_trait]
896
    impl Buildable for Mutex<FakeCirc> {
897
        async fn create_chantarget<RT: Runtime>(
898
            _: &ChanMgr<RT>,
899
            rt: &RT,
900
            _guard_status: &GuardStatusHandle,
901
            ct: &OwnedChanTarget,
902
            _: CircParameters,
903
            _usage: ChannelUsage,
904
        ) -> Result<Arc<Self>> {
905
            let (d1, d2) = timeouts_from_chantarget(ct);
906
            rt.sleep(d1).await;
907
            if !d2.is_zero() {
908
                rt.allow_one_advance(d2);
909
            }
910

            
911
            let c = FakeCirc {
912
                hops: vec![RelayIds::from_relay_ids(ct)],
913
                onehop: true,
914
            };
915
            Ok(Arc::new(Mutex::new(c)))
916
        }
917
        async fn create<RT: Runtime>(
918
            _: &ChanMgr<RT>,
919
            rt: &RT,
920
            _guard_status: &GuardStatusHandle,
921
            ct: &OwnedCircTarget,
922
            _: CircParameters,
923
            _usage: ChannelUsage,
924
        ) -> Result<Arc<Self>> {
925
            let (d1, d2) = timeouts_from_chantarget(ct);
926
            rt.sleep(d1).await;
927
            if !d2.is_zero() {
928
                rt.allow_one_advance(d2);
929
            }
930

            
931
            let c = FakeCirc {
932
                hops: vec![RelayIds::from_relay_ids(ct)],
933
                onehop: false,
934
            };
935
            Ok(Arc::new(Mutex::new(c)))
936
        }
937
        async fn extend<RT: Runtime>(
938
            &self,
939
            rt: &RT,
940
            ct: &OwnedCircTarget,
941
            _: CircParameters,
942
        ) -> Result<()> {
943
            let (d1, d2) = timeouts_from_chantarget(ct);
944
            rt.sleep(d1).await;
945
            if !d2.is_zero() {
946
                rt.allow_one_advance(d2);
947
            }
948

            
949
            {
950
                let mut c = self.lock().unwrap();
951
                c.hops.push(RelayIds::from_relay_ids(ct));
952
            }
953
            Ok(())
954
        }
955
    }
956

            
957
    /// Fake implementation of TimeoutEstimator that just records its inputs.
958
    struct TimeoutRecorder<R> {
959
        runtime: R,
960
        hist: Vec<(bool, u8, Duration)>,
961
        // How much advance to permit after being told of a timeout?
962
        on_timeout: Duration,
963
        // How much advance to permit after being told of a success?
964
        on_success: Duration,
965

            
966
        snd_success: Option<oneshot::Sender<()>>,
967
        rcv_success: Option<oneshot::Receiver<()>>,
968
    }
969

            
970
    impl<R> TimeoutRecorder<R> {
971
        fn new(runtime: R) -> Self {
972
            Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
973
        }
974

            
975
        fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
976
            let (snd_success, rcv_success) = oneshot::channel();
977
            Self {
978
                runtime,
979
                hist: Vec::new(),
980
                on_timeout,
981
                on_success,
982
                rcv_success: Some(rcv_success),
983
                snd_success: Some(snd_success),
984
            }
985
        }
986
    }
987
    impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
988
        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
989
            if !is_last {
990
                return;
991
            }
992
            let (rt, advance) = {
993
                let mut this = self.lock().unwrap();
994
                this.hist.push((true, hop, delay));
995
                let _ = this.snd_success.take().unwrap().send(());
996
                (this.runtime.clone(), this.on_success)
997
            };
998
            if !advance.is_zero() {
999
                rt.allow_one_advance(advance);
            }
        }
        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
            let (rt, advance) = {
                let mut this = self.lock().unwrap();
                this.hist.push((false, hop, delay));
                (this.runtime.clone(), this.on_timeout)
            };
            if !advance.is_zero() {
                rt.allow_one_advance(advance);
            }
        }
        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
            (Duration::from_secs(3), Duration::from_secs(100))
        }
        fn learning_timeouts(&self) -> bool {
            false
        }
        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
            None
        }
    }
    /// Testing only: create a bogus circuit target
    fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
        let mut builder = OwnedCircTarget::builder();
        builder
            .chan_target()
            .ed_identity(id)
            .rsa_identity([0x20; 20].into());
        builder
            .ntor_onion_key([0x33; 32].into())
            .protocols("".parse().unwrap())
            .build()
            .unwrap()
    }
    /// Testing only: create a bogus channel target
    fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
        OwnedChanTarget::builder()
            .ed_identity(id)
            .rsa_identity([0x20; 20].into())
            .build()
            .unwrap()
    }
    async fn run_builder_test(
        rt: tor_rtmock::MockRuntime,
        advance_initial: Duration,
        path: OwnedPath,
        advance_on_timeout: Option<(Duration, Duration)>,
        usage: ChannelUsage,
    ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
        let chanmgr = Arc::new(ChanMgr::new(
            rt.clone(),
            &ChannelConfig::default(),
            Default::default(),
            &Default::default(),
            ToplevelAccount::new_noop(),
        ));
        // always has 3 second timeout, 100 second abandon.
        let timeouts = match advance_on_timeout {
            Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
            None => TimeoutRecorder::new(rt.clone()),
        };
        let timeouts = Arc::new(Mutex::new(timeouts));
        let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
            rt.clone(),
            chanmgr,
            timeouts::Estimator::new(Arc::clone(&timeouts)),
        );
        rt.block_advance("manually controlling advances");
        rt.allow_one_advance(advance_initial);
        let outcome = rt.spawn_join("build-owned", async move {
            let arcbuilder = Arc::new(builder);
            let params = exit_circparams_from_netparams(&NetParameters::default())?;
            arcbuilder.build_owned(path, &params, gs(), usage).await
        });
        // Now we wait for a success to finally, finally be reported.
        if advance_on_timeout.is_some() {
            let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
            rt.spawn_identified("receiver", async move {
                receiver.await.unwrap();
            });
        }
        rt.advance_until_stalled().await;
        let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
        let timeouts = timeouts.lock().unwrap().hist.clone();
        (circ, timeouts)
    }
    #[test]
    fn build_onehop() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
            let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
            let (outcome, timeouts) =
                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
            let circ = outcome.unwrap();
            assert!(circ.onehop);
            assert_eq!(circ.hops.len(), 1);
            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
            assert_eq!(timeouts.len(), 1);
            assert!(timeouts[0].0); // success
            assert_eq!(timeouts[0].1, 0); // one-hop
            assert_eq!(timeouts[0].2, Duration::from_millis(100));
        });
    }
    #[test]
    fn build_threehop() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms =
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
            let id_200ms =
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
            let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
            let path =
                OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
            let (outcome, timeouts) =
                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
            let circ = outcome.unwrap();
            assert!(!circ.onehop);
            assert_eq!(circ.hops.len(), 3);
            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
            assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
            assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
            assert_eq!(timeouts.len(), 1);
            assert!(timeouts[0].0); // success
            assert_eq!(timeouts[0].1, 2); // three-hop
            assert_eq!(timeouts[0].2, Duration::from_millis(600));
        });
    }
    #[test]
    fn build_huge_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms =
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
            let id_200ms =
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
            let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
            let (outcome, timeouts) =
                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
            assert_eq!(timeouts.len(), 1);
            assert!(!timeouts[0].0); // timeout
            // BUG: Sometimes this is 1 and sometimes this is 2.
            // assert_eq!(timeouts[0].1, 2); // at third hop.
            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
        });
    }
    #[test]
    fn build_modest_timeout() {
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
            let id_100ms =
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
            let id_200ms =
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
            let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
            let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
            let (outcome, timeouts) = run_builder_test(
                rt.clone(),
                Duration::from_millis(100),
                path,
                Some(timeout_advance),
                CU::UserTraffic,
            )
            .await;
            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
            assert_eq!(timeouts.len(), 2);
            assert!(!timeouts[0].0); // timeout
            // BUG: Sometimes this is 1 and sometimes this is 2.
            //assert_eq!(timeouts[0].1, 2); // at third hop.
            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
            assert!(timeouts[1].0); // success
            assert_eq!(timeouts[1].1, 2); // three-hop
                                          // BUG: This timer is not always reliable, due to races.
                                          //assert_eq!(timeouts[1].2, Duration::from_millis(3300));
        });
    }
}