tor_hsclient/
connect.rs

1//! Main implementation of the connection functionality
2
3use std::time::Duration;
4
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::sync::Arc;
9use std::time::Instant;
10
11use async_trait::async_trait;
12use educe::Educe;
13use futures::{AsyncRead, AsyncWrite};
14use itertools::Itertools;
15use rand::Rng;
16use tor_bytes::Writeable;
17use tor_cell::relaycell::hs::intro_payload::{self, IntroduceHandshakePayload};
18use tor_cell::relaycell::hs::pow::ProofOfWork;
19use tor_cell::relaycell::msg::{AnyRelayMsg, Introduce1, Rendezvous2};
20use tor_circmgr::build::onion_circparams_from_netparams;
21use tor_error::{debug_report, warn_report, Bug};
22use tor_hscrypto::Subcredential;
23use tor_proto::circuit::handshake::hs_ntor;
24use tracing::{debug, trace};
25
26use retry_error::RetryError;
27use safelog::Sensitive;
28use tor_cell::relaycell::hs::{
29    AuthKeyType, EstablishRendezvous, IntroduceAck, RendezvousEstablished,
30};
31use tor_cell::relaycell::RelayMsg;
32use tor_checkable::{timed::TimerangeBound, Timebound};
33use tor_circmgr::hspool::{HsCircKind, HsCircPool};
34use tor_circmgr::timeouts::Action as TimeoutsAction;
35use tor_dirclient::request::Requestable as _;
36use tor_error::{internal, into_internal};
37use tor_error::{HasRetryTime as _, RetryTime};
38use tor_hscrypto::pk::{HsBlindId, HsId, HsIdKey};
39use tor_hscrypto::RendCookie;
40use tor_linkspec::{CircTarget, HasRelayIds, OwnedCircTarget, RelayId};
41use tor_llcrypto::pk::ed25519::Ed25519Identity;
42use tor_netdir::{NetDir, Relay};
43use tor_netdoc::doc::hsdesc::{HsDesc, IntroPointDesc};
44use tor_proto::circuit::{CircParameters, ClientCirc, MetaCellDisposition, MsgHandler};
45use tor_rtcompat::{Runtime, SleepProviderExt as _, TimeoutError};
46
47use crate::pow::HsPowClient;
48use crate::proto_oneshot;
49use crate::relay_info::ipt_to_circtarget;
50use crate::state::MockableConnectorData;
51use crate::Config;
52use crate::{rend_pt_identity_for_error, FailedAttemptError, IntroPtIndex, RendPtIdentityForError};
53use crate::{ConnError, DescriptorError, DescriptorErrorDetail};
54use crate::{HsClientConnector, HsClientSecretKeys};
55
56use ConnError as CE;
57use FailedAttemptError as FAE;
58
59/// Number of hops in our hsdir, introduction, and rendezvous circuits
60///
61/// Required by `tor_circmgr`'s timeout estimation API
62/// ([`tor_circmgr::CircMgr::estimate_timeout`], [`HsCircPool::estimate_timeout`]).
63///
64/// TODO HS hardcoding the number of hops to 3 seems wrong.
65/// This is really something that HsCircPool knows.  And some setups might want to make
66/// shorter circuits for some reason.  And it will become wrong with vanguards?
67/// But right now I think this is what HsCircPool does.
68//
69// Some commentary from
70//   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1342#note_2918050
71// Possibilities:
72//  * Look at n_hops() on the circuits we get, if we don't need this estimate
73//    till after we have the circuit.
74//  * Add a function to HsCircPool to tell us what length of circuit to expect
75//    for each given type of circuit.
76const HOPS: usize = 3;
77
78/// Given `R, M` where `M: MocksForConnect<M>`, expand to the mockable `ClientCirc`
79// This is quite annoying.  But the alternative is to write out `<... as // ...>`
80// each time, since otherwise the compile complains about ambiguous associated types.
81macro_rules! ClientCirc { { $R:ty, $M:ty } => {
82    <<$M as MocksForConnect<$R>>::HsCircPool as MockableCircPool<$R>>::ClientCirc
83} }
84
85/// Information about a hidden service, including our connection history
86#[derive(Default, Educe)]
87#[educe(Debug)]
88// This type is actually crate-private, since it isn't re-exported, but it must
89// be `pub` because it appears as a default for a type parameter in HsClientConnector.
90pub struct Data {
91    /// The latest known onion service descriptor for this service.
92    desc: DataHsDesc,
93    /// Information about the latest status of trying to connect to this service
94    /// through each of its introduction points.
95    ipts: DataIpts,
96}
97
98/// Part of `Data` that relates to the HS descriptor
99type DataHsDesc = Option<TimerangeBound<HsDesc>>;
100
101/// Part of `Data` that relates to our information about introduction points
102type DataIpts = HashMap<RelayIdForExperience, IptExperience>;
103
104/// How things went last time we tried to use this introduction point
105///
106/// Neither this data structure, nor [`Data`], is responsible for arranging that we expire this
107/// information eventually.  If we keep reconnecting to the service, we'll retain information
108/// about each IPT indefinitely, at least so long as they remain listed in the descriptors we
109/// receive.
110///
111/// Expiry of unused data is handled by `state.rs`, according to `last_used` in `ServiceState`.
112///
113/// Choosing which IPT to prefer is done by obtaining an `IptSortKey`
114/// (from this and other information).
115//
116// Don't impl Ord for IptExperience.  We obtain `Option<&IptExperience>` from our
117// data structure, and if IptExperience were Ord then Option<&IptExperience> would be Ord
118// but it would be the wrong sort order: it would always prefer None, ie untried IPTs.
119#[derive(Debug)]
120struct IptExperience {
121    /// How long it took us to get whatever outcome occurred
122    ///
123    /// We prefer fast successes to slow ones.
124    /// Then, we prefer failures with earlier `RetryTime`,
125    /// and, lastly, faster failures to slower ones.
126    duration: Duration,
127
128    /// What happened and when we might try again
129    ///
130    /// Note that we don't actually *enforce* the `RetryTime` here, just sort by it
131    /// using `RetryTime::loose_cmp`.
132    ///
133    /// We *do* return an error that is itself `HasRetryTime` and expect our callers
134    /// to honour that.
135    outcome: Result<(), RetryTime>,
136}
137
138/// Actually make a HS connection, updating our recorded state as necessary
139///
140/// `connector` is provided only for obtaining the runtime and netdir (and `mock_for_state`).
141/// Obviously, `connect` is not supposed to go looking in `services`.
142///
143/// This function handles all necessary retrying of fallible operations,
144/// (and, therefore, must also limit the total work done for a particular call).
145///
146/// This function has a minimum of functionality, since it is the boundary
147/// between "mock connection, used for testing `state.rs`" and
148/// "mock circuit and netdir, used for testing `connect.rs`",
149/// so it is not, itself, unit-testable.
150pub(crate) async fn connect<R: Runtime>(
151    connector: &HsClientConnector<R>,
152    netdir: Arc<NetDir>,
153    config: Arc<Config>,
154    hsid: HsId,
155    data: &mut Data,
156    secret_keys: HsClientSecretKeys,
157) -> Result<Arc<ClientCirc>, ConnError> {
158    Context::new(
159        &connector.runtime,
160        &*connector.circpool,
161        netdir,
162        config,
163        hsid,
164        secret_keys,
165        (),
166    )?
167    .connect(data)
168    .await
169}
170
171/// Common context for a single request to connect to a hidden service
172///
173/// This saves on passing this same set of (immutable) values (or subsets thereof)
174/// to each method in the principal functional code, everywhere.
175/// It also provides a convenient type to be `Self`.
176///
177/// Its lifetime is one request to make a new client circuit to a hidden service,
178/// including all the retries and timeouts.
179struct Context<'c, R: Runtime, M: MocksForConnect<R>> {
180    /// Runtime
181    runtime: &'c R,
182    /// Circpool
183    circpool: &'c M::HsCircPool,
184    /// Netdir
185    //
186    // TODO holding onto the netdir for the duration of our attempts is not ideal
187    // but doing better is fairly complicated.  See discussions here:
188    //   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1228#note_2910545
189    //   https://gitlab.torproject.org/tpo/core/arti/-/issues/884
190    netdir: Arc<NetDir>,
191    /// Configuration
192    config: Arc<Config>,
193    /// Secret keys to use
194    secret_keys: HsClientSecretKeys,
195    /// HS ID
196    hsid: HsId,
197    /// Blinded HS ID
198    hs_blind_id: HsBlindId,
199    /// The subcredential to use during this time period
200    subcredential: Subcredential,
201    /// Mock data
202    mocks: M,
203}
204
205/// Details of an established rendezvous point
206///
207/// Intermediate value for progress during a connection attempt.
208struct Rendezvous<'r, R: Runtime, M: MocksForConnect<R>> {
209    /// RPT as a `Relay`
210    rend_relay: Relay<'r>,
211    /// Rendezvous circuit
212    rend_circ: Arc<ClientCirc!(R, M)>,
213    /// Rendezvous cookie
214    rend_cookie: RendCookie,
215
216    /// Receiver that will give us the RENDEZVOUS2 message.
217    ///
218    /// The sending ended is owned by the handler
219    /// which receives control messages on the rendezvous circuit,
220    /// and which was installed when we sent `ESTABLISH_RENDEZVOUS`.
221    ///
222    /// (`RENDEZVOUS2` is the message containing the onion service's side of the handshake.)
223    rend2_rx: proto_oneshot::Receiver<Rendezvous2>,
224
225    /// Dummy, to placate compiler
226    ///
227    /// Covariant without dropck or interfering with Send/Sync will do fine.
228    marker: PhantomData<fn() -> (R, M)>,
229}
230
231/// Random value used as part of IPT selection
232type IptSortRand = u32;
233
234/// Details of an apparently-useable introduction point
235///
236/// Intermediate value for progress during a connection attempt.
237struct UsableIntroPt<'i> {
238    /// Index in HS descriptor
239    intro_index: IntroPtIndex,
240    /// IPT descriptor
241    intro_desc: &'i IntroPointDesc,
242    /// IPT `CircTarget`
243    intro_target: OwnedCircTarget,
244    /// Random value used as part of IPT selection
245    sort_rand: IptSortRand,
246}
247
248/// Lookup key for looking up and recording our IPT use experiences
249///
250/// Used to identify a relay when looking to see what happened last time we used it,
251/// and storing that information after we tried it.
252///
253/// We store the experience information under an arbitrary one of the relay's identities,
254/// as returned by the `HasRelayIds::identities().next()`.
255/// When we do lookups, we check all the relay's identities to see if we find
256/// anything relevant.
257/// If relay identities permute in strange ways, whether we find our previous
258/// knowledge about them is not particularly well defined, but that's fine.
259///
260/// While this is, structurally, a relay identity, it is not suitable for other purposes.
261#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
262struct RelayIdForExperience(RelayId);
263
264/// Details of an apparently-successful INTRODUCE exchange
265///
266/// Intermediate value for progress during a connection attempt.
267struct Introduced<R: Runtime, M: MocksForConnect<R>> {
268    /// End-to-end crypto NTORv3 handshake with the service
269    ///
270    /// Created as part of generating our `INTRODUCE1`,
271    /// and then used when processing `RENDEZVOUS2`.
272    handshake_state: hs_ntor::HsNtorClientState,
273
274    /// Dummy, to placate compiler
275    ///
276    /// `R` and `M` only used for getting to mocks.
277    /// Covariant without dropck or interfering with Send/Sync will do fine.
278    marker: PhantomData<fn() -> (R, M)>,
279}
280
281impl RelayIdForExperience {
282    /// Identities to use to try to find previous experience information about this IPT
283    fn for_lookup(intro_target: &OwnedCircTarget) -> impl Iterator<Item = Self> + '_ {
284        intro_target
285            .identities()
286            .map(|id| RelayIdForExperience(id.to_owned()))
287    }
288
289    /// Identity to use to store previous experience information about this IPT
290    fn for_store(intro_target: &OwnedCircTarget) -> Result<Self, Bug> {
291        let id = intro_target
292            .identities()
293            .next()
294            .ok_or_else(|| internal!("introduction point relay with no identities"))?
295            .to_owned();
296        Ok(RelayIdForExperience(id))
297    }
298}
299
300/// Sort key for an introduction point, for selecting the best IPTs to try first
301///
302/// Ordering is most preferable first.
303///
304/// We use this to sort our `UsableIpt`s using `.sort_by_key`.
305/// (This implementation approach ensures that we obey all the usual ordering invariants.)
306#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
307struct IptSortKey {
308    /// Sort by how preferable the experience was
309    outcome: IptSortKeyOutcome,
310    /// Failing that, choose randomly
311    sort_rand: IptSortRand,
312}
313
314/// Component of the [`IptSortKey`] representing outcome of our last attempt, if any
315///
316/// This is the main thing we use to decide which IPTs to try first.
317/// It is calculated for each IPT
318/// (via `.sort_by_key`, so repeatedly - it should therefore be cheap to make.)
319///
320/// Ordering is most preferable first.
321#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
322enum IptSortKeyOutcome {
323    /// Prefer successes
324    Success {
325        /// Prefer quick ones
326        duration: Duration,
327    },
328    /// Failing that, try one we don't know to have failed
329    Untried,
330    /// Failing that, it'll have to be ones that didn't work last time
331    Failed {
332        /// Prefer failures with an earlier retry time
333        retry_time: tor_error::LooseCmpRetryTime,
334        /// Failing that, prefer quick failures (rather than slow ones eg timeouts)
335        duration: Duration,
336    },
337}
338
339impl From<Option<&IptExperience>> for IptSortKeyOutcome {
340    fn from(experience: Option<&IptExperience>) -> IptSortKeyOutcome {
341        use IptSortKeyOutcome as O;
342        match experience {
343            None => O::Untried,
344            Some(IptExperience { duration, outcome }) => match outcome {
345                Ok(()) => O::Success {
346                    duration: *duration,
347                },
348                Err(retry_time) => O::Failed {
349                    retry_time: (*retry_time).into(),
350                    duration: *duration,
351                },
352            },
353        }
354    }
355}
356
357impl<'c, R: Runtime, M: MocksForConnect<R>> Context<'c, R, M> {
358    /// Make a new `Context` from the input data
359    fn new(
360        runtime: &'c R,
361        circpool: &'c M::HsCircPool,
362        netdir: Arc<NetDir>,
363        config: Arc<Config>,
364        hsid: HsId,
365        secret_keys: HsClientSecretKeys,
366        mocks: M,
367    ) -> Result<Self, ConnError> {
368        let time_period = netdir.hs_time_period();
369        let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
370            .map_err(|_| CE::InvalidHsId)?
371            .compute_blinded_key(time_period)
372            .map_err(
373                // TODO HS what on earth do these errors mean, in practical terms ?
374                // In particular, we'll want to convert them to a ConnError variant,
375                // but what ErrorKind should they have ?
376                into_internal!("key blinding error, don't know how to handle"),
377            )?;
378        let hs_blind_id = hs_blind_id_key.id();
379
380        Ok(Context {
381            netdir,
382            config,
383            hsid,
384            hs_blind_id,
385            subcredential,
386            circpool,
387            runtime,
388            secret_keys,
389            mocks,
390        })
391    }
392
393    /// Actually make a HS connection, updating our recorded state as necessary
394    ///
395    /// Called by the `connect` function in this module.
396    ///
397    /// This function handles all necessary retrying of fallible operations,
398    /// (and, therefore, must also limit the total work done for a particular call).
399    async fn connect(&self, data: &mut Data) -> Result<Arc<ClientCirc!(R, M)>, ConnError> {
400        // This function must do the following, retrying as appropriate.
401        //  - Look up the onion descriptor in the state.
402        //  - Download the onion descriptor if one isn't there.
403        //  - In parallel:
404        //    - Pick a rendezvous point from the netdirprovider and launch a
405        //      rendezvous circuit to it. Then send ESTABLISH_INTRO.
406        //    - Pick a number of introduction points (1 or more) and try to
407        //      launch circuits to them.
408        //  - On a circuit to an introduction point, send an INTRODUCE1 cell.
409        //  - Wait for a RENDEZVOUS2 cell on the rendezvous circuit
410        //  - Add a virtual hop to the rendezvous circuit.
411        //  - Return the rendezvous circuit.
412
413        let mocks = self.mocks.clone();
414
415        let desc = self.descriptor_ensure(&mut data.desc).await?;
416
417        mocks.test_got_desc(desc);
418
419        let circ = self.intro_rend_connect(desc, &mut data.ipts).await?;
420        mocks.test_got_circ(&circ);
421
422        Ok(circ)
423    }
424
425    /// Ensure that `Data.desc` contains the HS descriptor
426    ///
427    /// If we have a previously-downloaded descriptor, which is still valid,
428    /// just returns a reference to it.
429    ///
430    /// Otherwise, tries to obtain the descriptor by downloading it from hsdir(s).
431    ///
432    /// Does all necessary retries and timeouts.
433    /// Returns an error if no valid descriptor could be found.
434    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
435    async fn descriptor_ensure<'d>(&self, data: &'d mut DataHsDesc) -> Result<&'d HsDesc, CE> {
436        // Maximum number of hsdir connection and retrieval attempts we'll make
437        let max_total_attempts = self
438            .config
439            .retry
440            .hs_desc_fetch_attempts()
441            .try_into()
442            // User specified a very large u32.  We must be downcasting it to 16bit!
443            // let's give them as many retries as we can manage.
444            .unwrap_or(usize::MAX);
445
446        // Limit on the duration of each retrieval attempt
447        let each_timeout = self.estimate_timeout(&[
448            (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
449            (1, TimeoutsAction::RoundTrip { length: HOPS }),    // One HTTP query/response
450        ]);
451
452        // We retain a previously obtained descriptor precisely until its lifetime expires,
453        // and pay no attention to the descriptor's revision counter.
454        // When it expires, we discard it completely and try to obtain a new one.
455        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914448
456        // TODO SPEC: Discuss HS descriptor lifetime and expiry client behaviour
457        if let Some(previously) = data {
458            let now = self.runtime.wallclock();
459            if let Ok(_desc) = previously.as_ref().check_valid_at(&now) {
460                // Ideally we would just return desc but that confuses borrowck.
461                // https://github.com/rust-lang/rust/issues/51545
462                return Ok(data
463                    .as_ref()
464                    .expect("Some but now None")
465                    .as_ref()
466                    .check_valid_at(&now)
467                    .expect("Ok but now Err"));
468            }
469            // Seems to be not valid now.  Try to fetch a fresh one.
470        }
471
472        let hs_dirs = self.netdir.hs_dirs_download(
473            self.hs_blind_id,
474            self.netdir.hs_time_period(),
475            &mut self.mocks.thread_rng(),
476        )?;
477
478        trace!(
479            "HS desc fetch for {}, using {} hsdirs",
480            &self.hsid,
481            hs_dirs.len()
482        );
483
484        // We might consider launching requests to multiple HsDirs in parallel.
485        //   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1118#note_2894463
486        // But C Tor doesn't and our HS experts don't consider that important:
487        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914436
488        // (Additionally, making multiple HSDir requests at once may make us
489        // more vulnerable to traffic analysis.)
490        let mut attempts = hs_dirs.iter().cycle().take(max_total_attempts);
491        let mut errors = RetryError::in_attempt_to("retrieve hidden service descriptor");
492        let desc = loop {
493            let relay = match attempts.next() {
494                Some(relay) => relay,
495                None => {
496                    return Err(if errors.is_empty() {
497                        CE::NoHsDirs
498                    } else {
499                        CE::DescriptorDownload(errors)
500                    })
501                }
502            };
503            let hsdir_for_error: Sensitive<Ed25519Identity> = (*relay.id()).into();
504            match self
505                .runtime
506                .timeout(each_timeout, self.descriptor_fetch_attempt(relay))
507                .await
508                .unwrap_or(Err(DescriptorErrorDetail::Timeout))
509            {
510                Ok(desc) => break desc,
511                Err(error) => {
512                    debug_report!(
513                        &error,
514                        "failed hsdir desc fetch for {} from {}/{}",
515                        &self.hsid,
516                        &relay.id(),
517                        &relay.rsa_id()
518                    );
519                    errors.push(tor_error::Report(DescriptorError {
520                        hsdir: hsdir_for_error,
521                        error,
522                    }));
523                }
524            }
525        };
526
527        // Store the bounded value in the cache for reuse,
528        // but return a reference to the unwrapped `HsDesc`.
529        //
530        // The `HsDesc` must be owned by `data.desc`,
531        // so first add it to `data.desc`,
532        // and then dangerously_assume_timely to get a reference out again.
533        //
534        // It is safe to dangerously_assume_timely,
535        // as descriptor_fetch_attempt has already checked the timeliness of the descriptor.
536        let ret = data.insert(desc);
537        Ok(ret.as_ref().dangerously_assume_timely())
538    }
539
540    /// Make one attempt to fetch the descriptor from a specific hsdir
541    ///
542    /// No timeout
543    ///
544    /// On success, returns the descriptor.
545    ///
546    /// While the returned descriptor is `TimerangeBound`, its validity at the current time *has*
547    /// been checked.
548    async fn descriptor_fetch_attempt(
549        &self,
550        hsdir: &Relay<'_>,
551    ) -> Result<TimerangeBound<HsDesc>, DescriptorErrorDetail> {
552        let max_len: usize = self
553            .netdir
554            .params()
555            .hsdir_max_desc_size
556            .get()
557            .try_into()
558            .map_err(into_internal!("BoundedInt was not truly bounded!"))?;
559        let request = {
560            let mut r = tor_dirclient::request::HsDescDownloadRequest::new(self.hs_blind_id);
561            r.set_max_len(max_len);
562            r
563        };
564        trace!(
565            "hsdir for {}, trying {}/{}, request {:?} (http request {:?})",
566            &self.hsid,
567            &hsdir.id(),
568            &hsdir.rsa_id(),
569            &request,
570            request.debug_request()
571        );
572
573        let circuit = self
574            .circpool
575            .m_get_or_launch_specific(
576                &self.netdir,
577                HsCircKind::ClientHsDir,
578                OwnedCircTarget::from_circ_target(hsdir),
579            )
580            .await?;
581        let mut stream = circuit
582            .m_begin_dir_stream()
583            .await
584            .map_err(DescriptorErrorDetail::Stream)?;
585
586        let response = tor_dirclient::send_request(self.runtime, &request, &mut stream, None)
587            .await
588            .map_err(|dir_error| match dir_error {
589                tor_dirclient::Error::RequestFailed(rfe) => DescriptorErrorDetail::from(rfe.error),
590                tor_dirclient::Error::CircMgr(ce) => into_internal!(
591                    "tor-dirclient complains about circmgr going wrong but we gave it a stream"
592                )(ce)
593                .into(),
594                other => into_internal!(
595                    "tor-dirclient gave unexpected error, tor-hsclient code needs updating"
596                )(other)
597                .into(),
598            })?;
599
600        let desc_text = response.into_output_string().map_err(|rfe| rfe.error)?;
601        let hsc_desc_enc = self.secret_keys.keys.ks_hsc_desc_enc.as_ref();
602
603        let now = self.runtime.wallclock();
604
605        HsDesc::parse_decrypt_validate(
606            &desc_text,
607            &self.hs_blind_id,
608            now,
609            &self.subcredential,
610            hsc_desc_enc,
611        )
612        .map_err(DescriptorErrorDetail::from)
613    }
614
615    /// Given the descriptor, try to connect to service
616    ///
617    /// Does all necessary retries, timeouts, etc.
618    async fn intro_rend_connect(
619        &self,
620        desc: &HsDesc,
621        data: &mut DataIpts,
622    ) -> Result<Arc<ClientCirc!(R, M)>, CE> {
623        // Maximum number of rendezvous/introduction attempts we'll make
624        let max_total_attempts = self
625            .config
626            .retry
627            .hs_intro_rend_attempts()
628            .try_into()
629            // User specified a very large u32.  We must be downcasting it to 16bit!
630            // let's give them as many retries as we can manage.
631            .unwrap_or(usize::MAX);
632
633        // Limit on the duration of each attempt to establish a rendezvous point
634        //
635        // This *might* include establishing a fresh circuit,
636        // if the HsCircPool's pool is empty.
637        let rend_timeout = self.estimate_timeout(&[
638            (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
639            (1, TimeoutsAction::RoundTrip { length: HOPS }),    // One ESTABLISH_RENDEZVOUS
640        ]);
641
642        // Limit on the duration of each attempt to negotiate with an introduction point
643        //
644        // *Does* include establishing the circuit.
645        let intro_timeout = self.estimate_timeout(&[
646            (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
647            // This does some crypto too, but we don't account for that.
648            (1, TimeoutsAction::RoundTrip { length: HOPS }), // One INTRODUCE1/INTRODUCE_ACK
649        ]);
650
651        // Timeout estimator for the action that the HS will take in building
652        // its circuit to the RPT.
653        let hs_build_action = TimeoutsAction::BuildCircuit {
654            length: if desc.is_single_onion_service() {
655                1
656            } else {
657                HOPS
658            },
659        };
660        // Limit on the duration of each attempt for activities involving both
661        // RPT and IPT.
662        let rpt_ipt_timeout = self.estimate_timeout(&[
663            // The API requires us to specify a number of circuit builds and round trips.
664            // So what we tell the estimator is a rather imprecise description.
665            // (TODO it would be nice if the circmgr offered us a one-way trip Action).
666            //
667            // What we are timing here is:
668            //
669            //    INTRODUCE2 goes from IPT to HS
670            //    but that happens in parallel with us waiting for INTRODUCE_ACK,
671            //    which is controlled by `intro_timeout` so not pat of `ipt_rpt_timeout`.
672            //    and which has to come HOPS hops.  So don't count INTRODUCE2 here.
673            //
674            //    HS builds to our RPT
675            (1, hs_build_action),
676            //
677            //    RENDEZVOUS1 goes from HS to RPT.  `hs_hops`, one-way.
678            //    RENDEZVOUS2 goes from RPT to us.  HOPS, one-way.
679            //    Together, we squint a bit and call this a HOPS round trip:
680            (1, TimeoutsAction::RoundTrip { length: HOPS }),
681        ]);
682
683        // We can't reliably distinguish IPT failure from RPT failure, so we iterate over IPTs
684        // (best first) and each time use a random RPT.
685
686        // We limit the number of rendezvous establishment attempts, separately, since we don't
687        // try to talk to the intro pt until we've established the rendezvous circuit.
688        let mut rend_attempts = 0..max_total_attempts;
689
690        // But, we put all the errors into the same bucket, since we might have a mixture.
691        let mut errors = RetryError::in_attempt_to("make circuit to to hidden service");
692
693        // Note that IntroPtIndex is *not* the index into this Vec.
694        // It is the index into the original list of introduction points in the descriptor.
695        let mut usable_intros: Vec<UsableIntroPt> = desc
696            .intro_points()
697            .iter()
698            .enumerate()
699            .map(|(intro_index, intro_desc)| {
700                let intro_index = intro_index.into();
701                let intro_target = ipt_to_circtarget(intro_desc, &self.netdir)
702                    .map_err(|error| FAE::UnusableIntro { error, intro_index })?;
703                // Lack of TAIT means this clone
704                let intro_target = OwnedCircTarget::from_circ_target(&intro_target);
705                Ok::<_, FailedAttemptError>(UsableIntroPt {
706                    intro_index,
707                    intro_desc,
708                    intro_target,
709                    sort_rand: self.mocks.thread_rng().random(),
710                })
711            })
712            .filter_map(|entry| match entry {
713                Ok(y) => Some(y),
714                Err(e) => {
715                    errors.push(e);
716                    None
717                }
718            })
719            .collect_vec();
720
721        // Delete experience information for now-unlisted intro points
722        // Otherwise, as the IPTs change `Data` might grow without bound,
723        // if we keep reconnecting to the same HS.
724        data.retain(|k, _v| {
725            usable_intros
726                .iter()
727                .any(|ipt| RelayIdForExperience::for_lookup(&ipt.intro_target).any(|id| &id == k))
728        });
729
730        // Join with existing state recording our experiences,
731        // sort by descending goodness, and then randomly
732        // (so clients without any experience don't all pile onto the same, first, IPT)
733        usable_intros.sort_by_key(|ipt: &UsableIntroPt| {
734            let experience =
735                RelayIdForExperience::for_lookup(&ipt.intro_target).find_map(|id| data.get(&id));
736            IptSortKey {
737                outcome: experience.into(),
738                sort_rand: ipt.sort_rand,
739            }
740        });
741        self.mocks.test_got_ipts(&usable_intros);
742
743        let mut intro_attempts = usable_intros.iter().cycle().take(max_total_attempts);
744
745        // We retain a rendezvous we managed to set up in here.  That way if we created it, and
746        // then failed before we actually needed it, we can reuse it.
747        // If we exit with an error, we will waste it - but because we isolate things we do
748        // for different services, it wouldn't be reusable anyway.
749        let mut saved_rendezvous = None;
750
751        // If we are using proof-of-work DoS mitigation, this chooses an
752        // algorithm and initial effort, and adjusts that effort when we retry.
753        let mut pow_client = HsPowClient::new(&self.hs_blind_id, desc);
754
755        // We might consider making multiple INTRODUCE attempts to different
756        // IPTs in in parallel, and somehow aggregating the errors and
757        // experiences.
758        // However our HS experts don't consider that important:
759        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
760        // Parallelizing our HsCircPool circuit building would likely have
761        // greater impact. (See #1149.)
762        loop {
763            // When did we start doing things that depended on the IPT?
764            //
765            // Used for recording our experience with the selected IPT
766            let mut ipt_use_started = None::<Instant>;
767
768            // Error handling inner async block (analogous to an IEFE):
769            //  * Ok(Some()) means this attempt succeeded
770            //  * Ok(None) means all attempts exhausted
771            //  * Err(error) means this attempt failed
772            //
773            // Error handling is rather complex here.  It's the primary job of *this* code to
774            // make sure that it's done right for timeouts.  (The individual component
775            // functions handle non-timeout errors.)  The different timeout errors have
776            // different amounts of information about the identity of the RPT and IPT: in each
777            // case, the error only mentions the RPT or IPT if that node is implicated in the
778            // timeout.
779            let outcome = async {
780                // We establish a rendezvous point first.  Although it appears from reading
781                // this code that this means we serialise establishment of the rendezvous and
782                // introduction circuits, this isn't actually the case.  The circmgr maintains
783                // a pool of circuits.  What actually happens in the "standing start" case is
784                // that we obtain a circuit for rendezvous from the circmgr's pool, expecting
785                // one to be available immediately; the circmgr will then start to build a new
786                // one to replenish its pool, and that happens in parallel with the work we do
787                // here - but in arrears.  If the circmgr pool is empty, then we must wait.
788                //
789                // Perhaps this should be parallelised here.  But that's really what the pool
790                // is for, since we expect building the rendezvous circuit and building the
791                // introduction circuit to take about the same length of time.
792                //
793                // We *do* serialise the ESTABLISH_RENDEZVOUS exchange, with the
794                // building of the introduction circuit.  That could be improved, at the cost
795                // of some additional complexity here.
796                //
797                // Our HS experts don't consider it important to increase the parallelism:
798                //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914444
799                //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914445
800                if saved_rendezvous.is_none() {
801                    debug!("hs conn to {}: setting up rendezvous point", &self.hsid);
802                    // Establish a rendezvous circuit.
803                    let Some(_): Option<usize> = rend_attempts.next() else {
804                        return Ok(None);
805                    };
806
807                    let mut using_rend_pt = None;
808                    saved_rendezvous = Some(
809                        self.runtime
810                            .timeout(rend_timeout, self.establish_rendezvous(&mut using_rend_pt))
811                            .await
812                            .map_err(|_: TimeoutError| match using_rend_pt {
813                                None => FAE::RendezvousCircuitObtain {
814                                    error: tor_circmgr::Error::CircTimeout(None),
815                                },
816                                Some(rend_pt) => FAE::RendezvousEstablishTimeout { rend_pt },
817                            })??,
818                    );
819                }
820
821                let Some(ipt) = intro_attempts.next() else {
822                    return Ok(None);
823                };
824                let intro_index = ipt.intro_index;
825
826                let proof_of_work = match pow_client.solve().await {
827                    Ok(solution) => solution,
828                    Err(e) => {
829                        debug!(
830                            "failing to compute proof-of-work, trying without. ({:?})",
831                            e
832                        );
833                        None
834                    }
835                };
836
837                // We record how long things take, starting from here, as
838                // as a statistic we'll use for the IPT in future.
839                // This is stored in a variable outside this async block,
840                // so that the outcome handling can use it.
841                ipt_use_started = Some(self.runtime.now());
842
843                // No `Option::get_or_try_insert_with`, or we'd avoid this expect()
844                let rend_pt_for_error = rend_pt_identity_for_error(
845                    &saved_rendezvous
846                        .as_ref()
847                        .expect("just made Some")
848                        .rend_relay,
849                );
850                debug!(
851                    "hs conn to {}: RPT {}",
852                    &self.hsid,
853                    rend_pt_for_error.as_inner()
854                );
855
856                let (rendezvous, introduced) = self
857                    .runtime
858                    .timeout(
859                        intro_timeout,
860                        self.exchange_introduce(ipt, &mut saved_rendezvous,
861                            proof_of_work),
862                    )
863                    .await
864                    .map_err(|_: TimeoutError| {
865                        // The intro point ought to give us a prompt ACK regardless of HS
866                        // behaviour or whatever is happening at the RPT, so blame the IPT.
867                        FAE::IntroductionTimeout { intro_index }
868                    })?
869                    // TODO: Maybe try, once, to extend-and-reuse the intro circuit.
870                    //
871                    // If the introduction fails, the introduction circuit is in principle
872                    // still usable.  We believe that in this case, C Tor extends the intro
873                    // circuit by one hop to the next IPT to try.  That saves on building a
874                    // whole new 3-hop intro circuit.  However, our HS experts tell us that
875                    // if introduction fails at one IPT it is likely to fail at the others too,
876                    // so that optimisation might reduce our network impact and time to failure,
877                    // but isn't likely to improve our chances of success.
878                    //
879                    // However, it's not clear whether this approach risks contaminating
880                    // the 2nd attempt with some fault relating to the introduction point.
881                    // The 1st ipt might also gain more knowledge about which HS we're talking to.
882                    //
883                    // TODO SPEC: Discuss extend-and-reuse HS intro circuit after nack
884                    ?;
885                #[allow(unused_variables)] // it's *supposed* to be unused
886                let saved_rendezvous = (); // don't use `saved_rendezvous` any more, use rendezvous
887
888                let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
889                let circ = self
890                    .runtime
891                    .timeout(
892                        rpt_ipt_timeout,
893                        self.complete_rendezvous(ipt, rendezvous, introduced),
894                    )
895                    .await
896                    .map_err(|_: TimeoutError| FAE::RendezvousCompletionTimeout {
897                        intro_index,
898                        rend_pt: rend_pt.clone(),
899                    })??;
900
901                debug!(
902                    "hs conn to {}: RPT {} IPT {}: success",
903                    &self.hsid,
904                    rend_pt.as_inner(),
905                    intro_index,
906                );
907                Ok::<_, FAE>(Some((intro_index, circ)))
908            }
909            .await;
910
911            // Store the experience `outcome` we had with IPT `intro_index`, in `data`
912            #[allow(clippy::unused_unit)] // -> () is here for error handling clarity
913            let mut store_experience = |intro_index, outcome| -> () {
914                (|| {
915                    let ipt = usable_intros
916                        .iter()
917                        .find(|ipt| ipt.intro_index == intro_index)
918                        .ok_or_else(|| internal!("IPT not found by index"))?;
919                    let id = RelayIdForExperience::for_store(&ipt.intro_target)?;
920                    let started = ipt_use_started.ok_or_else(|| {
921                        internal!("trying to record IPT use but no IPT start time noted")
922                    })?;
923                    let duration = self
924                        .runtime
925                        .now()
926                        .checked_duration_since(started)
927                        .ok_or_else(|| internal!("clock overflow calculating IPT use duration"))?;
928                    data.insert(id, IptExperience { duration, outcome });
929                    Ok::<_, Bug>(())
930                })()
931                .unwrap_or_else(|e| warn_report!(e, "error recording HS IPT use experience"));
932            };
933
934            match outcome {
935                Ok(Some((intro_index, y))) => {
936                    // Record successful outcome in Data
937                    store_experience(intro_index, Ok(()));
938                    return Ok(y);
939                }
940                Ok(None) => return Err(CE::Failed(errors)),
941                Err(error) => {
942                    debug_report!(&error, "hs conn to {}: attempt failed", &self.hsid);
943                    // Record error outcome in Data, if in fact we involved the IPT
944                    // at all.  The IPT information is be retrieved from `error`,
945                    // since only some of the errors implicate the introduction point.
946                    if let Some(intro_index) = error.intro_index() {
947                        store_experience(intro_index, Err(error.retry_time()));
948                    }
949                    errors.push(error);
950
951                    // If we are using proof-of-work DoS mitigation, try harder next time
952                    pow_client.increase_effort();
953                }
954            }
955        }
956    }
957
958    /// Make one attempt to establish a rendezvous circuit
959    ///
960    /// This doesn't really depend on anything,
961    /// other than (obviously) the isolation implied by our circuit pool.
962    /// In particular it doesn't depend on the introduction point.
963    ///
964    /// Does not apply a timeout.
965    ///
966    /// On entry `using_rend_pt` is `None`.
967    /// This function will store `Some` when it finds out which relay
968    /// it is talking to and starts to converse with it.
969    /// That way, if a timeout occurs, the caller can add that information to the error.
970    async fn establish_rendezvous(
971        &'c self,
972        using_rend_pt: &mut Option<RendPtIdentityForError>,
973    ) -> Result<Rendezvous<'c, R, M>, FAE> {
974        let (rend_circ, rend_relay) = self
975            .circpool
976            .m_get_or_launch_client_rend(&self.netdir)
977            .await
978            .map_err(|error| FAE::RendezvousCircuitObtain { error })?;
979
980        let rend_pt = rend_pt_identity_for_error(&rend_relay);
981        *using_rend_pt = Some(rend_pt.clone());
982
983        let rend_cookie: RendCookie = self.mocks.thread_rng().random();
984        let message = EstablishRendezvous::new(rend_cookie);
985
986        let (rend_established_tx, rend_established_rx) = proto_oneshot::channel();
987        let (rend2_tx, rend2_rx) = proto_oneshot::channel();
988
989        /// Handler which expects `RENDEZVOUS_ESTABLISHED` and then
990        /// `RENDEZVOUS2`.   Returns each message via the corresponding `oneshot`.
991        struct Handler {
992            /// Sender for a RENDEZVOUS_ESTABLISHED message.
993            rend_established_tx: proto_oneshot::Sender<RendezvousEstablished>,
994            /// Sender for a RENDEZVOUS2 message.
995            rend2_tx: proto_oneshot::Sender<Rendezvous2>,
996        }
997        impl MsgHandler for Handler {
998            fn handle_msg(
999                &mut self,
1000                msg: AnyRelayMsg,
1001            ) -> Result<MetaCellDisposition, tor_proto::Error> {
1002                // The first message we expect is a RENDEZVOUS_ESTABALISHED.
1003                if self.rend_established_tx.still_expected() {
1004                    self.rend_established_tx
1005                        .deliver_expected_message(msg, MetaCellDisposition::Consumed)
1006                } else {
1007                    self.rend2_tx
1008                        .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
1009                }
1010            }
1011        }
1012
1013        debug!(
1014            "hs conn to {}: RPT {}: sending ESTABLISH_RENDEZVOUS",
1015            &self.hsid,
1016            rend_pt.as_inner(),
1017        );
1018
1019        let handle_proto_error = |error| FAE::RendezvousEstablish {
1020            error,
1021            rend_pt: rend_pt.clone(),
1022        };
1023        let handler = Handler {
1024            rend_established_tx,
1025            rend2_tx,
1026        };
1027
1028        rend_circ
1029            .m_start_conversation_last_hop(Some(message.into()), handler)
1030            .await
1031            .map_err(handle_proto_error)?;
1032
1033        // `start_conversation` returns as soon as the control message has been sent.
1034        // We need to obtain the RENDEZVOUS_ESTABLISHED message, which is "returned" via the oneshot.
1035        let _: RendezvousEstablished = rend_established_rx.recv(handle_proto_error).await?;
1036
1037        debug!(
1038            "hs conn to {}: RPT {}: got RENDEZVOUS_ESTABLISHED",
1039            &self.hsid,
1040            rend_pt.as_inner(),
1041        );
1042
1043        Ok(Rendezvous {
1044            rend_circ,
1045            rend_cookie,
1046            rend_relay,
1047            rend2_rx,
1048            marker: PhantomData,
1049        })
1050    }
1051
1052    /// Attempt (once) to send an INTRODUCE1 and wait for the INTRODUCE_ACK
1053    ///
1054    /// `take`s the input `rendezvous` (but only takes it if it gets that far)
1055    /// and, if successful, returns it.
1056    /// (This arranges that the rendezvous is "used up" precisely if
1057    /// we sent its secret somewhere.)
1058    ///
1059    /// Although this function handles the `Rendezvous`,
1060    /// nothing in it actually involves the rendezvous point.
1061    /// So if there's a failure, it's purely to do with the introduction point.
1062    ///
1063    /// Does not apply a timeout.
1064    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1065    async fn exchange_introduce(
1066        &'c self,
1067        ipt: &UsableIntroPt<'_>,
1068        rendezvous: &mut Option<Rendezvous<'c, R, M>>,
1069        proof_of_work: Option<ProofOfWork>,
1070    ) -> Result<(Rendezvous<'c, R, M>, Introduced<R, M>), FAE> {
1071        let intro_index = ipt.intro_index;
1072
1073        debug!(
1074            "hs conn to {}: IPT {}: obtaining intro circuit",
1075            &self.hsid, intro_index,
1076        );
1077
1078        let intro_circ = self
1079            .circpool
1080            .m_get_or_launch_specific(
1081                &self.netdir,
1082                HsCircKind::ClientIntro,
1083                ipt.intro_target.clone(), // &OwnedCircTarget isn't CircTarget apparently
1084            )
1085            .await
1086            .map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
1087
1088        let rendezvous = rendezvous.take().ok_or_else(|| internal!("no rend"))?;
1089
1090        let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
1091
1092        debug!(
1093            "hs conn to {}: RPT {} IPT {}: making introduction",
1094            &self.hsid,
1095            rend_pt.as_inner(),
1096            intro_index,
1097        );
1098
1099        // Now we construct an introduce1 message and perform the first part of the
1100        // rendezvous handshake.
1101        //
1102        // This process is tricky because the header of the INTRODUCE1 message
1103        // -- which depends on the IntroPt configuration -- is authenticated as
1104        // part of the HsDesc handshake.
1105
1106        // Construct the header, since we need it as input to our encryption.
1107        let intro_header = {
1108            let ipt_sid_key = ipt.intro_desc.ipt_sid_key();
1109            let intro1 = Introduce1::new(
1110                AuthKeyType::ED25519_SHA3_256,
1111                ipt_sid_key.as_bytes().to_vec(),
1112                vec![],
1113            );
1114            let mut header = vec![];
1115            intro1
1116                .encode_onto(&mut header)
1117                .map_err(into_internal!("couldn't encode intro1 header"))?;
1118            header
1119        };
1120
1121        // Construct the introduce payload, which tells the onion service how to find
1122        // our rendezvous point.  (We could do this earlier if we wanted.)
1123        let intro_payload = {
1124            let onion_key =
1125                intro_payload::OnionKey::NtorOnionKey(*rendezvous.rend_relay.ntor_onion_key());
1126            let linkspecs = rendezvous
1127                .rend_relay
1128                .linkspecs()
1129                .map_err(into_internal!("Couldn't encode link specifiers"))?;
1130            let payload = IntroduceHandshakePayload::new(
1131                rendezvous.rend_cookie,
1132                onion_key,
1133                linkspecs,
1134                proof_of_work,
1135            );
1136            let mut encoded = vec![];
1137            payload
1138                .write_onto(&mut encoded)
1139                .map_err(into_internal!("Couldn't encode introduce1 payload"))?;
1140            encoded
1141        };
1142
1143        // Perform the cryptographic handshake with the onion service.
1144        let service_info = hs_ntor::HsNtorServiceInfo::new(
1145            ipt.intro_desc.svc_ntor_key().clone(),
1146            ipt.intro_desc.ipt_sid_key().clone(),
1147            self.subcredential,
1148        );
1149        let handshake_state =
1150            hs_ntor::HsNtorClientState::new(&mut self.mocks.thread_rng(), service_info);
1151        let encrypted_body = handshake_state
1152            .client_send_intro(&intro_header, &intro_payload)
1153            .map_err(into_internal!("can't begin hs-ntor handshake"))?;
1154
1155        // Build our actual INTRODUCE1 message.
1156        let intro1_real = Introduce1::new(
1157            AuthKeyType::ED25519_SHA3_256,
1158            ipt.intro_desc.ipt_sid_key().as_bytes().to_vec(),
1159            encrypted_body,
1160        );
1161
1162        /// Handler which expects just `INTRODUCE_ACK`
1163        struct Handler {
1164            /// Sender for `INTRODUCE_ACK`
1165            intro_ack_tx: proto_oneshot::Sender<IntroduceAck>,
1166        }
1167        impl MsgHandler for Handler {
1168            fn handle_msg(
1169                &mut self,
1170                msg: AnyRelayMsg,
1171            ) -> Result<MetaCellDisposition, tor_proto::Error> {
1172                self.intro_ack_tx
1173                    .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
1174            }
1175        }
1176        let handle_intro_proto_error = |error| FAE::IntroductionExchange { error, intro_index };
1177        let (intro_ack_tx, intro_ack_rx) = proto_oneshot::channel();
1178        let handler = Handler { intro_ack_tx };
1179
1180        debug!(
1181            "hs conn to {}: RPT {} IPT {}: making introduction - sending INTRODUCE1",
1182            &self.hsid,
1183            rend_pt.as_inner(),
1184            intro_index,
1185        );
1186
1187        intro_circ
1188            .m_start_conversation_last_hop(Some(intro1_real.into()), handler)
1189            .await
1190            .map_err(handle_intro_proto_error)?;
1191
1192        // Status is checked by `.success()`, and we don't look at the extensions;
1193        // just discard the known-successful `IntroduceAck`
1194        let _: IntroduceAck = intro_ack_rx
1195            .recv(handle_intro_proto_error)
1196            .await?
1197            .success()
1198            .map_err(|status| FAE::IntroductionFailed {
1199                status,
1200                intro_index,
1201            })?;
1202
1203        debug!(
1204            "hs conn to {}: RPT {} IPT {}: making introduction - success",
1205            &self.hsid,
1206            rend_pt.as_inner(),
1207            intro_index,
1208        );
1209
1210        // Having received INTRODUCE_ACK. we can forget about this circuit
1211        // (and potentially tear it down).
1212        drop(intro_circ);
1213
1214        Ok((
1215            rendezvous,
1216            Introduced {
1217                handshake_state,
1218                marker: PhantomData,
1219            },
1220        ))
1221    }
1222
1223    /// Attempt (once) to connect a rendezvous circuit using the given intro pt
1224    ///
1225    /// Timeouts here might be due to the IPT, RPT, service,
1226    /// or any of the intermediate relays.
1227    ///
1228    /// If, rather than a timeout, we actually encounter some kind of error,
1229    /// we'll return the appropriate `FailedAttemptError`.
1230    /// (Who is responsible may vary, so the `FailedAttemptError` variant will reflect that.)
1231    ///
1232    /// Does not apply a timeout
1233    async fn complete_rendezvous(
1234        &'c self,
1235        ipt: &UsableIntroPt<'_>,
1236        rendezvous: Rendezvous<'c, R, M>,
1237        introduced: Introduced<R, M>,
1238    ) -> Result<Arc<ClientCirc!(R, M)>, FAE> {
1239        use tor_proto::circuit::handshake;
1240
1241        let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
1242        let intro_index = ipt.intro_index;
1243        let handle_proto_error = |error| FAE::RendezvousCompletionCircuitError {
1244            error,
1245            intro_index,
1246            rend_pt: rend_pt.clone(),
1247        };
1248
1249        debug!(
1250            "hs conn to {}: RPT {} IPT {}: awaiting rendezvous completion",
1251            &self.hsid,
1252            rend_pt.as_inner(),
1253            intro_index,
1254        );
1255
1256        let rend2_msg: Rendezvous2 = rendezvous.rend2_rx.recv(handle_proto_error).await?;
1257
1258        debug!(
1259            "hs conn to {}: RPT {} IPT {}: received RENDEZVOUS2",
1260            &self.hsid,
1261            rend_pt.as_inner(),
1262            intro_index,
1263        );
1264
1265        // In theory would be great if we could have multiple introduction attempts in parallel
1266        // with similar x,X values but different IPTs.  However, our HS experts don't
1267        // think increasing parallelism here is important:
1268        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
1269        let handshake_state = introduced.handshake_state;
1270
1271        // Try to complete the cryptographic handshake.
1272        let keygen = handshake_state
1273            .client_receive_rend(rend2_msg.handshake_info())
1274            // If this goes wrong. either the onion service has mangled the crypto,
1275            // or the rendezvous point has misbehaved (that that is possible is a protocol bug),
1276            // or we have used the wrong handshake_state (let's assume that's not true).
1277            //
1278            // If this happens we'll go and try another RPT.
1279            .map_err(|error| FAE::RendezvousCompletionHandshake {
1280                error,
1281                intro_index,
1282                rend_pt: rend_pt.clone(),
1283            })?;
1284
1285        let params = onion_circparams_from_netparams(self.netdir.params())
1286            .map_err(into_internal!("Failed to build CircParameters"))?;
1287
1288        rendezvous
1289            .rend_circ
1290            .m_extend_virtual(
1291                handshake::RelayProtocol::HsV3,
1292                handshake::HandshakeRole::Initiator,
1293                keygen,
1294                params,
1295            )
1296            .await
1297            .map_err(into_internal!(
1298                "actually this is probably a 'circuit closed' error" // TODO HS
1299            ))?;
1300
1301        debug!(
1302            "hs conn to {}: RPT {} IPT {}: HS circuit established",
1303            &self.hsid,
1304            rend_pt.as_inner(),
1305            intro_index,
1306        );
1307
1308        Ok(rendezvous.rend_circ)
1309    }
1310
1311    /// Helper to estimate a timeout for a complicated operation
1312    ///
1313    /// `actions` is a list of `(count, action)`, where each entry
1314    /// represents doing `action`, `count` times sequentially.
1315    ///
1316    /// Combines the timeout estimates and returns an overall timeout.
1317    fn estimate_timeout(&self, actions: &[(u32, TimeoutsAction)]) -> Duration {
1318        // This algorithm is, perhaps, wrong.  For uncorrelated variables, a particular
1319        // percentile estimate for a sum of random variables, is not calculated by adding the
1320        // percentile estimates of the individual variables.
1321        //
1322        // But the actual lengths of times of the operations aren't uncorrelated.
1323        // If they were *perfectly* correlated, then this addition would be correct.
1324        // It will do for now; it just might be rather longer than it ought to be.
1325        actions
1326            .iter()
1327            .map(|(count, action)| {
1328                self.circpool
1329                    .m_estimate_timeout(action)
1330                    .saturating_mul(*count)
1331            })
1332            .fold(Duration::ZERO, Duration::saturating_add)
1333    }
1334}
1335
1336/// Mocks used for testing `connect.rs`
1337///
1338/// This is different to `MockableConnectorData`,
1339/// which is used to *replace* this file, when testing `state.rs`.
1340///
1341/// `MocksForConnect` provides mock facilities for *testing* this file.
1342//
1343// TODO this should probably live somewhere else, maybe tor-circmgr even?
1344// TODO this really ought to be made by macros or something
1345trait MocksForConnect<R>: Clone {
1346    /// HS circuit pool
1347    type HsCircPool: MockableCircPool<R>;
1348
1349    /// A random number generator
1350    type Rng: rand::Rng + rand::CryptoRng;
1351
1352    /// Tell tests we got this descriptor text
1353    fn test_got_desc(&self, _: &HsDesc) {}
1354    /// Tell tests we got this circuit
1355    fn test_got_circ(&self, _: &Arc<ClientCirc!(R, Self)>) {}
1356    /// Tell tests we have obtained and sorted the intros like this
1357    fn test_got_ipts(&self, _: &[UsableIntroPt]) {}
1358
1359    /// Return a random number generator
1360    fn thread_rng(&self) -> Self::Rng;
1361}
1362/// Mock for `HsCircPool`
1363///
1364/// Methods start with `m_` to avoid the following problem:
1365/// `ClientCirc::start_conversation` (say) means
1366/// to use the inherent method if one exists,
1367/// but will use a trait method if there isn't an inherent method.
1368///
1369/// So if the inherent method is renamed, the call in the impl here
1370/// turns into an always-recursive call.
1371/// This is not detected by the compiler due to the situation being
1372/// complicated by futures, `#[async_trait]` etc.
1373/// <https://github.com/rust-lang/rust/issues/111177>
1374#[async_trait]
1375trait MockableCircPool<R> {
1376    /// Client circuit
1377    type ClientCirc: MockableClientCirc;
1378    async fn m_get_or_launch_specific(
1379        &self,
1380        netdir: &NetDir,
1381        kind: HsCircKind,
1382        target: impl CircTarget + Send + Sync + 'async_trait,
1383    ) -> tor_circmgr::Result<Arc<Self::ClientCirc>>;
1384
1385    /// Client circuit
1386    async fn m_get_or_launch_client_rend<'a>(
1387        &self,
1388        netdir: &'a NetDir,
1389    ) -> tor_circmgr::Result<(Arc<Self::ClientCirc>, Relay<'a>)>;
1390
1391    /// Estimate timeout
1392    fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration;
1393}
1394/// Mock for `ClientCirc`
1395#[async_trait]
1396trait MockableClientCirc: Debug {
1397    /// Client circuit
1398    type DirStream: AsyncRead + AsyncWrite + Send + Unpin;
1399    async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream>;
1400
1401    /// Converse
1402    async fn m_start_conversation_last_hop(
1403        &self,
1404        msg: Option<AnyRelayMsg>,
1405        reply_handler: impl MsgHandler + Send + 'static,
1406    ) -> tor_proto::Result<Self::Conversation<'_>>;
1407    /// Conversation
1408    type Conversation<'r>
1409    where
1410        Self: 'r;
1411
1412    /// Add a virtual hop to the circuit.
1413    async fn m_extend_virtual(
1414        &self,
1415        protocol: tor_proto::circuit::handshake::RelayProtocol,
1416        role: tor_proto::circuit::handshake::HandshakeRole,
1417        handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1418        params: CircParameters,
1419    ) -> tor_proto::Result<()>;
1420}
1421
1422impl<R: Runtime> MocksForConnect<R> for () {
1423    type HsCircPool = HsCircPool<R>;
1424    type Rng = rand::rngs::ThreadRng;
1425
1426    fn thread_rng(&self) -> Self::Rng {
1427        rand::rng()
1428    }
1429}
1430#[async_trait]
1431impl<R: Runtime> MockableCircPool<R> for HsCircPool<R> {
1432    type ClientCirc = ClientCirc;
1433    async fn m_get_or_launch_specific(
1434        &self,
1435        netdir: &NetDir,
1436        kind: HsCircKind,
1437        target: impl CircTarget + Send + Sync + 'async_trait,
1438    ) -> tor_circmgr::Result<Arc<ClientCirc>> {
1439        HsCircPool::get_or_launch_specific(self, netdir, kind, target).await
1440    }
1441    async fn m_get_or_launch_client_rend<'a>(
1442        &self,
1443        netdir: &'a NetDir,
1444    ) -> tor_circmgr::Result<(Arc<ClientCirc>, Relay<'a>)> {
1445        HsCircPool::get_or_launch_client_rend(self, netdir).await
1446    }
1447    fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
1448        HsCircPool::estimate_timeout(self, action)
1449    }
1450}
1451#[async_trait]
1452impl MockableClientCirc for ClientCirc {
1453    /// Client circuit
1454    type DirStream = tor_proto::stream::DataStream;
1455    async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
1456        ClientCirc::begin_dir_stream(self).await
1457    }
1458    async fn m_start_conversation_last_hop(
1459        &self,
1460        msg: Option<AnyRelayMsg>,
1461        reply_handler: impl MsgHandler + Send + 'static,
1462    ) -> tor_proto::Result<Self::Conversation<'_>> {
1463        let last_hop = self.last_hop_num()?;
1464        ClientCirc::start_conversation(self, msg, reply_handler, last_hop).await
1465    }
1466    type Conversation<'r> = tor_proto::circuit::Conversation<'r>;
1467
1468    async fn m_extend_virtual(
1469        &self,
1470        protocol: tor_proto::circuit::handshake::RelayProtocol,
1471        role: tor_proto::circuit::handshake::HandshakeRole,
1472        handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1473        params: CircParameters,
1474    ) -> tor_proto::Result<()> {
1475        ClientCirc::extend_virtual(self, protocol, role, handshake, params).await
1476    }
1477}
1478
1479#[async_trait]
1480impl MockableConnectorData for Data {
1481    type ClientCirc = ClientCirc;
1482    type MockGlobalState = ();
1483
1484    async fn connect<R: Runtime>(
1485        connector: &HsClientConnector<R>,
1486        netdir: Arc<NetDir>,
1487        config: Arc<Config>,
1488        hsid: HsId,
1489        data: &mut Self,
1490        secret_keys: HsClientSecretKeys,
1491    ) -> Result<Arc<Self::ClientCirc>, ConnError> {
1492        connect(connector, netdir, config, hsid, data, secret_keys).await
1493    }
1494
1495    fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool {
1496        !circuit.is_closing()
1497    }
1498}
1499
1500#[cfg(test)]
1501mod test {
1502    // @@ begin test lint list maintained by maint/add_warning @@
1503    #![allow(clippy::bool_assert_comparison)]
1504    #![allow(clippy::clone_on_copy)]
1505    #![allow(clippy::dbg_macro)]
1506    #![allow(clippy::mixed_attributes_style)]
1507    #![allow(clippy::print_stderr)]
1508    #![allow(clippy::print_stdout)]
1509    #![allow(clippy::single_char_pattern)]
1510    #![allow(clippy::unwrap_used)]
1511    #![allow(clippy::unchecked_duration_subtraction)]
1512    #![allow(clippy::useless_vec)]
1513    #![allow(clippy::needless_pass_by_value)]
1514    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1515
1516    #![allow(dead_code, unused_variables)] // TODO HS TESTS delete, after tests are completed
1517
1518    use super::*;
1519    use crate::*;
1520    use futures::FutureExt as _;
1521    use std::{iter, panic::AssertUnwindSafe};
1522    use tokio_crate as tokio;
1523    use tor_async_utils::JoinReadWrite;
1524    use tor_basic_utils::test_rng::{testing_rng, TestingRng};
1525    use tor_hscrypto::pk::{HsClientDescEncKey, HsClientDescEncKeypair};
1526    use tor_llcrypto::pk::curve25519;
1527    use tor_netdoc::doc::{hsdesc::test_data, netstatus::Lifetime};
1528    use tor_rtcompat::tokio::TokioNativeTlsRuntime;
1529    use tor_rtcompat::RuntimeSubstExt as _;
1530    #[allow(deprecated)] // TODO #1885
1531    use tor_rtmock::time::MockSleepProvider;
1532    use tracing_test::traced_test;
1533
1534    #[derive(Debug, Default)]
1535    struct MocksGlobal {
1536        hsdirs_asked: Vec<OwnedCircTarget>,
1537        got_desc: Option<HsDesc>,
1538    }
1539    #[derive(Clone, Debug)]
1540    struct Mocks<I> {
1541        mglobal: Arc<Mutex<MocksGlobal>>,
1542        id: I,
1543    }
1544
1545    impl<I> Mocks<I> {
1546        fn map_id<J>(&self, f: impl FnOnce(&I) -> J) -> Mocks<J> {
1547            Mocks {
1548                mglobal: self.mglobal.clone(),
1549                id: f(&self.id),
1550            }
1551        }
1552    }
1553
1554    impl<R: Runtime> MocksForConnect<R> for Mocks<()> {
1555        type HsCircPool = Mocks<()>;
1556        type Rng = TestingRng;
1557
1558        fn test_got_desc(&self, desc: &HsDesc) {
1559            self.mglobal.lock().unwrap().got_desc = Some(desc.clone());
1560        }
1561
1562        fn test_got_ipts(&self, desc: &[UsableIntroPt]) {}
1563
1564        fn thread_rng(&self) -> Self::Rng {
1565            testing_rng()
1566        }
1567    }
1568    #[allow(clippy::diverging_sub_expression)] // async_trait + todo!()
1569    #[async_trait]
1570    impl<R: Runtime> MockableCircPool<R> for Mocks<()> {
1571        type ClientCirc = Mocks<()>;
1572        async fn m_get_or_launch_specific(
1573            &self,
1574            _netdir: &NetDir,
1575            kind: HsCircKind,
1576            target: impl CircTarget + Send + Sync + 'async_trait,
1577        ) -> tor_circmgr::Result<Arc<Self::ClientCirc>> {
1578            assert_eq!(kind, HsCircKind::ClientHsDir);
1579            let target = OwnedCircTarget::from_circ_target(&target);
1580            self.mglobal.lock().unwrap().hsdirs_asked.push(target);
1581            // Adding the `Arc` here is a little ugly, but that's what we get
1582            // for using the same Mocks for everything.
1583            Ok(Arc::new(self.clone()))
1584        }
1585        /// Client circuit
1586        async fn m_get_or_launch_client_rend<'a>(
1587            &self,
1588            netdir: &'a NetDir,
1589        ) -> tor_circmgr::Result<(Arc<ClientCirc!(R, Self)>, Relay<'a>)> {
1590            todo!()
1591        }
1592
1593        fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
1594            Duration::from_secs(10)
1595        }
1596    }
1597    #[allow(clippy::diverging_sub_expression)] // async_trait + todo!()
1598    #[async_trait]
1599    impl MockableClientCirc for Mocks<()> {
1600        type DirStream = JoinReadWrite<futures::io::Cursor<Box<[u8]>>, futures::io::Sink>;
1601        type Conversation<'r> = &'r ();
1602        async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
1603            let response = format!(
1604                r#"HTTP/1.1 200 OK
1605
1606{}"#,
1607                test_data::TEST_DATA_2
1608            )
1609            .into_bytes()
1610            .into_boxed_slice();
1611
1612            Ok(JoinReadWrite::new(
1613                futures::io::Cursor::new(response),
1614                futures::io::sink(),
1615            ))
1616        }
1617        async fn m_start_conversation_last_hop(
1618            &self,
1619            msg: Option<AnyRelayMsg>,
1620            reply_handler: impl MsgHandler + Send + 'static,
1621        ) -> tor_proto::Result<Self::Conversation<'_>> {
1622            todo!()
1623        }
1624
1625        async fn m_extend_virtual(
1626            &self,
1627            protocol: tor_proto::circuit::handshake::RelayProtocol,
1628            role: tor_proto::circuit::handshake::HandshakeRole,
1629            handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1630            params: CircParameters,
1631        ) -> tor_proto::Result<()> {
1632            todo!()
1633        }
1634    }
1635
1636    #[traced_test]
1637    #[tokio::test]
1638    async fn test_connect() {
1639        let valid_after = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
1640        let fresh_until = valid_after + humantime::parse_duration("1 hours").unwrap();
1641        let valid_until = valid_after + humantime::parse_duration("24 hours").unwrap();
1642        let lifetime = Lifetime::new(valid_after, fresh_until, valid_until).unwrap();
1643
1644        let netdir = tor_netdir::testnet::construct_custom_netdir_with_params(
1645            tor_netdir::testnet::simple_net_func,
1646            iter::empty::<(&str, _)>(),
1647            Some(lifetime),
1648        )
1649        .expect("failed to build default testing netdir");
1650
1651        let netdir = Arc::new(netdir.unwrap_if_sufficient().unwrap());
1652        let runtime = TokioNativeTlsRuntime::current().unwrap();
1653        let now = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
1654        #[allow(deprecated)] // TODO #1885
1655        let mock_sp = MockSleepProvider::new(now);
1656        let runtime = runtime
1657            .with_sleep_provider(mock_sp.clone())
1658            .with_coarse_time_provider(mock_sp);
1659        let time_period = netdir.hs_time_period();
1660
1661        let mglobal = Arc::new(Mutex::new(MocksGlobal::default()));
1662        let mocks = Mocks { mglobal, id: () };
1663        // From C Tor src/test/test_hs_common.c test_build_address
1664        let hsid = test_data::TEST_HSID_2.into();
1665        let mut data = Data::default();
1666
1667        let pk: HsClientDescEncKey = curve25519::PublicKey::from(test_data::TEST_PUBKEY_2).into();
1668        let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
1669        let mut secret_keys_builder = HsClientSecretKeysBuilder::default();
1670        secret_keys_builder.ks_hsc_desc_enc(HsClientDescEncKeypair::new(pk.clone(), sk));
1671        let secret_keys = secret_keys_builder.build().unwrap();
1672
1673        let ctx = Context::new(
1674            &runtime,
1675            &mocks,
1676            netdir,
1677            Default::default(),
1678            hsid,
1679            secret_keys,
1680            mocks.clone(),
1681        )
1682        .unwrap();
1683
1684        let _got = AssertUnwindSafe(ctx.connect(&mut data))
1685            .catch_unwind() // TODO HS TESTS: remove this and the AssertUnwindSafe
1686            .await;
1687
1688        let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
1689            .unwrap()
1690            .compute_blinded_key(time_period)
1691            .unwrap();
1692        let hs_blind_id = hs_blind_id_key.id();
1693
1694        let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
1695
1696        let hsdesc = HsDesc::parse_decrypt_validate(
1697            test_data::TEST_DATA_2,
1698            &hs_blind_id,
1699            now,
1700            &subcredential,
1701            Some(&HsClientDescEncKeypair::new(pk, sk)),
1702        )
1703        .unwrap()
1704        .dangerously_assume_timely();
1705
1706        let mglobal = mocks.mglobal.lock().unwrap();
1707        assert_eq!(mglobal.hsdirs_asked.len(), 1);
1708        // TODO hs: here and in other places, consider implementing PartialEq instead, or creating
1709        // an assert_dbg_eq macro (which would be part of a test_helpers crate or something)
1710        assert_eq!(
1711            format!("{:?}", mglobal.got_desc),
1712            format!("{:?}", Some(hsdesc))
1713        );
1714
1715        // Check how long the descriptor is valid for
1716        let (start_time, end_time) = data.desc.as_ref().unwrap().bounds();
1717        assert_eq!(start_time, None);
1718
1719        let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
1720        assert_eq!(end_time, Some(desc_valid_until));
1721
1722        // TODO HS TESTS: check the circuit in got is the one we gave out
1723
1724        // TODO HS TESTS: continue with this
1725    }
1726
1727    // TODO HS TESTS: Test IPT state management and expiry:
1728    //   - obtain a test descriptor with only a broken ipt
1729    //     (broken in the sense that intro can be attempted, but will fail somehow)
1730    //   - try to make a connection and expect it to fail
1731    //   - assert that the ipt data isn't empty
1732    //   - cause the descriptor to expire (advance clock)
1733    //   - start using a mocked RNG if we weren't already and pin its seed here
1734    //   - make a new descriptor with two IPTs: the broken one from earlier, and a new one
1735    //   - make a new connection
1736    //   - use test_got_ipts to check that the random numbers
1737    //     would sort the bad intro first, *and* that the good one is appears first
1738    //   - assert that connection succeeded
1739    //   - cause the circuit and descriptor to expire (advance clock)
1740    //   - go back to the previous descriptor contents, but with a new validity period
1741    //   - try to make a connection
1742    //   - use test_got_ipts to check that only the broken ipt is present
1743
1744    // TODO HS TESTS: test retries (of every retry loop we have here)
1745    // TODO HS TESTS: test error paths
1746}