tor_hsservice/ipt_establish.rs
1//! IPT Establisher
2//!
3//! Responsible for maintaining and establishing one introduction point.
4//!
5//! TODO (#1235): move docs from `hssvc-ipt-algorithm.md`
6//!
7//! See the docs for
8//! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now)
9//! for details of our algorithm.
10
11use crate::internal_prelude::*;
12
13use tor_cell::relaycell::{
14 hs::est_intro::{self, EstablishIntroDetails},
15 msg::IntroEstablished,
16};
17use tor_proto::TargetHop;
18
19/// Handle onto the task which is establishing and maintaining one IPT
20pub(crate) struct IptEstablisher {
21 /// A oneshot sender which, when dropped, notifies the running task that it's time to shut
22 /// down.
23 _terminate_tx: oneshot::Sender<Void>,
24
25 /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
26 state: Arc<Mutex<EstablisherState>>,
27}
28
29/// When the `IptEstablisher` is dropped it is torn down
30///
31/// Synchronously
32///
33/// * No rendezvous requests will be accepted
34/// that arrived after `Drop::drop` returns.
35///
36/// Asynchronously
37///
38/// * Circuits constructed for this IPT are torn down
39/// * The `rend_reqs` sink is closed (dropped)
40/// * `IptStatusStatus::Faulty` will be indicated
41impl Drop for IptEstablisher {
42 fn drop(&mut self) {
43 // Make sure no more requests are accepted once this returns.
44 //
45 // (Note that if we didn't care about the "no more rendezvous
46 // requests will be accepted" requirement, we could do away with this
47 // code and the corresponding check for `RequestDisposition::Shutdown` in
48 // `IptMsgHandler::handle_msg`.)
49 self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown;
50
51 // Tell the reactor to shut down... by doing nothing.
52 //
53 // (When terminate_tx is dropped, it will send an error to the
54 // corresponding terminate_rx.)
55 }
56}
57
58/// An error from trying to work with an IptEstablisher.
59#[derive(Clone, Debug, thiserror::Error)]
60pub(crate) enum IptEstablisherError {
61 /// We encountered a faulty IPT.
62 #[error("{0}")]
63 Ipt(#[from] IptError),
64
65 /// The network directory provider is shutting down without giving us the
66 /// netdir we asked for.
67 #[error("{0}")]
68 NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown),
69
70 /// We encountered an error while building a circuit to an intro point.
71 #[error("Unable to build circuit to introduction point")]
72 BuildCircuit(#[source] tor_circmgr::Error),
73
74 /// We encountered an error while querying the circuit state.
75 #[error("Unable to query circuit state")]
76 CircuitState(#[source] tor_proto::Error),
77
78 /// We encountered an error while building and signing our establish_intro
79 /// message.
80 #[error("Unable to construct signed ESTABLISH_INTRO message")]
81 CreateEstablishIntro(#[source] tor_cell::Error),
82
83 /// We encountered a timeout after building the circuit.
84 #[error("Timeout during ESTABLISH_INTRO handshake.")]
85 EstablishTimeout,
86
87 /// We encountered an error while sending our establish_intro
88 /// message.
89 #[error("Unable to send an ESTABLISH_INTRO message")]
90 SendEstablishIntro(#[source] tor_proto::Error),
91
92 /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the
93 /// circuit was closed.
94 #[error("Circuit closed during INTRO_ESTABLISHED handshake")]
95 ClosedWithoutAck,
96
97 /// We encountered a programming error.
98 #[error("Internal error")]
99 Bug(#[from] tor_error::Bug),
100}
101
102/// An error caused by a faulty IPT.
103#[derive(Clone, Debug, thiserror::Error)]
104#[non_exhaustive]
105pub enum IptError {
106 /// When we tried to establish this introduction point, we found that the
107 /// netdir didn't list it.
108 ///
109 /// This introduction point is not strictly "faulty", but unlisted in the directory means we
110 /// can't use the introduction point.
111 #[error("Introduction point not listed in network directory")]
112 IntroPointNotListed,
113
114 /// We received an invalid INTRO_ESTABLISHED message.
115 ///
116 /// This is definitely the introduction point's fault: it sent us
117 /// an authenticated message, but the contents of that message were
118 /// definitely wrong.
119 #[error("Got an invalid INTRO_ESTABLISHED message")]
120 // Eventually, once we expect intro_established extensions, we will make
121 // sure that they are well-formed.
122 #[allow(dead_code)]
123 BadEstablished,
124
125 /// We received a message that not a valid part of the introduction-point
126 /// protocol.
127 #[error("Invalid message: {0}")]
128 BadMessage(String),
129
130 /// This introduction point has gone too long without a success.
131 #[error("introduction point has gone too long without a success")]
132 Timeout,
133}
134
135impl tor_error::HasKind for IptEstablisherError {
136 fn kind(&self) -> tor_error::ErrorKind {
137 use tor_error::ErrorKind as EK;
138 use IptEstablisherError as E;
139 match self {
140 E::Ipt(e) => e.kind(),
141 E::NetdirProviderShutdown(e) => e.kind(),
142 E::BuildCircuit(e) => e.kind(),
143 E::CircuitState(e) => e.kind(),
144 E::EstablishTimeout => EK::TorNetworkTimeout,
145 E::SendEstablishIntro(e) => e.kind(),
146 E::ClosedWithoutAck => EK::CircuitCollapse,
147 E::CreateEstablishIntro(_) => EK::Internal,
148 E::Bug(e) => e.kind(),
149 }
150 }
151}
152
153impl tor_error::HasKind for IptError {
154 fn kind(&self) -> tor_error::ErrorKind {
155 use tor_error::ErrorKind as EK;
156 use IptError as E;
157 match self {
158 E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind.
159 E::BadEstablished => EK::RemoteProtocolViolation,
160 E::BadMessage(_) => EK::RemoteProtocolViolation,
161 E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct
162 }
163 }
164}
165
166impl IptEstablisherError {
167 /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault.
168 ///
169 /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`,
170 /// it means that we should try another relay as an introduction point,
171 /// though we don't necessarily need to give up on this one.
172 ///
173 /// Note that the intro point may be to blame even if we return `None`;
174 /// we only return `true` when we are certain that the intro point is
175 /// unlisted, unusable, or misbehaving.
176 fn ipt_failure(&self) -> Option<&IptError> {
177 use IptEstablisherError as IE;
178 match self {
179 // If we don't have a netdir, then no intro point is better than any other.
180 IE::NetdirProviderShutdown(_) => None,
181 IE::Ipt(e) => Some(e),
182 // This _might_ be the introduction point's fault, but it might not.
183 // We can't be certain.
184 IE::BuildCircuit(_) => None,
185 IE::CircuitState(_) => None,
186 IE::EstablishTimeout => None,
187 IE::ClosedWithoutAck => None,
188 // These are, most likely, not the introduction point's fault,
189 // though they might or might not be survivable.
190 IE::CreateEstablishIntro(_) => None,
191 IE::SendEstablishIntro(_) => None,
192 IE::Bug(_) => None,
193 }
194 }
195}
196
197/// Parameters for an introduction point
198///
199/// Consumed by `IptEstablisher::new`.
200/// Primarily serves as a convenient way to bundle the many arguments required.
201///
202/// Does not include:
203/// * The runtime (which would force this struct to have a type parameter)
204/// * The circuit builder (leaving this out makes it possible to use this
205/// struct during mock execution, where we don't call `IptEstablisher::new`).
206#[derive(Educe)]
207#[educe(Debug)]
208pub(crate) struct IptParameters {
209 /// A receiver that we can use to tell us about updates in our configuration.
210 ///
211 /// Configuration changes may tell us to change our introduction points or build new
212 /// circuits to them.
213 //
214 // TODO (#1209):
215 //
216 // We want to make a new introduction circuit if our dos parameters change,
217 // which means that we should possibly be watching for changes in our
218 // configuration. Right now, though, we only copy out the configuration
219 // on startup.
220 #[educe(Debug(ignore))]
221 pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
222 /// A `NetDirProvider` that we'll use to find changes in the network
223 /// parameters, and to look up information about routers.
224 #[educe(Debug(ignore))]
225 pub(crate) netdir_provider: Arc<dyn NetDirProvider>,
226 /// A shared sender that we'll use to report incoming INTRODUCE2 requests
227 /// for rendezvous circuits.
228 #[educe(Debug(ignore))]
229 pub(crate) introduce_tx: mpsc::Sender<RendRequest>,
230 /// Opaque local ID for this introduction point.
231 ///
232 /// This ID does not change within the lifetime of an [`IptEstablisher`].
233 /// See [`IptLocalId`] for information about what changes would require a
234 /// new ID (and hence a new `IptEstablisher`).
235 pub(crate) lid: IptLocalId,
236 /// Persistent log for INTRODUCE2 requests.
237 ///
238 /// We use this to record the requests that we see, and to prevent replays.
239 #[educe(Debug(ignore))]
240 pub(crate) replay_log: IptReplayLog,
241 /// A set of identifiers for the Relay that we intend to use as the
242 /// introduction point.
243 ///
244 /// We use this to identify the relay within a `NetDir`, and to make sure
245 /// we're connecting to the right introduction point.
246 pub(crate) target: RelayIds,
247 /// Keypair used to authenticate and identify ourselves to this introduction
248 /// point.
249 ///
250 /// Later, we publish the public component of this keypair in our HsDesc,
251 /// and clients use it to tell the introduction point which introduction circuit
252 /// should receive their requests.
253 ///
254 /// This is the `K_hs_ipt_sid` keypair.
255 pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>,
256 /// Whether this `IptEstablisher` should begin by accepting requests, or
257 /// wait to be told that requests are okay.
258 pub(crate) accepting_requests: RequestDisposition,
259 /// Keypair used to decrypt INTRODUCE2 requests from clients.
260 ///
261 /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to
262 /// form a shared key set of keys with the client, and decrypt information
263 /// about the client's chosen rendezvous point and extensions.
264 pub(crate) k_ntor: Arc<HsSvcNtorKeypair>,
265}
266
267impl IptEstablisher {
268 /// Try to set up, and maintain, an IPT at `target`.
269 ///
270 /// Rendezvous requests will be rejected or accepted
271 /// depending on the value of `accepting_requests`
272 /// (which must be `Advertised` or `NotAdvertised`).
273 ///
274 /// Also returns a stream of events that is produced whenever we have a
275 /// change in the IptStatus for this intro point. Note that this stream is
276 /// potentially lossy.
277 ///
278 /// The returned `watch::Receiver` will yield `Faulty` if the IPT
279 /// establisher is shut down (or crashes).
280 ///
281 /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks
282 /// and close all circuits used to establish this introduction point.
283 pub(crate) fn launch<R: Runtime>(
284 runtime: &R,
285 params: IptParameters,
286 pool: Arc<HsCircPool<R>>,
287 keymgr: &Arc<KeyMgr>,
288 ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> {
289 // This exhaustive deconstruction ensures that we don't
290 // accidentally forget to handle any of our inputs.
291 let IptParameters {
292 config_rx,
293 netdir_provider,
294 introduce_tx,
295 lid,
296 target,
297 k_sid,
298 k_ntor,
299 accepting_requests,
300 replay_log,
301 } = params;
302 let config = Arc::clone(&config_rx.borrow());
303 let nickname = config.nickname().clone();
304
305 if matches!(accepting_requests, RequestDisposition::Shutdown) {
306 return Err(bad_api_usage!(
307 "Tried to create a IptEstablisher that that was already shutting down?"
308 )
309 .into());
310 }
311
312 let state = Arc::new(Mutex::new(EstablisherState { accepting_requests }));
313
314 let request_context = Arc::new(RendRequestContext {
315 nickname: nickname.clone(),
316 keymgr: Arc::clone(keymgr),
317 kp_hss_ntor: Arc::clone(&k_ntor),
318 kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(),
319 filter: config.filter_settings(),
320 netdir_provider: netdir_provider.clone(),
321 circ_pool: pool.clone(),
322 });
323
324 let reactor = Reactor {
325 runtime: runtime.clone(),
326 nickname,
327 pool,
328 netdir_provider,
329 lid,
330 target,
331 k_sid,
332 introduce_tx,
333 extensions: EstIntroExtensionSet {
334 // Updates to this are handled by the IPT manager: when it changes,
335 // this IPT will be replaced with one with the correct parameters.
336 dos_params: config.dos_extension()?,
337 },
338 state: state.clone(),
339 request_context,
340 replay_log: Arc::new(replay_log.into()),
341 };
342
343 let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new());
344 let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>();
345 let status_tx = DropNotifyWatchSender::new(status_tx);
346
347 // Spawn a task to keep the intro established. The task will shut down
348 // when terminate_tx is dropped.
349 runtime
350 .spawn(async move {
351 futures::select_biased!(
352 terminated = terminate_rx => {
353 // Only Err is possible, but the compiler can't tell that.
354 let oneshot::Canceled = terminated.void_unwrap_err();
355 }
356 outcome = reactor.keep_intro_established(status_tx).fuse() => {
357 warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task");
358 }
359 );
360 })
361 .map_err(|e| FatalError::Spawn {
362 spawning: "introduction point establisher",
363 cause: Arc::new(e),
364 })?;
365 let establisher = IptEstablisher {
366 _terminate_tx: terminate_tx,
367 state,
368 };
369 Ok((establisher, status_rx))
370 }
371
372 /// Begin accepting requests from this introduction point.
373 ///
374 /// If any introduction requests are sent before we have called this method,
375 /// they are treated as an error and our connection to this introduction
376 /// point is closed.
377 pub(crate) fn start_accepting(&self) {
378 self.state.lock().expect("poisoned lock").accepting_requests =
379 RequestDisposition::Advertised;
380 }
381}
382
383/// The current status of an introduction point, as defined in
384/// `hssvc-ipt-algorithms.md`.
385///
386/// TODO (#1235) Make that file unneeded.
387#[derive(Clone, Debug)]
388pub(crate) enum IptStatusStatus {
389 /// We are (re)establishing our connection to the IPT
390 ///
391 /// But we don't think there's anything wrong with it.
392 ///
393 /// The IPT manager should *not* arrange to include this in descriptors.
394 Establishing,
395
396 /// The IPT is established and ready to accept rendezvous requests
397 ///
398 /// Also contains information about the introduction point
399 /// necessary for making descriptors,
400 /// including information from the netdir about the relay
401 ///
402 /// The IPT manager *should* arrange to include this in descriptors.
403 Good(GoodIptDetails),
404
405 /// We don't have the IPT and it looks like it was the IPT's fault
406 ///
407 /// This should be used whenever trying another IPT relay is likely to work better;
408 /// regardless of whether attempts to establish *this* IPT can continue.
409 ///
410 /// The IPT manager should *not* arrange to include this in descriptors.
411 /// If this persists, the IPT manager should replace this IPT
412 /// with a new IPT at a different relay.
413 Faulty(Option<IptError>),
414}
415
416/// Details of a good introduction point
417///
418/// This struct contains similar information to
419/// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`].
420/// However, that insists that the contained `T` is a [`CircTarget`],
421/// which `<NtorPublicKey>` isn't.
422/// And, we don't use this as a circuit target (at least, not here -
423/// the client will do so, as a result of us publishing the information).
424///
425/// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974>
426#[derive(Clone, Debug, Eq, PartialEq)]
427pub(crate) struct GoodIptDetails {
428 /// The link specifiers to be used in the descriptor
429 ///
430 /// As obtained and converted from the netdir.
431 pub(crate) link_specifiers: LinkSpecs,
432
433 /// The introduction point relay's ntor key (from the netdir)
434 pub(crate) ipt_kp_ntor: NtorPublicKey,
435}
436
437impl GoodIptDetails {
438 /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails.
439 fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> {
440 Ok(Self {
441 link_specifiers: relay
442 .linkspecs()
443 .map_err(into_internal!("Unable to encode relay link specifiers"))?,
444 ipt_kp_ntor: *relay.ntor_onion_key(),
445 })
446 }
447}
448
449/// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
450///
451/// This happens when the IPT has had (too) many rendezvous requests.
452///
453/// This must *not* be used for *errors*, because it will cause the IPT manager to
454/// *immediately* start to replace the IPT, regardless of rate limits etc.
455#[derive(Clone, Debug, Eq, PartialEq)]
456pub(crate) struct IptWantsToRetire;
457
458/// State shared between the IptEstablisher and the Reactor.
459struct EstablisherState {
460 /// True if we are accepting requests right now.
461 accepting_requests: RequestDisposition,
462}
463
464/// Current state of an introduction point; determines what we want to do with
465/// any incoming messages.
466#[derive(Copy, Clone, Debug)]
467pub(crate) enum RequestDisposition {
468 /// We are not yet advertised: the message handler should complain if it
469 /// gets any requests and shut down.
470 NotAdvertised,
471 /// We are advertised: the message handler should pass along any requests
472 Advertised,
473 /// We are shutting down cleanly: the message handler should exit but not complain.
474 Shutdown,
475}
476
477/// The current status of an introduction point.
478#[derive(Clone, Debug)]
479pub(crate) struct IptStatus {
480 /// The current state of this introduction point as defined by
481 /// `hssvc-ipt-algorithms.md`.
482 ///
483 /// TODO (#1235): Make that file unneeded.
484 pub(crate) status: IptStatusStatus,
485
486 /// The current status of whether this introduction point circuit wants to be
487 /// retired based on having processed too many requests.
488 pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
489
490 /// If Some, a time after which all attempts have been unsuccessful.
491 pub(crate) failing_since: Option<Instant>,
492}
493
494/// We declare an introduction point to be faulty if all of the attempts to
495/// reach it fail, over this much time.
496///
497/// TODO: This value is more or less arbitrary; we may want to tune it in the
498/// future.
499const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60);
500
501impl IptStatus {
502 /// Record that we have successfully connected to an introduction point.
503 fn note_open(&mut self, ipt_details: GoodIptDetails) {
504 self.status = IptStatusStatus::Good(ipt_details);
505 self.failing_since = None;
506 }
507
508 /// Record that we are trying to connect to an introduction point.
509 fn note_attempt(&mut self) {
510 use IptStatusStatus::*;
511 self.status = match &self.status {
512 Establishing | Good(..) => Establishing,
513 Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken.
514 }
515 }
516
517 /// Record that an error has occurred.
518 fn note_error(&mut self, err: &IptEstablisherError, now: Instant) {
519 use IptStatusStatus::*;
520 let failing_since = *self.failing_since.get_or_insert(now);
521 #[allow(clippy::if_same_then_else)]
522 if let Some(ipt_err) = err.ipt_failure() {
523 // This error always indicates a faulty introduction point.
524 self.status = Faulty(Some(ipt_err.clone()));
525 } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD {
526 // This introduction point has gone too long without a success.
527 self.status = Faulty(Some(IptError::Timeout));
528 }
529 }
530
531 /// Return an `IptStatus` representing an establisher that has not yet taken
532 /// any action.
533 fn new() -> Self {
534 Self {
535 status: IptStatusStatus::Establishing,
536 wants_to_retire: Ok(()),
537 failing_since: None,
538 }
539 }
540
541 /// Produce an `IptStatus` representing a shut down or crashed establisher
542 fn new_terminated() -> Self {
543 IptStatus {
544 status: IptStatusStatus::Faulty(None),
545 // If we're broken, we simply tell the manager that that is the case.
546 // It will decide for itself whether it wants to replace us.
547 wants_to_retire: Ok(()),
548 failing_since: None,
549 }
550 }
551}
552
553impl Default for IptStatus {
554 fn default() -> Self {
555 Self::new()
556 }
557}
558
559impl tor_async_utils::DropNotifyEofSignallable for IptStatus {
560 fn eof() -> IptStatus {
561 IptStatus::new_terminated()
562 }
563}
564
565tor_cell::restricted_msg! {
566 /// An acceptable message to receive from an introduction point.
567 enum IptMsg : RelayMsg {
568 IntroEstablished,
569 Introduce2,
570 }
571}
572
573/// A set of extensions to send with our `ESTABLISH_INTRO` message.
574///
575/// NOTE: we eventually might want to support unrecognized extensions. But
576/// that's potentially troublesome, since the set of extensions we sent might
577/// have an affect on how we validate the reply.
578#[derive(Clone, Debug)]
579pub(crate) struct EstIntroExtensionSet {
580 /// Parameters related to rate-limiting to prevent denial-of-service
581 /// attacks.
582 dos_params: Option<est_intro::DosParams>,
583}
584
585/// Implementation structure for the task that implements an IptEstablisher.
586struct Reactor<R: Runtime> {
587 /// A copy of our runtime, used for timeouts and sleeping.
588 runtime: R,
589 /// The nickname of the onion service we're running. Used when logging.
590 nickname: HsNickname,
591 /// A pool used to create circuits to the introduction point.
592 pool: Arc<HsCircPool<R>>,
593 /// A provider used to select the other relays in the circuit.
594 netdir_provider: Arc<dyn NetDirProvider>,
595 /// Identifier for the intro point.
596 lid: IptLocalId,
597 /// The target introduction point.
598 target: RelayIds,
599 /// The keypair to use when establishing the introduction point.
600 ///
601 /// Knowledge of this private key prevents anybody else from impersonating
602 /// us to the introduction point.
603 k_sid: Arc<HsIntroPtSessionIdKeypair>,
604 /// The extensions to use when establishing the introduction point.
605 ///
606 /// TODO (#1209): This should be able to change over time as we re-establish
607 /// the intro point.
608 extensions: EstIntroExtensionSet,
609
610 /// The stream that will receive INTRODUCE2 messages.
611 introduce_tx: mpsc::Sender<RendRequest>,
612
613 /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
614 state: Arc<Mutex<EstablisherState>>,
615
616 /// Context information that we'll need to answer rendezvous requests.
617 request_context: Arc<RendRequestContext>,
618
619 /// Introduction request replay log
620 ///
621 /// Shared between multiple IPT circuit control message handlers -
622 /// [`IptMsgHandler`] contains the lock guard.
623 ///
624 /// Has to be an async mutex since it's locked for a long time,
625 /// so we mustn't block the async executor thread on it.
626 replay_log: Arc<futures::lock::Mutex<IptReplayLog>>,
627}
628
629/// An open session with a single introduction point.
630//
631// TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
632pub(crate) struct IntroPtSession {
633 /// The circuit to the introduction point, on which we're receiving
634 /// Introduce2 messages.
635 intro_circ: Arc<ClientCirc>,
636}
637
638impl<R: Runtime> Reactor<R> {
639 /// Run forever, keeping an introduction point established.
640 #[allow(clippy::blocks_in_conditions)]
641 #[allow(clippy::cognitive_complexity)]
642 async fn keep_intro_established(
643 &self,
644 mut status_tx: DropNotifyWatchSender<IptStatus>,
645 ) -> Result<Void, IptEstablisherError> {
646 let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000);
647 loop {
648 status_tx.borrow_mut().note_attempt();
649 match self.establish_intro_once().await {
650 Ok((session, good_ipt_details)) => {
651 // TODO (#1239): we need to monitor the netdir for changes to this relay
652 // Eg,
653 // - if it becomes unlisted, we should declare the IPT faulty
654 // (until it perhaps reappears)
655 //
656 // TODO SPEC Continuing to use an unlisted relay is dangerous
657 // It might be malicious. We should withdraw our IPT then,
658 // and hope that clients find another, working, IPT.
659 //
660 // - if it changes its ntor key or link specs,
661 // we need to update the GoodIptDetails in our status report,
662 // so that the updated info can make its way to the descriptor
663 //
664 // Possibly some this could/should be done by the IPT Manager instead,
665 // but Diziet thinks it is probably cleanest to do it here.
666
667 status_tx.borrow_mut().note_open(good_ipt_details);
668
669 debug!(
670 "{}: Successfully established introduction point with {}",
671 &self.nickname,
672 self.target.display_relay_ids().redacted()
673 );
674 // Now that we've succeeded, we can stop backing off for our
675 // next attempt.
676 retry_delay.reset();
677
678 // Wait for the session to be closed.
679 session.wait_for_close().await;
680 }
681 Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => {
682 // The network directory didn't include this relay. Wait
683 // until it does.
684 //
685 // Note that this `note_error` will necessarily mark the
686 // ipt as Faulty. That's important, since we may be about to
687 // wait indefinitely when we call wait_for_netdir_to_list.
688 status_tx.borrow_mut().note_error(&e, self.runtime.now());
689 self.netdir_provider
690 .wait_for_netdir_to_list(&self.target, Timeliness::Timely)
691 .await?;
692 }
693 Err(e) => {
694 status_tx.borrow_mut().note_error(&e, self.runtime.now());
695 debug_report!(
696 e,
697 "{}: Problem establishing introduction point with {}",
698 &self.nickname,
699 self.target.display_relay_ids().redacted()
700 );
701 let retry_after = retry_delay.next_delay(&mut rand::rng());
702 self.runtime.sleep(retry_after).await;
703 }
704 }
705 }
706 }
707
708 /// Try, once, to make a circuit to a single relay and establish an introduction
709 /// point there.
710 ///
711 /// Does not retry. Does not time out except via `HsCircPool`.
712 async fn establish_intro_once(
713 &self,
714 ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> {
715 let (protovers, circuit, ipt_details) = {
716 let netdir = self
717 .netdir_provider
718 .wait_for_netdir(tor_netdir::Timeliness::Timely)
719 .await?;
720 let circ_target = netdir
721 .by_ids(&self.target)
722 .ok_or(IptError::IntroPointNotListed)?;
723 let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?;
724
725 let kind = tor_circmgr::hspool::HsCircKind::SvcIntro;
726 let protovers = circ_target.protovers().clone();
727 let circuit = self
728 .pool
729 .get_or_launch_specific(netdir.as_ref(), kind, circ_target)
730 .await
731 .map_err(IptEstablisherError::BuildCircuit)?;
732 // note that netdir is dropped here, to avoid holding on to it any
733 // longer than necessary.
734 (protovers, circuit, ipt_details)
735 };
736
737 let establish_intro = {
738 let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into();
739 let mut details = EstablishIntroDetails::new(ipt_sid_id);
740 if let Some(dos_params) = &self.extensions.dos_params {
741 // We only send the Dos extension when the relay is known to
742 // support it.
743 use tor_protover::named::HSINTRO_RATELIM;
744 if protovers.supports_named_subver(HSINTRO_RATELIM) {
745 details.set_extension_dos(dos_params.clone());
746 }
747 }
748 let circuit_binding_key = circuit
749 .binding_key(TargetHop::LastHop)
750 .await
751 .map_err(IptEstablisherError::CircuitState)?
752 .ok_or(internal!("No binding key for introduction point!?"))?;
753 let body: Vec<u8> = details
754 .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac())
755 .map_err(IptEstablisherError::CreateEstablishIntro)?;
756
757 // TODO: This is ugly, but it is the sensible way to munge the above
758 // body into a format that AnyRelayMsgOuter will accept without doing a
759 // redundant parse step.
760 //
761 // One alternative would be allowing start_conversation to take an `impl
762 // RelayMsg` rather than an AnyRelayMsg.
763 //
764 // Or possibly, when we feel like it, we could rename one or more of
765 // these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll
766 // potentially face breaking changes up and down our crate stack.
767 AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
768 tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
769 body,
770 ))
771 };
772
773 let (established_tx, established_rx) = oneshot::channel();
774
775 // In theory there ought to be only one IptMsgHandler in existence at any one time,
776 // for any one IptLocalId (ie for any one IptReplayLog). However, the teardown
777 // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous
778 // (so we need to synchronise). Therefore:
779 //
780 // Make sure we don't start writing to the replay log until any previous
781 // IptMsgHandler has been torn down. (Using an async mutex means we
782 // don't risk blocking the whole executor even if we have teardown bugs.)
783 let replay_log = self.replay_log.clone().lock_owned().await;
784
785 let handler = IptMsgHandler {
786 established_tx: Some(established_tx),
787 introduce_tx: self.introduce_tx.clone(),
788 state: self.state.clone(),
789 lid: self.lid,
790 request_context: self.request_context.clone(),
791 replay_log,
792 };
793 let _conversation = circuit
794 .start_conversation(Some(establish_intro), handler, TargetHop::LastHop)
795 .await
796 .map_err(IptEstablisherError::SendEstablishIntro)?;
797 // At this point, we have `await`ed for the Conversation to exist, so we know
798 // that the message was sent. We have to wait for any actual `established`
799 // message, though.
800
801 let length = circuit
802 .n_hops()
803 .map_err(into_internal!("failed to get circuit length"))?;
804 let ack_timeout = self
805 .pool
806 .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip { length });
807 let _established: IntroEstablished = self
808 .runtime
809 .timeout(ack_timeout, established_rx)
810 .await
811 .map_err(|_| IptEstablisherError::EstablishTimeout)?
812 .map_err(|_| IptEstablisherError::ClosedWithoutAck)??;
813
814 // This session will be owned by keep_intro_established(), and dropped
815 // when the circuit closes, or when the keep_intro_established() future
816 // is dropped.
817 let session = IntroPtSession {
818 intro_circ: circuit,
819 };
820 Ok((session, ipt_details))
821 }
822}
823
824impl IntroPtSession {
825 /// Wait for this introduction point session to be closed.
826 fn wait_for_close(&self) -> impl Future<Output = ()> {
827 self.intro_circ.wait_for_close()
828 }
829}
830
831/// MsgHandler type to implement a conversation with an introduction point.
832///
833/// This, like all MsgHandlers, is installed at the circuit's reactor, and used
834/// to handle otherwise unrecognized message types.
835struct IptMsgHandler {
836 /// A oneshot sender used to report our IntroEstablished message.
837 ///
838 /// If this is None, then we already sent an IntroEstablished and we shouldn't
839 /// send any more.
840 established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>,
841
842 /// A channel used to report Introduce2 messages.
843 introduce_tx: mpsc::Sender<RendRequest>,
844
845 /// Keys that we'll need to answer the introduction requests.
846 request_context: Arc<RendRequestContext>,
847
848 /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
849 state: Arc<Mutex<EstablisherState>>,
850
851 /// Unique identifier for the introduction point (including the current
852 /// keys). Used to tag requests.
853 lid: IptLocalId,
854
855 /// A replay log used to detect replayed introduction requests.
856 replay_log: futures::lock::OwnedMutexGuard<IptReplayLog>,
857}
858
859impl tor_proto::circuit::MsgHandler for IptMsgHandler {
860 fn handle_msg(&mut self, any_msg: AnyRelayMsg) -> tor_proto::Result<MetaCellDisposition> {
861 let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| {
862 if let Some(tx) = self.established_tx.take() {
863 let _ = tx.send(Err(IptError::BadMessage(format!(
864 "Invalid message type {}",
865 m.cmd()
866 ))
867 .into()));
868 }
869 // TODO: It's not completely clear whether CircProto is the right
870 // type for use in this function (here and elsewhere);
871 // possibly, we should add a different tor_proto::Error type
872 // for protocol violations at a higher level than the circuit
873 // protocol.
874 //
875 // For now, however, this error type is fine: it will cause the
876 // circuit to be shut down, which is what we want.
877 tor_proto::Error::CircProto(format!(
878 "Invalid message type {} on introduction circuit",
879 m.cmd()
880 ))
881 })?;
882
883 if match msg {
884 IptMsg::IntroEstablished(established) => match self.established_tx.take() {
885 Some(tx) => {
886 // TODO: Once we want to enforce any properties on the
887 // intro_established message (like checking for correct
888 // extensions) we should do it here.
889 let established = Ok(established);
890 tx.send(established).map_err(|_| ())
891 }
892 None => {
893 return Err(tor_proto::Error::CircProto(
894 "Received a redundant INTRO_ESTABLISHED".into(),
895 ));
896 }
897 },
898 IptMsg::Introduce2(introduce2) => {
899 if let Some(tx) = self.established_tx.take() {
900 let _ = tx.send(Err(IptError::BadMessage(
901 "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(),
902 )
903 .into()));
904 return Err(tor_proto::Error::CircProto(
905 "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(),
906 ));
907 }
908 let disp = self.state.lock().expect("poisoned lock").accepting_requests;
909 match disp {
910 RequestDisposition::NotAdvertised => {
911 return Err(tor_proto::Error::CircProto(
912 "Received an INTRODUCE2 message before we were accepting requests!"
913 .into(),
914 ))
915 }
916 RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc),
917 RequestDisposition::Advertised => {}
918 }
919 match self.replay_log.check_for_replay(&introduce2) {
920 Ok(()) => {}
921 Err(ReplayError::AlreadySeen) => {
922 // This is probably a replay, but maybe an accident. We
923 // just drop the request.
924
925 // TODO (#1233): Log that this has occurred, with a rate
926 // limit. Possibly, we should allow it to fail once or
927 // twice per circuit before we log, since we expect
928 // a nonzero false-positive rate.
929 //
930 // Note that we should NOT close the circuit in this
931 // case: the repeated message could come from a hostile
932 // introduction point trying to do traffic analysis, but
933 // it could also come from a user trying to make it look
934 // like the intro point is doing traffic analysis.
935 return Ok(MetaCellDisposition::Consumed);
936 }
937 Err(ReplayError::Log(_)) => {
938 // Uh-oh! We failed to write the data persistently!
939 //
940 // TODO (#1226): We need to decide what to do here. Right
941 // now we close the circuit, which is wrong.
942 return Ok(MetaCellDisposition::CloseCirc);
943 }
944 }
945
946 let request = RendRequest::new(self.lid, introduce2, self.request_context.clone());
947 let send_outcome = self.introduce_tx.try_send(request);
948
949 // We only want to report full-stream problems as errors here.
950 // Disconnected streams are expected.
951 let report_outcome = match &send_outcome {
952 Err(e) if e.is_full() => Err(StreamWasFull {}),
953 _ => Ok(()),
954 };
955 // TODO: someday we might want to start tracking this by
956 // introduction or service point separately, though we would
957 // expect their failures to be correlated.
958 log_ratelim!("sending rendezvous request to handler task"; report_outcome);
959
960 match send_outcome {
961 Ok(()) => Ok(()),
962 Err(e) => {
963 if e.is_disconnected() {
964 // The receiver is disconnected, meaning that
965 // messages from this intro point are no longer
966 // wanted. Close the circuit.
967 Err(())
968 } else {
969 // The receiver is full; we have no real option but
970 // to drop the request like C-tor does when the
971 // backlog is too large.
972 //
973 // See discussion at
974 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349
975 Ok(())
976 }
977 }
978 }
979 }
980 } == Err(())
981 {
982 // If the above return an error, we failed to send. That means that
983 // we need to close the circuit, since nobody is listening on the
984 // other end of the tx.
985 return Ok(MetaCellDisposition::CloseCirc);
986 }
987
988 Ok(MetaCellDisposition::Consumed)
989 }
990}
991
992/// We failed to send a rendezvous request onto the handler test that should
993/// have handled it, because it was not handling requests fast enough.
994///
995/// (This is a separate type so that we can have it implement Clone.)
996#[derive(Clone, Debug, thiserror::Error)]
997#[error("Could not send request; stream was full.")]
998struct StreamWasFull {}