tor_hsclient/
connect.rs

1//! Main implementation of the connection functionality
2
3use std::time::Duration;
4
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::sync::Arc;
9use std::time::Instant;
10
11use async_trait::async_trait;
12use educe::Educe;
13use futures::{AsyncRead, AsyncWrite};
14use itertools::Itertools;
15use rand::Rng;
16use tor_bytes::Writeable;
17use tor_cell::relaycell::hs::intro_payload::{self, IntroduceHandshakePayload};
18use tor_cell::relaycell::hs::pow::ProofOfWork;
19use tor_cell::relaycell::msg::{AnyRelayMsg, Introduce1, Rendezvous2};
20use tor_circmgr::build::onion_circparams_from_netparams;
21use tor_dirclient::SourceInfo;
22use tor_error::{debug_report, warn_report, Bug};
23use tor_hscrypto::Subcredential;
24use tor_proto::circuit::handshake::hs_ntor;
25use tracing::{debug, trace};
26
27use retry_error::RetryError;
28use safelog::Sensitive;
29use tor_cell::relaycell::hs::{
30    AuthKeyType, EstablishRendezvous, IntroduceAck, RendezvousEstablished,
31};
32use tor_cell::relaycell::RelayMsg;
33use tor_checkable::{timed::TimerangeBound, Timebound};
34use tor_circmgr::hspool::{HsCircKind, HsCircPool};
35use tor_circmgr::timeouts::Action as TimeoutsAction;
36use tor_dirclient::request::Requestable as _;
37use tor_error::{internal, into_internal};
38use tor_error::{HasRetryTime as _, RetryTime};
39use tor_hscrypto::pk::{HsBlindId, HsId, HsIdKey};
40use tor_hscrypto::RendCookie;
41use tor_linkspec::{CircTarget, HasRelayIds, OwnedCircTarget, RelayId};
42use tor_llcrypto::pk::ed25519::Ed25519Identity;
43use tor_netdir::{NetDir, Relay};
44use tor_netdoc::doc::hsdesc::{HsDesc, IntroPointDesc};
45use tor_proto::circuit::{CircParameters, ClientCirc, MetaCellDisposition, MsgHandler};
46use tor_rtcompat::{Runtime, SleepProviderExt as _, TimeoutError};
47
48use crate::pow::HsPowClient;
49use crate::proto_oneshot;
50use crate::relay_info::ipt_to_circtarget;
51use crate::state::MockableConnectorData;
52use crate::Config;
53use crate::{rend_pt_identity_for_error, FailedAttemptError, IntroPtIndex, RendPtIdentityForError};
54use crate::{ConnError, DescriptorError, DescriptorErrorDetail};
55use crate::{HsClientConnector, HsClientSecretKeys};
56
57use ConnError as CE;
58use FailedAttemptError as FAE;
59
60/// Number of hops in our hsdir, introduction, and rendezvous circuits
61///
62/// Required by `tor_circmgr`'s timeout estimation API
63/// ([`tor_circmgr::CircMgr::estimate_timeout`], [`HsCircPool::estimate_timeout`]).
64///
65/// TODO HS hardcoding the number of hops to 3 seems wrong.
66/// This is really something that HsCircPool knows.  And some setups might want to make
67/// shorter circuits for some reason.  And it will become wrong with vanguards?
68/// But right now I think this is what HsCircPool does.
69//
70// Some commentary from
71//   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1342#note_2918050
72// Possibilities:
73//  * Look at n_hops() on the circuits we get, if we don't need this estimate
74//    till after we have the circuit.
75//  * Add a function to HsCircPool to tell us what length of circuit to expect
76//    for each given type of circuit.
77const HOPS: usize = 3;
78
79/// Given `R, M` where `M: MocksForConnect<M>`, expand to the mockable `ClientCirc`
80// This is quite annoying.  But the alternative is to write out `<... as // ...>`
81// each time, since otherwise the compile complains about ambiguous associated types.
82macro_rules! ClientCirc { { $R:ty, $M:ty } => {
83    <<$M as MocksForConnect<$R>>::HsCircPool as MockableCircPool<$R>>::ClientCirc
84} }
85
86/// Information about a hidden service, including our connection history
87#[derive(Default, Educe)]
88#[educe(Debug)]
89// This type is actually crate-private, since it isn't re-exported, but it must
90// be `pub` because it appears as a default for a type parameter in HsClientConnector.
91pub struct Data {
92    /// The latest known onion service descriptor for this service.
93    desc: DataHsDesc,
94    /// Information about the latest status of trying to connect to this service
95    /// through each of its introduction points.
96    ipts: DataIpts,
97}
98
99/// Part of `Data` that relates to the HS descriptor
100type DataHsDesc = Option<TimerangeBound<HsDesc>>;
101
102/// Part of `Data` that relates to our information about introduction points
103type DataIpts = HashMap<RelayIdForExperience, IptExperience>;
104
105/// How things went last time we tried to use this introduction point
106///
107/// Neither this data structure, nor [`Data`], is responsible for arranging that we expire this
108/// information eventually.  If we keep reconnecting to the service, we'll retain information
109/// about each IPT indefinitely, at least so long as they remain listed in the descriptors we
110/// receive.
111///
112/// Expiry of unused data is handled by `state.rs`, according to `last_used` in `ServiceState`.
113///
114/// Choosing which IPT to prefer is done by obtaining an `IptSortKey`
115/// (from this and other information).
116//
117// Don't impl Ord for IptExperience.  We obtain `Option<&IptExperience>` from our
118// data structure, and if IptExperience were Ord then Option<&IptExperience> would be Ord
119// but it would be the wrong sort order: it would always prefer None, ie untried IPTs.
120#[derive(Debug)]
121struct IptExperience {
122    /// How long it took us to get whatever outcome occurred
123    ///
124    /// We prefer fast successes to slow ones.
125    /// Then, we prefer failures with earlier `RetryTime`,
126    /// and, lastly, faster failures to slower ones.
127    duration: Duration,
128
129    /// What happened and when we might try again
130    ///
131    /// Note that we don't actually *enforce* the `RetryTime` here, just sort by it
132    /// using `RetryTime::loose_cmp`.
133    ///
134    /// We *do* return an error that is itself `HasRetryTime` and expect our callers
135    /// to honour that.
136    outcome: Result<(), RetryTime>,
137}
138
139/// Actually make a HS connection, updating our recorded state as necessary
140///
141/// `connector` is provided only for obtaining the runtime and netdir (and `mock_for_state`).
142/// Obviously, `connect` is not supposed to go looking in `services`.
143///
144/// This function handles all necessary retrying of fallible operations,
145/// (and, therefore, must also limit the total work done for a particular call).
146///
147/// This function has a minimum of functionality, since it is the boundary
148/// between "mock connection, used for testing `state.rs`" and
149/// "mock circuit and netdir, used for testing `connect.rs`",
150/// so it is not, itself, unit-testable.
151pub(crate) async fn connect<R: Runtime>(
152    connector: &HsClientConnector<R>,
153    netdir: Arc<NetDir>,
154    config: Arc<Config>,
155    hsid: HsId,
156    data: &mut Data,
157    secret_keys: HsClientSecretKeys,
158) -> Result<Arc<ClientCirc>, ConnError> {
159    Context::new(
160        &connector.runtime,
161        &*connector.circpool,
162        netdir,
163        config,
164        hsid,
165        secret_keys,
166        (),
167    )?
168    .connect(data)
169    .await
170}
171
172/// Common context for a single request to connect to a hidden service
173///
174/// This saves on passing this same set of (immutable) values (or subsets thereof)
175/// to each method in the principal functional code, everywhere.
176/// It also provides a convenient type to be `Self`.
177///
178/// Its lifetime is one request to make a new client circuit to a hidden service,
179/// including all the retries and timeouts.
180struct Context<'c, R: Runtime, M: MocksForConnect<R>> {
181    /// Runtime
182    runtime: &'c R,
183    /// Circpool
184    circpool: &'c M::HsCircPool,
185    /// Netdir
186    //
187    // TODO holding onto the netdir for the duration of our attempts is not ideal
188    // but doing better is fairly complicated.  See discussions here:
189    //   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1228#note_2910545
190    //   https://gitlab.torproject.org/tpo/core/arti/-/issues/884
191    netdir: Arc<NetDir>,
192    /// Configuration
193    config: Arc<Config>,
194    /// Secret keys to use
195    secret_keys: HsClientSecretKeys,
196    /// HS ID
197    hsid: HsId,
198    /// Blinded HS ID
199    hs_blind_id: HsBlindId,
200    /// The subcredential to use during this time period
201    subcredential: Subcredential,
202    /// Mock data
203    mocks: M,
204}
205
206/// Details of an established rendezvous point
207///
208/// Intermediate value for progress during a connection attempt.
209struct Rendezvous<'r, R: Runtime, M: MocksForConnect<R>> {
210    /// RPT as a `Relay`
211    rend_relay: Relay<'r>,
212    /// Rendezvous circuit
213    rend_circ: Arc<ClientCirc!(R, M)>,
214    /// Rendezvous cookie
215    rend_cookie: RendCookie,
216
217    /// Receiver that will give us the RENDEZVOUS2 message.
218    ///
219    /// The sending ended is owned by the handler
220    /// which receives control messages on the rendezvous circuit,
221    /// and which was installed when we sent `ESTABLISH_RENDEZVOUS`.
222    ///
223    /// (`RENDEZVOUS2` is the message containing the onion service's side of the handshake.)
224    rend2_rx: proto_oneshot::Receiver<Rendezvous2>,
225
226    /// Dummy, to placate compiler
227    ///
228    /// Covariant without dropck or interfering with Send/Sync will do fine.
229    marker: PhantomData<fn() -> (R, M)>,
230}
231
232/// Random value used as part of IPT selection
233type IptSortRand = u32;
234
235/// Details of an apparently-useable introduction point
236///
237/// Intermediate value for progress during a connection attempt.
238struct UsableIntroPt<'i> {
239    /// Index in HS descriptor
240    intro_index: IntroPtIndex,
241    /// IPT descriptor
242    intro_desc: &'i IntroPointDesc,
243    /// IPT `CircTarget`
244    intro_target: OwnedCircTarget,
245    /// Random value used as part of IPT selection
246    sort_rand: IptSortRand,
247}
248
249/// Lookup key for looking up and recording our IPT use experiences
250///
251/// Used to identify a relay when looking to see what happened last time we used it,
252/// and storing that information after we tried it.
253///
254/// We store the experience information under an arbitrary one of the relay's identities,
255/// as returned by the `HasRelayIds::identities().next()`.
256/// When we do lookups, we check all the relay's identities to see if we find
257/// anything relevant.
258/// If relay identities permute in strange ways, whether we find our previous
259/// knowledge about them is not particularly well defined, but that's fine.
260///
261/// While this is, structurally, a relay identity, it is not suitable for other purposes.
262#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
263struct RelayIdForExperience(RelayId);
264
265/// Details of an apparently-successful INTRODUCE exchange
266///
267/// Intermediate value for progress during a connection attempt.
268struct Introduced<R: Runtime, M: MocksForConnect<R>> {
269    /// End-to-end crypto NTORv3 handshake with the service
270    ///
271    /// Created as part of generating our `INTRODUCE1`,
272    /// and then used when processing `RENDEZVOUS2`.
273    handshake_state: hs_ntor::HsNtorClientState,
274
275    /// Dummy, to placate compiler
276    ///
277    /// `R` and `M` only used for getting to mocks.
278    /// Covariant without dropck or interfering with Send/Sync will do fine.
279    marker: PhantomData<fn() -> (R, M)>,
280}
281
282impl RelayIdForExperience {
283    /// Identities to use to try to find previous experience information about this IPT
284    fn for_lookup(intro_target: &OwnedCircTarget) -> impl Iterator<Item = Self> + '_ {
285        intro_target
286            .identities()
287            .map(|id| RelayIdForExperience(id.to_owned()))
288    }
289
290    /// Identity to use to store previous experience information about this IPT
291    fn for_store(intro_target: &OwnedCircTarget) -> Result<Self, Bug> {
292        let id = intro_target
293            .identities()
294            .next()
295            .ok_or_else(|| internal!("introduction point relay with no identities"))?
296            .to_owned();
297        Ok(RelayIdForExperience(id))
298    }
299}
300
301/// Sort key for an introduction point, for selecting the best IPTs to try first
302///
303/// Ordering is most preferable first.
304///
305/// We use this to sort our `UsableIpt`s using `.sort_by_key`.
306/// (This implementation approach ensures that we obey all the usual ordering invariants.)
307#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
308struct IptSortKey {
309    /// Sort by how preferable the experience was
310    outcome: IptSortKeyOutcome,
311    /// Failing that, choose randomly
312    sort_rand: IptSortRand,
313}
314
315/// Component of the [`IptSortKey`] representing outcome of our last attempt, if any
316///
317/// This is the main thing we use to decide which IPTs to try first.
318/// It is calculated for each IPT
319/// (via `.sort_by_key`, so repeatedly - it should therefore be cheap to make.)
320///
321/// Ordering is most preferable first.
322#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
323enum IptSortKeyOutcome {
324    /// Prefer successes
325    Success {
326        /// Prefer quick ones
327        duration: Duration,
328    },
329    /// Failing that, try one we don't know to have failed
330    Untried,
331    /// Failing that, it'll have to be ones that didn't work last time
332    Failed {
333        /// Prefer failures with an earlier retry time
334        retry_time: tor_error::LooseCmpRetryTime,
335        /// Failing that, prefer quick failures (rather than slow ones eg timeouts)
336        duration: Duration,
337    },
338}
339
340impl From<Option<&IptExperience>> for IptSortKeyOutcome {
341    fn from(experience: Option<&IptExperience>) -> IptSortKeyOutcome {
342        use IptSortKeyOutcome as O;
343        match experience {
344            None => O::Untried,
345            Some(IptExperience { duration, outcome }) => match outcome {
346                Ok(()) => O::Success {
347                    duration: *duration,
348                },
349                Err(retry_time) => O::Failed {
350                    retry_time: (*retry_time).into(),
351                    duration: *duration,
352                },
353            },
354        }
355    }
356}
357
358impl<'c, R: Runtime, M: MocksForConnect<R>> Context<'c, R, M> {
359    /// Make a new `Context` from the input data
360    fn new(
361        runtime: &'c R,
362        circpool: &'c M::HsCircPool,
363        netdir: Arc<NetDir>,
364        config: Arc<Config>,
365        hsid: HsId,
366        secret_keys: HsClientSecretKeys,
367        mocks: M,
368    ) -> Result<Self, ConnError> {
369        let time_period = netdir.hs_time_period();
370        let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
371            .map_err(|_| CE::InvalidHsId)?
372            .compute_blinded_key(time_period)
373            .map_err(
374                // TODO HS what on earth do these errors mean, in practical terms ?
375                // In particular, we'll want to convert them to a ConnError variant,
376                // but what ErrorKind should they have ?
377                into_internal!("key blinding error, don't know how to handle"),
378            )?;
379        let hs_blind_id = hs_blind_id_key.id();
380
381        Ok(Context {
382            netdir,
383            config,
384            hsid,
385            hs_blind_id,
386            subcredential,
387            circpool,
388            runtime,
389            secret_keys,
390            mocks,
391        })
392    }
393
394    /// Actually make a HS connection, updating our recorded state as necessary
395    ///
396    /// Called by the `connect` function in this module.
397    ///
398    /// This function handles all necessary retrying of fallible operations,
399    /// (and, therefore, must also limit the total work done for a particular call).
400    async fn connect(&self, data: &mut Data) -> Result<Arc<ClientCirc!(R, M)>, ConnError> {
401        // This function must do the following, retrying as appropriate.
402        //  - Look up the onion descriptor in the state.
403        //  - Download the onion descriptor if one isn't there.
404        //  - In parallel:
405        //    - Pick a rendezvous point from the netdirprovider and launch a
406        //      rendezvous circuit to it. Then send ESTABLISH_INTRO.
407        //    - Pick a number of introduction points (1 or more) and try to
408        //      launch circuits to them.
409        //  - On a circuit to an introduction point, send an INTRODUCE1 cell.
410        //  - Wait for a RENDEZVOUS2 cell on the rendezvous circuit
411        //  - Add a virtual hop to the rendezvous circuit.
412        //  - Return the rendezvous circuit.
413
414        let mocks = self.mocks.clone();
415
416        let desc = self.descriptor_ensure(&mut data.desc).await?;
417
418        mocks.test_got_desc(desc);
419
420        let circ = self.intro_rend_connect(desc, &mut data.ipts).await?;
421        mocks.test_got_circ(&circ);
422
423        Ok(circ)
424    }
425
426    /// Ensure that `Data.desc` contains the HS descriptor
427    ///
428    /// If we have a previously-downloaded descriptor, which is still valid,
429    /// just returns a reference to it.
430    ///
431    /// Otherwise, tries to obtain the descriptor by downloading it from hsdir(s).
432    ///
433    /// Does all necessary retries and timeouts.
434    /// Returns an error if no valid descriptor could be found.
435    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
436    async fn descriptor_ensure<'d>(&self, data: &'d mut DataHsDesc) -> Result<&'d HsDesc, CE> {
437        // Maximum number of hsdir connection and retrieval attempts we'll make
438        let max_total_attempts = self
439            .config
440            .retry
441            .hs_desc_fetch_attempts()
442            .try_into()
443            // User specified a very large u32.  We must be downcasting it to 16bit!
444            // let's give them as many retries as we can manage.
445            .unwrap_or(usize::MAX);
446
447        // Limit on the duration of each retrieval attempt
448        let each_timeout = self.estimate_timeout(&[
449            (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
450            (1, TimeoutsAction::RoundTrip { length: HOPS }),    // One HTTP query/response
451        ]);
452
453        // We retain a previously obtained descriptor precisely until its lifetime expires,
454        // and pay no attention to the descriptor's revision counter.
455        // When it expires, we discard it completely and try to obtain a new one.
456        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914448
457        // TODO SPEC: Discuss HS descriptor lifetime and expiry client behaviour
458        if let Some(previously) = data {
459            let now = self.runtime.wallclock();
460            if let Ok(_desc) = previously.as_ref().check_valid_at(&now) {
461                // Ideally we would just return desc but that confuses borrowck.
462                // https://github.com/rust-lang/rust/issues/51545
463                return Ok(data
464                    .as_ref()
465                    .expect("Some but now None")
466                    .as_ref()
467                    .check_valid_at(&now)
468                    .expect("Ok but now Err"));
469            }
470            // Seems to be not valid now.  Try to fetch a fresh one.
471        }
472
473        let hs_dirs = self.netdir.hs_dirs_download(
474            self.hs_blind_id,
475            self.netdir.hs_time_period(),
476            &mut self.mocks.thread_rng(),
477        )?;
478
479        trace!(
480            "HS desc fetch for {}, using {} hsdirs",
481            &self.hsid,
482            hs_dirs.len()
483        );
484
485        // We might consider launching requests to multiple HsDirs in parallel.
486        //   https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1118#note_2894463
487        // But C Tor doesn't and our HS experts don't consider that important:
488        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914436
489        // (Additionally, making multiple HSDir requests at once may make us
490        // more vulnerable to traffic analysis.)
491        let mut attempts = hs_dirs.iter().cycle().take(max_total_attempts);
492        let mut errors = RetryError::in_attempt_to("retrieve hidden service descriptor");
493        let desc = loop {
494            let relay = match attempts.next() {
495                Some(relay) => relay,
496                None => {
497                    return Err(if errors.is_empty() {
498                        CE::NoHsDirs
499                    } else {
500                        CE::DescriptorDownload(errors)
501                    })
502                }
503            };
504            let hsdir_for_error: Sensitive<Ed25519Identity> = (*relay.id()).into();
505            match self
506                .runtime
507                .timeout(each_timeout, self.descriptor_fetch_attempt(relay))
508                .await
509                .unwrap_or(Err(DescriptorErrorDetail::Timeout))
510            {
511                Ok(desc) => break desc,
512                Err(error) => {
513                    if error.should_report_as_suspicious() {
514                        // Note that not every protocol violation is suspicious:
515                        // we only warn on the protocol violations that look like attempts
516                        // to do a traffic tagging attack via hsdir inflation.
517                        // (See proposal 360.)
518                        warn_report!(
519                            &error,
520                            "Suspicious failure while downloading hsdesc for {} from relay {}",
521                            &self.hsid,
522                            relay.display_relay_ids(),
523                        );
524                    } else {
525                        debug_report!(
526                            &error,
527                            "failed hsdir desc fetch for {} from {}/{}",
528                            &self.hsid,
529                            &relay.id(),
530                            &relay.rsa_id()
531                        );
532                    }
533                    errors.push(tor_error::Report(DescriptorError {
534                        hsdir: hsdir_for_error,
535                        error,
536                    }));
537                }
538            }
539        };
540
541        // Store the bounded value in the cache for reuse,
542        // but return a reference to the unwrapped `HsDesc`.
543        //
544        // The `HsDesc` must be owned by `data.desc`,
545        // so first add it to `data.desc`,
546        // and then dangerously_assume_timely to get a reference out again.
547        //
548        // It is safe to dangerously_assume_timely,
549        // as descriptor_fetch_attempt has already checked the timeliness of the descriptor.
550        let ret = data.insert(desc);
551        Ok(ret.as_ref().dangerously_assume_timely())
552    }
553
554    /// Make one attempt to fetch the descriptor from a specific hsdir
555    ///
556    /// No timeout
557    ///
558    /// On success, returns the descriptor.
559    ///
560    /// While the returned descriptor is `TimerangeBound`, its validity at the current time *has*
561    /// been checked.
562    async fn descriptor_fetch_attempt(
563        &self,
564        hsdir: &Relay<'_>,
565    ) -> Result<TimerangeBound<HsDesc>, DescriptorErrorDetail> {
566        let max_len: usize = self
567            .netdir
568            .params()
569            .hsdir_max_desc_size
570            .get()
571            .try_into()
572            .map_err(into_internal!("BoundedInt was not truly bounded!"))?;
573        let request = {
574            let mut r = tor_dirclient::request::HsDescDownloadRequest::new(self.hs_blind_id);
575            r.set_max_len(max_len);
576            r
577        };
578        trace!(
579            "hsdir for {}, trying {}/{}, request {:?} (http request {:?})",
580            &self.hsid,
581            &hsdir.id(),
582            &hsdir.rsa_id(),
583            &request,
584            request.debug_request()
585        );
586
587        let circuit = self
588            .circpool
589            .m_get_or_launch_specific(
590                &self.netdir,
591                HsCircKind::ClientHsDir,
592                OwnedCircTarget::from_circ_target(hsdir),
593            )
594            .await?;
595        let source: Option<SourceInfo> = circuit
596            .m_source_info()
597            .map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
598        let mut stream = circuit
599            .m_begin_dir_stream()
600            .await
601            .map_err(DescriptorErrorDetail::Stream)?;
602
603        let response = tor_dirclient::send_request(self.runtime, &request, &mut stream, source)
604            .await
605            .map_err(|dir_error| match dir_error {
606                tor_dirclient::Error::RequestFailed(rfe) => DescriptorErrorDetail::from(rfe.error),
607                tor_dirclient::Error::CircMgr(ce) => into_internal!(
608                    "tor-dirclient complains about circmgr going wrong but we gave it a stream"
609                )(ce)
610                .into(),
611                other => into_internal!(
612                    "tor-dirclient gave unexpected error, tor-hsclient code needs updating"
613                )(other)
614                .into(),
615            })?;
616
617        let desc_text = response.into_output_string().map_err(|rfe| rfe.error)?;
618        let hsc_desc_enc = self.secret_keys.keys.ks_hsc_desc_enc.as_ref();
619
620        let now = self.runtime.wallclock();
621
622        HsDesc::parse_decrypt_validate(
623            &desc_text,
624            &self.hs_blind_id,
625            now,
626            &self.subcredential,
627            hsc_desc_enc,
628        )
629        .map_err(DescriptorErrorDetail::from)
630    }
631
632    /// Given the descriptor, try to connect to service
633    ///
634    /// Does all necessary retries, timeouts, etc.
635    async fn intro_rend_connect(
636        &self,
637        desc: &HsDesc,
638        data: &mut DataIpts,
639    ) -> Result<Arc<ClientCirc!(R, M)>, CE> {
640        // Maximum number of rendezvous/introduction attempts we'll make
641        let max_total_attempts = self
642            .config
643            .retry
644            .hs_intro_rend_attempts()
645            .try_into()
646            // User specified a very large u32.  We must be downcasting it to 16bit!
647            // let's give them as many retries as we can manage.
648            .unwrap_or(usize::MAX);
649
650        // Limit on the duration of each attempt to establish a rendezvous point
651        //
652        // This *might* include establishing a fresh circuit,
653        // if the HsCircPool's pool is empty.
654        let rend_timeout = self.estimate_timeout(&[
655            (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
656            (1, TimeoutsAction::RoundTrip { length: HOPS }),    // One ESTABLISH_RENDEZVOUS
657        ]);
658
659        // Limit on the duration of each attempt to negotiate with an introduction point
660        //
661        // *Does* include establishing the circuit.
662        let intro_timeout = self.estimate_timeout(&[
663            (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
664            // This does some crypto too, but we don't account for that.
665            (1, TimeoutsAction::RoundTrip { length: HOPS }), // One INTRODUCE1/INTRODUCE_ACK
666        ]);
667
668        // Timeout estimator for the action that the HS will take in building
669        // its circuit to the RPT.
670        let hs_build_action = TimeoutsAction::BuildCircuit {
671            length: if desc.is_single_onion_service() {
672                1
673            } else {
674                HOPS
675            },
676        };
677        // Limit on the duration of each attempt for activities involving both
678        // RPT and IPT.
679        let rpt_ipt_timeout = self.estimate_timeout(&[
680            // The API requires us to specify a number of circuit builds and round trips.
681            // So what we tell the estimator is a rather imprecise description.
682            // (TODO it would be nice if the circmgr offered us a one-way trip Action).
683            //
684            // What we are timing here is:
685            //
686            //    INTRODUCE2 goes from IPT to HS
687            //    but that happens in parallel with us waiting for INTRODUCE_ACK,
688            //    which is controlled by `intro_timeout` so not pat of `ipt_rpt_timeout`.
689            //    and which has to come HOPS hops.  So don't count INTRODUCE2 here.
690            //
691            //    HS builds to our RPT
692            (1, hs_build_action),
693            //
694            //    RENDEZVOUS1 goes from HS to RPT.  `hs_hops`, one-way.
695            //    RENDEZVOUS2 goes from RPT to us.  HOPS, one-way.
696            //    Together, we squint a bit and call this a HOPS round trip:
697            (1, TimeoutsAction::RoundTrip { length: HOPS }),
698        ]);
699
700        // We can't reliably distinguish IPT failure from RPT failure, so we iterate over IPTs
701        // (best first) and each time use a random RPT.
702
703        // We limit the number of rendezvous establishment attempts, separately, since we don't
704        // try to talk to the intro pt until we've established the rendezvous circuit.
705        let mut rend_attempts = 0..max_total_attempts;
706
707        // But, we put all the errors into the same bucket, since we might have a mixture.
708        let mut errors = RetryError::in_attempt_to("make circuit to to hidden service");
709
710        // Note that IntroPtIndex is *not* the index into this Vec.
711        // It is the index into the original list of introduction points in the descriptor.
712        let mut usable_intros: Vec<UsableIntroPt> = desc
713            .intro_points()
714            .iter()
715            .enumerate()
716            .map(|(intro_index, intro_desc)| {
717                let intro_index = intro_index.into();
718                let intro_target = ipt_to_circtarget(intro_desc, &self.netdir)
719                    .map_err(|error| FAE::UnusableIntro { error, intro_index })?;
720                // Lack of TAIT means this clone
721                let intro_target = OwnedCircTarget::from_circ_target(&intro_target);
722                Ok::<_, FailedAttemptError>(UsableIntroPt {
723                    intro_index,
724                    intro_desc,
725                    intro_target,
726                    sort_rand: self.mocks.thread_rng().random(),
727                })
728            })
729            .filter_map(|entry| match entry {
730                Ok(y) => Some(y),
731                Err(e) => {
732                    errors.push(e);
733                    None
734                }
735            })
736            .collect_vec();
737
738        // Delete experience information for now-unlisted intro points
739        // Otherwise, as the IPTs change `Data` might grow without bound,
740        // if we keep reconnecting to the same HS.
741        data.retain(|k, _v| {
742            usable_intros
743                .iter()
744                .any(|ipt| RelayIdForExperience::for_lookup(&ipt.intro_target).any(|id| &id == k))
745        });
746
747        // Join with existing state recording our experiences,
748        // sort by descending goodness, and then randomly
749        // (so clients without any experience don't all pile onto the same, first, IPT)
750        usable_intros.sort_by_key(|ipt: &UsableIntroPt| {
751            let experience =
752                RelayIdForExperience::for_lookup(&ipt.intro_target).find_map(|id| data.get(&id));
753            IptSortKey {
754                outcome: experience.into(),
755                sort_rand: ipt.sort_rand,
756            }
757        });
758        self.mocks.test_got_ipts(&usable_intros);
759
760        let mut intro_attempts = usable_intros.iter().cycle().take(max_total_attempts);
761
762        // We retain a rendezvous we managed to set up in here.  That way if we created it, and
763        // then failed before we actually needed it, we can reuse it.
764        // If we exit with an error, we will waste it - but because we isolate things we do
765        // for different services, it wouldn't be reusable anyway.
766        let mut saved_rendezvous = None;
767
768        // If we are using proof-of-work DoS mitigation, this chooses an
769        // algorithm and initial effort, and adjusts that effort when we retry.
770        let mut pow_client = HsPowClient::new(&self.hs_blind_id, desc);
771
772        // We might consider making multiple INTRODUCE attempts to different
773        // IPTs in in parallel, and somehow aggregating the errors and
774        // experiences.
775        // However our HS experts don't consider that important:
776        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
777        // Parallelizing our HsCircPool circuit building would likely have
778        // greater impact. (See #1149.)
779        loop {
780            // When did we start doing things that depended on the IPT?
781            //
782            // Used for recording our experience with the selected IPT
783            let mut ipt_use_started = None::<Instant>;
784
785            // Error handling inner async block (analogous to an IEFE):
786            //  * Ok(Some()) means this attempt succeeded
787            //  * Ok(None) means all attempts exhausted
788            //  * Err(error) means this attempt failed
789            //
790            // Error handling is rather complex here.  It's the primary job of *this* code to
791            // make sure that it's done right for timeouts.  (The individual component
792            // functions handle non-timeout errors.)  The different timeout errors have
793            // different amounts of information about the identity of the RPT and IPT: in each
794            // case, the error only mentions the RPT or IPT if that node is implicated in the
795            // timeout.
796            let outcome = async {
797                // We establish a rendezvous point first.  Although it appears from reading
798                // this code that this means we serialise establishment of the rendezvous and
799                // introduction circuits, this isn't actually the case.  The circmgr maintains
800                // a pool of circuits.  What actually happens in the "standing start" case is
801                // that we obtain a circuit for rendezvous from the circmgr's pool, expecting
802                // one to be available immediately; the circmgr will then start to build a new
803                // one to replenish its pool, and that happens in parallel with the work we do
804                // here - but in arrears.  If the circmgr pool is empty, then we must wait.
805                //
806                // Perhaps this should be parallelised here.  But that's really what the pool
807                // is for, since we expect building the rendezvous circuit and building the
808                // introduction circuit to take about the same length of time.
809                //
810                // We *do* serialise the ESTABLISH_RENDEZVOUS exchange, with the
811                // building of the introduction circuit.  That could be improved, at the cost
812                // of some additional complexity here.
813                //
814                // Our HS experts don't consider it important to increase the parallelism:
815                //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914444
816                //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914445
817                if saved_rendezvous.is_none() {
818                    debug!("hs conn to {}: setting up rendezvous point", &self.hsid);
819                    // Establish a rendezvous circuit.
820                    let Some(_): Option<usize> = rend_attempts.next() else {
821                        return Ok(None);
822                    };
823
824                    let mut using_rend_pt = None;
825                    saved_rendezvous = Some(
826                        self.runtime
827                            .timeout(rend_timeout, self.establish_rendezvous(&mut using_rend_pt))
828                            .await
829                            .map_err(|_: TimeoutError| match using_rend_pt {
830                                None => FAE::RendezvousCircuitObtain {
831                                    error: tor_circmgr::Error::CircTimeout(None),
832                                },
833                                Some(rend_pt) => FAE::RendezvousEstablishTimeout { rend_pt },
834                            })??,
835                    );
836                }
837
838                let Some(ipt) = intro_attempts.next() else {
839                    return Ok(None);
840                };
841                let intro_index = ipt.intro_index;
842
843                let proof_of_work = match pow_client.solve().await {
844                    Ok(solution) => solution,
845                    Err(e) => {
846                        debug!(
847                            "failing to compute proof-of-work, trying without. ({:?})",
848                            e
849                        );
850                        None
851                    }
852                };
853
854                // We record how long things take, starting from here, as
855                // as a statistic we'll use for the IPT in future.
856                // This is stored in a variable outside this async block,
857                // so that the outcome handling can use it.
858                ipt_use_started = Some(self.runtime.now());
859
860                // No `Option::get_or_try_insert_with`, or we'd avoid this expect()
861                let rend_pt_for_error = rend_pt_identity_for_error(
862                    &saved_rendezvous
863                        .as_ref()
864                        .expect("just made Some")
865                        .rend_relay,
866                );
867                debug!(
868                    "hs conn to {}: RPT {}",
869                    &self.hsid,
870                    rend_pt_for_error.as_inner()
871                );
872
873                let (rendezvous, introduced) = self
874                    .runtime
875                    .timeout(
876                        intro_timeout,
877                        self.exchange_introduce(ipt, &mut saved_rendezvous,
878                            proof_of_work),
879                    )
880                    .await
881                    .map_err(|_: TimeoutError| {
882                        // The intro point ought to give us a prompt ACK regardless of HS
883                        // behaviour or whatever is happening at the RPT, so blame the IPT.
884                        FAE::IntroductionTimeout { intro_index }
885                    })?
886                    // TODO: Maybe try, once, to extend-and-reuse the intro circuit.
887                    //
888                    // If the introduction fails, the introduction circuit is in principle
889                    // still usable.  We believe that in this case, C Tor extends the intro
890                    // circuit by one hop to the next IPT to try.  That saves on building a
891                    // whole new 3-hop intro circuit.  However, our HS experts tell us that
892                    // if introduction fails at one IPT it is likely to fail at the others too,
893                    // so that optimisation might reduce our network impact and time to failure,
894                    // but isn't likely to improve our chances of success.
895                    //
896                    // However, it's not clear whether this approach risks contaminating
897                    // the 2nd attempt with some fault relating to the introduction point.
898                    // The 1st ipt might also gain more knowledge about which HS we're talking to.
899                    //
900                    // TODO SPEC: Discuss extend-and-reuse HS intro circuit after nack
901                    ?;
902                #[allow(unused_variables)] // it's *supposed* to be unused
903                let saved_rendezvous = (); // don't use `saved_rendezvous` any more, use rendezvous
904
905                let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
906                let circ = self
907                    .runtime
908                    .timeout(
909                        rpt_ipt_timeout,
910                        self.complete_rendezvous(ipt, rendezvous, introduced),
911                    )
912                    .await
913                    .map_err(|_: TimeoutError| FAE::RendezvousCompletionTimeout {
914                        intro_index,
915                        rend_pt: rend_pt.clone(),
916                    })??;
917
918                debug!(
919                    "hs conn to {}: RPT {} IPT {}: success",
920                    &self.hsid,
921                    rend_pt.as_inner(),
922                    intro_index,
923                );
924                Ok::<_, FAE>(Some((intro_index, circ)))
925            }
926            .await;
927
928            // Store the experience `outcome` we had with IPT `intro_index`, in `data`
929            #[allow(clippy::unused_unit)] // -> () is here for error handling clarity
930            let mut store_experience = |intro_index, outcome| -> () {
931                (|| {
932                    let ipt = usable_intros
933                        .iter()
934                        .find(|ipt| ipt.intro_index == intro_index)
935                        .ok_or_else(|| internal!("IPT not found by index"))?;
936                    let id = RelayIdForExperience::for_store(&ipt.intro_target)?;
937                    let started = ipt_use_started.ok_or_else(|| {
938                        internal!("trying to record IPT use but no IPT start time noted")
939                    })?;
940                    let duration = self
941                        .runtime
942                        .now()
943                        .checked_duration_since(started)
944                        .ok_or_else(|| internal!("clock overflow calculating IPT use duration"))?;
945                    data.insert(id, IptExperience { duration, outcome });
946                    Ok::<_, Bug>(())
947                })()
948                .unwrap_or_else(|e| warn_report!(e, "error recording HS IPT use experience"));
949            };
950
951            match outcome {
952                Ok(Some((intro_index, y))) => {
953                    // Record successful outcome in Data
954                    store_experience(intro_index, Ok(()));
955                    return Ok(y);
956                }
957                Ok(None) => return Err(CE::Failed(errors)),
958                Err(error) => {
959                    debug_report!(&error, "hs conn to {}: attempt failed", &self.hsid);
960                    // Record error outcome in Data, if in fact we involved the IPT
961                    // at all.  The IPT information is be retrieved from `error`,
962                    // since only some of the errors implicate the introduction point.
963                    if let Some(intro_index) = error.intro_index() {
964                        store_experience(intro_index, Err(error.retry_time()));
965                    }
966                    errors.push(error);
967
968                    // If we are using proof-of-work DoS mitigation, try harder next time
969                    pow_client.increase_effort();
970                }
971            }
972        }
973    }
974
975    /// Make one attempt to establish a rendezvous circuit
976    ///
977    /// This doesn't really depend on anything,
978    /// other than (obviously) the isolation implied by our circuit pool.
979    /// In particular it doesn't depend on the introduction point.
980    ///
981    /// Does not apply a timeout.
982    ///
983    /// On entry `using_rend_pt` is `None`.
984    /// This function will store `Some` when it finds out which relay
985    /// it is talking to and starts to converse with it.
986    /// That way, if a timeout occurs, the caller can add that information to the error.
987    async fn establish_rendezvous(
988        &'c self,
989        using_rend_pt: &mut Option<RendPtIdentityForError>,
990    ) -> Result<Rendezvous<'c, R, M>, FAE> {
991        let (rend_circ, rend_relay) = self
992            .circpool
993            .m_get_or_launch_client_rend(&self.netdir)
994            .await
995            .map_err(|error| FAE::RendezvousCircuitObtain { error })?;
996
997        let rend_pt = rend_pt_identity_for_error(&rend_relay);
998        *using_rend_pt = Some(rend_pt.clone());
999
1000        let rend_cookie: RendCookie = self.mocks.thread_rng().random();
1001        let message = EstablishRendezvous::new(rend_cookie);
1002
1003        let (rend_established_tx, rend_established_rx) = proto_oneshot::channel();
1004        let (rend2_tx, rend2_rx) = proto_oneshot::channel();
1005
1006        /// Handler which expects `RENDEZVOUS_ESTABLISHED` and then
1007        /// `RENDEZVOUS2`.   Returns each message via the corresponding `oneshot`.
1008        struct Handler {
1009            /// Sender for a RENDEZVOUS_ESTABLISHED message.
1010            rend_established_tx: proto_oneshot::Sender<RendezvousEstablished>,
1011            /// Sender for a RENDEZVOUS2 message.
1012            rend2_tx: proto_oneshot::Sender<Rendezvous2>,
1013        }
1014        impl MsgHandler for Handler {
1015            fn handle_msg(
1016                &mut self,
1017                msg: AnyRelayMsg,
1018            ) -> Result<MetaCellDisposition, tor_proto::Error> {
1019                // The first message we expect is a RENDEZVOUS_ESTABALISHED.
1020                if self.rend_established_tx.still_expected() {
1021                    self.rend_established_tx
1022                        .deliver_expected_message(msg, MetaCellDisposition::Consumed)
1023                } else {
1024                    self.rend2_tx
1025                        .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
1026                }
1027            }
1028        }
1029
1030        debug!(
1031            "hs conn to {}: RPT {}: sending ESTABLISH_RENDEZVOUS",
1032            &self.hsid,
1033            rend_pt.as_inner(),
1034        );
1035
1036        let handle_proto_error = |error| FAE::RendezvousEstablish {
1037            error,
1038            rend_pt: rend_pt.clone(),
1039        };
1040        let handler = Handler {
1041            rend_established_tx,
1042            rend2_tx,
1043        };
1044
1045        rend_circ
1046            .m_start_conversation_last_hop(Some(message.into()), handler)
1047            .await
1048            .map_err(handle_proto_error)?;
1049
1050        // `start_conversation` returns as soon as the control message has been sent.
1051        // We need to obtain the RENDEZVOUS_ESTABLISHED message, which is "returned" via the oneshot.
1052        let _: RendezvousEstablished = rend_established_rx.recv(handle_proto_error).await?;
1053
1054        debug!(
1055            "hs conn to {}: RPT {}: got RENDEZVOUS_ESTABLISHED",
1056            &self.hsid,
1057            rend_pt.as_inner(),
1058        );
1059
1060        Ok(Rendezvous {
1061            rend_circ,
1062            rend_cookie,
1063            rend_relay,
1064            rend2_rx,
1065            marker: PhantomData,
1066        })
1067    }
1068
1069    /// Attempt (once) to send an INTRODUCE1 and wait for the INTRODUCE_ACK
1070    ///
1071    /// `take`s the input `rendezvous` (but only takes it if it gets that far)
1072    /// and, if successful, returns it.
1073    /// (This arranges that the rendezvous is "used up" precisely if
1074    /// we sent its secret somewhere.)
1075    ///
1076    /// Although this function handles the `Rendezvous`,
1077    /// nothing in it actually involves the rendezvous point.
1078    /// So if there's a failure, it's purely to do with the introduction point.
1079    ///
1080    /// Does not apply a timeout.
1081    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1082    async fn exchange_introduce(
1083        &'c self,
1084        ipt: &UsableIntroPt<'_>,
1085        rendezvous: &mut Option<Rendezvous<'c, R, M>>,
1086        proof_of_work: Option<ProofOfWork>,
1087    ) -> Result<(Rendezvous<'c, R, M>, Introduced<R, M>), FAE> {
1088        let intro_index = ipt.intro_index;
1089
1090        debug!(
1091            "hs conn to {}: IPT {}: obtaining intro circuit",
1092            &self.hsid, intro_index,
1093        );
1094
1095        let intro_circ = self
1096            .circpool
1097            .m_get_or_launch_specific(
1098                &self.netdir,
1099                HsCircKind::ClientIntro,
1100                ipt.intro_target.clone(), // &OwnedCircTarget isn't CircTarget apparently
1101            )
1102            .await
1103            .map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
1104
1105        let rendezvous = rendezvous.take().ok_or_else(|| internal!("no rend"))?;
1106
1107        let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
1108
1109        debug!(
1110            "hs conn to {}: RPT {} IPT {}: making introduction",
1111            &self.hsid,
1112            rend_pt.as_inner(),
1113            intro_index,
1114        );
1115
1116        // Now we construct an introduce1 message and perform the first part of the
1117        // rendezvous handshake.
1118        //
1119        // This process is tricky because the header of the INTRODUCE1 message
1120        // -- which depends on the IntroPt configuration -- is authenticated as
1121        // part of the HsDesc handshake.
1122
1123        // Construct the header, since we need it as input to our encryption.
1124        let intro_header = {
1125            let ipt_sid_key = ipt.intro_desc.ipt_sid_key();
1126            let intro1 = Introduce1::new(
1127                AuthKeyType::ED25519_SHA3_256,
1128                ipt_sid_key.as_bytes().to_vec(),
1129                vec![],
1130            );
1131            let mut header = vec![];
1132            intro1
1133                .encode_onto(&mut header)
1134                .map_err(into_internal!("couldn't encode intro1 header"))?;
1135            header
1136        };
1137
1138        // Construct the introduce payload, which tells the onion service how to find
1139        // our rendezvous point.  (We could do this earlier if we wanted.)
1140        let intro_payload = {
1141            let onion_key =
1142                intro_payload::OnionKey::NtorOnionKey(*rendezvous.rend_relay.ntor_onion_key());
1143            let linkspecs = rendezvous
1144                .rend_relay
1145                .linkspecs()
1146                .map_err(into_internal!("Couldn't encode link specifiers"))?;
1147            let payload = IntroduceHandshakePayload::new(
1148                rendezvous.rend_cookie,
1149                onion_key,
1150                linkspecs,
1151                proof_of_work,
1152            );
1153            let mut encoded = vec![];
1154            payload
1155                .write_onto(&mut encoded)
1156                .map_err(into_internal!("Couldn't encode introduce1 payload"))?;
1157            encoded
1158        };
1159
1160        // Perform the cryptographic handshake with the onion service.
1161        let service_info = hs_ntor::HsNtorServiceInfo::new(
1162            ipt.intro_desc.svc_ntor_key().clone(),
1163            ipt.intro_desc.ipt_sid_key().clone(),
1164            self.subcredential,
1165        );
1166        let handshake_state =
1167            hs_ntor::HsNtorClientState::new(&mut self.mocks.thread_rng(), service_info);
1168        let encrypted_body = handshake_state
1169            .client_send_intro(&intro_header, &intro_payload)
1170            .map_err(into_internal!("can't begin hs-ntor handshake"))?;
1171
1172        // Build our actual INTRODUCE1 message.
1173        let intro1_real = Introduce1::new(
1174            AuthKeyType::ED25519_SHA3_256,
1175            ipt.intro_desc.ipt_sid_key().as_bytes().to_vec(),
1176            encrypted_body,
1177        );
1178
1179        /// Handler which expects just `INTRODUCE_ACK`
1180        struct Handler {
1181            /// Sender for `INTRODUCE_ACK`
1182            intro_ack_tx: proto_oneshot::Sender<IntroduceAck>,
1183        }
1184        impl MsgHandler for Handler {
1185            fn handle_msg(
1186                &mut self,
1187                msg: AnyRelayMsg,
1188            ) -> Result<MetaCellDisposition, tor_proto::Error> {
1189                self.intro_ack_tx
1190                    .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
1191            }
1192        }
1193        let handle_intro_proto_error = |error| FAE::IntroductionExchange { error, intro_index };
1194        let (intro_ack_tx, intro_ack_rx) = proto_oneshot::channel();
1195        let handler = Handler { intro_ack_tx };
1196
1197        debug!(
1198            "hs conn to {}: RPT {} IPT {}: making introduction - sending INTRODUCE1",
1199            &self.hsid,
1200            rend_pt.as_inner(),
1201            intro_index,
1202        );
1203
1204        intro_circ
1205            .m_start_conversation_last_hop(Some(intro1_real.into()), handler)
1206            .await
1207            .map_err(handle_intro_proto_error)?;
1208
1209        // Status is checked by `.success()`, and we don't look at the extensions;
1210        // just discard the known-successful `IntroduceAck`
1211        let _: IntroduceAck = intro_ack_rx
1212            .recv(handle_intro_proto_error)
1213            .await?
1214            .success()
1215            .map_err(|status| FAE::IntroductionFailed {
1216                status,
1217                intro_index,
1218            })?;
1219
1220        debug!(
1221            "hs conn to {}: RPT {} IPT {}: making introduction - success",
1222            &self.hsid,
1223            rend_pt.as_inner(),
1224            intro_index,
1225        );
1226
1227        // Having received INTRODUCE_ACK. we can forget about this circuit
1228        // (and potentially tear it down).
1229        drop(intro_circ);
1230
1231        Ok((
1232            rendezvous,
1233            Introduced {
1234                handshake_state,
1235                marker: PhantomData,
1236            },
1237        ))
1238    }
1239
1240    /// Attempt (once) to connect a rendezvous circuit using the given intro pt
1241    ///
1242    /// Timeouts here might be due to the IPT, RPT, service,
1243    /// or any of the intermediate relays.
1244    ///
1245    /// If, rather than a timeout, we actually encounter some kind of error,
1246    /// we'll return the appropriate `FailedAttemptError`.
1247    /// (Who is responsible may vary, so the `FailedAttemptError` variant will reflect that.)
1248    ///
1249    /// Does not apply a timeout
1250    async fn complete_rendezvous(
1251        &'c self,
1252        ipt: &UsableIntroPt<'_>,
1253        rendezvous: Rendezvous<'c, R, M>,
1254        introduced: Introduced<R, M>,
1255    ) -> Result<Arc<ClientCirc!(R, M)>, FAE> {
1256        use tor_proto::circuit::handshake;
1257
1258        let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
1259        let intro_index = ipt.intro_index;
1260        let handle_proto_error = |error| FAE::RendezvousCompletionCircuitError {
1261            error,
1262            intro_index,
1263            rend_pt: rend_pt.clone(),
1264        };
1265
1266        debug!(
1267            "hs conn to {}: RPT {} IPT {}: awaiting rendezvous completion",
1268            &self.hsid,
1269            rend_pt.as_inner(),
1270            intro_index,
1271        );
1272
1273        let rend2_msg: Rendezvous2 = rendezvous.rend2_rx.recv(handle_proto_error).await?;
1274
1275        debug!(
1276            "hs conn to {}: RPT {} IPT {}: received RENDEZVOUS2",
1277            &self.hsid,
1278            rend_pt.as_inner(),
1279            intro_index,
1280        );
1281
1282        // In theory would be great if we could have multiple introduction attempts in parallel
1283        // with similar x,X values but different IPTs.  However, our HS experts don't
1284        // think increasing parallelism here is important:
1285        //   https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
1286        let handshake_state = introduced.handshake_state;
1287
1288        // Try to complete the cryptographic handshake.
1289        let keygen = handshake_state
1290            .client_receive_rend(rend2_msg.handshake_info())
1291            // If this goes wrong. either the onion service has mangled the crypto,
1292            // or the rendezvous point has misbehaved (that that is possible is a protocol bug),
1293            // or we have used the wrong handshake_state (let's assume that's not true).
1294            //
1295            // If this happens we'll go and try another RPT.
1296            .map_err(|error| FAE::RendezvousCompletionHandshake {
1297                error,
1298                intro_index,
1299                rend_pt: rend_pt.clone(),
1300            })?;
1301
1302        let params = onion_circparams_from_netparams(self.netdir.params())
1303            .map_err(into_internal!("Failed to build CircParameters"))?;
1304        // TODO: We may be able to infer more about the supported protocols of the other side from our
1305        // handshake, and from its descriptors.
1306        //
1307        // TODO CC: This is relevant for congestion control!
1308        let protocols = self.netdir.client_protocol_status().required_protocols();
1309
1310        rendezvous
1311            .rend_circ
1312            .m_extend_virtual(
1313                handshake::RelayProtocol::HsV3,
1314                handshake::HandshakeRole::Initiator,
1315                keygen,
1316                params,
1317                protocols,
1318            )
1319            .await
1320            .map_err(into_internal!(
1321                "actually this is probably a 'circuit closed' error" // TODO HS
1322            ))?;
1323
1324        debug!(
1325            "hs conn to {}: RPT {} IPT {}: HS circuit established",
1326            &self.hsid,
1327            rend_pt.as_inner(),
1328            intro_index,
1329        );
1330
1331        Ok(rendezvous.rend_circ)
1332    }
1333
1334    /// Helper to estimate a timeout for a complicated operation
1335    ///
1336    /// `actions` is a list of `(count, action)`, where each entry
1337    /// represents doing `action`, `count` times sequentially.
1338    ///
1339    /// Combines the timeout estimates and returns an overall timeout.
1340    fn estimate_timeout(&self, actions: &[(u32, TimeoutsAction)]) -> Duration {
1341        // This algorithm is, perhaps, wrong.  For uncorrelated variables, a particular
1342        // percentile estimate for a sum of random variables, is not calculated by adding the
1343        // percentile estimates of the individual variables.
1344        //
1345        // But the actual lengths of times of the operations aren't uncorrelated.
1346        // If they were *perfectly* correlated, then this addition would be correct.
1347        // It will do for now; it just might be rather longer than it ought to be.
1348        actions
1349            .iter()
1350            .map(|(count, action)| {
1351                self.circpool
1352                    .m_estimate_timeout(action)
1353                    .saturating_mul(*count)
1354            })
1355            .fold(Duration::ZERO, Duration::saturating_add)
1356    }
1357}
1358
1359/// Mocks used for testing `connect.rs`
1360///
1361/// This is different to `MockableConnectorData`,
1362/// which is used to *replace* this file, when testing `state.rs`.
1363///
1364/// `MocksForConnect` provides mock facilities for *testing* this file.
1365//
1366// TODO this should probably live somewhere else, maybe tor-circmgr even?
1367// TODO this really ought to be made by macros or something
1368trait MocksForConnect<R>: Clone {
1369    /// HS circuit pool
1370    type HsCircPool: MockableCircPool<R>;
1371
1372    /// A random number generator
1373    type Rng: rand::Rng + rand::CryptoRng;
1374
1375    /// Tell tests we got this descriptor text
1376    fn test_got_desc(&self, _: &HsDesc) {}
1377    /// Tell tests we got this circuit
1378    fn test_got_circ(&self, _: &Arc<ClientCirc!(R, Self)>) {}
1379    /// Tell tests we have obtained and sorted the intros like this
1380    fn test_got_ipts(&self, _: &[UsableIntroPt]) {}
1381
1382    /// Return a random number generator
1383    fn thread_rng(&self) -> Self::Rng;
1384}
1385/// Mock for `HsCircPool`
1386///
1387/// Methods start with `m_` to avoid the following problem:
1388/// `ClientCirc::start_conversation` (say) means
1389/// to use the inherent method if one exists,
1390/// but will use a trait method if there isn't an inherent method.
1391///
1392/// So if the inherent method is renamed, the call in the impl here
1393/// turns into an always-recursive call.
1394/// This is not detected by the compiler due to the situation being
1395/// complicated by futures, `#[async_trait]` etc.
1396/// <https://github.com/rust-lang/rust/issues/111177>
1397#[async_trait]
1398trait MockableCircPool<R> {
1399    /// Client circuit
1400    type ClientCirc: MockableClientCirc;
1401    async fn m_get_or_launch_specific(
1402        &self,
1403        netdir: &NetDir,
1404        kind: HsCircKind,
1405        target: impl CircTarget + Send + Sync + 'async_trait,
1406    ) -> tor_circmgr::Result<Arc<Self::ClientCirc>>;
1407
1408    /// Client circuit
1409    async fn m_get_or_launch_client_rend<'a>(
1410        &self,
1411        netdir: &'a NetDir,
1412    ) -> tor_circmgr::Result<(Arc<Self::ClientCirc>, Relay<'a>)>;
1413
1414    /// Estimate timeout
1415    fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration;
1416}
1417/// Mock for `ClientCirc`
1418#[async_trait]
1419trait MockableClientCirc: Debug {
1420    /// Client circuit
1421    type DirStream: AsyncRead + AsyncWrite + Send + Unpin;
1422    async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream>;
1423
1424    /// Converse
1425    async fn m_start_conversation_last_hop(
1426        &self,
1427        msg: Option<AnyRelayMsg>,
1428        reply_handler: impl MsgHandler + Send + 'static,
1429    ) -> tor_proto::Result<Self::Conversation<'_>>;
1430    /// Conversation
1431    type Conversation<'r>
1432    where
1433        Self: 'r;
1434
1435    /// Get a tor_dirclient::SourceInfo for this circuit, if possible.
1436    fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
1437
1438    /// Add a virtual hop to the circuit.
1439    async fn m_extend_virtual(
1440        &self,
1441        protocol: tor_proto::circuit::handshake::RelayProtocol,
1442        role: tor_proto::circuit::handshake::HandshakeRole,
1443        handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1444        params: CircParameters,
1445        capabilities: &tor_protover::Protocols,
1446    ) -> tor_proto::Result<()>;
1447}
1448
1449impl<R: Runtime> MocksForConnect<R> for () {
1450    type HsCircPool = HsCircPool<R>;
1451    type Rng = rand::rngs::ThreadRng;
1452
1453    fn thread_rng(&self) -> Self::Rng {
1454        rand::rng()
1455    }
1456}
1457#[async_trait]
1458impl<R: Runtime> MockableCircPool<R> for HsCircPool<R> {
1459    type ClientCirc = ClientCirc;
1460    async fn m_get_or_launch_specific(
1461        &self,
1462        netdir: &NetDir,
1463        kind: HsCircKind,
1464        target: impl CircTarget + Send + Sync + 'async_trait,
1465    ) -> tor_circmgr::Result<Arc<ClientCirc>> {
1466        HsCircPool::get_or_launch_specific(self, netdir, kind, target).await
1467    }
1468    async fn m_get_or_launch_client_rend<'a>(
1469        &self,
1470        netdir: &'a NetDir,
1471    ) -> tor_circmgr::Result<(Arc<ClientCirc>, Relay<'a>)> {
1472        HsCircPool::get_or_launch_client_rend(self, netdir).await
1473    }
1474    fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
1475        HsCircPool::estimate_timeout(self, action)
1476    }
1477}
1478#[async_trait]
1479impl MockableClientCirc for ClientCirc {
1480    /// Client circuit
1481    type DirStream = tor_proto::stream::DataStream;
1482    async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
1483        ClientCirc::begin_dir_stream(self).await
1484    }
1485    async fn m_start_conversation_last_hop(
1486        &self,
1487        msg: Option<AnyRelayMsg>,
1488        reply_handler: impl MsgHandler + Send + 'static,
1489    ) -> tor_proto::Result<Self::Conversation<'_>> {
1490        ClientCirc::start_conversation(self, msg, reply_handler, tor_proto::TargetHop::LastHop)
1491            .await
1492    }
1493    type Conversation<'r> = tor_proto::circuit::Conversation<'r>;
1494
1495    async fn m_extend_virtual(
1496        &self,
1497        protocol: tor_proto::circuit::handshake::RelayProtocol,
1498        role: tor_proto::circuit::handshake::HandshakeRole,
1499        handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1500        params: CircParameters,
1501        capabilities: &tor_protover::Protocols,
1502    ) -> tor_proto::Result<()> {
1503        ClientCirc::extend_virtual(self, protocol, role, handshake, &params, capabilities).await
1504    }
1505
1506    /// Get a tor_dirclient::SourceInfo for this circuit, if possible.
1507    fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
1508        SourceInfo::from_circuit(self)
1509    }
1510}
1511
1512#[async_trait]
1513impl MockableConnectorData for Data {
1514    type ClientCirc = ClientCirc;
1515    type MockGlobalState = ();
1516
1517    async fn connect<R: Runtime>(
1518        connector: &HsClientConnector<R>,
1519        netdir: Arc<NetDir>,
1520        config: Arc<Config>,
1521        hsid: HsId,
1522        data: &mut Self,
1523        secret_keys: HsClientSecretKeys,
1524    ) -> Result<Arc<Self::ClientCirc>, ConnError> {
1525        connect(connector, netdir, config, hsid, data, secret_keys).await
1526    }
1527
1528    fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool {
1529        !circuit.is_closing()
1530    }
1531}
1532
1533#[cfg(test)]
1534mod test {
1535    // @@ begin test lint list maintained by maint/add_warning @@
1536    #![allow(clippy::bool_assert_comparison)]
1537    #![allow(clippy::clone_on_copy)]
1538    #![allow(clippy::dbg_macro)]
1539    #![allow(clippy::mixed_attributes_style)]
1540    #![allow(clippy::print_stderr)]
1541    #![allow(clippy::print_stdout)]
1542    #![allow(clippy::single_char_pattern)]
1543    #![allow(clippy::unwrap_used)]
1544    #![allow(clippy::unchecked_duration_subtraction)]
1545    #![allow(clippy::useless_vec)]
1546    #![allow(clippy::needless_pass_by_value)]
1547    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1548
1549    #![allow(dead_code, unused_variables)] // TODO HS TESTS delete, after tests are completed
1550
1551    use super::*;
1552    use crate::*;
1553    use futures::FutureExt as _;
1554    use std::{iter, panic::AssertUnwindSafe};
1555    use tokio_crate as tokio;
1556    use tor_async_utils::JoinReadWrite;
1557    use tor_basic_utils::test_rng::{testing_rng, TestingRng};
1558    use tor_hscrypto::pk::{HsClientDescEncKey, HsClientDescEncKeypair};
1559    use tor_llcrypto::pk::curve25519;
1560    use tor_netdoc::doc::{hsdesc::test_data, netstatus::Lifetime};
1561    use tor_rtcompat::tokio::TokioNativeTlsRuntime;
1562    use tor_rtcompat::RuntimeSubstExt as _;
1563    #[allow(deprecated)] // TODO #1885
1564    use tor_rtmock::time::MockSleepProvider;
1565    use tracing_test::traced_test;
1566
1567    #[derive(Debug, Default)]
1568    struct MocksGlobal {
1569        hsdirs_asked: Vec<OwnedCircTarget>,
1570        got_desc: Option<HsDesc>,
1571    }
1572    #[derive(Clone, Debug)]
1573    struct Mocks<I> {
1574        mglobal: Arc<Mutex<MocksGlobal>>,
1575        id: I,
1576    }
1577
1578    impl<I> Mocks<I> {
1579        fn map_id<J>(&self, f: impl FnOnce(&I) -> J) -> Mocks<J> {
1580            Mocks {
1581                mglobal: self.mglobal.clone(),
1582                id: f(&self.id),
1583            }
1584        }
1585    }
1586
1587    impl<R: Runtime> MocksForConnect<R> for Mocks<()> {
1588        type HsCircPool = Mocks<()>;
1589        type Rng = TestingRng;
1590
1591        fn test_got_desc(&self, desc: &HsDesc) {
1592            self.mglobal.lock().unwrap().got_desc = Some(desc.clone());
1593        }
1594
1595        fn test_got_ipts(&self, desc: &[UsableIntroPt]) {}
1596
1597        fn thread_rng(&self) -> Self::Rng {
1598            testing_rng()
1599        }
1600    }
1601    #[allow(clippy::diverging_sub_expression)] // async_trait + todo!()
1602    #[async_trait]
1603    impl<R: Runtime> MockableCircPool<R> for Mocks<()> {
1604        type ClientCirc = Mocks<()>;
1605        async fn m_get_or_launch_specific(
1606            &self,
1607            _netdir: &NetDir,
1608            kind: HsCircKind,
1609            target: impl CircTarget + Send + Sync + 'async_trait,
1610        ) -> tor_circmgr::Result<Arc<Self::ClientCirc>> {
1611            assert_eq!(kind, HsCircKind::ClientHsDir);
1612            let target = OwnedCircTarget::from_circ_target(&target);
1613            self.mglobal.lock().unwrap().hsdirs_asked.push(target);
1614            // Adding the `Arc` here is a little ugly, but that's what we get
1615            // for using the same Mocks for everything.
1616            Ok(Arc::new(self.clone()))
1617        }
1618        /// Client circuit
1619        async fn m_get_or_launch_client_rend<'a>(
1620            &self,
1621            netdir: &'a NetDir,
1622        ) -> tor_circmgr::Result<(Arc<ClientCirc!(R, Self)>, Relay<'a>)> {
1623            todo!()
1624        }
1625
1626        fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
1627            Duration::from_secs(10)
1628        }
1629    }
1630    #[allow(clippy::diverging_sub_expression)] // async_trait + todo!()
1631    #[async_trait]
1632    impl MockableClientCirc for Mocks<()> {
1633        type DirStream = JoinReadWrite<futures::io::Cursor<Box<[u8]>>, futures::io::Sink>;
1634        type Conversation<'r> = &'r ();
1635        async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
1636            let response = format!(
1637                r#"HTTP/1.1 200 OK
1638
1639{}"#,
1640                test_data::TEST_DATA_2
1641            )
1642            .into_bytes()
1643            .into_boxed_slice();
1644
1645            Ok(JoinReadWrite::new(
1646                futures::io::Cursor::new(response),
1647                futures::io::sink(),
1648            ))
1649        }
1650        async fn m_start_conversation_last_hop(
1651            &self,
1652            msg: Option<AnyRelayMsg>,
1653            reply_handler: impl MsgHandler + Send + 'static,
1654        ) -> tor_proto::Result<Self::Conversation<'_>> {
1655            todo!()
1656        }
1657
1658        async fn m_extend_virtual(
1659            &self,
1660            protocol: tor_proto::circuit::handshake::RelayProtocol,
1661            role: tor_proto::circuit::handshake::HandshakeRole,
1662            handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1663            params: CircParameters,
1664            capabilities: &tor_protover::Protocols,
1665        ) -> tor_proto::Result<()> {
1666            todo!()
1667        }
1668
1669        /// Get a tor_dirclient::SourceInfo for this circuit, if possible.
1670        fn m_source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
1671            Ok(None)
1672        }
1673    }
1674
1675    #[traced_test]
1676    #[tokio::test]
1677    async fn test_connect() {
1678        let valid_after = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
1679        let fresh_until = valid_after + humantime::parse_duration("1 hours").unwrap();
1680        let valid_until = valid_after + humantime::parse_duration("24 hours").unwrap();
1681        let lifetime = Lifetime::new(valid_after, fresh_until, valid_until).unwrap();
1682
1683        let netdir = tor_netdir::testnet::construct_custom_netdir_with_params(
1684            tor_netdir::testnet::simple_net_func,
1685            iter::empty::<(&str, _)>(),
1686            Some(lifetime),
1687        )
1688        .expect("failed to build default testing netdir");
1689
1690        let netdir = Arc::new(netdir.unwrap_if_sufficient().unwrap());
1691        let runtime = TokioNativeTlsRuntime::current().unwrap();
1692        let now = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
1693        #[allow(deprecated)] // TODO #1885
1694        let mock_sp = MockSleepProvider::new(now);
1695        let runtime = runtime
1696            .with_sleep_provider(mock_sp.clone())
1697            .with_coarse_time_provider(mock_sp);
1698        let time_period = netdir.hs_time_period();
1699
1700        let mglobal = Arc::new(Mutex::new(MocksGlobal::default()));
1701        let mocks = Mocks { mglobal, id: () };
1702        // From C Tor src/test/test_hs_common.c test_build_address
1703        let hsid = test_data::TEST_HSID_2.into();
1704        let mut data = Data::default();
1705
1706        let pk: HsClientDescEncKey = curve25519::PublicKey::from(test_data::TEST_PUBKEY_2).into();
1707        let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
1708        let mut secret_keys_builder = HsClientSecretKeysBuilder::default();
1709        secret_keys_builder.ks_hsc_desc_enc(HsClientDescEncKeypair::new(pk.clone(), sk));
1710        let secret_keys = secret_keys_builder.build().unwrap();
1711
1712        let ctx = Context::new(
1713            &runtime,
1714            &mocks,
1715            netdir,
1716            Default::default(),
1717            hsid,
1718            secret_keys,
1719            mocks.clone(),
1720        )
1721        .unwrap();
1722
1723        let _got = AssertUnwindSafe(ctx.connect(&mut data))
1724            .catch_unwind() // TODO HS TESTS: remove this and the AssertUnwindSafe
1725            .await;
1726
1727        let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
1728            .unwrap()
1729            .compute_blinded_key(time_period)
1730            .unwrap();
1731        let hs_blind_id = hs_blind_id_key.id();
1732
1733        let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
1734
1735        let hsdesc = HsDesc::parse_decrypt_validate(
1736            test_data::TEST_DATA_2,
1737            &hs_blind_id,
1738            now,
1739            &subcredential,
1740            Some(&HsClientDescEncKeypair::new(pk, sk)),
1741        )
1742        .unwrap()
1743        .dangerously_assume_timely();
1744
1745        let mglobal = mocks.mglobal.lock().unwrap();
1746        assert_eq!(mglobal.hsdirs_asked.len(), 1);
1747        // TODO hs: here and in other places, consider implementing PartialEq instead, or creating
1748        // an assert_dbg_eq macro (which would be part of a test_helpers crate or something)
1749        assert_eq!(
1750            format!("{:?}", mglobal.got_desc),
1751            format!("{:?}", Some(hsdesc))
1752        );
1753
1754        // Check how long the descriptor is valid for
1755        let (start_time, end_time) = data.desc.as_ref().unwrap().bounds();
1756        assert_eq!(start_time, None);
1757
1758        let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
1759        assert_eq!(end_time, Some(desc_valid_until));
1760
1761        // TODO HS TESTS: check the circuit in got is the one we gave out
1762
1763        // TODO HS TESTS: continue with this
1764    }
1765
1766    // TODO HS TESTS: Test IPT state management and expiry:
1767    //   - obtain a test descriptor with only a broken ipt
1768    //     (broken in the sense that intro can be attempted, but will fail somehow)
1769    //   - try to make a connection and expect it to fail
1770    //   - assert that the ipt data isn't empty
1771    //   - cause the descriptor to expire (advance clock)
1772    //   - start using a mocked RNG if we weren't already and pin its seed here
1773    //   - make a new descriptor with two IPTs: the broken one from earlier, and a new one
1774    //   - make a new connection
1775    //   - use test_got_ipts to check that the random numbers
1776    //     would sort the bad intro first, *and* that the good one is appears first
1777    //   - assert that connection succeeded
1778    //   - cause the circuit and descriptor to expire (advance clock)
1779    //   - go back to the previous descriptor contents, but with a new validity period
1780    //   - try to make a connection
1781    //   - use test_got_ipts to check that only the broken ipt is present
1782
1783    // TODO HS TESTS: test retries (of every retry loop we have here)
1784    // TODO HS TESTS: test error paths
1785}