tor_circmgr/
build.rs

1//! Facilities to build circuits directly, instead of via a circuit manager.
2
3use crate::path::{OwnedPath, TorPath};
4use crate::timeouts::{self, Action};
5use crate::{Error, Result};
6use async_trait::async_trait;
7use futures::task::SpawnExt;
8use futures::Future;
9use oneshot_fused_workaround as oneshot;
10use std::sync::{
11    atomic::{AtomicU32, Ordering},
12    Arc,
13};
14use std::time::{Duration, Instant};
15use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
16use tor_error::into_internal;
17use tor_guardmgr::GuardStatus;
18use tor_linkspec::{ChanTarget, IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
19use tor_netdir::params::NetParameters;
20use tor_proto::ccparams::{self, AlgorithmType};
21use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
22use tor_rtcompat::{Runtime, SleepProviderExt};
23use tor_units::Percentage;
24
25#[cfg(all(feature = "vanguards", feature = "hs-common"))]
26use tor_guardmgr::vanguards::VanguardMgr;
27
28mod guardstatus;
29
30pub(crate) use guardstatus::GuardStatusHandle;
31
32/// Represents an objects that can be constructed in a circuit-like way.
33///
34/// This is only a separate trait for testing purposes, so that we can swap
35/// our some other type when we're testing Builder.
36///
37/// TODO: I'd like to have a simpler testing strategy here; this one
38/// complicates things a bit.
39#[async_trait]
40pub(crate) trait Buildable: Sized {
41    /// Launch a new one-hop circuit to a given relay, given only a
42    /// channel target `ct` specifying that relay.
43    ///
44    /// (Since we don't have a CircTarget here, we can't extend the circuit
45    /// to be multihop later on.)
46    async fn create_chantarget<RT: Runtime>(
47        chanmgr: &ChanMgr<RT>,
48        rt: &RT,
49        guard_status: &GuardStatusHandle,
50        ct: &OwnedChanTarget,
51        params: CircParameters,
52        usage: ChannelUsage,
53    ) -> Result<Arc<Self>>;
54
55    /// Launch a new circuit through a given relay, given a circuit target
56    /// `ct` specifying that relay.
57    async fn create<RT: Runtime>(
58        chanmgr: &ChanMgr<RT>,
59        rt: &RT,
60        guard_status: &GuardStatusHandle,
61        ct: &OwnedCircTarget,
62        params: CircParameters,
63        usage: ChannelUsage,
64    ) -> Result<Arc<Self>>;
65
66    /// Extend this circuit-like object by one hop, to the location described
67    /// in `ct`.
68    async fn extend<RT: Runtime>(
69        &self,
70        rt: &RT,
71        ct: &OwnedCircTarget,
72        params: CircParameters,
73    ) -> Result<()>;
74}
75
76/// Try to make a [`PendingClientCirc`] to a given relay, and start its
77/// reactor.
78///
79/// This is common code, shared by all the first-hop functions in the
80/// implementation of `Buildable` for `Arc<ClientCirc>`.
81async fn create_common<RT: Runtime, CT: ChanTarget>(
82    chanmgr: &ChanMgr<RT>,
83    rt: &RT,
84    target: &CT,
85    guard_status: &GuardStatusHandle,
86    usage: ChannelUsage,
87) -> Result<PendingClientCirc> {
88    // Get or construct the channel.
89    let result = chanmgr.get_or_launch(target, usage).await;
90
91    // Report the clock skew if appropriate, and exit if there has been an error.
92    let chan = match result {
93        Ok((chan, ChanProvenance::NewlyCreated)) => {
94            guard_status.skew(chan.clock_skew());
95            chan
96        }
97        Ok((chan, _)) => chan,
98        Err(cause) => {
99            if let Some(skew) = cause.clock_skew() {
100                guard_status.skew(skew);
101            }
102            return Err(Error::Channel {
103                peer: target.to_logged(),
104                cause,
105            });
106        }
107    };
108    // Construct the (zero-hop) circuit.
109    let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol {
110        error,
111        peer: None, // we don't blame the peer, because new_circ() does no networking.
112        action: "initializing circuit",
113        unique_id: None,
114    })?;
115
116    rt.spawn(async {
117        let _ = reactor.run().await;
118    })
119    .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
120
121    Ok(pending_circ)
122}
123
124#[async_trait]
125impl Buildable for ClientCirc {
126    async fn create_chantarget<RT: Runtime>(
127        chanmgr: &ChanMgr<RT>,
128        rt: &RT,
129        guard_status: &GuardStatusHandle,
130        ct: &OwnedChanTarget,
131        params: CircParameters,
132        usage: ChannelUsage,
133    ) -> Result<Arc<Self>> {
134        let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
135        let unique_id = Some(circ.peek_unique_id());
136        circ.create_firsthop_fast(params)
137            .await
138            .map_err(|error| Error::Protocol {
139                peer: Some(ct.to_logged()),
140                error,
141                action: "running CREATE_FAST handshake",
142                unique_id,
143            })
144    }
145    async fn create<RT: Runtime>(
146        chanmgr: &ChanMgr<RT>,
147        rt: &RT,
148        guard_status: &GuardStatusHandle,
149        ct: &OwnedCircTarget,
150        params: CircParameters,
151        usage: ChannelUsage,
152    ) -> Result<Arc<Self>> {
153        let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
154        let unique_id = Some(circ.peek_unique_id());
155
156        let handshake_res = circ.create_firsthop(ct, params).await;
157
158        handshake_res.map_err(|error| Error::Protocol {
159            peer: Some(ct.to_logged()),
160            error,
161            action: "creating first hop",
162            unique_id,
163        })
164    }
165    async fn extend<RT: Runtime>(
166        &self,
167        _rt: &RT,
168        ct: &OwnedCircTarget,
169        params: CircParameters,
170    ) -> Result<()> {
171        // use "ClientCirc::" name to avoid calling _this_ method.
172        let res = ClientCirc::extend(self, ct, params).await;
173
174        res.map_err(|error| Error::Protocol {
175            error,
176            // We can't know who caused the error, since it may have been
177            // the hop we were extending from, or the hop we were extending
178            // to.
179            peer: None,
180            action: "extending circuit",
181            unique_id: Some(self.unique_id()),
182        })
183    }
184}
185
186/// An implementation type for [`CircuitBuilder`].
187///
188/// A `CircuitBuilder` holds references to all the objects that are needed
189/// to build circuits correctly.
190///
191/// In general, you should not need to construct or use this object yourself,
192/// unless you are choosing your own paths.
193struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
194    /// The runtime used by this circuit builder.
195    runtime: R,
196    /// A channel manager that this circuit builder uses to make channels.
197    chanmgr: Arc<ChanMgr<R>>,
198    /// An estimator to determine the correct timeouts for circuit building.
199    timeouts: timeouts::Estimator,
200    /// We don't actually hold any clientcircs, so we need to put this
201    /// type here so the compiler won't freak out.
202    _phantom: std::marker::PhantomData<C>,
203}
204
205impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
206    /// Construct a new [`Builder`].
207    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
208        Builder {
209            runtime,
210            chanmgr,
211            timeouts,
212            _phantom: std::marker::PhantomData,
213        }
214    }
215
216    /// Build a circuit, without performing any timeout operations.
217    ///
218    /// After each hop is built, increments n_hops_built.  Make sure that
219    /// `guard_status` has its pending status set correctly to correspond
220    /// to a circuit failure at any given stage.
221    ///
222    /// (TODO: Find
223    /// a better design there.)
224    async fn build_notimeout(
225        self: Arc<Self>,
226        path: OwnedPath,
227        params: CircParameters,
228        start_time: Instant,
229        n_hops_built: Arc<AtomicU32>,
230        guard_status: Arc<GuardStatusHandle>,
231        usage: ChannelUsage,
232    ) -> Result<Arc<C>> {
233        match path {
234            OwnedPath::ChannelOnly(target) => {
235                // If we fail now, it's the guard's fault.
236                guard_status.pending(GuardStatus::Failure);
237                let circ = C::create_chantarget(
238                    &self.chanmgr,
239                    &self.runtime,
240                    &guard_status,
241                    &target,
242                    params,
243                    usage,
244                )
245                .await?;
246                self.timeouts
247                    .note_hop_completed(0, self.runtime.now() - start_time, true);
248                n_hops_built.fetch_add(1, Ordering::SeqCst);
249                Ok(circ)
250            }
251            OwnedPath::Normal(p) => {
252                assert!(!p.is_empty());
253                let n_hops = p.len() as u8;
254                // If we fail now, it's the guard's fault.
255                guard_status.pending(GuardStatus::Failure);
256                // Each hop has its own circ parameters. This is for the first hop (CREATE).
257                let circ = C::create(
258                    &self.chanmgr,
259                    &self.runtime,
260                    &guard_status,
261                    &p[0],
262                    params.clone(),
263                    usage,
264                )
265                .await?;
266                self.timeouts
267                    .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
268                // If we fail after this point, we can't tell whether it's
269                // the fault of the guard or some later relay.
270                guard_status.pending(GuardStatus::Indeterminate);
271                n_hops_built.fetch_add(1, Ordering::SeqCst);
272                let mut hop_num = 1;
273                for relay in p[1..].iter() {
274                    // Get the params per subsequent hop (EXTEND).
275                    circ.extend(&self.runtime, relay, params.clone()).await?;
276                    n_hops_built.fetch_add(1, Ordering::SeqCst);
277                    self.timeouts.note_hop_completed(
278                        hop_num,
279                        self.runtime.now() - start_time,
280                        hop_num == (n_hops - 1),
281                    );
282                    hop_num += 1;
283                }
284                Ok(circ)
285            }
286        }
287    }
288
289    /// Build a circuit from an [`OwnedPath`].
290    async fn build_owned(
291        self: &Arc<Self>,
292        path: OwnedPath,
293        params: &CircParameters,
294        guard_status: Arc<GuardStatusHandle>,
295        usage: ChannelUsage,
296    ) -> Result<Arc<C>> {
297        let action = Action::BuildCircuit { length: path.len() };
298        let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
299        let start_time = self.runtime.now();
300
301        // TODO: This is probably not the best way for build_notimeout to
302        // tell us how many hops it managed to build, but at least it is
303        // isolated here.
304        let hops_built = Arc::new(AtomicU32::new(0));
305
306        let self_clone = Arc::clone(self);
307        let params = params.clone();
308
309        let circuit_future = self_clone.build_notimeout(
310            path,
311            params,
312            start_time,
313            Arc::clone(&hops_built),
314            guard_status,
315            usage,
316        );
317
318        match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
319            Ok(circuit) => Ok(circuit),
320            Err(Error::CircTimeout(unique_id)) => {
321                let n_built = hops_built.load(Ordering::SeqCst);
322                self.timeouts
323                    .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
324                Err(Error::CircTimeout(unique_id))
325            }
326            Err(e) => Err(e),
327        }
328    }
329
330    /// Return a reference to this Builder runtime.
331    pub(crate) fn runtime(&self) -> &R {
332        &self.runtime
333    }
334
335    /// Return a reference to this Builder's timeout estimator.
336    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
337        &self.timeouts
338    }
339}
340
341/// A factory object to build circuits.
342///
343/// A `CircuitBuilder` holds references to all the objects that are needed
344/// to build circuits correctly.
345///
346/// In general, you should not need to construct or use this object yourself,
347/// unless you are choosing your own paths.
348pub struct CircuitBuilder<R: Runtime> {
349    /// The underlying [`Builder`] object
350    builder: Arc<Builder<R, ClientCirc>>,
351    /// Configuration for how to choose paths for circuits.
352    path_config: tor_config::MutCfg<crate::PathConfig>,
353    /// State-manager object to use in storing current state.
354    storage: crate::TimeoutStateHandle,
355    /// Guard manager to tell us which guards nodes to use for the circuits
356    /// we build.
357    guardmgr: tor_guardmgr::GuardMgr<R>,
358    /// The vanguard manager object used for HS circuits.
359    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
360    vanguardmgr: Arc<VanguardMgr<R>>,
361}
362
363impl<R: Runtime> CircuitBuilder<R> {
364    /// Construct a new [`CircuitBuilder`].
365    // TODO: eventually I'd like to make this a public function, but
366    // TimeoutStateHandle is private.
367    pub(crate) fn new(
368        runtime: R,
369        chanmgr: Arc<ChanMgr<R>>,
370        path_config: crate::PathConfig,
371        storage: crate::TimeoutStateHandle,
372        guardmgr: tor_guardmgr::GuardMgr<R>,
373        #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
374    ) -> Self {
375        let timeouts = timeouts::Estimator::from_storage(&storage);
376
377        CircuitBuilder {
378            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
379            path_config: path_config.into(),
380            storage,
381            guardmgr,
382            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
383            vanguardmgr: Arc::new(vanguardmgr),
384        }
385    }
386
387    /// Return this builder's [`PathConfig`](crate::PathConfig).
388    pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
389        self.path_config.get()
390    }
391
392    /// Replace this builder's [`PathConfig`](crate::PathConfig).
393    pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
394        self.path_config.replace(new_config);
395    }
396
397    /// Flush state to the state manager if we own the lock.
398    ///
399    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
400    pub(crate) fn save_state(&self) -> Result<bool> {
401        if !self.storage.can_store() {
402            return Ok(false);
403        }
404        // TODO: someday we'll want to only do this if there is something
405        // changed.
406        self.builder.timeouts.save_state(&self.storage)?;
407        self.guardmgr.store_persistent_state()?;
408        Ok(true)
409    }
410
411    /// Replace our state with a new owning state, assuming we have
412    /// storage permission.
413    pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
414        self.builder
415            .timeouts
416            .upgrade_to_owning_storage(&self.storage);
417        self.guardmgr.upgrade_to_owned_persistent_state()?;
418        Ok(())
419    }
420
421    /// Reload persistent state from disk, if we don't have storage permission.
422    pub(crate) fn reload_state(&self) -> Result<()> {
423        if !self.storage.can_store() {
424            self.builder
425                .timeouts
426                .reload_readonly_from_storage(&self.storage);
427        }
428        self.guardmgr.reload_persistent_state()?;
429        Ok(())
430    }
431
432    /// Reconfigure this builder using the latest set of network parameters.
433    ///
434    /// (NOTE: for now, this only affects circuit timeout estimation.)
435    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
436        self.builder.timeouts.update_params(p);
437    }
438
439    /// Like `build`, but construct a new circuit from an [`OwnedPath`].
440    pub(crate) async fn build_owned(
441        &self,
442        path: OwnedPath,
443        params: &CircParameters,
444        guard_status: Arc<GuardStatusHandle>,
445        usage: ChannelUsage,
446    ) -> Result<Arc<ClientCirc>> {
447        self.builder
448            .build_owned(path, params, guard_status, usage)
449            .await
450    }
451
452    /// Try to construct a new circuit from a given path, using appropriate
453    /// timeouts.
454    ///
455    /// This circuit is _not_ automatically registered with any
456    /// circuit manager; if you don't hang on it it, it will
457    /// automatically go away when the last reference is dropped.
458    pub async fn build(
459        &self,
460        path: &TorPath<'_>,
461        params: &CircParameters,
462        usage: ChannelUsage,
463    ) -> Result<Arc<ClientCirc>> {
464        let owned = path.try_into()?;
465        self.build_owned(owned, params, Arc::new(None.into()), usage)
466            .await
467    }
468
469    /// Return true if this builder is currently learning timeout info.
470    pub(crate) fn learning_timeouts(&self) -> bool {
471        self.builder.timeouts.learning_timeouts()
472    }
473
474    /// Return a reference to this builder's `GuardMgr`.
475    pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
476        &self.guardmgr
477    }
478
479    /// Return a reference to this builder's `VanguardMgr`.
480    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
481    pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
482        &self.vanguardmgr
483    }
484
485    /// Return a reference to this builder's runtime
486    pub(crate) fn runtime(&self) -> &R {
487        self.builder.runtime()
488    }
489
490    /// Return a reference to this builder's timeout estimator.
491    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
492        self.builder.estimator()
493    }
494}
495
496/// Return the congestion control Vegas algorithm using the given network parameters.
497#[cfg(feature = "flowctl-cc")]
498fn build_cc_vegas(
499    inp: &NetParameters,
500    vegas_queue_params: ccparams::VegasQueueParams,
501) -> ccparams::Algorithm {
502    ccparams::Algorithm::Vegas(
503        ccparams::VegasParamsBuilder::default()
504            .cell_in_queue_params(vegas_queue_params)
505            .ss_cwnd_max(inp.cc_ss_max.into())
506            .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
507            .cwnd_full_min_pct(Percentage::new(
508                inp.cc_cwnd_full_minpct.as_percent().get() as u32
509            ))
510            .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
511            .build()
512            .expect("Unable to build Vegas params from NetParams"),
513    )
514}
515
516/// Return the congestion control FixedWindow algorithm using the given network parameters.
517fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
518    ccparams::Algorithm::FixedWindow(build_cc_fixedwindow_params(inp))
519}
520
521/// Return the parameters for the congestion control FixedWindow algorithm
522/// using the given network parameters.
523fn build_cc_fixedwindow_params(inp: &NetParameters) -> ccparams::FixedWindowParams {
524    ccparams::FixedWindowParamsBuilder::default()
525        .circ_window_start(inp.circuit_window.get() as u16)
526        .circ_window_min(inp.circuit_window.lower() as u16)
527        .circ_window_max(inp.circuit_window.upper() as u16)
528        .build()
529        .expect("Unable to build FixedWindow params from NetParams")
530}
531
532/// Return a new circuit parameter struct using the given network parameters and algorithm to use.
533fn circparameters_from_netparameters(
534    inp: &NetParameters,
535    alg: ccparams::Algorithm,
536) -> Result<CircParameters> {
537    let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
538        .cwnd_init(inp.cc_cwnd_init.into())
539        .cwnd_inc_pct_ss(Percentage::new(
540            inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
541        ))
542        .cwnd_inc(inp.cc_cwnd_inc.into())
543        .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
544        .cwnd_min(inp.cc_cwnd_min.into())
545        .cwnd_max(inp.cc_cwnd_max.into())
546        .sendme_inc(inp.cc_sendme_inc.into())
547        .build()
548        .map_err(into_internal!(
549            "Unable to build CongestionWindow params from NetParams"
550        ))?;
551    let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
552        .ewma_cwnd_pct(Percentage::new(
553            inp.cc_ewma_cwnd_pct.as_percent().get() as u32
554        ))
555        .ewma_max(inp.cc_ewma_max.into())
556        .ewma_ss_max(inp.cc_ewma_ss.into())
557        .rtt_reset_pct(Percentage::new(
558            inp.cc_rtt_reset_pct.as_percent().get() as u32
559        ))
560        .build()
561        .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
562    let ccontrol = ccparams::CongestionControlParamsBuilder::default()
563        .alg(alg)
564        .fixed_window_params(build_cc_fixedwindow_params(inp))
565        .cwnd_params(cwnd_params)
566        .rtt_params(rtt_params)
567        .build()
568        .map_err(into_internal!(
569            "Unable to build CongestionControl params from NetParams"
570        ))?;
571    Ok(CircParameters::new(
572        inp.extend_by_ed25519_id.into(),
573        ccontrol,
574    ))
575}
576
577/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an exit circuit or
578/// single onion service (when implemented).
579pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
580    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
581        #[cfg(feature = "flowctl-cc")]
582        AlgorithmType::VEGAS => {
583            // TODO(arti#88): We always use fixed window for now,
584            // even with the "flowctl-cc" feature enabled:
585            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
586            // We use `if false` so that the vegas cc code is still type checked.
587            if false {
588                build_cc_vegas(
589                    inp,
590                    (
591                        inp.cc_vegas_alpha_exit.into(),
592                        inp.cc_vegas_beta_exit.into(),
593                        inp.cc_vegas_delta_exit.into(),
594                        inp.cc_vegas_gamma_exit.into(),
595                        inp.cc_vegas_sscap_exit.into(),
596                    )
597                        .into(),
598                )
599            } else {
600                build_cc_fixedwindow(inp)
601            }
602        }
603        // Unrecognized, fallback to fixed window as in SENDME v0.
604        _ => build_cc_fixedwindow(inp),
605    };
606    circparameters_from_netparameters(inp, alg)
607}
608
609/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an onion circuit
610/// which also includes an onion service with Vanguard.
611pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
612    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
613        #[cfg(feature = "flowctl-cc")]
614        AlgorithmType::VEGAS => {
615            // TODO(arti#88): We always use fixed window for now,
616            // even with the "flowctl-cc" feature enabled:
617            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
618            // We use `if false` so that the vegas cc code is still type checked.
619            if false {
620                build_cc_vegas(
621                    inp,
622                    (
623                        inp.cc_vegas_alpha_onion.into(),
624                        inp.cc_vegas_beta_onion.into(),
625                        inp.cc_vegas_delta_onion.into(),
626                        inp.cc_vegas_gamma_onion.into(),
627                        inp.cc_vegas_sscap_onion.into(),
628                    )
629                        .into(),
630                )
631            } else {
632                build_cc_fixedwindow(inp)
633            }
634        }
635        // Unrecognized, fallback to fixed window as in SENDME v0.
636        _ => build_cc_fixedwindow(inp),
637    };
638    circparameters_from_netparameters(inp, alg)
639}
640
641/// Helper function: spawn a future as a background task, and run it with
642/// two separate timeouts.
643///
644/// If the future does not complete by `timeout`, then return a
645/// timeout error immediately, but keep running the future in the
646/// background.
647///
648/// If the future does not complete by `abandon`, then abandon the
649/// future completely.
650async fn double_timeout<R, F, T>(
651    runtime: &R,
652    fut: F,
653    timeout: Duration,
654    abandon: Duration,
655) -> Result<T>
656where
657    R: Runtime,
658    F: Future<Output = Result<T>> + Send + 'static,
659    T: Send + 'static,
660{
661    let (snd, rcv) = oneshot::channel();
662    let rt = runtime.clone();
663    // We create these futures now, since we want them to look at the current
664    // time when they decide when to expire.
665    let inner_timeout_future = rt.timeout(abandon, fut);
666    let outer_timeout_future = rt.timeout(timeout, rcv);
667
668    runtime
669        .spawn(async move {
670            let result = inner_timeout_future.await;
671            let _ignore_cancelled_error = snd.send(result);
672        })
673        .map_err(|e| Error::from_spawn("circuit construction task", e))?;
674
675    let outcome = outer_timeout_future.await;
676    // 4 layers of error to collapse:
677    //     One from the receiver being cancelled.
678    //     One from the outer timeout.
679    //     One from the inner timeout.
680    //     One from the actual future's result.
681    //
682    // (Technically, we could refrain from unwrapping the future's result,
683    // but doing it this way helps make it more certain that we really are
684    // collapsing all the layers into one.)
685    outcome
686        .map_err(|_| Error::CircTimeout(None))??
687        .map_err(|_| Error::CircTimeout(None))?
688}
689
690#[cfg(test)]
691mod test {
692    // @@ begin test lint list maintained by maint/add_warning @@
693    #![allow(clippy::bool_assert_comparison)]
694    #![allow(clippy::clone_on_copy)]
695    #![allow(clippy::dbg_macro)]
696    #![allow(clippy::mixed_attributes_style)]
697    #![allow(clippy::print_stderr)]
698    #![allow(clippy::print_stdout)]
699    #![allow(clippy::single_char_pattern)]
700    #![allow(clippy::unwrap_used)]
701    #![allow(clippy::unchecked_duration_subtraction)]
702    #![allow(clippy::useless_vec)]
703    #![allow(clippy::needless_pass_by_value)]
704    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
705    use super::*;
706    use crate::timeouts::TimeoutEstimator;
707    use futures::FutureExt;
708    use std::sync::Mutex;
709    use tor_chanmgr::ChannelConfig;
710    use tor_chanmgr::ChannelUsage as CU;
711    use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
712    use tor_llcrypto::pk::ed25519::Ed25519Identity;
713    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
714    use tor_proto::memquota::ToplevelAccount;
715    use tor_rtcompat::SleepProvider;
716    use tracing::trace;
717
718    /// Make a new nonfunctional `Arc<GuardStatusHandle>`
719    fn gs() -> Arc<GuardStatusHandle> {
720        Arc::new(None.into())
721    }
722
723    #[test]
724    // Re-enabled after work from eta, discussed in arti#149
725    fn test_double_timeout() {
726        let t1 = Duration::from_secs(1);
727        let t10 = Duration::from_secs(10);
728        /// Return true if d1 is in range [d2...d2 + 0.5sec]
729        fn duration_close_to(d1: Duration, d2: Duration) -> bool {
730            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
731        }
732
733        tor_rtmock::MockRuntime::test_with_various(|rto| async move {
734            // Try a future that's ready immediately.
735            let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
736            assert!(x.is_ok());
737            assert_eq!(x.unwrap(), 3_u32);
738
739            trace!("acquiesce after test1");
740            #[allow(clippy::clone_on_copy)]
741            #[allow(deprecated)] // TODO #1885
742            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
743
744            // Try a future that's ready after a short delay.
745            let rt_clone = rt.clone();
746            // (We only want the short delay to fire, not any of the other timeouts.)
747            rt_clone.block_advance("manually controlling advances");
748            let x = rt
749                .wait_for(double_timeout(
750                    &rt,
751                    async move {
752                        let sl = rt_clone.sleep(Duration::from_millis(100));
753                        rt_clone.allow_one_advance(Duration::from_millis(100));
754                        sl.await;
755                        Ok(4_u32)
756                    },
757                    t1,
758                    t10,
759                ))
760                .await;
761            assert!(x.is_ok());
762            assert_eq!(x.unwrap(), 4_u32);
763
764            trace!("acquiesce after test2");
765            #[allow(clippy::clone_on_copy)]
766            #[allow(deprecated)] // TODO #1885
767            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
768
769            // Try a future that passes the first timeout, and make sure that
770            // it keeps running after it times out.
771            let rt_clone = rt.clone();
772            let (snd, rcv) = oneshot::channel();
773            let start = rt.now();
774            rt.block_advance("manually controlling advances");
775            let x = rt
776                .wait_for(double_timeout(
777                    &rt,
778                    async move {
779                        let sl = rt_clone.sleep(Duration::from_secs(2));
780                        rt_clone.allow_one_advance(Duration::from_secs(2));
781                        sl.await;
782                        snd.send(()).unwrap();
783                        Ok(4_u32)
784                    },
785                    t1,
786                    t10,
787                ))
788                .await;
789            assert!(matches!(x, Err(Error::CircTimeout(_))));
790            let end = rt.now();
791            assert!(duration_close_to(end - start, Duration::from_secs(1)));
792            let waited = rt.wait_for(rcv).await;
793            assert_eq!(waited, Ok(()));
794
795            trace!("acquiesce after test3");
796            #[allow(clippy::clone_on_copy)]
797            #[allow(deprecated)] // TODO #1885
798            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
799
800            // Try a future that times out and gets abandoned.
801            let rt_clone = rt.clone();
802            rt.block_advance("manually controlling advances");
803            let (snd, rcv) = oneshot::channel();
804            let start = rt.now();
805            // Let it hit the first timeout...
806            rt.allow_one_advance(Duration::from_secs(1));
807            let x = rt
808                .wait_for(double_timeout(
809                    &rt,
810                    async move {
811                        rt_clone.sleep(Duration::from_secs(30)).await;
812                        snd.send(()).unwrap();
813                        Ok(4_u32)
814                    },
815                    t1,
816                    t10,
817                ))
818                .await;
819            assert!(matches!(x, Err(Error::CircTimeout(_))));
820            let end = rt.now();
821            // ...and let it hit the second, too.
822            rt.allow_one_advance(Duration::from_secs(9));
823            let waited = rt.wait_for(rcv).await;
824            assert!(waited.is_err());
825            let end2 = rt.now();
826            assert!(duration_close_to(end - start, Duration::from_secs(1)));
827            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
828        });
829    }
830
831    /// Get a pair of timeouts that we've encoded as an Ed25519 identity.
832    ///
833    /// In our FakeCircuit code below, the first timeout is the amount of
834    /// time that we should sleep while building a hop to this key,
835    /// and the second timeout is the length of time-advance we should allow
836    /// after the hop is built.
837    ///
838    /// (This is pretty silly, but it's good enough for testing.)
839    fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
840        let mut be = [0; 8];
841        be[..].copy_from_slice(&id.as_bytes()[0..8]);
842        let dur = u64::from_be_bytes(be);
843        be[..].copy_from_slice(&id.as_bytes()[8..16]);
844        let dur2 = u64::from_be_bytes(be);
845        (Duration::from_millis(dur), Duration::from_millis(dur2))
846    }
847    /// Encode a pair of timeouts as an Ed25519 identity.
848    ///
849    /// In our FakeCircuit code below, the first timeout is the amount of
850    /// time that we should sleep while building a hop to this key,
851    /// and the second timeout is the length of time-advance we should allow
852    /// after the hop is built.
853    ///
854    /// (This is pretty silly but it's good enough for testing.)
855    fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
856        let mut bytes = [0; 32];
857        let dur = (d1.as_millis() as u64).to_be_bytes();
858        bytes[0..8].copy_from_slice(&dur);
859        let dur = (d2.as_millis() as u64).to_be_bytes();
860        bytes[8..16].copy_from_slice(&dur);
861        bytes.into()
862    }
863
864    /// As [`timeouts_from_key`], but first extract the relevant key from the
865    /// OwnedChanTarget.
866    fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
867        // Extracting the Ed25519 identity should always succeed in this case:
868        // we put it there ourselves!
869        let ed_id = ct
870            .identity(RelayIdType::Ed25519)
871            .expect("No ed25519 key was present for fake ChanTarget‽")
872            .try_into()
873            .expect("ChanTarget provided wrong key type");
874        timeouts_from_key(ed_id)
875    }
876
877    /// Replacement type for circuit, to implement buildable.
878    #[derive(Debug, Clone)]
879    struct FakeCirc {
880        hops: Vec<RelayIds>,
881        onehop: bool,
882    }
883    #[async_trait]
884    impl Buildable for Mutex<FakeCirc> {
885        async fn create_chantarget<RT: Runtime>(
886            _: &ChanMgr<RT>,
887            rt: &RT,
888            _guard_status: &GuardStatusHandle,
889            ct: &OwnedChanTarget,
890            _: CircParameters,
891            _usage: ChannelUsage,
892        ) -> Result<Arc<Self>> {
893            let (d1, d2) = timeouts_from_chantarget(ct);
894            rt.sleep(d1).await;
895            if !d2.is_zero() {
896                rt.allow_one_advance(d2);
897            }
898
899            let c = FakeCirc {
900                hops: vec![RelayIds::from_relay_ids(ct)],
901                onehop: true,
902            };
903            Ok(Arc::new(Mutex::new(c)))
904        }
905        async fn create<RT: Runtime>(
906            _: &ChanMgr<RT>,
907            rt: &RT,
908            _guard_status: &GuardStatusHandle,
909            ct: &OwnedCircTarget,
910            _: CircParameters,
911            _usage: ChannelUsage,
912        ) -> Result<Arc<Self>> {
913            let (d1, d2) = timeouts_from_chantarget(ct);
914            rt.sleep(d1).await;
915            if !d2.is_zero() {
916                rt.allow_one_advance(d2);
917            }
918
919            let c = FakeCirc {
920                hops: vec![RelayIds::from_relay_ids(ct)],
921                onehop: false,
922            };
923            Ok(Arc::new(Mutex::new(c)))
924        }
925        async fn extend<RT: Runtime>(
926            &self,
927            rt: &RT,
928            ct: &OwnedCircTarget,
929            _: CircParameters,
930        ) -> Result<()> {
931            let (d1, d2) = timeouts_from_chantarget(ct);
932            rt.sleep(d1).await;
933            if !d2.is_zero() {
934                rt.allow_one_advance(d2);
935            }
936
937            {
938                let mut c = self.lock().unwrap();
939                c.hops.push(RelayIds::from_relay_ids(ct));
940            }
941            Ok(())
942        }
943    }
944
945    /// Fake implementation of TimeoutEstimator that just records its inputs.
946    struct TimeoutRecorder<R> {
947        runtime: R,
948        hist: Vec<(bool, u8, Duration)>,
949        // How much advance to permit after being told of a timeout?
950        on_timeout: Duration,
951        // How much advance to permit after being told of a success?
952        on_success: Duration,
953
954        snd_success: Option<oneshot::Sender<()>>,
955        rcv_success: Option<oneshot::Receiver<()>>,
956    }
957
958    impl<R> TimeoutRecorder<R> {
959        fn new(runtime: R) -> Self {
960            Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
961        }
962
963        fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
964            let (snd_success, rcv_success) = oneshot::channel();
965            Self {
966                runtime,
967                hist: Vec::new(),
968                on_timeout,
969                on_success,
970                rcv_success: Some(rcv_success),
971                snd_success: Some(snd_success),
972            }
973        }
974    }
975    impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
976        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
977            if !is_last {
978                return;
979            }
980            let (rt, advance) = {
981                let mut this = self.lock().unwrap();
982                this.hist.push((true, hop, delay));
983                let _ = this.snd_success.take().unwrap().send(());
984                (this.runtime.clone(), this.on_success)
985            };
986            if !advance.is_zero() {
987                rt.allow_one_advance(advance);
988            }
989        }
990        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
991            let (rt, advance) = {
992                let mut this = self.lock().unwrap();
993                this.hist.push((false, hop, delay));
994                (this.runtime.clone(), this.on_timeout)
995            };
996            if !advance.is_zero() {
997                rt.allow_one_advance(advance);
998            }
999        }
1000        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
1001            (Duration::from_secs(3), Duration::from_secs(100))
1002        }
1003        fn learning_timeouts(&self) -> bool {
1004            false
1005        }
1006        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
1007
1008        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
1009            None
1010        }
1011    }
1012
1013    /// Testing only: create a bogus circuit target
1014    fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
1015        let mut builder = OwnedCircTarget::builder();
1016        builder
1017            .chan_target()
1018            .ed_identity(id)
1019            .rsa_identity([0x20; 20].into());
1020        builder
1021            .ntor_onion_key([0x33; 32].into())
1022            .protocols("".parse().unwrap())
1023            .build()
1024            .unwrap()
1025    }
1026    /// Testing only: create a bogus channel target
1027    fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
1028        OwnedChanTarget::builder()
1029            .ed_identity(id)
1030            .rsa_identity([0x20; 20].into())
1031            .build()
1032            .unwrap()
1033    }
1034
1035    async fn run_builder_test(
1036        rt: tor_rtmock::MockRuntime,
1037        advance_initial: Duration,
1038        path: OwnedPath,
1039        advance_on_timeout: Option<(Duration, Duration)>,
1040        usage: ChannelUsage,
1041    ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
1042        let chanmgr = Arc::new(ChanMgr::new(
1043            rt.clone(),
1044            &ChannelConfig::default(),
1045            Default::default(),
1046            &Default::default(),
1047            ToplevelAccount::new_noop(),
1048        ));
1049        // always has 3 second timeout, 100 second abandon.
1050        let timeouts = match advance_on_timeout {
1051            Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
1052            None => TimeoutRecorder::new(rt.clone()),
1053        };
1054        let timeouts = Arc::new(Mutex::new(timeouts));
1055        let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
1056            rt.clone(),
1057            chanmgr,
1058            timeouts::Estimator::new(Arc::clone(&timeouts)),
1059        );
1060
1061        rt.block_advance("manually controlling advances");
1062        rt.allow_one_advance(advance_initial);
1063        let outcome = rt.spawn_join("build-owned", async move {
1064            let arcbuilder = Arc::new(builder);
1065            let params = exit_circparams_from_netparams(&NetParameters::default())?;
1066            arcbuilder.build_owned(path, &params, gs(), usage).await
1067        });
1068
1069        // Now we wait for a success to finally, finally be reported.
1070        if advance_on_timeout.is_some() {
1071            let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
1072            rt.spawn_identified("receiver", async move {
1073                receiver.await.unwrap();
1074            });
1075        }
1076        rt.advance_until_stalled().await;
1077
1078        let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
1079        let timeouts = timeouts.lock().unwrap().hist.clone();
1080
1081        (circ, timeouts)
1082    }
1083
1084    #[test]
1085    fn build_onehop() {
1086        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1087            let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
1088            let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
1089
1090            let (outcome, timeouts) =
1091                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1092            let circ = outcome.unwrap();
1093            assert!(circ.onehop);
1094            assert_eq!(circ.hops.len(), 1);
1095            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1096
1097            assert_eq!(timeouts.len(), 1);
1098            assert!(timeouts[0].0); // success
1099            assert_eq!(timeouts[0].1, 0); // one-hop
1100            assert_eq!(timeouts[0].2, Duration::from_millis(100));
1101        });
1102    }
1103
1104    #[test]
1105    fn build_threehop() {
1106        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1107            let id_100ms =
1108                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1109            let id_200ms =
1110                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
1111            let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
1112            let path =
1113                OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
1114
1115            let (outcome, timeouts) =
1116                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1117            let circ = outcome.unwrap();
1118            assert!(!circ.onehop);
1119            assert_eq!(circ.hops.len(), 3);
1120            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1121            assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
1122            assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
1123
1124            assert_eq!(timeouts.len(), 1);
1125            assert!(timeouts[0].0); // success
1126            assert_eq!(timeouts[0].1, 2); // three-hop
1127            assert_eq!(timeouts[0].2, Duration::from_millis(600));
1128        });
1129    }
1130
1131    #[test]
1132    fn build_huge_timeout() {
1133        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1134            let id_100ms =
1135                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1136            let id_200ms =
1137                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1138            let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
1139
1140            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
1141
1142            let (outcome, timeouts) =
1143                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1144            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1145
1146            assert_eq!(timeouts.len(), 1);
1147            assert!(!timeouts[0].0); // timeout
1148
1149            // BUG: Sometimes this is 1 and sometimes this is 2.
1150            // assert_eq!(timeouts[0].1, 2); // at third hop.
1151            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1152        });
1153    }
1154
1155    #[test]
1156    fn build_modest_timeout() {
1157        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1158            let id_100ms =
1159                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1160            let id_200ms =
1161                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1162            let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
1163
1164            let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
1165
1166            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
1167
1168            let (outcome, timeouts) = run_builder_test(
1169                rt.clone(),
1170                Duration::from_millis(100),
1171                path,
1172                Some(timeout_advance),
1173                CU::UserTraffic,
1174            )
1175            .await;
1176            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1177
1178            assert_eq!(timeouts.len(), 2);
1179            assert!(!timeouts[0].0); // timeout
1180
1181            // BUG: Sometimes this is 1 and sometimes this is 2.
1182            //assert_eq!(timeouts[0].1, 2); // at third hop.
1183            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1184
1185            assert!(timeouts[1].0); // success
1186            assert_eq!(timeouts[1].1, 2); // three-hop
1187                                          // BUG: This timer is not always reliable, due to races.
1188                                          //assert_eq!(timeouts[1].2, Duration::from_millis(3300));
1189        });
1190    }
1191}