tor_hsclient/connect.rs
1//! Main implementation of the connection functionality
2
3use std::time::Duration;
4
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::sync::Arc;
9use std::time::Instant;
10
11use async_trait::async_trait;
12use educe::Educe;
13use futures::{AsyncRead, AsyncWrite};
14use itertools::Itertools;
15use rand::Rng;
16use tor_bytes::Writeable;
17use tor_cell::relaycell::hs::intro_payload::{self, IntroduceHandshakePayload};
18use tor_cell::relaycell::hs::pow::ProofOfWork;
19use tor_cell::relaycell::msg::{AnyRelayMsg, Introduce1, Rendezvous2};
20use tor_circmgr::build::onion_circparams_from_netparams;
21use tor_error::{debug_report, warn_report, Bug};
22use tor_hscrypto::Subcredential;
23use tor_proto::circuit::handshake::hs_ntor;
24use tracing::{debug, trace};
25
26use retry_error::RetryError;
27use safelog::Sensitive;
28use tor_cell::relaycell::hs::{
29 AuthKeyType, EstablishRendezvous, IntroduceAck, RendezvousEstablished,
30};
31use tor_cell::relaycell::RelayMsg;
32use tor_checkable::{timed::TimerangeBound, Timebound};
33use tor_circmgr::hspool::{HsCircKind, HsCircPool};
34use tor_circmgr::timeouts::Action as TimeoutsAction;
35use tor_dirclient::request::Requestable as _;
36use tor_error::{internal, into_internal};
37use tor_error::{HasRetryTime as _, RetryTime};
38use tor_hscrypto::pk::{HsBlindId, HsId, HsIdKey};
39use tor_hscrypto::RendCookie;
40use tor_linkspec::{CircTarget, HasRelayIds, OwnedCircTarget, RelayId};
41use tor_llcrypto::pk::ed25519::Ed25519Identity;
42use tor_netdir::{NetDir, Relay};
43use tor_netdoc::doc::hsdesc::{HsDesc, IntroPointDesc};
44use tor_proto::circuit::{CircParameters, ClientCirc, MetaCellDisposition, MsgHandler};
45use tor_rtcompat::{Runtime, SleepProviderExt as _, TimeoutError};
46
47use crate::pow::HsPowClient;
48use crate::proto_oneshot;
49use crate::relay_info::ipt_to_circtarget;
50use crate::state::MockableConnectorData;
51use crate::Config;
52use crate::{rend_pt_identity_for_error, FailedAttemptError, IntroPtIndex, RendPtIdentityForError};
53use crate::{ConnError, DescriptorError, DescriptorErrorDetail};
54use crate::{HsClientConnector, HsClientSecretKeys};
55
56use ConnError as CE;
57use FailedAttemptError as FAE;
58
59/// Number of hops in our hsdir, introduction, and rendezvous circuits
60///
61/// Required by `tor_circmgr`'s timeout estimation API
62/// ([`tor_circmgr::CircMgr::estimate_timeout`], [`HsCircPool::estimate_timeout`]).
63///
64/// TODO HS hardcoding the number of hops to 3 seems wrong.
65/// This is really something that HsCircPool knows. And some setups might want to make
66/// shorter circuits for some reason. And it will become wrong with vanguards?
67/// But right now I think this is what HsCircPool does.
68//
69// Some commentary from
70// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1342#note_2918050
71// Possibilities:
72// * Look at n_hops() on the circuits we get, if we don't need this estimate
73// till after we have the circuit.
74// * Add a function to HsCircPool to tell us what length of circuit to expect
75// for each given type of circuit.
76const HOPS: usize = 3;
77
78/// Given `R, M` where `M: MocksForConnect<M>`, expand to the mockable `ClientCirc`
79// This is quite annoying. But the alternative is to write out `<... as // ...>`
80// each time, since otherwise the compile complains about ambiguous associated types.
81macro_rules! ClientCirc { { $R:ty, $M:ty } => {
82 <<$M as MocksForConnect<$R>>::HsCircPool as MockableCircPool<$R>>::ClientCirc
83} }
84
85/// Information about a hidden service, including our connection history
86#[derive(Default, Educe)]
87#[educe(Debug)]
88// This type is actually crate-private, since it isn't re-exported, but it must
89// be `pub` because it appears as a default for a type parameter in HsClientConnector.
90pub struct Data {
91 /// The latest known onion service descriptor for this service.
92 desc: DataHsDesc,
93 /// Information about the latest status of trying to connect to this service
94 /// through each of its introduction points.
95 ipts: DataIpts,
96}
97
98/// Part of `Data` that relates to the HS descriptor
99type DataHsDesc = Option<TimerangeBound<HsDesc>>;
100
101/// Part of `Data` that relates to our information about introduction points
102type DataIpts = HashMap<RelayIdForExperience, IptExperience>;
103
104/// How things went last time we tried to use this introduction point
105///
106/// Neither this data structure, nor [`Data`], is responsible for arranging that we expire this
107/// information eventually. If we keep reconnecting to the service, we'll retain information
108/// about each IPT indefinitely, at least so long as they remain listed in the descriptors we
109/// receive.
110///
111/// Expiry of unused data is handled by `state.rs`, according to `last_used` in `ServiceState`.
112///
113/// Choosing which IPT to prefer is done by obtaining an `IptSortKey`
114/// (from this and other information).
115//
116// Don't impl Ord for IptExperience. We obtain `Option<&IptExperience>` from our
117// data structure, and if IptExperience were Ord then Option<&IptExperience> would be Ord
118// but it would be the wrong sort order: it would always prefer None, ie untried IPTs.
119#[derive(Debug)]
120struct IptExperience {
121 /// How long it took us to get whatever outcome occurred
122 ///
123 /// We prefer fast successes to slow ones.
124 /// Then, we prefer failures with earlier `RetryTime`,
125 /// and, lastly, faster failures to slower ones.
126 duration: Duration,
127
128 /// What happened and when we might try again
129 ///
130 /// Note that we don't actually *enforce* the `RetryTime` here, just sort by it
131 /// using `RetryTime::loose_cmp`.
132 ///
133 /// We *do* return an error that is itself `HasRetryTime` and expect our callers
134 /// to honour that.
135 outcome: Result<(), RetryTime>,
136}
137
138/// Actually make a HS connection, updating our recorded state as necessary
139///
140/// `connector` is provided only for obtaining the runtime and netdir (and `mock_for_state`).
141/// Obviously, `connect` is not supposed to go looking in `services`.
142///
143/// This function handles all necessary retrying of fallible operations,
144/// (and, therefore, must also limit the total work done for a particular call).
145///
146/// This function has a minimum of functionality, since it is the boundary
147/// between "mock connection, used for testing `state.rs`" and
148/// "mock circuit and netdir, used for testing `connect.rs`",
149/// so it is not, itself, unit-testable.
150pub(crate) async fn connect<R: Runtime>(
151 connector: &HsClientConnector<R>,
152 netdir: Arc<NetDir>,
153 config: Arc<Config>,
154 hsid: HsId,
155 data: &mut Data,
156 secret_keys: HsClientSecretKeys,
157) -> Result<Arc<ClientCirc>, ConnError> {
158 Context::new(
159 &connector.runtime,
160 &*connector.circpool,
161 netdir,
162 config,
163 hsid,
164 secret_keys,
165 (),
166 )?
167 .connect(data)
168 .await
169}
170
171/// Common context for a single request to connect to a hidden service
172///
173/// This saves on passing this same set of (immutable) values (or subsets thereof)
174/// to each method in the principal functional code, everywhere.
175/// It also provides a convenient type to be `Self`.
176///
177/// Its lifetime is one request to make a new client circuit to a hidden service,
178/// including all the retries and timeouts.
179struct Context<'c, R: Runtime, M: MocksForConnect<R>> {
180 /// Runtime
181 runtime: &'c R,
182 /// Circpool
183 circpool: &'c M::HsCircPool,
184 /// Netdir
185 //
186 // TODO holding onto the netdir for the duration of our attempts is not ideal
187 // but doing better is fairly complicated. See discussions here:
188 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1228#note_2910545
189 // https://gitlab.torproject.org/tpo/core/arti/-/issues/884
190 netdir: Arc<NetDir>,
191 /// Configuration
192 config: Arc<Config>,
193 /// Secret keys to use
194 secret_keys: HsClientSecretKeys,
195 /// HS ID
196 hsid: HsId,
197 /// Blinded HS ID
198 hs_blind_id: HsBlindId,
199 /// The subcredential to use during this time period
200 subcredential: Subcredential,
201 /// Mock data
202 mocks: M,
203}
204
205/// Details of an established rendezvous point
206///
207/// Intermediate value for progress during a connection attempt.
208struct Rendezvous<'r, R: Runtime, M: MocksForConnect<R>> {
209 /// RPT as a `Relay`
210 rend_relay: Relay<'r>,
211 /// Rendezvous circuit
212 rend_circ: Arc<ClientCirc!(R, M)>,
213 /// Rendezvous cookie
214 rend_cookie: RendCookie,
215
216 /// Receiver that will give us the RENDEZVOUS2 message.
217 ///
218 /// The sending ended is owned by the handler
219 /// which receives control messages on the rendezvous circuit,
220 /// and which was installed when we sent `ESTABLISH_RENDEZVOUS`.
221 ///
222 /// (`RENDEZVOUS2` is the message containing the onion service's side of the handshake.)
223 rend2_rx: proto_oneshot::Receiver<Rendezvous2>,
224
225 /// Dummy, to placate compiler
226 ///
227 /// Covariant without dropck or interfering with Send/Sync will do fine.
228 marker: PhantomData<fn() -> (R, M)>,
229}
230
231/// Random value used as part of IPT selection
232type IptSortRand = u32;
233
234/// Details of an apparently-useable introduction point
235///
236/// Intermediate value for progress during a connection attempt.
237struct UsableIntroPt<'i> {
238 /// Index in HS descriptor
239 intro_index: IntroPtIndex,
240 /// IPT descriptor
241 intro_desc: &'i IntroPointDesc,
242 /// IPT `CircTarget`
243 intro_target: OwnedCircTarget,
244 /// Random value used as part of IPT selection
245 sort_rand: IptSortRand,
246}
247
248/// Lookup key for looking up and recording our IPT use experiences
249///
250/// Used to identify a relay when looking to see what happened last time we used it,
251/// and storing that information after we tried it.
252///
253/// We store the experience information under an arbitrary one of the relay's identities,
254/// as returned by the `HasRelayIds::identities().next()`.
255/// When we do lookups, we check all the relay's identities to see if we find
256/// anything relevant.
257/// If relay identities permute in strange ways, whether we find our previous
258/// knowledge about them is not particularly well defined, but that's fine.
259///
260/// While this is, structurally, a relay identity, it is not suitable for other purposes.
261#[derive(Hash, Eq, PartialEq, Ord, PartialOrd, Debug)]
262struct RelayIdForExperience(RelayId);
263
264/// Details of an apparently-successful INTRODUCE exchange
265///
266/// Intermediate value for progress during a connection attempt.
267struct Introduced<R: Runtime, M: MocksForConnect<R>> {
268 /// End-to-end crypto NTORv3 handshake with the service
269 ///
270 /// Created as part of generating our `INTRODUCE1`,
271 /// and then used when processing `RENDEZVOUS2`.
272 handshake_state: hs_ntor::HsNtorClientState,
273
274 /// Dummy, to placate compiler
275 ///
276 /// `R` and `M` only used for getting to mocks.
277 /// Covariant without dropck or interfering with Send/Sync will do fine.
278 marker: PhantomData<fn() -> (R, M)>,
279}
280
281impl RelayIdForExperience {
282 /// Identities to use to try to find previous experience information about this IPT
283 fn for_lookup(intro_target: &OwnedCircTarget) -> impl Iterator<Item = Self> + '_ {
284 intro_target
285 .identities()
286 .map(|id| RelayIdForExperience(id.to_owned()))
287 }
288
289 /// Identity to use to store previous experience information about this IPT
290 fn for_store(intro_target: &OwnedCircTarget) -> Result<Self, Bug> {
291 let id = intro_target
292 .identities()
293 .next()
294 .ok_or_else(|| internal!("introduction point relay with no identities"))?
295 .to_owned();
296 Ok(RelayIdForExperience(id))
297 }
298}
299
300/// Sort key for an introduction point, for selecting the best IPTs to try first
301///
302/// Ordering is most preferable first.
303///
304/// We use this to sort our `UsableIpt`s using `.sort_by_key`.
305/// (This implementation approach ensures that we obey all the usual ordering invariants.)
306#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
307struct IptSortKey {
308 /// Sort by how preferable the experience was
309 outcome: IptSortKeyOutcome,
310 /// Failing that, choose randomly
311 sort_rand: IptSortRand,
312}
313
314/// Component of the [`IptSortKey`] representing outcome of our last attempt, if any
315///
316/// This is the main thing we use to decide which IPTs to try first.
317/// It is calculated for each IPT
318/// (via `.sort_by_key`, so repeatedly - it should therefore be cheap to make.)
319///
320/// Ordering is most preferable first.
321#[derive(Ord, PartialOrd, Eq, PartialEq, Debug)]
322enum IptSortKeyOutcome {
323 /// Prefer successes
324 Success {
325 /// Prefer quick ones
326 duration: Duration,
327 },
328 /// Failing that, try one we don't know to have failed
329 Untried,
330 /// Failing that, it'll have to be ones that didn't work last time
331 Failed {
332 /// Prefer failures with an earlier retry time
333 retry_time: tor_error::LooseCmpRetryTime,
334 /// Failing that, prefer quick failures (rather than slow ones eg timeouts)
335 duration: Duration,
336 },
337}
338
339impl From<Option<&IptExperience>> for IptSortKeyOutcome {
340 fn from(experience: Option<&IptExperience>) -> IptSortKeyOutcome {
341 use IptSortKeyOutcome as O;
342 match experience {
343 None => O::Untried,
344 Some(IptExperience { duration, outcome }) => match outcome {
345 Ok(()) => O::Success {
346 duration: *duration,
347 },
348 Err(retry_time) => O::Failed {
349 retry_time: (*retry_time).into(),
350 duration: *duration,
351 },
352 },
353 }
354 }
355}
356
357impl<'c, R: Runtime, M: MocksForConnect<R>> Context<'c, R, M> {
358 /// Make a new `Context` from the input data
359 fn new(
360 runtime: &'c R,
361 circpool: &'c M::HsCircPool,
362 netdir: Arc<NetDir>,
363 config: Arc<Config>,
364 hsid: HsId,
365 secret_keys: HsClientSecretKeys,
366 mocks: M,
367 ) -> Result<Self, ConnError> {
368 let time_period = netdir.hs_time_period();
369 let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
370 .map_err(|_| CE::InvalidHsId)?
371 .compute_blinded_key(time_period)
372 .map_err(
373 // TODO HS what on earth do these errors mean, in practical terms ?
374 // In particular, we'll want to convert them to a ConnError variant,
375 // but what ErrorKind should they have ?
376 into_internal!("key blinding error, don't know how to handle"),
377 )?;
378 let hs_blind_id = hs_blind_id_key.id();
379
380 Ok(Context {
381 netdir,
382 config,
383 hsid,
384 hs_blind_id,
385 subcredential,
386 circpool,
387 runtime,
388 secret_keys,
389 mocks,
390 })
391 }
392
393 /// Actually make a HS connection, updating our recorded state as necessary
394 ///
395 /// Called by the `connect` function in this module.
396 ///
397 /// This function handles all necessary retrying of fallible operations,
398 /// (and, therefore, must also limit the total work done for a particular call).
399 async fn connect(&self, data: &mut Data) -> Result<Arc<ClientCirc!(R, M)>, ConnError> {
400 // This function must do the following, retrying as appropriate.
401 // - Look up the onion descriptor in the state.
402 // - Download the onion descriptor if one isn't there.
403 // - In parallel:
404 // - Pick a rendezvous point from the netdirprovider and launch a
405 // rendezvous circuit to it. Then send ESTABLISH_INTRO.
406 // - Pick a number of introduction points (1 or more) and try to
407 // launch circuits to them.
408 // - On a circuit to an introduction point, send an INTRODUCE1 cell.
409 // - Wait for a RENDEZVOUS2 cell on the rendezvous circuit
410 // - Add a virtual hop to the rendezvous circuit.
411 // - Return the rendezvous circuit.
412
413 let mocks = self.mocks.clone();
414
415 let desc = self.descriptor_ensure(&mut data.desc).await?;
416
417 mocks.test_got_desc(desc);
418
419 let circ = self.intro_rend_connect(desc, &mut data.ipts).await?;
420 mocks.test_got_circ(&circ);
421
422 Ok(circ)
423 }
424
425 /// Ensure that `Data.desc` contains the HS descriptor
426 ///
427 /// If we have a previously-downloaded descriptor, which is still valid,
428 /// just returns a reference to it.
429 ///
430 /// Otherwise, tries to obtain the descriptor by downloading it from hsdir(s).
431 ///
432 /// Does all necessary retries and timeouts.
433 /// Returns an error if no valid descriptor could be found.
434 async fn descriptor_ensure<'d>(&self, data: &'d mut DataHsDesc) -> Result<&'d HsDesc, CE> {
435 // Maximum number of hsdir connection and retrieval attempts we'll make
436 let max_total_attempts = self
437 .config
438 .retry
439 .hs_desc_fetch_attempts()
440 .try_into()
441 // User specified a very large u32. We must be downcasting it to 16bit!
442 // let's give them as many retries as we can manage.
443 .unwrap_or(usize::MAX);
444
445 // Limit on the duration of each retrieval attempt
446 let each_timeout = self.estimate_timeout(&[
447 (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
448 (1, TimeoutsAction::RoundTrip { length: HOPS }), // One HTTP query/response
449 ]);
450
451 // We retain a previously obtained descriptor precisely until its lifetime expires,
452 // and pay no attention to the descriptor's revision counter.
453 // When it expires, we discard it completely and try to obtain a new one.
454 // https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914448
455 // TODO SPEC: Discuss HS descriptor lifetime and expiry client behaviour
456 if let Some(previously) = data {
457 let now = self.runtime.wallclock();
458 if let Ok(_desc) = previously.as_ref().check_valid_at(&now) {
459 // Ideally we would just return desc but that confuses borrowck.
460 // https://github.com/rust-lang/rust/issues/51545
461 return Ok(data
462 .as_ref()
463 .expect("Some but now None")
464 .as_ref()
465 .check_valid_at(&now)
466 .expect("Ok but now Err"));
467 }
468 // Seems to be not valid now. Try to fetch a fresh one.
469 }
470
471 let hs_dirs = self.netdir.hs_dirs_download(
472 self.hs_blind_id,
473 self.netdir.hs_time_period(),
474 &mut self.mocks.thread_rng(),
475 )?;
476
477 trace!(
478 "HS desc fetch for {}, using {} hsdirs",
479 &self.hsid,
480 hs_dirs.len()
481 );
482
483 // We might consider launching requests to multiple HsDirs in parallel.
484 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1118#note_2894463
485 // But C Tor doesn't and our HS experts don't consider that important:
486 // https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914436
487 // (Additionally, making multiple HSDir requests at once may make us
488 // more vulnerable to traffic analysis.)
489 let mut attempts = hs_dirs.iter().cycle().take(max_total_attempts);
490 let mut errors = RetryError::in_attempt_to("retrieve hidden service descriptor");
491 let desc = loop {
492 let relay = match attempts.next() {
493 Some(relay) => relay,
494 None => {
495 return Err(if errors.is_empty() {
496 CE::NoHsDirs
497 } else {
498 CE::DescriptorDownload(errors)
499 })
500 }
501 };
502 let hsdir_for_error: Sensitive<Ed25519Identity> = (*relay.id()).into();
503 match self
504 .runtime
505 .timeout(each_timeout, self.descriptor_fetch_attempt(relay))
506 .await
507 .unwrap_or(Err(DescriptorErrorDetail::Timeout))
508 {
509 Ok(desc) => break desc,
510 Err(error) => {
511 debug_report!(
512 &error,
513 "failed hsdir desc fetch for {} from {}/{}",
514 &self.hsid,
515 &relay.id(),
516 &relay.rsa_id()
517 );
518 errors.push(tor_error::Report(DescriptorError {
519 hsdir: hsdir_for_error,
520 error,
521 }));
522 }
523 }
524 };
525
526 // Store the bounded value in the cache for reuse,
527 // but return a reference to the unwrapped `HsDesc`.
528 //
529 // The `HsDesc` must be owned by `data.desc`,
530 // so first add it to `data.desc`,
531 // and then dangerously_assume_timely to get a reference out again.
532 //
533 // It is safe to dangerously_assume_timely,
534 // as descriptor_fetch_attempt has already checked the timeliness of the descriptor.
535 let ret = data.insert(desc);
536 Ok(ret.as_ref().dangerously_assume_timely())
537 }
538
539 /// Make one attempt to fetch the descriptor from a specific hsdir
540 ///
541 /// No timeout
542 ///
543 /// On success, returns the descriptor.
544 ///
545 /// While the returned descriptor is `TimerangeBound`, its validity at the current time *has*
546 /// been checked.
547 async fn descriptor_fetch_attempt(
548 &self,
549 hsdir: &Relay<'_>,
550 ) -> Result<TimerangeBound<HsDesc>, DescriptorErrorDetail> {
551 let max_len: usize = self
552 .netdir
553 .params()
554 .hsdir_max_desc_size
555 .get()
556 .try_into()
557 .map_err(into_internal!("BoundedInt was not truly bounded!"))?;
558 let request = {
559 let mut r = tor_dirclient::request::HsDescDownloadRequest::new(self.hs_blind_id);
560 r.set_max_len(max_len);
561 r
562 };
563 trace!(
564 "hsdir for {}, trying {}/{}, request {:?} (http request {:?})",
565 &self.hsid,
566 &hsdir.id(),
567 &hsdir.rsa_id(),
568 &request,
569 request.debug_request()
570 );
571
572 let circuit = self
573 .circpool
574 .m_get_or_launch_specific(
575 &self.netdir,
576 HsCircKind::ClientHsDir,
577 OwnedCircTarget::from_circ_target(hsdir),
578 )
579 .await?;
580 let mut stream = circuit
581 .m_begin_dir_stream()
582 .await
583 .map_err(DescriptorErrorDetail::Stream)?;
584
585 let response = tor_dirclient::send_request(self.runtime, &request, &mut stream, None)
586 .await
587 .map_err(|dir_error| match dir_error {
588 tor_dirclient::Error::RequestFailed(rfe) => DescriptorErrorDetail::from(rfe.error),
589 tor_dirclient::Error::CircMgr(ce) => into_internal!(
590 "tor-dirclient complains about circmgr going wrong but we gave it a stream"
591 )(ce)
592 .into(),
593 other => into_internal!(
594 "tor-dirclient gave unexpected error, tor-hsclient code needs updating"
595 )(other)
596 .into(),
597 })?;
598
599 let desc_text = response.into_output_string().map_err(|rfe| rfe.error)?;
600 let hsc_desc_enc = self.secret_keys.keys.ks_hsc_desc_enc.as_ref();
601
602 let now = self.runtime.wallclock();
603
604 HsDesc::parse_decrypt_validate(
605 &desc_text,
606 &self.hs_blind_id,
607 now,
608 &self.subcredential,
609 hsc_desc_enc,
610 )
611 .map_err(DescriptorErrorDetail::from)
612 }
613
614 /// Given the descriptor, try to connect to service
615 ///
616 /// Does all necessary retries, timeouts, etc.
617 async fn intro_rend_connect(
618 &self,
619 desc: &HsDesc,
620 data: &mut DataIpts,
621 ) -> Result<Arc<ClientCirc!(R, M)>, CE> {
622 // Maximum number of rendezvous/introduction attempts we'll make
623 let max_total_attempts = self
624 .config
625 .retry
626 .hs_intro_rend_attempts()
627 .try_into()
628 // User specified a very large u32. We must be downcasting it to 16bit!
629 // let's give them as many retries as we can manage.
630 .unwrap_or(usize::MAX);
631
632 // Limit on the duration of each attempt to establish a rendezvous point
633 //
634 // This *might* include establishing a fresh circuit,
635 // if the HsCircPool's pool is empty.
636 let rend_timeout = self.estimate_timeout(&[
637 (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
638 (1, TimeoutsAction::RoundTrip { length: HOPS }), // One ESTABLISH_RENDEZVOUS
639 ]);
640
641 // Limit on the duration of each attempt to negotiate with an introduction point
642 //
643 // *Does* include establishing the circuit.
644 let intro_timeout = self.estimate_timeout(&[
645 (1, TimeoutsAction::BuildCircuit { length: HOPS }), // build circuit
646 // This does some crypto too, but we don't account for that.
647 (1, TimeoutsAction::RoundTrip { length: HOPS }), // One INTRODUCE1/INTRODUCE_ACK
648 ]);
649
650 // Timeout estimator for the action that the HS will take in building
651 // its circuit to the RPT.
652 let hs_build_action = TimeoutsAction::BuildCircuit {
653 length: if desc.is_single_onion_service() {
654 1
655 } else {
656 HOPS
657 },
658 };
659 // Limit on the duration of each attempt for activities involving both
660 // RPT and IPT.
661 let rpt_ipt_timeout = self.estimate_timeout(&[
662 // The API requires us to specify a number of circuit builds and round trips.
663 // So what we tell the estimator is a rather imprecise description.
664 // (TODO it would be nice if the circmgr offered us a one-way trip Action).
665 //
666 // What we are timing here is:
667 //
668 // INTRODUCE2 goes from IPT to HS
669 // but that happens in parallel with us waiting for INTRODUCE_ACK,
670 // which is controlled by `intro_timeout` so not pat of `ipt_rpt_timeout`.
671 // and which has to come HOPS hops. So don't count INTRODUCE2 here.
672 //
673 // HS builds to our RPT
674 (1, hs_build_action),
675 //
676 // RENDEZVOUS1 goes from HS to RPT. `hs_hops`, one-way.
677 // RENDEZVOUS2 goes from RPT to us. HOPS, one-way.
678 // Together, we squint a bit and call this a HOPS round trip:
679 (1, TimeoutsAction::RoundTrip { length: HOPS }),
680 ]);
681
682 // We can't reliably distinguish IPT failure from RPT failure, so we iterate over IPTs
683 // (best first) and each time use a random RPT.
684
685 // We limit the number of rendezvous establishment attempts, separately, since we don't
686 // try to talk to the intro pt until we've established the rendezvous circuit.
687 let mut rend_attempts = 0..max_total_attempts;
688
689 // But, we put all the errors into the same bucket, since we might have a mixture.
690 let mut errors = RetryError::in_attempt_to("make circuit to to hidden service");
691
692 // Note that IntroPtIndex is *not* the index into this Vec.
693 // It is the index into the original list of introduction points in the descriptor.
694 let mut usable_intros: Vec<UsableIntroPt> = desc
695 .intro_points()
696 .iter()
697 .enumerate()
698 .map(|(intro_index, intro_desc)| {
699 let intro_index = intro_index.into();
700 let intro_target = ipt_to_circtarget(intro_desc, &self.netdir)
701 .map_err(|error| FAE::UnusableIntro { error, intro_index })?;
702 // Lack of TAIT means this clone
703 let intro_target = OwnedCircTarget::from_circ_target(&intro_target);
704 Ok::<_, FailedAttemptError>(UsableIntroPt {
705 intro_index,
706 intro_desc,
707 intro_target,
708 sort_rand: self.mocks.thread_rng().random(),
709 })
710 })
711 .filter_map(|entry| match entry {
712 Ok(y) => Some(y),
713 Err(e) => {
714 errors.push(e);
715 None
716 }
717 })
718 .collect_vec();
719
720 // Delete experience information for now-unlisted intro points
721 // Otherwise, as the IPTs change `Data` might grow without bound,
722 // if we keep reconnecting to the same HS.
723 data.retain(|k, _v| {
724 usable_intros
725 .iter()
726 .any(|ipt| RelayIdForExperience::for_lookup(&ipt.intro_target).any(|id| &id == k))
727 });
728
729 // Join with existing state recording our experiences,
730 // sort by descending goodness, and then randomly
731 // (so clients without any experience don't all pile onto the same, first, IPT)
732 usable_intros.sort_by_key(|ipt: &UsableIntroPt| {
733 let experience =
734 RelayIdForExperience::for_lookup(&ipt.intro_target).find_map(|id| data.get(&id));
735 IptSortKey {
736 outcome: experience.into(),
737 sort_rand: ipt.sort_rand,
738 }
739 });
740 self.mocks.test_got_ipts(&usable_intros);
741
742 let mut intro_attempts = usable_intros.iter().cycle().take(max_total_attempts);
743
744 // We retain a rendezvous we managed to set up in here. That way if we created it, and
745 // then failed before we actually needed it, we can reuse it.
746 // If we exit with an error, we will waste it - but because we isolate things we do
747 // for different services, it wouldn't be reusable anyway.
748 let mut saved_rendezvous = None;
749
750 // If we are using proof-of-work DoS mitigation, this chooses an
751 // algorithm and initial effort, and adjusts that effort when we retry.
752 let mut pow_client = HsPowClient::new(&self.hs_blind_id, desc);
753
754 // We might consider making multiple INTRODUCE attempts to different
755 // IPTs in in parallel, and somehow aggregating the errors and
756 // experiences.
757 // However our HS experts don't consider that important:
758 // https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
759 // Parallelizing our HsCircPool circuit building would likely have
760 // greater impact. (See #1149.)
761 loop {
762 // When did we start doing things that depended on the IPT?
763 //
764 // Used for recording our experience with the selected IPT
765 let mut ipt_use_started = None::<Instant>;
766
767 // Error handling inner async block (analogous to an IEFE):
768 // * Ok(Some()) means this attempt succeeded
769 // * Ok(None) means all attempts exhausted
770 // * Err(error) means this attempt failed
771 //
772 // Error handling is rather complex here. It's the primary job of *this* code to
773 // make sure that it's done right for timeouts. (The individual component
774 // functions handle non-timeout errors.) The different timeout errors have
775 // different amounts of information about the identity of the RPT and IPT: in each
776 // case, the error only mentions the RPT or IPT if that node is implicated in the
777 // timeout.
778 let outcome = async {
779 // We establish a rendezvous point first. Although it appears from reading
780 // this code that this means we serialise establishment of the rendezvous and
781 // introduction circuits, this isn't actually the case. The circmgr maintains
782 // a pool of circuits. What actually happens in the "standing start" case is
783 // that we obtain a circuit for rendezvous from the circmgr's pool, expecting
784 // one to be available immediately; the circmgr will then start to build a new
785 // one to replenish its pool, and that happens in parallel with the work we do
786 // here - but in arrears. If the circmgr pool is empty, then we must wait.
787 //
788 // Perhaps this should be parallelised here. But that's really what the pool
789 // is for, since we expect building the rendezvous circuit and building the
790 // introduction circuit to take about the same length of time.
791 //
792 // We *do* serialise the ESTABLISH_RENDEZVOUS exchange, with the
793 // building of the introduction circuit. That could be improved, at the cost
794 // of some additional complexity here.
795 //
796 // Our HS experts don't consider it important to increase the parallelism:
797 // https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914444
798 // https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914445
799 if saved_rendezvous.is_none() {
800 debug!("hs conn to {}: setting up rendezvous point", &self.hsid);
801 // Establish a rendezvous circuit.
802 let Some(_): Option<usize> = rend_attempts.next() else {
803 return Ok(None);
804 };
805
806 let mut using_rend_pt = None;
807 saved_rendezvous = Some(
808 self.runtime
809 .timeout(rend_timeout, self.establish_rendezvous(&mut using_rend_pt))
810 .await
811 .map_err(|_: TimeoutError| match using_rend_pt {
812 None => FAE::RendezvousCircuitObtain {
813 error: tor_circmgr::Error::CircTimeout(None),
814 },
815 Some(rend_pt) => FAE::RendezvousEstablishTimeout { rend_pt },
816 })??,
817 );
818 }
819
820 let Some(ipt) = intro_attempts.next() else {
821 return Ok(None);
822 };
823 let intro_index = ipt.intro_index;
824
825 let proof_of_work = match pow_client.solve().await {
826 Ok(solution) => solution,
827 Err(e) => {
828 debug!(
829 "failing to compute proof-of-work, trying without. ({:?})",
830 e
831 );
832 None
833 }
834 };
835
836 // We record how long things take, starting from here, as
837 // as a statistic we'll use for the IPT in future.
838 // This is stored in a variable outside this async block,
839 // so that the outcome handling can use it.
840 ipt_use_started = Some(self.runtime.now());
841
842 // No `Option::get_or_try_insert_with`, or we'd avoid this expect()
843 let rend_pt_for_error = rend_pt_identity_for_error(
844 &saved_rendezvous
845 .as_ref()
846 .expect("just made Some")
847 .rend_relay,
848 );
849 debug!(
850 "hs conn to {}: RPT {}",
851 &self.hsid,
852 rend_pt_for_error.as_inner()
853 );
854
855 let (rendezvous, introduced) = self
856 .runtime
857 .timeout(
858 intro_timeout,
859 self.exchange_introduce(ipt, &mut saved_rendezvous,
860 proof_of_work),
861 )
862 .await
863 .map_err(|_: TimeoutError| {
864 // The intro point ought to give us a prompt ACK regardless of HS
865 // behaviour or whatever is happening at the RPT, so blame the IPT.
866 FAE::IntroductionTimeout { intro_index }
867 })?
868 // TODO: Maybe try, once, to extend-and-reuse the intro circuit.
869 //
870 // If the introduction fails, the introduction circuit is in principle
871 // still usable. We believe that in this case, C Tor extends the intro
872 // circuit by one hop to the next IPT to try. That saves on building a
873 // whole new 3-hop intro circuit. However, our HS experts tell us that
874 // if introduction fails at one IPT it is likely to fail at the others too,
875 // so that optimisation might reduce our network impact and time to failure,
876 // but isn't likely to improve our chances of success.
877 //
878 // However, it's not clear whether this approach risks contaminating
879 // the 2nd attempt with some fault relating to the introduction point.
880 // The 1st ipt might also gain more knowledge about which HS we're talking to.
881 //
882 // TODO SPEC: Discuss extend-and-reuse HS intro circuit after nack
883 ?;
884 #[allow(unused_variables)] // it's *supposed* to be unused
885 let saved_rendezvous = (); // don't use `saved_rendezvous` any more, use rendezvous
886
887 let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
888 let circ = self
889 .runtime
890 .timeout(
891 rpt_ipt_timeout,
892 self.complete_rendezvous(ipt, rendezvous, introduced),
893 )
894 .await
895 .map_err(|_: TimeoutError| FAE::RendezvousCompletionTimeout {
896 intro_index,
897 rend_pt: rend_pt.clone(),
898 })??;
899
900 debug!(
901 "hs conn to {}: RPT {} IPT {}: success",
902 &self.hsid,
903 rend_pt.as_inner(),
904 intro_index,
905 );
906 Ok::<_, FAE>(Some((intro_index, circ)))
907 }
908 .await;
909
910 // Store the experience `outcome` we had with IPT `intro_index`, in `data`
911 #[allow(clippy::unused_unit)] // -> () is here for error handling clarity
912 let mut store_experience = |intro_index, outcome| -> () {
913 (|| {
914 let ipt = usable_intros
915 .iter()
916 .find(|ipt| ipt.intro_index == intro_index)
917 .ok_or_else(|| internal!("IPT not found by index"))?;
918 let id = RelayIdForExperience::for_store(&ipt.intro_target)?;
919 let started = ipt_use_started.ok_or_else(|| {
920 internal!("trying to record IPT use but no IPT start time noted")
921 })?;
922 let duration = self
923 .runtime
924 .now()
925 .checked_duration_since(started)
926 .ok_or_else(|| internal!("clock overflow calculating IPT use duration"))?;
927 data.insert(id, IptExperience { duration, outcome });
928 Ok::<_, Bug>(())
929 })()
930 .unwrap_or_else(|e| warn_report!(e, "error recording HS IPT use experience"));
931 };
932
933 match outcome {
934 Ok(Some((intro_index, y))) => {
935 // Record successful outcome in Data
936 store_experience(intro_index, Ok(()));
937 return Ok(y);
938 }
939 Ok(None) => return Err(CE::Failed(errors)),
940 Err(error) => {
941 debug_report!(&error, "hs conn to {}: attempt failed", &self.hsid);
942 // Record error outcome in Data, if in fact we involved the IPT
943 // at all. The IPT information is be retrieved from `error`,
944 // since only some of the errors implicate the introduction point.
945 if let Some(intro_index) = error.intro_index() {
946 store_experience(intro_index, Err(error.retry_time()));
947 }
948 errors.push(error);
949
950 // If we are using proof-of-work DoS mitigation, try harder next time
951 pow_client.increase_effort();
952 }
953 }
954 }
955 }
956
957 /// Make one attempt to establish a rendezvous circuit
958 ///
959 /// This doesn't really depend on anything,
960 /// other than (obviously) the isolation implied by our circuit pool.
961 /// In particular it doesn't depend on the introduction point.
962 ///
963 /// Does not apply a timeout.
964 ///
965 /// On entry `using_rend_pt` is `None`.
966 /// This function will store `Some` when it finds out which relay
967 /// it is talking to and starts to converse with it.
968 /// That way, if a timeout occurs, the caller can add that information to the error.
969 async fn establish_rendezvous(
970 &'c self,
971 using_rend_pt: &mut Option<RendPtIdentityForError>,
972 ) -> Result<Rendezvous<'c, R, M>, FAE> {
973 let (rend_circ, rend_relay) = self
974 .circpool
975 .m_get_or_launch_client_rend(&self.netdir)
976 .await
977 .map_err(|error| FAE::RendezvousCircuitObtain { error })?;
978
979 let rend_pt = rend_pt_identity_for_error(&rend_relay);
980 *using_rend_pt = Some(rend_pt.clone());
981
982 let rend_cookie: RendCookie = self.mocks.thread_rng().random();
983 let message = EstablishRendezvous::new(rend_cookie);
984
985 let (rend_established_tx, rend_established_rx) = proto_oneshot::channel();
986 let (rend2_tx, rend2_rx) = proto_oneshot::channel();
987
988 /// Handler which expects `RENDEZVOUS_ESTABLISHED` and then
989 /// `RENDEZVOUS2`. Returns each message via the corresponding `oneshot`.
990 struct Handler {
991 /// Sender for a RENDEZVOUS_ESTABLISHED message.
992 rend_established_tx: proto_oneshot::Sender<RendezvousEstablished>,
993 /// Sender for a RENDEZVOUS2 message.
994 rend2_tx: proto_oneshot::Sender<Rendezvous2>,
995 }
996 impl MsgHandler for Handler {
997 fn handle_msg(
998 &mut self,
999 msg: AnyRelayMsg,
1000 ) -> Result<MetaCellDisposition, tor_proto::Error> {
1001 // The first message we expect is a RENDEZVOUS_ESTABALISHED.
1002 if self.rend_established_tx.still_expected() {
1003 self.rend_established_tx
1004 .deliver_expected_message(msg, MetaCellDisposition::Consumed)
1005 } else {
1006 self.rend2_tx
1007 .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
1008 }
1009 }
1010 }
1011
1012 debug!(
1013 "hs conn to {}: RPT {}: sending ESTABLISH_RENDEZVOUS",
1014 &self.hsid,
1015 rend_pt.as_inner(),
1016 );
1017
1018 let handle_proto_error = |error| FAE::RendezvousEstablish {
1019 error,
1020 rend_pt: rend_pt.clone(),
1021 };
1022 let handler = Handler {
1023 rend_established_tx,
1024 rend2_tx,
1025 };
1026
1027 rend_circ
1028 .m_start_conversation_last_hop(Some(message.into()), handler)
1029 .await
1030 .map_err(handle_proto_error)?;
1031
1032 // `start_conversation` returns as soon as the control message has been sent.
1033 // We need to obtain the RENDEZVOUS_ESTABLISHED message, which is "returned" via the oneshot.
1034 let _: RendezvousEstablished = rend_established_rx.recv(handle_proto_error).await?;
1035
1036 debug!(
1037 "hs conn to {}: RPT {}: got RENDEZVOUS_ESTABLISHED",
1038 &self.hsid,
1039 rend_pt.as_inner(),
1040 );
1041
1042 Ok(Rendezvous {
1043 rend_circ,
1044 rend_cookie,
1045 rend_relay,
1046 rend2_rx,
1047 marker: PhantomData,
1048 })
1049 }
1050
1051 /// Attempt (once) to send an INTRODUCE1 and wait for the INTRODUCE_ACK
1052 ///
1053 /// `take`s the input `rendezvous` (but only takes it if it gets that far)
1054 /// and, if successful, returns it.
1055 /// (This arranges that the rendezvous is "used up" precisely if
1056 /// we sent its secret somewhere.)
1057 ///
1058 /// Although this function handles the `Rendezvous`,
1059 /// nothing in it actually involves the rendezvous point.
1060 /// So if there's a failure, it's purely to do with the introduction point.
1061 ///
1062 /// Does not apply a timeout.
1063 async fn exchange_introduce(
1064 &'c self,
1065 ipt: &UsableIntroPt<'_>,
1066 rendezvous: &mut Option<Rendezvous<'c, R, M>>,
1067 proof_of_work: Option<ProofOfWork>,
1068 ) -> Result<(Rendezvous<'c, R, M>, Introduced<R, M>), FAE> {
1069 let intro_index = ipt.intro_index;
1070
1071 debug!(
1072 "hs conn to {}: IPT {}: obtaining intro circuit",
1073 &self.hsid, intro_index,
1074 );
1075
1076 let intro_circ = self
1077 .circpool
1078 .m_get_or_launch_specific(
1079 &self.netdir,
1080 HsCircKind::ClientIntro,
1081 ipt.intro_target.clone(), // &OwnedCircTarget isn't CircTarget apparently
1082 )
1083 .await
1084 .map_err(|error| FAE::IntroductionCircuitObtain { error, intro_index })?;
1085
1086 let rendezvous = rendezvous.take().ok_or_else(|| internal!("no rend"))?;
1087
1088 let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
1089
1090 debug!(
1091 "hs conn to {}: RPT {} IPT {}: making introduction",
1092 &self.hsid,
1093 rend_pt.as_inner(),
1094 intro_index,
1095 );
1096
1097 // Now we construct an introduce1 message and perform the first part of the
1098 // rendezvous handshake.
1099 //
1100 // This process is tricky because the header of the INTRODUCE1 message
1101 // -- which depends on the IntroPt configuration -- is authenticated as
1102 // part of the HsDesc handshake.
1103
1104 // Construct the header, since we need it as input to our encryption.
1105 let intro_header = {
1106 let ipt_sid_key = ipt.intro_desc.ipt_sid_key();
1107 let intro1 = Introduce1::new(
1108 AuthKeyType::ED25519_SHA3_256,
1109 ipt_sid_key.as_bytes().to_vec(),
1110 vec![],
1111 );
1112 let mut header = vec![];
1113 intro1
1114 .encode_onto(&mut header)
1115 .map_err(into_internal!("couldn't encode intro1 header"))?;
1116 header
1117 };
1118
1119 // Construct the introduce payload, which tells the onion service how to find
1120 // our rendezvous point. (We could do this earlier if we wanted.)
1121 let intro_payload = {
1122 let onion_key =
1123 intro_payload::OnionKey::NtorOnionKey(*rendezvous.rend_relay.ntor_onion_key());
1124 let linkspecs = rendezvous
1125 .rend_relay
1126 .linkspecs()
1127 .map_err(into_internal!("Couldn't encode link specifiers"))?;
1128 let payload = IntroduceHandshakePayload::new(
1129 rendezvous.rend_cookie,
1130 onion_key,
1131 linkspecs,
1132 proof_of_work,
1133 );
1134 let mut encoded = vec![];
1135 payload
1136 .write_onto(&mut encoded)
1137 .map_err(into_internal!("Couldn't encode introduce1 payload"))?;
1138 encoded
1139 };
1140
1141 // Perform the cryptographic handshake with the onion service.
1142 let service_info = hs_ntor::HsNtorServiceInfo::new(
1143 ipt.intro_desc.svc_ntor_key().clone(),
1144 ipt.intro_desc.ipt_sid_key().clone(),
1145 self.subcredential,
1146 );
1147 let handshake_state =
1148 hs_ntor::HsNtorClientState::new(&mut self.mocks.thread_rng(), service_info);
1149 let encrypted_body = handshake_state
1150 .client_send_intro(&intro_header, &intro_payload)
1151 .map_err(into_internal!("can't begin hs-ntor handshake"))?;
1152
1153 // Build our actual INTRODUCE1 message.
1154 let intro1_real = Introduce1::new(
1155 AuthKeyType::ED25519_SHA3_256,
1156 ipt.intro_desc.ipt_sid_key().as_bytes().to_vec(),
1157 encrypted_body,
1158 );
1159
1160 /// Handler which expects just `INTRODUCE_ACK`
1161 struct Handler {
1162 /// Sender for `INTRODUCE_ACK`
1163 intro_ack_tx: proto_oneshot::Sender<IntroduceAck>,
1164 }
1165 impl MsgHandler for Handler {
1166 fn handle_msg(
1167 &mut self,
1168 msg: AnyRelayMsg,
1169 ) -> Result<MetaCellDisposition, tor_proto::Error> {
1170 self.intro_ack_tx
1171 .deliver_expected_message(msg, MetaCellDisposition::ConversationFinished)
1172 }
1173 }
1174 let handle_intro_proto_error = |error| FAE::IntroductionExchange { error, intro_index };
1175 let (intro_ack_tx, intro_ack_rx) = proto_oneshot::channel();
1176 let handler = Handler { intro_ack_tx };
1177
1178 debug!(
1179 "hs conn to {}: RPT {} IPT {}: making introduction - sending INTRODUCE1",
1180 &self.hsid,
1181 rend_pt.as_inner(),
1182 intro_index,
1183 );
1184
1185 intro_circ
1186 .m_start_conversation_last_hop(Some(intro1_real.into()), handler)
1187 .await
1188 .map_err(handle_intro_proto_error)?;
1189
1190 // Status is checked by `.success()`, and we don't look at the extensions;
1191 // just discard the known-successful `IntroduceAck`
1192 let _: IntroduceAck = intro_ack_rx
1193 .recv(handle_intro_proto_error)
1194 .await?
1195 .success()
1196 .map_err(|status| FAE::IntroductionFailed {
1197 status,
1198 intro_index,
1199 })?;
1200
1201 debug!(
1202 "hs conn to {}: RPT {} IPT {}: making introduction - success",
1203 &self.hsid,
1204 rend_pt.as_inner(),
1205 intro_index,
1206 );
1207
1208 // Having received INTRODUCE_ACK. we can forget about this circuit
1209 // (and potentially tear it down).
1210 drop(intro_circ);
1211
1212 Ok((
1213 rendezvous,
1214 Introduced {
1215 handshake_state,
1216 marker: PhantomData,
1217 },
1218 ))
1219 }
1220
1221 /// Attempt (once) to connect a rendezvous circuit using the given intro pt
1222 ///
1223 /// Timeouts here might be due to the IPT, RPT, service,
1224 /// or any of the intermediate relays.
1225 ///
1226 /// If, rather than a timeout, we actually encounter some kind of error,
1227 /// we'll return the appropriate `FailedAttemptError`.
1228 /// (Who is responsible may vary, so the `FailedAttemptError` variant will reflect that.)
1229 ///
1230 /// Does not apply a timeout
1231 async fn complete_rendezvous(
1232 &'c self,
1233 ipt: &UsableIntroPt<'_>,
1234 rendezvous: Rendezvous<'c, R, M>,
1235 introduced: Introduced<R, M>,
1236 ) -> Result<Arc<ClientCirc!(R, M)>, FAE> {
1237 use tor_proto::circuit::handshake;
1238
1239 let rend_pt = rend_pt_identity_for_error(&rendezvous.rend_relay);
1240 let intro_index = ipt.intro_index;
1241 let handle_proto_error = |error| FAE::RendezvousCompletionCircuitError {
1242 error,
1243 intro_index,
1244 rend_pt: rend_pt.clone(),
1245 };
1246
1247 debug!(
1248 "hs conn to {}: RPT {} IPT {}: awaiting rendezvous completion",
1249 &self.hsid,
1250 rend_pt.as_inner(),
1251 intro_index,
1252 );
1253
1254 let rend2_msg: Rendezvous2 = rendezvous.rend2_rx.recv(handle_proto_error).await?;
1255
1256 debug!(
1257 "hs conn to {}: RPT {} IPT {}: received RENDEZVOUS2",
1258 &self.hsid,
1259 rend_pt.as_inner(),
1260 intro_index,
1261 );
1262
1263 // In theory would be great if we could have multiple introduction attempts in parallel
1264 // with similar x,X values but different IPTs. However, our HS experts don't
1265 // think increasing parallelism here is important:
1266 // https://gitlab.torproject.org/tpo/core/arti/-/issues/913#note_2914438
1267 let handshake_state = introduced.handshake_state;
1268
1269 // Try to complete the cryptographic handshake.
1270 let keygen = handshake_state
1271 .client_receive_rend(rend2_msg.handshake_info())
1272 // If this goes wrong. either the onion service has mangled the crypto,
1273 // or the rendezvous point has misbehaved (that that is possible is a protocol bug),
1274 // or we have used the wrong handshake_state (let's assume that's not true).
1275 //
1276 // If this happens we'll go and try another RPT.
1277 .map_err(|error| FAE::RendezvousCompletionHandshake {
1278 error,
1279 intro_index,
1280 rend_pt: rend_pt.clone(),
1281 })?;
1282
1283 let params = onion_circparams_from_netparams(self.netdir.params())
1284 .map_err(into_internal!("Failed to build CircParameters"))?;
1285
1286 rendezvous
1287 .rend_circ
1288 .m_extend_virtual(
1289 handshake::RelayProtocol::HsV3,
1290 handshake::HandshakeRole::Initiator,
1291 keygen,
1292 params,
1293 )
1294 .await
1295 .map_err(into_internal!(
1296 "actually this is probably a 'circuit closed' error" // TODO HS
1297 ))?;
1298
1299 debug!(
1300 "hs conn to {}: RPT {} IPT {}: HS circuit established",
1301 &self.hsid,
1302 rend_pt.as_inner(),
1303 intro_index,
1304 );
1305
1306 Ok(rendezvous.rend_circ)
1307 }
1308
1309 /// Helper to estimate a timeout for a complicated operation
1310 ///
1311 /// `actions` is a list of `(count, action)`, where each entry
1312 /// represents doing `action`, `count` times sequentially.
1313 ///
1314 /// Combines the timeout estimates and returns an overall timeout.
1315 fn estimate_timeout(&self, actions: &[(u32, TimeoutsAction)]) -> Duration {
1316 // This algorithm is, perhaps, wrong. For uncorrelated variables, a particular
1317 // percentile estimate for a sum of random variables, is not calculated by adding the
1318 // percentile estimates of the individual variables.
1319 //
1320 // But the actual lengths of times of the operations aren't uncorrelated.
1321 // If they were *perfectly* correlated, then this addition would be correct.
1322 // It will do for now; it just might be rather longer than it ought to be.
1323 actions
1324 .iter()
1325 .map(|(count, action)| {
1326 self.circpool
1327 .m_estimate_timeout(action)
1328 .saturating_mul(*count)
1329 })
1330 .fold(Duration::ZERO, Duration::saturating_add)
1331 }
1332}
1333
1334/// Mocks used for testing `connect.rs`
1335///
1336/// This is different to `MockableConnectorData`,
1337/// which is used to *replace* this file, when testing `state.rs`.
1338///
1339/// `MocksForConnect` provides mock facilities for *testing* this file.
1340//
1341// TODO this should probably live somewhere else, maybe tor-circmgr even?
1342// TODO this really ought to be made by macros or something
1343trait MocksForConnect<R>: Clone {
1344 /// HS circuit pool
1345 type HsCircPool: MockableCircPool<R>;
1346
1347 /// A random number generator
1348 type Rng: rand::Rng + rand::CryptoRng;
1349
1350 /// Tell tests we got this descriptor text
1351 fn test_got_desc(&self, _: &HsDesc) {}
1352 /// Tell tests we got this circuit
1353 fn test_got_circ(&self, _: &Arc<ClientCirc!(R, Self)>) {}
1354 /// Tell tests we have obtained and sorted the intros like this
1355 fn test_got_ipts(&self, _: &[UsableIntroPt]) {}
1356
1357 /// Return a random number generator
1358 fn thread_rng(&self) -> Self::Rng;
1359}
1360/// Mock for `HsCircPool`
1361///
1362/// Methods start with `m_` to avoid the following problem:
1363/// `ClientCirc::start_conversation` (say) means
1364/// to use the inherent method if one exists,
1365/// but will use a trait method if there isn't an inherent method.
1366///
1367/// So if the inherent method is renamed, the call in the impl here
1368/// turns into an always-recursive call.
1369/// This is not detected by the compiler due to the situation being
1370/// complicated by futures, `#[async_trait]` etc.
1371/// <https://github.com/rust-lang/rust/issues/111177>
1372#[async_trait]
1373trait MockableCircPool<R> {
1374 /// Client circuit
1375 type ClientCirc: MockableClientCirc;
1376 async fn m_get_or_launch_specific(
1377 &self,
1378 netdir: &NetDir,
1379 kind: HsCircKind,
1380 target: impl CircTarget + Send + Sync + 'async_trait,
1381 ) -> tor_circmgr::Result<Arc<Self::ClientCirc>>;
1382
1383 /// Client circuit
1384 async fn m_get_or_launch_client_rend<'a>(
1385 &self,
1386 netdir: &'a NetDir,
1387 ) -> tor_circmgr::Result<(Arc<Self::ClientCirc>, Relay<'a>)>;
1388
1389 /// Estimate timeout
1390 fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration;
1391}
1392/// Mock for `ClientCirc`
1393#[async_trait]
1394trait MockableClientCirc: Debug {
1395 /// Client circuit
1396 type DirStream: AsyncRead + AsyncWrite + Send + Unpin;
1397 async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream>;
1398
1399 /// Converse
1400 async fn m_start_conversation_last_hop(
1401 &self,
1402 msg: Option<AnyRelayMsg>,
1403 reply_handler: impl MsgHandler + Send + 'static,
1404 ) -> tor_proto::Result<Self::Conversation<'_>>;
1405 /// Conversation
1406 type Conversation<'r>
1407 where
1408 Self: 'r;
1409
1410 /// Add a virtual hop to the circuit.
1411 async fn m_extend_virtual(
1412 &self,
1413 protocol: tor_proto::circuit::handshake::RelayProtocol,
1414 role: tor_proto::circuit::handshake::HandshakeRole,
1415 handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1416 params: CircParameters,
1417 ) -> tor_proto::Result<()>;
1418}
1419
1420impl<R: Runtime> MocksForConnect<R> for () {
1421 type HsCircPool = HsCircPool<R>;
1422 type Rng = rand::rngs::ThreadRng;
1423
1424 fn thread_rng(&self) -> Self::Rng {
1425 rand::rng()
1426 }
1427}
1428#[async_trait]
1429impl<R: Runtime> MockableCircPool<R> for HsCircPool<R> {
1430 type ClientCirc = ClientCirc;
1431 async fn m_get_or_launch_specific(
1432 &self,
1433 netdir: &NetDir,
1434 kind: HsCircKind,
1435 target: impl CircTarget + Send + Sync + 'async_trait,
1436 ) -> tor_circmgr::Result<Arc<ClientCirc>> {
1437 HsCircPool::get_or_launch_specific(self, netdir, kind, target).await
1438 }
1439 async fn m_get_or_launch_client_rend<'a>(
1440 &self,
1441 netdir: &'a NetDir,
1442 ) -> tor_circmgr::Result<(Arc<ClientCirc>, Relay<'a>)> {
1443 HsCircPool::get_or_launch_client_rend(self, netdir).await
1444 }
1445 fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
1446 HsCircPool::estimate_timeout(self, action)
1447 }
1448}
1449#[async_trait]
1450impl MockableClientCirc for ClientCirc {
1451 /// Client circuit
1452 type DirStream = tor_proto::stream::DataStream;
1453 async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
1454 ClientCirc::begin_dir_stream(self).await
1455 }
1456 async fn m_start_conversation_last_hop(
1457 &self,
1458 msg: Option<AnyRelayMsg>,
1459 reply_handler: impl MsgHandler + Send + 'static,
1460 ) -> tor_proto::Result<Self::Conversation<'_>> {
1461 let last_hop = self.last_hop_num()?;
1462 ClientCirc::start_conversation(self, msg, reply_handler, last_hop).await
1463 }
1464 type Conversation<'r> = tor_proto::circuit::Conversation<'r>;
1465
1466 async fn m_extend_virtual(
1467 &self,
1468 protocol: tor_proto::circuit::handshake::RelayProtocol,
1469 role: tor_proto::circuit::handshake::HandshakeRole,
1470 handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1471 params: CircParameters,
1472 ) -> tor_proto::Result<()> {
1473 ClientCirc::extend_virtual(self, protocol, role, handshake, params).await
1474 }
1475}
1476
1477#[async_trait]
1478impl MockableConnectorData for Data {
1479 type ClientCirc = ClientCirc;
1480 type MockGlobalState = ();
1481
1482 async fn connect<R: Runtime>(
1483 connector: &HsClientConnector<R>,
1484 netdir: Arc<NetDir>,
1485 config: Arc<Config>,
1486 hsid: HsId,
1487 data: &mut Self,
1488 secret_keys: HsClientSecretKeys,
1489 ) -> Result<Arc<Self::ClientCirc>, ConnError> {
1490 connect(connector, netdir, config, hsid, data, secret_keys).await
1491 }
1492
1493 fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool {
1494 !circuit.is_closing()
1495 }
1496}
1497
1498#[cfg(test)]
1499mod test {
1500 // @@ begin test lint list maintained by maint/add_warning @@
1501 #![allow(clippy::bool_assert_comparison)]
1502 #![allow(clippy::clone_on_copy)]
1503 #![allow(clippy::dbg_macro)]
1504 #![allow(clippy::mixed_attributes_style)]
1505 #![allow(clippy::print_stderr)]
1506 #![allow(clippy::print_stdout)]
1507 #![allow(clippy::single_char_pattern)]
1508 #![allow(clippy::unwrap_used)]
1509 #![allow(clippy::unchecked_duration_subtraction)]
1510 #![allow(clippy::useless_vec)]
1511 #![allow(clippy::needless_pass_by_value)]
1512 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1513
1514 #![allow(dead_code, unused_variables)] // TODO HS TESTS delete, after tests are completed
1515
1516 use super::*;
1517 use crate::*;
1518 use futures::FutureExt as _;
1519 use std::{iter, panic::AssertUnwindSafe};
1520 use tokio_crate as tokio;
1521 use tor_async_utils::JoinReadWrite;
1522 use tor_basic_utils::test_rng::{testing_rng, TestingRng};
1523 use tor_hscrypto::pk::{HsClientDescEncKey, HsClientDescEncKeypair};
1524 use tor_llcrypto::pk::curve25519;
1525 use tor_netdoc::doc::{hsdesc::test_data, netstatus::Lifetime};
1526 use tor_rtcompat::tokio::TokioNativeTlsRuntime;
1527 use tor_rtcompat::RuntimeSubstExt as _;
1528 #[allow(deprecated)] // TODO #1885
1529 use tor_rtmock::time::MockSleepProvider;
1530 use tracing_test::traced_test;
1531
1532 #[derive(Debug, Default)]
1533 struct MocksGlobal {
1534 hsdirs_asked: Vec<OwnedCircTarget>,
1535 got_desc: Option<HsDesc>,
1536 }
1537 #[derive(Clone, Debug)]
1538 struct Mocks<I> {
1539 mglobal: Arc<Mutex<MocksGlobal>>,
1540 id: I,
1541 }
1542
1543 impl<I> Mocks<I> {
1544 fn map_id<J>(&self, f: impl FnOnce(&I) -> J) -> Mocks<J> {
1545 Mocks {
1546 mglobal: self.mglobal.clone(),
1547 id: f(&self.id),
1548 }
1549 }
1550 }
1551
1552 impl<R: Runtime> MocksForConnect<R> for Mocks<()> {
1553 type HsCircPool = Mocks<()>;
1554 type Rng = TestingRng;
1555
1556 fn test_got_desc(&self, desc: &HsDesc) {
1557 self.mglobal.lock().unwrap().got_desc = Some(desc.clone());
1558 }
1559
1560 fn test_got_ipts(&self, desc: &[UsableIntroPt]) {}
1561
1562 fn thread_rng(&self) -> Self::Rng {
1563 testing_rng()
1564 }
1565 }
1566 #[allow(clippy::diverging_sub_expression)] // async_trait + todo!()
1567 #[async_trait]
1568 impl<R: Runtime> MockableCircPool<R> for Mocks<()> {
1569 type ClientCirc = Mocks<()>;
1570 async fn m_get_or_launch_specific(
1571 &self,
1572 _netdir: &NetDir,
1573 kind: HsCircKind,
1574 target: impl CircTarget + Send + Sync + 'async_trait,
1575 ) -> tor_circmgr::Result<Arc<Self::ClientCirc>> {
1576 assert_eq!(kind, HsCircKind::ClientHsDir);
1577 let target = OwnedCircTarget::from_circ_target(&target);
1578 self.mglobal.lock().unwrap().hsdirs_asked.push(target);
1579 // Adding the `Arc` here is a little ugly, but that's what we get
1580 // for using the same Mocks for everything.
1581 Ok(Arc::new(self.clone()))
1582 }
1583 /// Client circuit
1584 async fn m_get_or_launch_client_rend<'a>(
1585 &self,
1586 netdir: &'a NetDir,
1587 ) -> tor_circmgr::Result<(Arc<ClientCirc!(R, Self)>, Relay<'a>)> {
1588 todo!()
1589 }
1590
1591 fn m_estimate_timeout(&self, action: &TimeoutsAction) -> Duration {
1592 Duration::from_secs(10)
1593 }
1594 }
1595 #[allow(clippy::diverging_sub_expression)] // async_trait + todo!()
1596 #[async_trait]
1597 impl MockableClientCirc for Mocks<()> {
1598 type DirStream = JoinReadWrite<futures::io::Cursor<Box<[u8]>>, futures::io::Sink>;
1599 type Conversation<'r> = &'r ();
1600 async fn m_begin_dir_stream(self: Arc<Self>) -> tor_proto::Result<Self::DirStream> {
1601 let response = format!(
1602 r#"HTTP/1.1 200 OK
1603
1604{}"#,
1605 test_data::TEST_DATA_2
1606 )
1607 .into_bytes()
1608 .into_boxed_slice();
1609
1610 Ok(JoinReadWrite::new(
1611 futures::io::Cursor::new(response),
1612 futures::io::sink(),
1613 ))
1614 }
1615 async fn m_start_conversation_last_hop(
1616 &self,
1617 msg: Option<AnyRelayMsg>,
1618 reply_handler: impl MsgHandler + Send + 'static,
1619 ) -> tor_proto::Result<Self::Conversation<'_>> {
1620 todo!()
1621 }
1622
1623 async fn m_extend_virtual(
1624 &self,
1625 protocol: tor_proto::circuit::handshake::RelayProtocol,
1626 role: tor_proto::circuit::handshake::HandshakeRole,
1627 handshake: impl tor_proto::circuit::handshake::KeyGenerator + Send,
1628 params: CircParameters,
1629 ) -> tor_proto::Result<()> {
1630 todo!()
1631 }
1632 }
1633
1634 #[traced_test]
1635 #[tokio::test]
1636 async fn test_connect() {
1637 let valid_after = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
1638 let fresh_until = valid_after + humantime::parse_duration("1 hours").unwrap();
1639 let valid_until = valid_after + humantime::parse_duration("24 hours").unwrap();
1640 let lifetime = Lifetime::new(valid_after, fresh_until, valid_until).unwrap();
1641
1642 let netdir = tor_netdir::testnet::construct_custom_netdir_with_params(
1643 tor_netdir::testnet::simple_net_func,
1644 iter::empty::<(&str, _)>(),
1645 Some(lifetime),
1646 )
1647 .expect("failed to build default testing netdir");
1648
1649 let netdir = Arc::new(netdir.unwrap_if_sufficient().unwrap());
1650 let runtime = TokioNativeTlsRuntime::current().unwrap();
1651 let now = humantime::parse_rfc3339("2023-02-09T12:00:00Z").unwrap();
1652 #[allow(deprecated)] // TODO #1885
1653 let mock_sp = MockSleepProvider::new(now);
1654 let runtime = runtime
1655 .with_sleep_provider(mock_sp.clone())
1656 .with_coarse_time_provider(mock_sp);
1657 let time_period = netdir.hs_time_period();
1658
1659 let mglobal = Arc::new(Mutex::new(MocksGlobal::default()));
1660 let mocks = Mocks { mglobal, id: () };
1661 // From C Tor src/test/test_hs_common.c test_build_address
1662 let hsid = test_data::TEST_HSID_2.into();
1663 let mut data = Data::default();
1664
1665 let pk: HsClientDescEncKey = curve25519::PublicKey::from(test_data::TEST_PUBKEY_2).into();
1666 let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
1667 let mut secret_keys_builder = HsClientSecretKeysBuilder::default();
1668 secret_keys_builder.ks_hsc_desc_enc(HsClientDescEncKeypair::new(pk.clone(), sk));
1669 let secret_keys = secret_keys_builder.build().unwrap();
1670
1671 let ctx = Context::new(
1672 &runtime,
1673 &mocks,
1674 netdir,
1675 Default::default(),
1676 hsid,
1677 secret_keys,
1678 mocks.clone(),
1679 )
1680 .unwrap();
1681
1682 let _got = AssertUnwindSafe(ctx.connect(&mut data))
1683 .catch_unwind() // TODO HS TESTS: remove this and the AssertUnwindSafe
1684 .await;
1685
1686 let (hs_blind_id_key, subcredential) = HsIdKey::try_from(hsid)
1687 .unwrap()
1688 .compute_blinded_key(time_period)
1689 .unwrap();
1690 let hs_blind_id = hs_blind_id_key.id();
1691
1692 let sk = curve25519::StaticSecret::from(test_data::TEST_SECKEY_2).into();
1693
1694 let hsdesc = HsDesc::parse_decrypt_validate(
1695 test_data::TEST_DATA_2,
1696 &hs_blind_id,
1697 now,
1698 &subcredential,
1699 Some(&HsClientDescEncKeypair::new(pk, sk)),
1700 )
1701 .unwrap()
1702 .dangerously_assume_timely();
1703
1704 let mglobal = mocks.mglobal.lock().unwrap();
1705 assert_eq!(mglobal.hsdirs_asked.len(), 1);
1706 // TODO hs: here and in other places, consider implementing PartialEq instead, or creating
1707 // an assert_dbg_eq macro (which would be part of a test_helpers crate or something)
1708 assert_eq!(
1709 format!("{:?}", mglobal.got_desc),
1710 format!("{:?}", Some(hsdesc))
1711 );
1712
1713 // Check how long the descriptor is valid for
1714 let (start_time, end_time) = data.desc.as_ref().unwrap().bounds();
1715 assert_eq!(start_time, None);
1716
1717 let desc_valid_until = humantime::parse_rfc3339("2023-02-11T20:00:00Z").unwrap();
1718 assert_eq!(end_time, Some(desc_valid_until));
1719
1720 // TODO HS TESTS: check the circuit in got is the one we gave out
1721
1722 // TODO HS TESTS: continue with this
1723 }
1724
1725 // TODO HS TESTS: Test IPT state management and expiry:
1726 // - obtain a test descriptor with only a broken ipt
1727 // (broken in the sense that intro can be attempted, but will fail somehow)
1728 // - try to make a connection and expect it to fail
1729 // - assert that the ipt data isn't empty
1730 // - cause the descriptor to expire (advance clock)
1731 // - start using a mocked RNG if we weren't already and pin its seed here
1732 // - make a new descriptor with two IPTs: the broken one from earlier, and a new one
1733 // - make a new connection
1734 // - use test_got_ipts to check that the random numbers
1735 // would sort the bad intro first, *and* that the good one is appears first
1736 // - assert that connection succeeded
1737 // - cause the circuit and descriptor to expire (advance clock)
1738 // - go back to the previous descriptor contents, but with a new validity period
1739 // - try to make a connection
1740 // - use test_got_ipts to check that only the broken ipt is present
1741
1742 // TODO HS TESTS: test retries (of every retry loop we have here)
1743 // TODO HS TESTS: test error paths
1744}