tor_circmgr/
build.rs

1//! Facilities to build circuits directly, instead of via a circuit manager.
2
3use crate::path::{OwnedPath, TorPath};
4use crate::timeouts::{self, Action};
5use crate::{Error, Result};
6use async_trait::async_trait;
7use futures::task::SpawnExt;
8use futures::Future;
9use oneshot_fused_workaround as oneshot;
10use std::sync::{
11    atomic::{AtomicU32, Ordering},
12    Arc,
13};
14use std::time::{Duration, Instant};
15use tor_chanmgr::{ChanMgr, ChanProvenance, ChannelUsage};
16use tor_error::into_internal;
17use tor_guardmgr::GuardStatus;
18use tor_linkspec::{ChanTarget, CircTarget, IntoOwnedChanTarget, OwnedChanTarget, OwnedCircTarget};
19use tor_netdir::params::NetParameters;
20use tor_proto::ccparams::{self, AlgorithmType};
21use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
22use tor_protover::named::FLOWCTRL_CC;
23use tor_protover::Protocols;
24use tor_rtcompat::{Runtime, SleepProviderExt};
25use tor_units::Percentage;
26
27#[cfg(all(feature = "vanguards", feature = "hs-common"))]
28use tor_guardmgr::vanguards::VanguardMgr;
29
30mod guardstatus;
31
32pub(crate) use guardstatus::GuardStatusHandle;
33
34/// Represents an objects that can be constructed in a circuit-like way.
35///
36/// This is only a separate trait for testing purposes, so that we can swap
37/// our some other type when we're testing Builder.
38///
39/// TODO: I'd like to have a simpler testing strategy here; this one
40/// complicates things a bit.
41#[async_trait]
42pub(crate) trait Buildable: Sized {
43    /// Launch a new one-hop circuit to a given relay, given only a
44    /// channel target `ct` specifying that relay.
45    ///
46    /// (Since we don't have a CircTarget here, we can't extend the circuit
47    /// to be multihop later on.)
48    async fn create_chantarget<RT: Runtime>(
49        chanmgr: &ChanMgr<RT>,
50        rt: &RT,
51        guard_status: &GuardStatusHandle,
52        ct: &OwnedChanTarget,
53        params: CircParameters,
54        usage: ChannelUsage,
55    ) -> Result<Arc<Self>>;
56
57    /// Launch a new circuit through a given relay, given a circuit target
58    /// `ct` specifying that relay.
59    async fn create<RT: Runtime>(
60        chanmgr: &ChanMgr<RT>,
61        rt: &RT,
62        guard_status: &GuardStatusHandle,
63        ct: &OwnedCircTarget,
64        params: CircParameters,
65        usage: ChannelUsage,
66    ) -> Result<Arc<Self>>;
67
68    /// Extend this circuit-like object by one hop, to the location described
69    /// in `ct`.
70    async fn extend<RT: Runtime>(
71        &self,
72        rt: &RT,
73        ct: &OwnedCircTarget,
74        params: CircParameters,
75    ) -> Result<()>;
76}
77
78/// Try to make a [`PendingClientCirc`] to a given relay, and start its
79/// reactor.
80///
81/// This is common code, shared by all the first-hop functions in the
82/// implementation of `Buildable` for `Arc<ClientCirc>`.
83async fn create_common<RT: Runtime, CT: ChanTarget>(
84    chanmgr: &ChanMgr<RT>,
85    rt: &RT,
86    target: &CT,
87    guard_status: &GuardStatusHandle,
88    usage: ChannelUsage,
89) -> Result<PendingClientCirc> {
90    // Get or construct the channel.
91    let result = chanmgr.get_or_launch(target, usage).await;
92
93    // Report the clock skew if appropriate, and exit if there has been an error.
94    let chan = match result {
95        Ok((chan, ChanProvenance::NewlyCreated)) => {
96            guard_status.skew(chan.clock_skew());
97            chan
98        }
99        Ok((chan, _)) => chan,
100        Err(cause) => {
101            if let Some(skew) = cause.clock_skew() {
102                guard_status.skew(skew);
103            }
104            return Err(Error::Channel {
105                peer: target.to_logged(),
106                cause,
107            });
108        }
109    };
110    // Construct the (zero-hop) circuit.
111    let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol {
112        error,
113        peer: None, // we don't blame the peer, because new_circ() does no networking.
114        action: "initializing circuit",
115        unique_id: None,
116    })?;
117
118    rt.spawn(async {
119        let _ = reactor.run().await;
120    })
121    .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
122
123    Ok(pending_circ)
124}
125
126#[async_trait]
127impl Buildable for ClientCirc {
128    async fn create_chantarget<RT: Runtime>(
129        chanmgr: &ChanMgr<RT>,
130        rt: &RT,
131        guard_status: &GuardStatusHandle,
132        ct: &OwnedChanTarget,
133        params: CircParameters,
134        usage: ChannelUsage,
135    ) -> Result<Arc<Self>> {
136        let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
137        let unique_id = Some(circ.peek_unique_id());
138        circ.create_firsthop_fast(params)
139            .await
140            .map_err(|error| Error::Protocol {
141                peer: Some(ct.to_logged()),
142                error,
143                action: "running CREATE_FAST handshake",
144                unique_id,
145            })
146    }
147    async fn create<RT: Runtime>(
148        chanmgr: &ChanMgr<RT>,
149        rt: &RT,
150        guard_status: &GuardStatusHandle,
151        ct: &OwnedCircTarget,
152        params: CircParameters,
153        usage: ChannelUsage,
154    ) -> Result<Arc<Self>> {
155        let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
156        let unique_id = Some(circ.peek_unique_id());
157
158        let handshake_res = circ.create_firsthop(ct, params).await;
159
160        handshake_res.map_err(|error| Error::Protocol {
161            peer: Some(ct.to_logged()),
162            error,
163            action: "creating first hop",
164            unique_id,
165        })
166    }
167    async fn extend<RT: Runtime>(
168        &self,
169        _rt: &RT,
170        ct: &OwnedCircTarget,
171        params: CircParameters,
172    ) -> Result<()> {
173        // use "ClientCirc::" name to avoid calling _this_ method.
174        let res = ClientCirc::extend(self, ct, params).await;
175
176        res.map_err(|error| Error::Protocol {
177            error,
178            // We can't know who caused the error, since it may have been
179            // the hop we were extending from, or the hop we were extending
180            // to.
181            peer: None,
182            action: "extending circuit",
183            unique_id: Some(self.unique_id()),
184        })
185    }
186}
187
188/// An implementation type for [`CircuitBuilder`].
189///
190/// A `CircuitBuilder` holds references to all the objects that are needed
191/// to build circuits correctly.
192///
193/// In general, you should not need to construct or use this object yourself,
194/// unless you are choosing your own paths.
195struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
196    /// The runtime used by this circuit builder.
197    runtime: R,
198    /// A channel manager that this circuit builder uses to make channels.
199    chanmgr: Arc<ChanMgr<R>>,
200    /// An estimator to determine the correct timeouts for circuit building.
201    timeouts: timeouts::Estimator,
202    /// We don't actually hold any clientcircs, so we need to put this
203    /// type here so the compiler won't freak out.
204    _phantom: std::marker::PhantomData<C>,
205}
206
207impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
208    /// Construct a new [`Builder`].
209    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
210        Builder {
211            runtime,
212            chanmgr,
213            timeouts,
214            _phantom: std::marker::PhantomData,
215        }
216    }
217
218    /// Helper function that takes the circuit parameters and apply any changes from the given
219    /// subprotocol versions.
220    fn apply_protovers_to_circparams(params: &mut CircParameters, protocols: &Protocols) {
221        // Not supporting FlowCtrl=2 means we have to use the fallback congestion control algorithm
222        // which is the FixedWindow one.
223        if !protocols.supports_named_subver(FLOWCTRL_CC) {
224            params.ccontrol.use_fallback_alg();
225        }
226    }
227
228    /// Build a circuit, without performing any timeout operations.
229    ///
230    /// After each hop is built, increments n_hops_built.  Make sure that
231    /// `guard_status` has its pending status set correctly to correspond
232    /// to a circuit failure at any given stage.
233    ///
234    /// (TODO: Find
235    /// a better design there.)
236    async fn build_notimeout(
237        self: Arc<Self>,
238        path: OwnedPath,
239        params: CircParameters,
240        start_time: Instant,
241        n_hops_built: Arc<AtomicU32>,
242        guard_status: Arc<GuardStatusHandle>,
243        usage: ChannelUsage,
244    ) -> Result<Arc<C>> {
245        match path {
246            OwnedPath::ChannelOnly(target) => {
247                // If we fail now, it's the guard's fault.
248                guard_status.pending(GuardStatus::Failure);
249                let circ = C::create_chantarget(
250                    &self.chanmgr,
251                    &self.runtime,
252                    &guard_status,
253                    &target,
254                    params,
255                    usage,
256                )
257                .await?;
258                self.timeouts
259                    .note_hop_completed(0, self.runtime.now() - start_time, true);
260                n_hops_built.fetch_add(1, Ordering::SeqCst);
261                Ok(circ)
262            }
263            OwnedPath::Normal(p) => {
264                assert!(!p.is_empty());
265                let n_hops = p.len() as u8;
266                // If we fail now, it's the guard's fault.
267                guard_status.pending(GuardStatus::Failure);
268                // Each hop has its own circ parameters. This is for the first hop (CREATE).
269                let mut first_hop_params = params.clone();
270                Self::apply_protovers_to_circparams(&mut first_hop_params, p[0].protovers());
271                let circ = C::create(
272                    &self.chanmgr,
273                    &self.runtime,
274                    &guard_status,
275                    &p[0],
276                    first_hop_params,
277                    usage,
278                )
279                .await?;
280                self.timeouts
281                    .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
282                // If we fail after this point, we can't tell whether it's
283                // the fault of the guard or some later relay.
284                guard_status.pending(GuardStatus::Indeterminate);
285                n_hops_built.fetch_add(1, Ordering::SeqCst);
286                let mut hop_num = 1;
287                for relay in p[1..].iter() {
288                    // Get the params per subsequent hop (EXTEND).
289                    let mut hop_params = params.clone();
290                    Self::apply_protovers_to_circparams(&mut hop_params, relay.protovers());
291                    circ.extend(&self.runtime, relay, hop_params).await?;
292                    n_hops_built.fetch_add(1, Ordering::SeqCst);
293                    self.timeouts.note_hop_completed(
294                        hop_num,
295                        self.runtime.now() - start_time,
296                        hop_num == (n_hops - 1),
297                    );
298                    hop_num += 1;
299                }
300                Ok(circ)
301            }
302        }
303    }
304
305    /// Build a circuit from an [`OwnedPath`].
306    async fn build_owned(
307        self: &Arc<Self>,
308        path: OwnedPath,
309        params: &CircParameters,
310        guard_status: Arc<GuardStatusHandle>,
311        usage: ChannelUsage,
312    ) -> Result<Arc<C>> {
313        let action = Action::BuildCircuit { length: path.len() };
314        let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
315        let start_time = self.runtime.now();
316
317        // TODO: This is probably not the best way for build_notimeout to
318        // tell us how many hops it managed to build, but at least it is
319        // isolated here.
320        let hops_built = Arc::new(AtomicU32::new(0));
321
322        let self_clone = Arc::clone(self);
323        let params = params.clone();
324
325        let circuit_future = self_clone.build_notimeout(
326            path,
327            params,
328            start_time,
329            Arc::clone(&hops_built),
330            guard_status,
331            usage,
332        );
333
334        match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
335            Ok(circuit) => Ok(circuit),
336            Err(Error::CircTimeout(unique_id)) => {
337                let n_built = hops_built.load(Ordering::SeqCst);
338                self.timeouts
339                    .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
340                Err(Error::CircTimeout(unique_id))
341            }
342            Err(e) => Err(e),
343        }
344    }
345
346    /// Return a reference to this Builder runtime.
347    pub(crate) fn runtime(&self) -> &R {
348        &self.runtime
349    }
350
351    /// Return a reference to this Builder's timeout estimator.
352    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
353        &self.timeouts
354    }
355}
356
357/// A factory object to build circuits.
358///
359/// A `CircuitBuilder` holds references to all the objects that are needed
360/// to build circuits correctly.
361///
362/// In general, you should not need to construct or use this object yourself,
363/// unless you are choosing your own paths.
364pub struct CircuitBuilder<R: Runtime> {
365    /// The underlying [`Builder`] object
366    builder: Arc<Builder<R, ClientCirc>>,
367    /// Configuration for how to choose paths for circuits.
368    path_config: tor_config::MutCfg<crate::PathConfig>,
369    /// State-manager object to use in storing current state.
370    storage: crate::TimeoutStateHandle,
371    /// Guard manager to tell us which guards nodes to use for the circuits
372    /// we build.
373    guardmgr: tor_guardmgr::GuardMgr<R>,
374    /// The vanguard manager object used for HS circuits.
375    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
376    vanguardmgr: Arc<VanguardMgr<R>>,
377}
378
379impl<R: Runtime> CircuitBuilder<R> {
380    /// Construct a new [`CircuitBuilder`].
381    // TODO: eventually I'd like to make this a public function, but
382    // TimeoutStateHandle is private.
383    pub(crate) fn new(
384        runtime: R,
385        chanmgr: Arc<ChanMgr<R>>,
386        path_config: crate::PathConfig,
387        storage: crate::TimeoutStateHandle,
388        guardmgr: tor_guardmgr::GuardMgr<R>,
389        #[cfg(all(feature = "vanguards", feature = "hs-common"))] vanguardmgr: VanguardMgr<R>,
390    ) -> Self {
391        let timeouts = timeouts::Estimator::from_storage(&storage);
392
393        CircuitBuilder {
394            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
395            path_config: path_config.into(),
396            storage,
397            guardmgr,
398            #[cfg(all(feature = "vanguards", feature = "hs-common"))]
399            vanguardmgr: Arc::new(vanguardmgr),
400        }
401    }
402
403    /// Return this builder's [`PathConfig`](crate::PathConfig).
404    pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
405        self.path_config.get()
406    }
407
408    /// Replace this builder's [`PathConfig`](crate::PathConfig).
409    pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
410        self.path_config.replace(new_config);
411    }
412
413    /// Flush state to the state manager if we own the lock.
414    ///
415    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
416    pub(crate) fn save_state(&self) -> Result<bool> {
417        if !self.storage.can_store() {
418            return Ok(false);
419        }
420        // TODO: someday we'll want to only do this if there is something
421        // changed.
422        self.builder.timeouts.save_state(&self.storage)?;
423        self.guardmgr.store_persistent_state()?;
424        Ok(true)
425    }
426
427    /// Replace our state with a new owning state, assuming we have
428    /// storage permission.
429    pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
430        self.builder
431            .timeouts
432            .upgrade_to_owning_storage(&self.storage);
433        self.guardmgr.upgrade_to_owned_persistent_state()?;
434        Ok(())
435    }
436
437    /// Reload persistent state from disk, if we don't have storage permission.
438    pub(crate) fn reload_state(&self) -> Result<()> {
439        if !self.storage.can_store() {
440            self.builder
441                .timeouts
442                .reload_readonly_from_storage(&self.storage);
443        }
444        self.guardmgr.reload_persistent_state()?;
445        Ok(())
446    }
447
448    /// Reconfigure this builder using the latest set of network parameters.
449    ///
450    /// (NOTE: for now, this only affects circuit timeout estimation.)
451    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
452        self.builder.timeouts.update_params(p);
453    }
454
455    /// Like `build`, but construct a new circuit from an [`OwnedPath`].
456    pub(crate) async fn build_owned(
457        &self,
458        path: OwnedPath,
459        params: &CircParameters,
460        guard_status: Arc<GuardStatusHandle>,
461        usage: ChannelUsage,
462    ) -> Result<Arc<ClientCirc>> {
463        self.builder
464            .build_owned(path, params, guard_status, usage)
465            .await
466    }
467
468    /// Try to construct a new circuit from a given path, using appropriate
469    /// timeouts.
470    ///
471    /// This circuit is _not_ automatically registered with any
472    /// circuit manager; if you don't hang on it it, it will
473    /// automatically go away when the last reference is dropped.
474    pub async fn build(
475        &self,
476        path: &TorPath<'_>,
477        params: &CircParameters,
478        usage: ChannelUsage,
479    ) -> Result<Arc<ClientCirc>> {
480        let owned = path.try_into()?;
481        self.build_owned(owned, params, Arc::new(None.into()), usage)
482            .await
483    }
484
485    /// Return true if this builder is currently learning timeout info.
486    pub(crate) fn learning_timeouts(&self) -> bool {
487        self.builder.timeouts.learning_timeouts()
488    }
489
490    /// Return a reference to this builder's `GuardMgr`.
491    pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
492        &self.guardmgr
493    }
494
495    /// Return a reference to this builder's `VanguardMgr`.
496    #[cfg(all(feature = "vanguards", feature = "hs-common"))]
497    pub(crate) fn vanguardmgr(&self) -> &Arc<VanguardMgr<R>> {
498        &self.vanguardmgr
499    }
500
501    /// Return a reference to this builder's runtime
502    pub(crate) fn runtime(&self) -> &R {
503        self.builder.runtime()
504    }
505
506    /// Return a reference to this builder's timeout estimator.
507    pub(crate) fn estimator(&self) -> &timeouts::Estimator {
508        self.builder.estimator()
509    }
510}
511
512/// Return the congestion control Vegas algorithm using the given network parameters.
513#[cfg(feature = "flowctl-cc")]
514fn build_cc_vegas(
515    inp: &NetParameters,
516    vegas_queue_params: ccparams::VegasQueueParams,
517) -> ccparams::Algorithm {
518    ccparams::Algorithm::Vegas(
519        ccparams::VegasParamsBuilder::default()
520            .cell_in_queue_params(vegas_queue_params)
521            .ss_cwnd_max(inp.cc_ss_max.into())
522            .cwnd_full_gap(inp.cc_cwnd_full_gap.into())
523            .cwnd_full_min_pct(Percentage::new(
524                inp.cc_cwnd_full_minpct.as_percent().get() as u32
525            ))
526            .cwnd_full_per_cwnd(inp.cc_cwnd_full_per_cwnd.into())
527            .build()
528            .expect("Unable to build Vegas params from NetParams"),
529    )
530}
531
532/// Return the congestion control FixedWindow algorithm using the given network parameters.
533fn build_cc_fixedwindow(inp: &NetParameters) -> ccparams::Algorithm {
534    ccparams::Algorithm::FixedWindow(
535        ccparams::FixedWindowParamsBuilder::default()
536            .circ_window_start(inp.circuit_window.get() as u16)
537            .circ_window_min(inp.circuit_window.lower() as u16)
538            .circ_window_max(inp.circuit_window.upper() as u16)
539            .build()
540            .expect("Unable to build FixedWindow params from NetParams"),
541    )
542}
543
544/// Return a new circuit parameter struct using the given network parameters and algorithm to use.
545fn circparameters_from_netparameters(
546    inp: &NetParameters,
547    alg: ccparams::Algorithm,
548) -> Result<CircParameters> {
549    let cwnd_params = ccparams::CongestionWindowParamsBuilder::default()
550        .cwnd_init(inp.cc_cwnd_init.into())
551        .cwnd_inc_pct_ss(Percentage::new(
552            inp.cc_cwnd_inc_pct_ss.as_percent().get() as u32
553        ))
554        .cwnd_inc(inp.cc_cwnd_inc.into())
555        .cwnd_inc_rate(inp.cc_cwnd_inc_rate.into())
556        .cwnd_min(inp.cc_cwnd_min.into())
557        .cwnd_max(inp.cc_cwnd_max.into())
558        .sendme_inc(inp.cc_sendme_inc.into())
559        .build()
560        .map_err(into_internal!(
561            "Unable to build CongestionWindow params from NetParams"
562        ))?;
563    let rtt_params = ccparams::RoundTripEstimatorParamsBuilder::default()
564        .ewma_cwnd_pct(Percentage::new(
565            inp.cc_ewma_cwnd_pct.as_percent().get() as u32
566        ))
567        .ewma_max(inp.cc_ewma_max.into())
568        .ewma_ss_max(inp.cc_ewma_ss.into())
569        .rtt_reset_pct(Percentage::new(
570            inp.cc_rtt_reset_pct.as_percent().get() as u32
571        ))
572        .build()
573        .map_err(into_internal!("Unable to build RTT params from NetParams"))?;
574    let ccontrol = ccparams::CongestionControlParamsBuilder::default()
575        .alg(alg)
576        .fallback_alg(build_cc_fixedwindow(inp))
577        .cwnd_params(cwnd_params)
578        .rtt_params(rtt_params)
579        .build()
580        .map_err(into_internal!(
581            "Unable to build CongestionControl params from NetParams"
582        ))?;
583    Ok(CircParameters::new(
584        inp.extend_by_ed25519_id.into(),
585        ccontrol,
586    ))
587}
588
589/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an exit circuit or
590/// single onion service (when implemented).
591pub fn exit_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
592    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
593        #[cfg(feature = "flowctl-cc")]
594        AlgorithmType::VEGAS => {
595            // TODO(arti#88): We always use fixed window for now,
596            // even with the "flowctl-cc" feature enabled:
597            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
598            // We use `if false` so that the vegas cc code is still type checked.
599            if false {
600                build_cc_vegas(
601                    inp,
602                    (
603                        inp.cc_vegas_alpha_exit.into(),
604                        inp.cc_vegas_beta_exit.into(),
605                        inp.cc_vegas_delta_exit.into(),
606                        inp.cc_vegas_gamma_exit.into(),
607                        inp.cc_vegas_sscap_exit.into(),
608                    )
609                        .into(),
610                )
611            } else {
612                build_cc_fixedwindow(inp)
613            }
614        }
615        // Unrecognized, fallback to fixed window as in SENDME v0.
616        _ => build_cc_fixedwindow(inp),
617    };
618    circparameters_from_netparameters(inp, alg)
619}
620
621/// Extract a [`CircParameters`] from the [`NetParameters`] from a consensus for an onion circuit
622/// which also includes an onion service with Vanguard.
623pub fn onion_circparams_from_netparams(inp: &NetParameters) -> Result<CircParameters> {
624    let alg = match AlgorithmType::from(inp.cc_alg.get()) {
625        #[cfg(feature = "flowctl-cc")]
626        AlgorithmType::VEGAS => {
627            // TODO(arti#88): We always use fixed window for now,
628            // even with the "flowctl-cc" feature enabled:
629            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2932#note_3191196
630            // We use `if false` so that the vegas cc code is still type checked.
631            if false {
632                build_cc_vegas(
633                    inp,
634                    (
635                        inp.cc_vegas_alpha_onion.into(),
636                        inp.cc_vegas_beta_onion.into(),
637                        inp.cc_vegas_delta_onion.into(),
638                        inp.cc_vegas_gamma_onion.into(),
639                        inp.cc_vegas_sscap_onion.into(),
640                    )
641                        .into(),
642                )
643            } else {
644                build_cc_fixedwindow(inp)
645            }
646        }
647        // Unrecognized, fallback to fixed window as in SENDME v0.
648        _ => build_cc_fixedwindow(inp),
649    };
650    circparameters_from_netparameters(inp, alg)
651}
652
653/// Helper function: spawn a future as a background task, and run it with
654/// two separate timeouts.
655///
656/// If the future does not complete by `timeout`, then return a
657/// timeout error immediately, but keep running the future in the
658/// background.
659///
660/// If the future does not complete by `abandon`, then abandon the
661/// future completely.
662async fn double_timeout<R, F, T>(
663    runtime: &R,
664    fut: F,
665    timeout: Duration,
666    abandon: Duration,
667) -> Result<T>
668where
669    R: Runtime,
670    F: Future<Output = Result<T>> + Send + 'static,
671    T: Send + 'static,
672{
673    let (snd, rcv) = oneshot::channel();
674    let rt = runtime.clone();
675    // We create these futures now, since we want them to look at the current
676    // time when they decide when to expire.
677    let inner_timeout_future = rt.timeout(abandon, fut);
678    let outer_timeout_future = rt.timeout(timeout, rcv);
679
680    runtime
681        .spawn(async move {
682            let result = inner_timeout_future.await;
683            let _ignore_cancelled_error = snd.send(result);
684        })
685        .map_err(|e| Error::from_spawn("circuit construction task", e))?;
686
687    let outcome = outer_timeout_future.await;
688    // 4 layers of error to collapse:
689    //     One from the receiver being cancelled.
690    //     One from the outer timeout.
691    //     One from the inner timeout.
692    //     One from the actual future's result.
693    //
694    // (Technically, we could refrain from unwrapping the future's result,
695    // but doing it this way helps make it more certain that we really are
696    // collapsing all the layers into one.)
697    outcome
698        .map_err(|_| Error::CircTimeout(None))??
699        .map_err(|_| Error::CircTimeout(None))?
700}
701
702#[cfg(test)]
703mod test {
704    // @@ begin test lint list maintained by maint/add_warning @@
705    #![allow(clippy::bool_assert_comparison)]
706    #![allow(clippy::clone_on_copy)]
707    #![allow(clippy::dbg_macro)]
708    #![allow(clippy::mixed_attributes_style)]
709    #![allow(clippy::print_stderr)]
710    #![allow(clippy::print_stdout)]
711    #![allow(clippy::single_char_pattern)]
712    #![allow(clippy::unwrap_used)]
713    #![allow(clippy::unchecked_duration_subtraction)]
714    #![allow(clippy::useless_vec)]
715    #![allow(clippy::needless_pass_by_value)]
716    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
717    use super::*;
718    use crate::timeouts::TimeoutEstimator;
719    use futures::FutureExt;
720    use std::sync::Mutex;
721    use tor_chanmgr::ChannelConfig;
722    use tor_chanmgr::ChannelUsage as CU;
723    use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
724    use tor_llcrypto::pk::ed25519::Ed25519Identity;
725    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
726    use tor_proto::memquota::ToplevelAccount;
727    use tor_rtcompat::SleepProvider;
728    use tracing::trace;
729
730    /// Make a new nonfunctional `Arc<GuardStatusHandle>`
731    fn gs() -> Arc<GuardStatusHandle> {
732        Arc::new(None.into())
733    }
734
735    #[test]
736    // Re-enabled after work from eta, discussed in arti#149
737    fn test_double_timeout() {
738        let t1 = Duration::from_secs(1);
739        let t10 = Duration::from_secs(10);
740        /// Return true if d1 is in range [d2...d2 + 0.5sec]
741        fn duration_close_to(d1: Duration, d2: Duration) -> bool {
742            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
743        }
744
745        tor_rtmock::MockRuntime::test_with_various(|rto| async move {
746            // Try a future that's ready immediately.
747            let x = double_timeout(&rto, async { Ok(3_u32) }, t1, t10).await;
748            assert!(x.is_ok());
749            assert_eq!(x.unwrap(), 3_u32);
750
751            trace!("acquiesce after test1");
752            #[allow(clippy::clone_on_copy)]
753            #[allow(deprecated)] // TODO #1885
754            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
755
756            // Try a future that's ready after a short delay.
757            let rt_clone = rt.clone();
758            // (We only want the short delay to fire, not any of the other timeouts.)
759            rt_clone.block_advance("manually controlling advances");
760            let x = rt
761                .wait_for(double_timeout(
762                    &rt,
763                    async move {
764                        let sl = rt_clone.sleep(Duration::from_millis(100));
765                        rt_clone.allow_one_advance(Duration::from_millis(100));
766                        sl.await;
767                        Ok(4_u32)
768                    },
769                    t1,
770                    t10,
771                ))
772                .await;
773            assert!(x.is_ok());
774            assert_eq!(x.unwrap(), 4_u32);
775
776            trace!("acquiesce after test2");
777            #[allow(clippy::clone_on_copy)]
778            #[allow(deprecated)] // TODO #1885
779            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
780
781            // Try a future that passes the first timeout, and make sure that
782            // it keeps running after it times out.
783            let rt_clone = rt.clone();
784            let (snd, rcv) = oneshot::channel();
785            let start = rt.now();
786            rt.block_advance("manually controlling advances");
787            let x = rt
788                .wait_for(double_timeout(
789                    &rt,
790                    async move {
791                        let sl = rt_clone.sleep(Duration::from_secs(2));
792                        rt_clone.allow_one_advance(Duration::from_secs(2));
793                        sl.await;
794                        snd.send(()).unwrap();
795                        Ok(4_u32)
796                    },
797                    t1,
798                    t10,
799                ))
800                .await;
801            assert!(matches!(x, Err(Error::CircTimeout(_))));
802            let end = rt.now();
803            assert!(duration_close_to(end - start, Duration::from_secs(1)));
804            let waited = rt.wait_for(rcv).await;
805            assert_eq!(waited, Ok(()));
806
807            trace!("acquiesce after test3");
808            #[allow(clippy::clone_on_copy)]
809            #[allow(deprecated)] // TODO #1885
810            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
811
812            // Try a future that times out and gets abandoned.
813            let rt_clone = rt.clone();
814            rt.block_advance("manually controlling advances");
815            let (snd, rcv) = oneshot::channel();
816            let start = rt.now();
817            // Let it hit the first timeout...
818            rt.allow_one_advance(Duration::from_secs(1));
819            let x = rt
820                .wait_for(double_timeout(
821                    &rt,
822                    async move {
823                        rt_clone.sleep(Duration::from_secs(30)).await;
824                        snd.send(()).unwrap();
825                        Ok(4_u32)
826                    },
827                    t1,
828                    t10,
829                ))
830                .await;
831            assert!(matches!(x, Err(Error::CircTimeout(_))));
832            let end = rt.now();
833            // ...and let it hit the second, too.
834            rt.allow_one_advance(Duration::from_secs(9));
835            let waited = rt.wait_for(rcv).await;
836            assert!(waited.is_err());
837            let end2 = rt.now();
838            assert!(duration_close_to(end - start, Duration::from_secs(1)));
839            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
840        });
841    }
842
843    /// Get a pair of timeouts that we've encoded as an Ed25519 identity.
844    ///
845    /// In our FakeCircuit code below, the first timeout is the amount of
846    /// time that we should sleep while building a hop to this key,
847    /// and the second timeout is the length of time-advance we should allow
848    /// after the hop is built.
849    ///
850    /// (This is pretty silly, but it's good enough for testing.)
851    fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
852        let mut be = [0; 8];
853        be[..].copy_from_slice(&id.as_bytes()[0..8]);
854        let dur = u64::from_be_bytes(be);
855        be[..].copy_from_slice(&id.as_bytes()[8..16]);
856        let dur2 = u64::from_be_bytes(be);
857        (Duration::from_millis(dur), Duration::from_millis(dur2))
858    }
859    /// Encode a pair of timeouts as an Ed25519 identity.
860    ///
861    /// In our FakeCircuit code below, the first timeout is the amount of
862    /// time that we should sleep while building a hop to this key,
863    /// and the second timeout is the length of time-advance we should allow
864    /// after the hop is built.
865    ///
866    /// (This is pretty silly but it's good enough for testing.)
867    fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
868        let mut bytes = [0; 32];
869        let dur = (d1.as_millis() as u64).to_be_bytes();
870        bytes[0..8].copy_from_slice(&dur);
871        let dur = (d2.as_millis() as u64).to_be_bytes();
872        bytes[8..16].copy_from_slice(&dur);
873        bytes.into()
874    }
875
876    /// As [`timeouts_from_key`], but first extract the relevant key from the
877    /// OwnedChanTarget.
878    fn timeouts_from_chantarget<CT: ChanTarget>(ct: &CT) -> (Duration, Duration) {
879        // Extracting the Ed25519 identity should always succeed in this case:
880        // we put it there ourselves!
881        let ed_id = ct
882            .identity(RelayIdType::Ed25519)
883            .expect("No ed25519 key was present for fake ChanTarget‽")
884            .try_into()
885            .expect("ChanTarget provided wrong key type");
886        timeouts_from_key(ed_id)
887    }
888
889    /// Replacement type for circuit, to implement buildable.
890    #[derive(Debug, Clone)]
891    struct FakeCirc {
892        hops: Vec<RelayIds>,
893        onehop: bool,
894    }
895    #[async_trait]
896    impl Buildable for Mutex<FakeCirc> {
897        async fn create_chantarget<RT: Runtime>(
898            _: &ChanMgr<RT>,
899            rt: &RT,
900            _guard_status: &GuardStatusHandle,
901            ct: &OwnedChanTarget,
902            _: CircParameters,
903            _usage: ChannelUsage,
904        ) -> Result<Arc<Self>> {
905            let (d1, d2) = timeouts_from_chantarget(ct);
906            rt.sleep(d1).await;
907            if !d2.is_zero() {
908                rt.allow_one_advance(d2);
909            }
910
911            let c = FakeCirc {
912                hops: vec![RelayIds::from_relay_ids(ct)],
913                onehop: true,
914            };
915            Ok(Arc::new(Mutex::new(c)))
916        }
917        async fn create<RT: Runtime>(
918            _: &ChanMgr<RT>,
919            rt: &RT,
920            _guard_status: &GuardStatusHandle,
921            ct: &OwnedCircTarget,
922            _: CircParameters,
923            _usage: ChannelUsage,
924        ) -> Result<Arc<Self>> {
925            let (d1, d2) = timeouts_from_chantarget(ct);
926            rt.sleep(d1).await;
927            if !d2.is_zero() {
928                rt.allow_one_advance(d2);
929            }
930
931            let c = FakeCirc {
932                hops: vec![RelayIds::from_relay_ids(ct)],
933                onehop: false,
934            };
935            Ok(Arc::new(Mutex::new(c)))
936        }
937        async fn extend<RT: Runtime>(
938            &self,
939            rt: &RT,
940            ct: &OwnedCircTarget,
941            _: CircParameters,
942        ) -> Result<()> {
943            let (d1, d2) = timeouts_from_chantarget(ct);
944            rt.sleep(d1).await;
945            if !d2.is_zero() {
946                rt.allow_one_advance(d2);
947            }
948
949            {
950                let mut c = self.lock().unwrap();
951                c.hops.push(RelayIds::from_relay_ids(ct));
952            }
953            Ok(())
954        }
955    }
956
957    /// Fake implementation of TimeoutEstimator that just records its inputs.
958    struct TimeoutRecorder<R> {
959        runtime: R,
960        hist: Vec<(bool, u8, Duration)>,
961        // How much advance to permit after being told of a timeout?
962        on_timeout: Duration,
963        // How much advance to permit after being told of a success?
964        on_success: Duration,
965
966        snd_success: Option<oneshot::Sender<()>>,
967        rcv_success: Option<oneshot::Receiver<()>>,
968    }
969
970    impl<R> TimeoutRecorder<R> {
971        fn new(runtime: R) -> Self {
972            Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
973        }
974
975        fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
976            let (snd_success, rcv_success) = oneshot::channel();
977            Self {
978                runtime,
979                hist: Vec::new(),
980                on_timeout,
981                on_success,
982                rcv_success: Some(rcv_success),
983                snd_success: Some(snd_success),
984            }
985        }
986    }
987    impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
988        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
989            if !is_last {
990                return;
991            }
992            let (rt, advance) = {
993                let mut this = self.lock().unwrap();
994                this.hist.push((true, hop, delay));
995                let _ = this.snd_success.take().unwrap().send(());
996                (this.runtime.clone(), this.on_success)
997            };
998            if !advance.is_zero() {
999                rt.allow_one_advance(advance);
1000            }
1001        }
1002        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
1003            let (rt, advance) = {
1004                let mut this = self.lock().unwrap();
1005                this.hist.push((false, hop, delay));
1006                (this.runtime.clone(), this.on_timeout)
1007            };
1008            if !advance.is_zero() {
1009                rt.allow_one_advance(advance);
1010            }
1011        }
1012        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
1013            (Duration::from_secs(3), Duration::from_secs(100))
1014        }
1015        fn learning_timeouts(&self) -> bool {
1016            false
1017        }
1018        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
1019
1020        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
1021            None
1022        }
1023    }
1024
1025    /// Testing only: create a bogus circuit target
1026    fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
1027        let mut builder = OwnedCircTarget::builder();
1028        builder
1029            .chan_target()
1030            .ed_identity(id)
1031            .rsa_identity([0x20; 20].into());
1032        builder
1033            .ntor_onion_key([0x33; 32].into())
1034            .protocols("".parse().unwrap())
1035            .build()
1036            .unwrap()
1037    }
1038    /// Testing only: create a bogus channel target
1039    fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
1040        OwnedChanTarget::builder()
1041            .ed_identity(id)
1042            .rsa_identity([0x20; 20].into())
1043            .build()
1044            .unwrap()
1045    }
1046
1047    async fn run_builder_test(
1048        rt: tor_rtmock::MockRuntime,
1049        advance_initial: Duration,
1050        path: OwnedPath,
1051        advance_on_timeout: Option<(Duration, Duration)>,
1052        usage: ChannelUsage,
1053    ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
1054        let chanmgr = Arc::new(ChanMgr::new(
1055            rt.clone(),
1056            &ChannelConfig::default(),
1057            Default::default(),
1058            &Default::default(),
1059            ToplevelAccount::new_noop(),
1060        ));
1061        // always has 3 second timeout, 100 second abandon.
1062        let timeouts = match advance_on_timeout {
1063            Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
1064            None => TimeoutRecorder::new(rt.clone()),
1065        };
1066        let timeouts = Arc::new(Mutex::new(timeouts));
1067        let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
1068            rt.clone(),
1069            chanmgr,
1070            timeouts::Estimator::new(Arc::clone(&timeouts)),
1071        );
1072
1073        rt.block_advance("manually controlling advances");
1074        rt.allow_one_advance(advance_initial);
1075        let outcome = rt.spawn_join("build-owned", async move {
1076            let arcbuilder = Arc::new(builder);
1077            let params = exit_circparams_from_netparams(&NetParameters::default())?;
1078            arcbuilder.build_owned(path, &params, gs(), usage).await
1079        });
1080
1081        // Now we wait for a success to finally, finally be reported.
1082        if advance_on_timeout.is_some() {
1083            let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
1084            rt.spawn_identified("receiver", async move {
1085                receiver.await.unwrap();
1086            });
1087        }
1088        rt.advance_until_stalled().await;
1089
1090        let circ = outcome.map(|m| Ok(m?.lock().unwrap().clone())).await;
1091        let timeouts = timeouts.lock().unwrap().hist.clone();
1092
1093        (circ, timeouts)
1094    }
1095
1096    #[test]
1097    fn build_onehop() {
1098        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1099            let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
1100            let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
1101
1102            let (outcome, timeouts) =
1103                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1104            let circ = outcome.unwrap();
1105            assert!(circ.onehop);
1106            assert_eq!(circ.hops.len(), 1);
1107            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1108
1109            assert_eq!(timeouts.len(), 1);
1110            assert!(timeouts[0].0); // success
1111            assert_eq!(timeouts[0].1, 0); // one-hop
1112            assert_eq!(timeouts[0].2, Duration::from_millis(100));
1113        });
1114    }
1115
1116    #[test]
1117    fn build_threehop() {
1118        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1119            let id_100ms =
1120                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1121            let id_200ms =
1122                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
1123            let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
1124            let path =
1125                OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
1126
1127            let (outcome, timeouts) =
1128                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1129            let circ = outcome.unwrap();
1130            assert!(!circ.onehop);
1131            assert_eq!(circ.hops.len(), 3);
1132            assert!(circ.hops[0].same_relay_ids(&chan_t(id_100ms)));
1133            assert!(circ.hops[1].same_relay_ids(&chan_t(id_200ms)));
1134            assert!(circ.hops[2].same_relay_ids(&chan_t(id_300ms)));
1135
1136            assert_eq!(timeouts.len(), 1);
1137            assert!(timeouts[0].0); // success
1138            assert_eq!(timeouts[0].1, 2); // three-hop
1139            assert_eq!(timeouts[0].2, Duration::from_millis(600));
1140        });
1141    }
1142
1143    #[test]
1144    fn build_huge_timeout() {
1145        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1146            let id_100ms =
1147                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1148            let id_200ms =
1149                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1150            let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
1151
1152            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
1153
1154            let (outcome, timeouts) =
1155                run_builder_test(rt, Duration::from_millis(100), path, None, CU::UserTraffic).await;
1156            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1157
1158            assert_eq!(timeouts.len(), 1);
1159            assert!(!timeouts[0].0); // timeout
1160
1161            // BUG: Sometimes this is 1 and sometimes this is 2.
1162            // assert_eq!(timeouts[0].1, 2); // at third hop.
1163            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1164        });
1165    }
1166
1167    #[test]
1168    fn build_modest_timeout() {
1169        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
1170            let id_100ms =
1171                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
1172            let id_200ms =
1173                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
1174            let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
1175
1176            let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
1177
1178            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
1179
1180            let (outcome, timeouts) = run_builder_test(
1181                rt.clone(),
1182                Duration::from_millis(100),
1183                path,
1184                Some(timeout_advance),
1185                CU::UserTraffic,
1186            )
1187            .await;
1188            assert!(matches!(outcome, Err(Error::CircTimeout(_))));
1189
1190            assert_eq!(timeouts.len(), 2);
1191            assert!(!timeouts[0].0); // timeout
1192
1193            // BUG: Sometimes this is 1 and sometimes this is 2.
1194            //assert_eq!(timeouts[0].1, 2); // at third hop.
1195            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
1196
1197            assert!(timeouts[1].0); // success
1198            assert_eq!(timeouts[1].1, 2); // three-hop
1199                                          // BUG: This timer is not always reliable, due to races.
1200                                          //assert_eq!(timeouts[1].2, Duration::from_millis(3300));
1201        });
1202    }
1203}