tor_hsservice/ipt_establish.rs
1//! IPT Establisher
2//!
3//! Responsible for maintaining and establishing one introduction point.
4//!
5//! TODO (#1235): move docs from `hssvc-ipt-algorithm.md`
6//!
7//! See the docs for
8//! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now)
9//! for details of our algorithm.
10
11use crate::internal_prelude::*;
12
13use tor_cell::relaycell::{
14 hs::est_intro::{self, EstablishIntroDetails},
15 msg::IntroEstablished,
16};
17use tor_circmgr::ServiceOnionServiceIntroTunnel;
18use tor_proto::TargetHop;
19use tracing::instrument;
20
21/// Handle onto the task which is establishing and maintaining one IPT
22pub(crate) struct IptEstablisher {
23 /// A oneshot sender which, when dropped, notifies the running task that it's time to shut
24 /// down.
25 _terminate_tx: oneshot::Sender<Void>,
26
27 /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
28 state: Arc<Mutex<EstablisherState>>,
29}
30
31/// When the `IptEstablisher` is dropped it is torn down
32///
33/// Synchronously
34///
35/// * No rendezvous requests will be accepted
36/// that arrived after `Drop::drop` returns.
37///
38/// Asynchronously
39///
40/// * Circuits constructed for this IPT are torn down
41/// * The `rend_reqs` sink is closed (dropped)
42/// * `IptStatusStatus::Faulty` will be indicated
43impl Drop for IptEstablisher {
44 fn drop(&mut self) {
45 // Make sure no more requests are accepted once this returns.
46 //
47 // (Note that if we didn't care about the "no more rendezvous
48 // requests will be accepted" requirement, we could do away with this
49 // code and the corresponding check for `RequestDisposition::Shutdown` in
50 // `IptMsgHandler::handle_msg`.)
51 self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown;
52
53 // Tell the reactor to shut down... by doing nothing.
54 //
55 // (When terminate_tx is dropped, it will send an error to the
56 // corresponding terminate_rx.)
57 }
58}
59
60/// An error from trying to work with an IptEstablisher.
61#[derive(Clone, Debug, thiserror::Error)]
62pub(crate) enum IptEstablisherError {
63 /// We encountered a faulty IPT.
64 #[error("{0}")]
65 Ipt(#[from] IptError),
66
67 /// The network directory provider is shutting down without giving us the
68 /// netdir we asked for.
69 #[error("{0}")]
70 NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown),
71
72 /// We encountered an error while building a circuit to an intro point.
73 #[error("Unable to build circuit to introduction point")]
74 BuildCircuit(#[source] tor_circmgr::Error),
75
76 /// We encountered an error while querying the circuit state.
77 #[error("Unable to query circuit state")]
78 CircuitState(#[source] tor_circmgr::Error),
79
80 /// We encountered an error while building and signing our establish_intro
81 /// message.
82 #[error("Unable to construct signed ESTABLISH_INTRO message")]
83 CreateEstablishIntro(#[source] tor_cell::Error),
84
85 /// We encountered a timeout after building the circuit.
86 #[error("Timeout during ESTABLISH_INTRO handshake.")]
87 EstablishTimeout,
88
89 /// We encountered an error while sending our establish_intro
90 /// message.
91 #[error("Unable to send an ESTABLISH_INTRO message")]
92 SendEstablishIntro(#[source] tor_circmgr::Error),
93
94 /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the
95 /// circuit was closed.
96 #[error("Circuit closed during INTRO_ESTABLISHED handshake")]
97 ClosedWithoutAck,
98
99 /// We encountered a programming error.
100 #[error("Internal error")]
101 Bug(#[from] tor_error::Bug),
102}
103
104/// An error caused by a faulty IPT.
105#[derive(Clone, Debug, thiserror::Error)]
106#[non_exhaustive]
107pub enum IptError {
108 /// When we tried to establish this introduction point, we found that the
109 /// netdir didn't list it.
110 ///
111 /// This introduction point is not strictly "faulty", but unlisted in the directory means we
112 /// can't use the introduction point.
113 #[error("Introduction point not listed in network directory")]
114 IntroPointNotListed,
115
116 /// We received an invalid INTRO_ESTABLISHED message.
117 ///
118 /// This is definitely the introduction point's fault: it sent us
119 /// an authenticated message, but the contents of that message were
120 /// definitely wrong.
121 #[error("Got an invalid INTRO_ESTABLISHED message")]
122 // Eventually, once we expect intro_established extensions, we will make
123 // sure that they are well-formed.
124 #[allow(dead_code)]
125 BadEstablished,
126
127 /// We received a message that not a valid part of the introduction-point
128 /// protocol.
129 #[error("Invalid message: {0}")]
130 BadMessage(String),
131
132 /// This introduction point has gone too long without a success.
133 #[error("introduction point has gone too long without a success")]
134 Timeout,
135}
136
137impl tor_error::HasKind for IptEstablisherError {
138 fn kind(&self) -> tor_error::ErrorKind {
139 use IptEstablisherError as E;
140 use tor_error::ErrorKind as EK;
141 match self {
142 E::Ipt(e) => e.kind(),
143 E::NetdirProviderShutdown(e) => e.kind(),
144 E::BuildCircuit(e) => e.kind(),
145 E::CircuitState(e) => e.kind(),
146 E::EstablishTimeout => EK::TorNetworkTimeout,
147 E::SendEstablishIntro(e) => e.kind(),
148 E::ClosedWithoutAck => EK::CircuitCollapse,
149 E::CreateEstablishIntro(_) => EK::Internal,
150 E::Bug(e) => e.kind(),
151 }
152 }
153}
154
155impl tor_error::HasKind for IptError {
156 fn kind(&self) -> tor_error::ErrorKind {
157 use IptError as E;
158 use tor_error::ErrorKind as EK;
159 match self {
160 E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind.
161 E::BadEstablished => EK::RemoteProtocolViolation,
162 E::BadMessage(_) => EK::RemoteProtocolViolation,
163 E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct
164 }
165 }
166}
167
168impl IptEstablisherError {
169 /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault.
170 ///
171 /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`,
172 /// it means that we should try another relay as an introduction point,
173 /// though we don't necessarily need to give up on this one.
174 ///
175 /// Note that the intro point may be to blame even if we return `None`;
176 /// we only return `true` when we are certain that the intro point is
177 /// unlisted, unusable, or misbehaving.
178 fn ipt_failure(&self) -> Option<&IptError> {
179 use IptEstablisherError as IE;
180 match self {
181 // If we don't have a netdir, then no intro point is better than any other.
182 IE::NetdirProviderShutdown(_) => None,
183 IE::Ipt(e) => Some(e),
184 // This _might_ be the introduction point's fault, but it might not.
185 // We can't be certain.
186 IE::BuildCircuit(_) => None,
187 IE::CircuitState(_) => None,
188 IE::EstablishTimeout => None,
189 IE::ClosedWithoutAck => None,
190 // These are, most likely, not the introduction point's fault,
191 // though they might or might not be survivable.
192 IE::CreateEstablishIntro(_) => None,
193 IE::SendEstablishIntro(_) => None,
194 IE::Bug(_) => None,
195 }
196 }
197}
198
199/// Parameters for an introduction point
200///
201/// Consumed by `IptEstablisher::new`.
202/// Primarily serves as a convenient way to bundle the many arguments required.
203///
204/// Does not include:
205/// * The runtime (which would force this struct to have a type parameter)
206/// * The circuit builder (leaving this out makes it possible to use this
207/// struct during mock execution, where we don't call `IptEstablisher::new`).
208#[derive(Educe)]
209#[educe(Debug)]
210pub(crate) struct IptParameters {
211 /// A receiver that we can use to tell us about updates in our configuration.
212 ///
213 /// Configuration changes may tell us to change our introduction points or build new
214 /// circuits to them.
215 //
216 // TODO (#1209):
217 //
218 // We want to make a new introduction circuit if our dos parameters change,
219 // which means that we should possibly be watching for changes in our
220 // configuration. Right now, though, we only copy out the configuration
221 // on startup.
222 #[educe(Debug(ignore))]
223 pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
224 /// A `NetDirProvider` that we'll use to find changes in the network
225 /// parameters, and to look up information about routers.
226 #[educe(Debug(ignore))]
227 pub(crate) netdir_provider: Arc<dyn NetDirProvider>,
228 /// A shared sender that we'll use to report incoming INTRODUCE2 requests
229 /// for rendezvous circuits.
230 #[educe(Debug(ignore))]
231 pub(crate) introduce_tx: mpsc::Sender<RendRequest>,
232 /// Opaque local ID for this introduction point.
233 ///
234 /// This ID does not change within the lifetime of an [`IptEstablisher`].
235 /// See [`IptLocalId`] for information about what changes would require a
236 /// new ID (and hence a new `IptEstablisher`).
237 pub(crate) lid: IptLocalId,
238 /// Persistent log for INTRODUCE2 requests.
239 ///
240 /// We use this to record the requests that we see, and to prevent replays.
241 #[educe(Debug(ignore))]
242 pub(crate) replay_log: IptReplayLog,
243 /// A set of identifiers for the Relay that we intend to use as the
244 /// introduction point.
245 ///
246 /// We use this to identify the relay within a `NetDir`, and to make sure
247 /// we're connecting to the right introduction point.
248 pub(crate) target: RelayIds,
249 /// Keypair used to authenticate and identify ourselves to this introduction
250 /// point.
251 ///
252 /// Later, we publish the public component of this keypair in our HsDesc,
253 /// and clients use it to tell the introduction point which introduction circuit
254 /// should receive their requests.
255 ///
256 /// This is the `K_hs_ipt_sid` keypair.
257 pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>,
258 /// Whether this `IptEstablisher` should begin by accepting requests, or
259 /// wait to be told that requests are okay.
260 pub(crate) accepting_requests: RequestDisposition,
261 /// Keypair used to decrypt INTRODUCE2 requests from clients.
262 ///
263 /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to
264 /// form a shared key set of keys with the client, and decrypt information
265 /// about the client's chosen rendezvous point and extensions.
266 pub(crate) k_ntor: Arc<HsSvcNtorKeypair>,
267}
268
269impl IptEstablisher {
270 /// Try to set up, and maintain, an IPT at `target`.
271 ///
272 /// Rendezvous requests will be rejected or accepted
273 /// depending on the value of `accepting_requests`
274 /// (which must be `Advertised` or `NotAdvertised`).
275 ///
276 /// Also returns a stream of events that is produced whenever we have a
277 /// change in the IptStatus for this intro point. Note that this stream is
278 /// potentially lossy.
279 ///
280 /// The returned `watch::Receiver` will yield `Faulty` if the IPT
281 /// establisher is shut down (or crashes).
282 ///
283 /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks
284 /// and close all circuits used to establish this introduction point.
285 #[instrument(level = "trace", skip_all)]
286 pub(crate) fn launch<R: Runtime>(
287 runtime: &R,
288 params: IptParameters,
289 pool: Arc<HsCircPool<R>>,
290 keymgr: &Arc<KeyMgr>,
291 ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> {
292 // This exhaustive deconstruction ensures that we don't
293 // accidentally forget to handle any of our inputs.
294 let IptParameters {
295 config_rx,
296 netdir_provider,
297 introduce_tx,
298 lid,
299 target,
300 k_sid,
301 k_ntor,
302 accepting_requests,
303 replay_log,
304 } = params;
305 let config = Arc::clone(&config_rx.borrow());
306 let nickname = config.nickname().clone();
307
308 if matches!(accepting_requests, RequestDisposition::Shutdown) {
309 return Err(bad_api_usage!(
310 "Tried to create a IptEstablisher that that was already shutting down?"
311 )
312 .into());
313 }
314
315 let state = Arc::new(Mutex::new(EstablisherState { accepting_requests }));
316
317 let request_context = Arc::new(RendRequestContext {
318 nickname: nickname.clone(),
319 keymgr: Arc::clone(keymgr),
320 kp_hss_ntor: Arc::clone(&k_ntor),
321 kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(),
322 filter: config.filter_settings(),
323 netdir_provider: netdir_provider.clone(),
324 circ_pool: pool.clone(),
325 });
326
327 let reactor = Reactor {
328 runtime: runtime.clone(),
329 nickname,
330 pool,
331 netdir_provider,
332 lid,
333 target,
334 k_sid,
335 introduce_tx,
336 extensions: EstIntroExtensionSet {
337 // Updates to this are handled by the IPT manager: when it changes,
338 // this IPT will be replaced with one with the correct parameters.
339 dos_params: config.dos_extension()?,
340 },
341 state: state.clone(),
342 request_context,
343 replay_log: Arc::new(replay_log.into()),
344 };
345
346 let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new());
347 let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>();
348 let status_tx = DropNotifyWatchSender::new(status_tx);
349
350 // Spawn a task to keep the intro established. The task will shut down
351 // when terminate_tx is dropped.
352 runtime
353 .spawn(async move {
354 futures::select_biased!(
355 terminated = terminate_rx => {
356 // Only Err is possible, but the compiler can't tell that.
357 let oneshot::Canceled = terminated.void_unwrap_err();
358 }
359 outcome = reactor.keep_intro_established(status_tx).fuse() => {
360 warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task");
361 }
362 );
363 })
364 .map_err(|e| FatalError::Spawn {
365 spawning: "introduction point establisher",
366 cause: Arc::new(e),
367 })?;
368 let establisher = IptEstablisher {
369 _terminate_tx: terminate_tx,
370 state,
371 };
372 Ok((establisher, status_rx))
373 }
374
375 /// Begin accepting requests from this introduction point.
376 ///
377 /// If any introduction requests are sent before we have called this method,
378 /// they are treated as an error and our connection to this introduction
379 /// point is closed.
380 pub(crate) fn start_accepting(&self) {
381 self.state.lock().expect("poisoned lock").accepting_requests =
382 RequestDisposition::Advertised;
383 }
384}
385
386/// The current status of an introduction point, as defined in
387/// `hssvc-ipt-algorithms.md`.
388///
389/// TODO (#1235) Make that file unneeded.
390#[derive(Clone, Debug)]
391pub(crate) enum IptStatusStatus {
392 /// We are (re)establishing our connection to the IPT
393 ///
394 /// But we don't think there's anything wrong with it.
395 ///
396 /// The IPT manager should *not* arrange to include this in descriptors.
397 Establishing,
398
399 /// The IPT is established and ready to accept rendezvous requests
400 ///
401 /// Also contains information about the introduction point
402 /// necessary for making descriptors,
403 /// including information from the netdir about the relay
404 ///
405 /// The IPT manager *should* arrange to include this in descriptors.
406 Good(GoodIptDetails),
407
408 /// We don't have the IPT and it looks like it was the IPT's fault
409 ///
410 /// This should be used whenever trying another IPT relay is likely to work better;
411 /// regardless of whether attempts to establish *this* IPT can continue.
412 ///
413 /// The IPT manager should *not* arrange to include this in descriptors.
414 /// If this persists, the IPT manager should replace this IPT
415 /// with a new IPT at a different relay.
416 Faulty(Option<IptError>),
417}
418
419/// Details of a good introduction point
420///
421/// This struct contains similar information to
422/// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`].
423/// However, that insists that the contained `T` is a [`CircTarget`],
424/// which `<NtorPublicKey>` isn't.
425/// And, we don't use this as a circuit target (at least, not here -
426/// the client will do so, as a result of us publishing the information).
427///
428/// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974>
429#[derive(Clone, Debug, Eq, PartialEq)]
430pub(crate) struct GoodIptDetails {
431 /// The link specifiers to be used in the descriptor
432 ///
433 /// As obtained and converted from the netdir.
434 pub(crate) link_specifiers: LinkSpecs,
435
436 /// The introduction point relay's ntor key (from the netdir)
437 pub(crate) ipt_kp_ntor: NtorPublicKey,
438}
439
440impl GoodIptDetails {
441 /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails.
442 fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> {
443 Ok(Self {
444 link_specifiers: relay
445 .linkspecs()
446 .map_err(into_internal!("Unable to encode relay link specifiers"))?,
447 ipt_kp_ntor: *relay.ntor_onion_key(),
448 })
449 }
450}
451
452/// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
453///
454/// This happens when the IPT has had (too) many rendezvous requests.
455///
456/// This must *not* be used for *errors*, because it will cause the IPT manager to
457/// *immediately* start to replace the IPT, regardless of rate limits etc.
458#[derive(Clone, Debug, Eq, PartialEq)]
459pub(crate) struct IptWantsToRetire;
460
461/// State shared between the IptEstablisher and the Reactor.
462struct EstablisherState {
463 /// True if we are accepting requests right now.
464 accepting_requests: RequestDisposition,
465}
466
467/// Current state of an introduction point; determines what we want to do with
468/// any incoming messages.
469#[derive(Copy, Clone, Debug)]
470pub(crate) enum RequestDisposition {
471 /// We are not yet advertised: the message handler should complain if it
472 /// gets any requests and shut down.
473 NotAdvertised,
474 /// We are advertised: the message handler should pass along any requests
475 Advertised,
476 /// We are shutting down cleanly: the message handler should exit but not complain.
477 Shutdown,
478}
479
480/// The current status of an introduction point.
481#[derive(Clone, Debug)]
482pub(crate) struct IptStatus {
483 /// The current state of this introduction point as defined by
484 /// `hssvc-ipt-algorithms.md`.
485 ///
486 /// TODO (#1235): Make that file unneeded.
487 pub(crate) status: IptStatusStatus,
488
489 /// The current status of whether this introduction point circuit wants to be
490 /// retired based on having processed too many requests.
491 pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
492
493 /// If Some, a time after which all attempts have been unsuccessful.
494 pub(crate) failing_since: Option<Instant>,
495}
496
497/// We declare an introduction point to be faulty if all of the attempts to
498/// reach it fail, over this much time.
499///
500/// TODO: This value is more or less arbitrary; we may want to tune it in the
501/// future.
502const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60);
503
504impl IptStatus {
505 /// Record that we have successfully connected to an introduction point.
506 fn note_open(&mut self, ipt_details: GoodIptDetails) {
507 self.status = IptStatusStatus::Good(ipt_details);
508 self.failing_since = None;
509 }
510
511 /// Record that we are trying to connect to an introduction point.
512 fn note_attempt(&mut self) {
513 use IptStatusStatus::*;
514 self.status = match &self.status {
515 Establishing | Good(..) => Establishing,
516 Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken.
517 }
518 }
519
520 /// Record that an error has occurred.
521 fn note_error(&mut self, err: &IptEstablisherError, now: Instant) {
522 use IptStatusStatus::*;
523 let failing_since = *self.failing_since.get_or_insert(now);
524 #[allow(clippy::if_same_then_else)]
525 if let Some(ipt_err) = err.ipt_failure() {
526 // This error always indicates a faulty introduction point.
527 self.status = Faulty(Some(ipt_err.clone()));
528 } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD {
529 // This introduction point has gone too long without a success.
530 self.status = Faulty(Some(IptError::Timeout));
531 }
532 }
533
534 /// Return an `IptStatus` representing an establisher that has not yet taken
535 /// any action.
536 fn new() -> Self {
537 Self {
538 status: IptStatusStatus::Establishing,
539 wants_to_retire: Ok(()),
540 failing_since: None,
541 }
542 }
543
544 /// Produce an `IptStatus` representing a shut down or crashed establisher
545 fn new_terminated() -> Self {
546 IptStatus {
547 status: IptStatusStatus::Faulty(None),
548 // If we're broken, we simply tell the manager that that is the case.
549 // It will decide for itself whether it wants to replace us.
550 wants_to_retire: Ok(()),
551 failing_since: None,
552 }
553 }
554}
555
556impl Default for IptStatus {
557 fn default() -> Self {
558 Self::new()
559 }
560}
561
562impl tor_async_utils::DropNotifyEofSignallable for IptStatus {
563 fn eof() -> IptStatus {
564 IptStatus::new_terminated()
565 }
566}
567
568tor_cell::restricted_msg! {
569 /// An acceptable message to receive from an introduction point.
570 enum IptMsg : RelayMsg {
571 IntroEstablished,
572 Introduce2,
573 }
574}
575
576/// A set of extensions to send with our `ESTABLISH_INTRO` message.
577///
578/// NOTE: we eventually might want to support unrecognized extensions. But
579/// that's potentially troublesome, since the set of extensions we sent might
580/// have an affect on how we validate the reply.
581#[derive(Clone, Debug)]
582pub(crate) struct EstIntroExtensionSet {
583 /// Parameters related to rate-limiting to prevent denial-of-service
584 /// attacks.
585 dos_params: Option<est_intro::DosParams>,
586}
587
588/// Implementation structure for the task that implements an IptEstablisher.
589struct Reactor<R: Runtime> {
590 /// A copy of our runtime, used for timeouts and sleeping.
591 runtime: R,
592 /// The nickname of the onion service we're running. Used when logging.
593 nickname: HsNickname,
594 /// A pool used to create circuits to the introduction point.
595 pool: Arc<HsCircPool<R>>,
596 /// A provider used to select the other relays in the circuit.
597 netdir_provider: Arc<dyn NetDirProvider>,
598 /// Identifier for the intro point.
599 lid: IptLocalId,
600 /// The target introduction point.
601 target: RelayIds,
602 /// The keypair to use when establishing the introduction point.
603 ///
604 /// Knowledge of this private key prevents anybody else from impersonating
605 /// us to the introduction point.
606 k_sid: Arc<HsIntroPtSessionIdKeypair>,
607 /// The extensions to use when establishing the introduction point.
608 ///
609 /// TODO (#1209): This should be able to change over time as we re-establish
610 /// the intro point.
611 extensions: EstIntroExtensionSet,
612
613 /// The stream that will receive INTRODUCE2 messages.
614 introduce_tx: mpsc::Sender<RendRequest>,
615
616 /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
617 state: Arc<Mutex<EstablisherState>>,
618
619 /// Context information that we'll need to answer rendezvous requests.
620 request_context: Arc<RendRequestContext>,
621
622 /// Introduction request replay log
623 ///
624 /// Shared between multiple IPT circuit control message handlers -
625 /// [`IptMsgHandler`] contains the lock guard.
626 ///
627 /// Has to be an async mutex since it's locked for a long time,
628 /// so we mustn't block the async executor thread on it.
629 replay_log: Arc<futures::lock::Mutex<IptReplayLog>>,
630}
631
632/// An open session with a single introduction point.
633//
634// TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
635pub(crate) struct IntroPtSession {
636 /// The circuit to the introduction point, on which we're receiving
637 /// Introduce2 messages.
638 intro_tunnel: Arc<ServiceOnionServiceIntroTunnel>,
639}
640
641impl<R: Runtime> Reactor<R> {
642 /// Run forever, keeping an introduction point established.
643 #[allow(clippy::blocks_in_conditions)]
644 #[allow(clippy::cognitive_complexity)]
645 #[instrument(level = "trace", skip_all)]
646 async fn keep_intro_established(
647 &self,
648 mut status_tx: DropNotifyWatchSender<IptStatus>,
649 ) -> Result<Void, IptEstablisherError> {
650 let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000);
651 loop {
652 status_tx.borrow_mut().note_attempt();
653 match self.establish_intro_once().await {
654 Ok((session, good_ipt_details)) => {
655 // TODO (#1239): we need to monitor the netdir for changes to this relay
656 // Eg,
657 // - if it becomes unlisted, we should declare the IPT faulty
658 // (until it perhaps reappears)
659 //
660 // TODO SPEC Continuing to use an unlisted relay is dangerous
661 // It might be malicious. We should withdraw our IPT then,
662 // and hope that clients find another, working, IPT.
663 //
664 // - if it changes its ntor key or link specs,
665 // we need to update the GoodIptDetails in our status report,
666 // so that the updated info can make its way to the descriptor
667 //
668 // Possibly some this could/should be done by the IPT Manager instead,
669 // but Diziet thinks it is probably cleanest to do it here.
670
671 status_tx.borrow_mut().note_open(good_ipt_details);
672
673 debug!(
674 "{}: Successfully established introduction point with {}",
675 &self.nickname,
676 self.target.display_relay_ids().redacted()
677 );
678 // Now that we've succeeded, we can stop backing off for our
679 // next attempt.
680 retry_delay.reset();
681
682 // Wait for the session to be closed.
683 session.wait_for_close().await;
684 }
685 Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => {
686 // The network directory didn't include this relay. Wait
687 // until it does.
688 //
689 // Note that this `note_error` will necessarily mark the
690 // ipt as Faulty. That's important, since we may be about to
691 // wait indefinitely when we call wait_for_netdir_to_list.
692 status_tx.borrow_mut().note_error(&e, self.runtime.now());
693 self.netdir_provider
694 .wait_for_netdir_to_list(&self.target, Timeliness::Timely)
695 .await?;
696 }
697 Err(e) => {
698 status_tx.borrow_mut().note_error(&e, self.runtime.now());
699 debug_report!(
700 e,
701 "{}: Problem establishing introduction point with {}",
702 &self.nickname,
703 self.target.display_relay_ids().redacted()
704 );
705 let retry_after = retry_delay.next_delay(&mut rand::rng());
706 self.runtime.sleep(retry_after).await;
707 }
708 }
709 }
710 }
711
712 /// Try, once, to make a circuit to a single relay and establish an introduction
713 /// point there.
714 ///
715 /// Does not retry. Does not time out except via `HsCircPool`.
716 #[instrument(level = "trace", skip_all)]
717 async fn establish_intro_once(
718 &self,
719 ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> {
720 let (protovers, tunnel, ipt_details) = {
721 let netdir = self
722 .netdir_provider
723 .wait_for_netdir(tor_netdir::Timeliness::Timely)
724 .await?;
725 let circ_target = netdir
726 .by_ids(&self.target)
727 .ok_or(IptError::IntroPointNotListed)?;
728 let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?;
729
730 let protovers = circ_target.protovers().clone();
731 let tunnel = self
732 .pool
733 .get_or_launch_svc_intro(netdir.as_ref(), circ_target)
734 .await
735 .map_err(IptEstablisherError::BuildCircuit)?;
736 // note that netdir is dropped here, to avoid holding on to it any
737 // longer than necessary.
738 (protovers, tunnel, ipt_details)
739 };
740
741 let establish_intro = {
742 let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into();
743 let mut details = EstablishIntroDetails::new(ipt_sid_id);
744 if let Some(dos_params) = &self.extensions.dos_params {
745 // We only send the Dos extension when the relay is known to
746 // support it.
747 use tor_protover::named::HSINTRO_RATELIM;
748 if protovers.supports_named_subver(HSINTRO_RATELIM) {
749 details.set_extension_dos(dos_params.clone());
750 }
751 }
752 let circuit_binding_key = tunnel
753 .binding_key(TargetHop::LastHop)
754 .await
755 .map_err(IptEstablisherError::CircuitState)?
756 .ok_or(internal!("No binding key for introduction point!?"))?;
757 let body: Vec<u8> = details
758 .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac())
759 .map_err(IptEstablisherError::CreateEstablishIntro)?;
760
761 // TODO: This is ugly, but it is the sensible way to munge the above
762 // body into a format that AnyRelayMsgOuter will accept without doing a
763 // redundant parse step.
764 //
765 // One alternative would be allowing start_conversation to take an `impl
766 // RelayMsg` rather than an AnyRelayMsg.
767 //
768 // Or possibly, when we feel like it, we could rename one or more of
769 // these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll
770 // potentially face breaking changes up and down our crate stack.
771 AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
772 tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
773 body,
774 ))
775 };
776
777 let (established_tx, established_rx) = oneshot::channel();
778
779 // In theory there ought to be only one IptMsgHandler in existence at any one time,
780 // for any one IptLocalId (ie for any one IptReplayLog). However, the teardown
781 // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous
782 // (so we need to synchronise). Therefore:
783 //
784 // Make sure we don't start writing to the replay log until any previous
785 // IptMsgHandler has been torn down. (Using an async mutex means we
786 // don't risk blocking the whole executor even if we have teardown bugs.)
787 let replay_log = self.replay_log.clone().lock_owned().await;
788
789 let handler = IptMsgHandler {
790 established_tx: Some(established_tx),
791 introduce_tx: self.introduce_tx.clone(),
792 state: self.state.clone(),
793 lid: self.lid,
794 request_context: self.request_context.clone(),
795 replay_log,
796 };
797 let _conversation = tunnel
798 .start_conversation(Some(establish_intro), handler, TargetHop::LastHop)
799 .await
800 .map_err(IptEstablisherError::SendEstablishIntro)?;
801 // At this point, we have `await`ed for the Conversation to exist, so we know
802 // that the message was sent. We have to wait for any actual `established`
803 // message, though.
804
805 let length = tunnel
806 .n_hops()
807 .map_err(into_internal!("failed to get circuit length"))?;
808 let ack_timeout = self
809 .pool
810 .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip { length });
811 let _established: IntroEstablished = self
812 .runtime
813 .timeout(ack_timeout, established_rx)
814 .await
815 .map_err(|_| IptEstablisherError::EstablishTimeout)?
816 .map_err(|_| IptEstablisherError::ClosedWithoutAck)??;
817
818 // This session will be owned by keep_intro_established(), and dropped
819 // when the circuit closes, or when the keep_intro_established() future
820 // is dropped.
821 let session = IntroPtSession {
822 intro_tunnel: tunnel.into(),
823 };
824 Ok((session, ipt_details))
825 }
826}
827
828impl IntroPtSession {
829 /// Wait for this introduction point session to be closed.
830 fn wait_for_close(&self) -> impl Future<Output = ()> + use<> {
831 self.intro_tunnel.wait_for_close()
832 }
833}
834
835/// MsgHandler type to implement a conversation with an introduction point.
836///
837/// This, like all MsgHandlers, is installed at the circuit's reactor, and used
838/// to handle otherwise unrecognized message types.
839struct IptMsgHandler {
840 /// A oneshot sender used to report our IntroEstablished message.
841 ///
842 /// If this is None, then we already sent an IntroEstablished and we shouldn't
843 /// send any more.
844 established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>,
845
846 /// A channel used to report Introduce2 messages.
847 introduce_tx: mpsc::Sender<RendRequest>,
848
849 /// Keys that we'll need to answer the introduction requests.
850 request_context: Arc<RendRequestContext>,
851
852 /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
853 state: Arc<Mutex<EstablisherState>>,
854
855 /// Unique identifier for the introduction point (including the current
856 /// keys). Used to tag requests.
857 lid: IptLocalId,
858
859 /// A replay log used to detect replayed introduction requests.
860 replay_log: futures::lock::OwnedMutexGuard<IptReplayLog>,
861}
862
863impl tor_proto::MsgHandler for IptMsgHandler {
864 fn handle_msg(&mut self, any_msg: AnyRelayMsg) -> tor_proto::Result<MetaCellDisposition> {
865 let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| {
866 if let Some(tx) = self.established_tx.take() {
867 let _ = tx.send(Err(IptError::BadMessage(format!(
868 "Invalid message type {}",
869 m.cmd()
870 ))
871 .into()));
872 }
873 // TODO: It's not completely clear whether CircProto is the right
874 // type for use in this function (here and elsewhere);
875 // possibly, we should add a different tor_proto::Error type
876 // for protocol violations at a higher level than the circuit
877 // protocol.
878 //
879 // For now, however, this error type is fine: it will cause the
880 // circuit to be shut down, which is what we want.
881 tor_proto::Error::CircProto(format!(
882 "Invalid message type {} on introduction circuit",
883 m.cmd()
884 ))
885 })?;
886
887 if match msg {
888 IptMsg::IntroEstablished(established) => match self.established_tx.take() {
889 Some(tx) => {
890 // TODO: Once we want to enforce any properties on the
891 // intro_established message (like checking for correct
892 // extensions) we should do it here.
893 let established = Ok(established);
894 tx.send(established).map_err(|_| ())
895 }
896 None => {
897 return Err(tor_proto::Error::CircProto(
898 "Received a redundant INTRO_ESTABLISHED".into(),
899 ));
900 }
901 },
902 IptMsg::Introduce2(introduce2) => {
903 if let Some(tx) = self.established_tx.take() {
904 let _ = tx.send(Err(IptError::BadMessage(
905 "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(),
906 )
907 .into()));
908 return Err(tor_proto::Error::CircProto(
909 "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(),
910 ));
911 }
912 let disp = self.state.lock().expect("poisoned lock").accepting_requests;
913 match disp {
914 RequestDisposition::NotAdvertised => {
915 return Err(tor_proto::Error::CircProto(
916 "Received an INTRODUCE2 message before we were accepting requests!"
917 .into(),
918 ));
919 }
920 RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc),
921 RequestDisposition::Advertised => {}
922 }
923 match self.replay_log.check_for_replay(&introduce2) {
924 Ok(()) => {}
925 Err(ReplayError::AlreadySeen) => {
926 // This is probably a replay, but maybe an accident. We
927 // just drop the request.
928
929 // TODO (#1233): Log that this has occurred, with a rate
930 // limit. Possibly, we should allow it to fail once or
931 // twice per circuit before we log, since we expect
932 // a nonzero false-positive rate.
933 //
934 // Note that we should NOT close the circuit in this
935 // case: the repeated message could come from a hostile
936 // introduction point trying to do traffic analysis, but
937 // it could also come from a user trying to make it look
938 // like the intro point is doing traffic analysis.
939 return Ok(MetaCellDisposition::Consumed);
940 }
941 Err(ReplayError::Log(_)) => {
942 // Uh-oh! We failed to write the data persistently!
943 //
944 // TODO (#1226): We need to decide what to do here. Right
945 // now we close the circuit, which is wrong.
946 return Ok(MetaCellDisposition::CloseCirc);
947 }
948 }
949
950 let request = RendRequest::new(self.lid, introduce2, self.request_context.clone());
951 let send_outcome = self.introduce_tx.try_send(request);
952
953 // We only want to report full-stream problems as errors here.
954 // Disconnected streams are expected.
955 let report_outcome = match &send_outcome {
956 Err(e) if e.is_full() => Err(StreamWasFull {}),
957 _ => Ok(()),
958 };
959 // TODO: someday we might want to start tracking this by
960 // introduction or service point separately, though we would
961 // expect their failures to be correlated.
962 log_ratelim!("sending rendezvous request to handler task"; report_outcome);
963
964 match send_outcome {
965 Ok(()) => Ok(()),
966 Err(e) => {
967 if e.is_disconnected() {
968 // The receiver is disconnected, meaning that
969 // messages from this intro point are no longer
970 // wanted. Close the circuit.
971 Err(())
972 } else {
973 // The receiver is full; we have no real option but
974 // to drop the request like C-tor does when the
975 // backlog is too large.
976 //
977 // See discussion at
978 // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349
979 Ok(())
980 }
981 }
982 }
983 }
984 } == Err(())
985 {
986 // If the above return an error, we failed to send. That means that
987 // we need to close the circuit, since nobody is listening on the
988 // other end of the tx.
989 return Ok(MetaCellDisposition::CloseCirc);
990 }
991
992 Ok(MetaCellDisposition::Consumed)
993 }
994}
995
996/// We failed to send a rendezvous request onto the handler test that should
997/// have handled it, because it was not handling requests fast enough.
998///
999/// (This is a separate type so that we can have it implement Clone.)
1000#[derive(Clone, Debug, thiserror::Error)]
1001#[error("Could not send request; stream was full.")]
1002struct StreamWasFull {}