tor_hsservice/
ipt_establish.rs

1//! IPT Establisher
2//!
3//! Responsible for maintaining and establishing one introduction point.
4//!
5//! TODO (#1235): move docs from `hssvc-ipt-algorithm.md`
6//!
7//! See the docs for
8//! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now)
9//! for details of our algorithm.
10
11use crate::internal_prelude::*;
12
13use tor_cell::relaycell::{
14    hs::est_intro::{self, EstablishIntroDetails},
15    msg::IntroEstablished,
16};
17
18/// Handle onto the task which is establishing and maintaining one IPT
19pub(crate) struct IptEstablisher {
20    /// A oneshot sender which, when dropped, notifies the running task that it's time to shut
21    /// down.
22    _terminate_tx: oneshot::Sender<Void>,
23
24    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
25    state: Arc<Mutex<EstablisherState>>,
26}
27
28/// When the `IptEstablisher` is dropped it is torn down
29///
30/// Synchronously
31///
32///  * No rendezvous requests will be accepted
33///    that arrived after `Drop::drop` returns.
34///
35/// Asynchronously
36///
37///  * Circuits constructed for this IPT are torn down
38///  * The `rend_reqs` sink is closed (dropped)
39///  * `IptStatusStatus::Faulty` will be indicated
40impl Drop for IptEstablisher {
41    fn drop(&mut self) {
42        // Make sure no more requests are accepted once this returns.
43        //
44        // (Note that if we didn't care about the "no more rendezvous
45        // requests will be accepted" requirement, we could do away with this
46        // code and the corresponding check for `RequestDisposition::Shutdown` in
47        // `IptMsgHandler::handle_msg`.)
48        self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown;
49
50        // Tell the reactor to shut down... by doing nothing.
51        //
52        // (When terminate_tx is dropped, it will send an error to the
53        // corresponding terminate_rx.)
54    }
55}
56
57/// An error from trying to work with an IptEstablisher.
58#[derive(Clone, Debug, thiserror::Error)]
59pub(crate) enum IptEstablisherError {
60    /// We encountered a faulty IPT.
61    #[error("{0}")]
62    Ipt(#[from] IptError),
63
64    /// The network directory provider is shutting down without giving us the
65    /// netdir we asked for.
66    #[error("{0}")]
67    NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown),
68
69    /// We encountered an error while building a circuit to an intro point.
70    #[error("Unable to build circuit to introduction point")]
71    BuildCircuit(#[source] tor_circmgr::Error),
72
73    /// We encountered an error while querying the circuit state.
74    #[error("Unable to query circuit state")]
75    CircuitState(#[source] tor_proto::Error),
76
77    /// We encountered an error while building and signing our establish_intro
78    /// message.
79    #[error("Unable to construct signed ESTABLISH_INTRO message")]
80    CreateEstablishIntro(#[source] tor_cell::Error),
81
82    /// We encountered a timeout after building the circuit.
83    #[error("Timeout during ESTABLISH_INTRO handshake.")]
84    EstablishTimeout,
85
86    /// We encountered an error while sending our establish_intro
87    /// message.
88    #[error("Unable to send an ESTABLISH_INTRO message")]
89    SendEstablishIntro(#[source] tor_proto::Error),
90
91    /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the
92    /// circuit was closed.
93    #[error("Circuit closed during INTRO_ESTABLISHED handshake")]
94    ClosedWithoutAck,
95
96    /// We encountered a programming error.
97    #[error("Internal error")]
98    Bug(#[from] tor_error::Bug),
99}
100
101/// An error caused by a faulty IPT.
102#[derive(Clone, Debug, thiserror::Error)]
103#[non_exhaustive]
104pub enum IptError {
105    /// When we tried to establish this introduction point, we found that the
106    /// netdir didn't list it.
107    ///
108    /// This introduction point is not strictly "faulty", but unlisted in the directory means we
109    /// can't use the introduction point.
110    #[error("Introduction point not listed in network directory")]
111    IntroPointNotListed,
112
113    /// We received an invalid INTRO_ESTABLISHED message.
114    ///
115    /// This is definitely the introduction point's fault: it sent us
116    /// an authenticated message, but the contents of that message were
117    /// definitely wrong.
118    #[error("Got an invalid INTRO_ESTABLISHED message")]
119    // Eventually, once we expect intro_established extensions, we will make
120    // sure that they are well-formed.
121    #[allow(dead_code)]
122    BadEstablished,
123
124    /// We received a message that not a valid part of the introduction-point
125    /// protocol.
126    #[error("Invalid message: {0}")]
127    BadMessage(String),
128
129    /// This introduction point has gone too long without a success.
130    #[error("introduction point has gone too long without a success")]
131    Timeout,
132}
133
134impl tor_error::HasKind for IptEstablisherError {
135    fn kind(&self) -> tor_error::ErrorKind {
136        use tor_error::ErrorKind as EK;
137        use IptEstablisherError as E;
138        match self {
139            E::Ipt(e) => e.kind(),
140            E::NetdirProviderShutdown(e) => e.kind(),
141            E::BuildCircuit(e) => e.kind(),
142            E::CircuitState(e) => e.kind(),
143            E::EstablishTimeout => EK::TorNetworkTimeout,
144            E::SendEstablishIntro(e) => e.kind(),
145            E::ClosedWithoutAck => EK::CircuitCollapse,
146            E::CreateEstablishIntro(_) => EK::Internal,
147            E::Bug(e) => e.kind(),
148        }
149    }
150}
151
152impl tor_error::HasKind for IptError {
153    fn kind(&self) -> tor_error::ErrorKind {
154        use tor_error::ErrorKind as EK;
155        use IptError as E;
156        match self {
157            E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind.
158            E::BadEstablished => EK::RemoteProtocolViolation,
159            E::BadMessage(_) => EK::RemoteProtocolViolation,
160            E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct
161        }
162    }
163}
164
165impl IptEstablisherError {
166    /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault.
167    ///
168    /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`,
169    /// it means that we should try another relay as an introduction point,
170    /// though we don't necessarily need to give up on this one.
171    ///
172    /// Note that the intro point may be to blame even if we return `None`;
173    /// we only return `true` when we are certain that the intro point is
174    /// unlisted, unusable, or misbehaving.
175    fn ipt_failure(&self) -> Option<&IptError> {
176        use IptEstablisherError as IE;
177        match self {
178            // If we don't have a netdir, then no intro point is better than any other.
179            IE::NetdirProviderShutdown(_) => None,
180            IE::Ipt(e) => Some(e),
181            // This _might_ be the introduction point's fault, but it might not.
182            // We can't be certain.
183            IE::BuildCircuit(_) => None,
184            IE::CircuitState(_) => None,
185            IE::EstablishTimeout => None,
186            IE::ClosedWithoutAck => None,
187            // These are, most likely, not the introduction point's fault,
188            // though they might or might not be survivable.
189            IE::CreateEstablishIntro(_) => None,
190            IE::SendEstablishIntro(_) => None,
191            IE::Bug(_) => None,
192        }
193    }
194}
195
196/// Parameters for an introduction point
197///
198/// Consumed by `IptEstablisher::new`.
199/// Primarily serves as a convenient way to bundle the many arguments required.
200///
201/// Does not include:
202///  * The runtime (which would force this struct to have a type parameter)
203///  * The circuit builder (leaving this out makes it possible to use this
204///    struct during mock execution, where we don't call `IptEstablisher::new`).
205#[derive(Educe)]
206#[educe(Debug)]
207pub(crate) struct IptParameters {
208    /// A receiver that we can use to tell us about updates in our configuration.
209    ///
210    /// Configuration changes may tell us to change our introduction points or build new
211    /// circuits to them.
212    //
213    // TODO (#1209):
214    //
215    // We want to make a new introduction circuit if our dos parameters change,
216    // which means that we should possibly be watching for changes in our
217    // configuration.  Right now, though, we only copy out the configuration
218    // on startup.
219    #[educe(Debug(ignore))]
220    pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
221    /// A `NetDirProvider` that we'll use to find changes in the network
222    /// parameters, and to look up information about routers.
223    #[educe(Debug(ignore))]
224    pub(crate) netdir_provider: Arc<dyn NetDirProvider>,
225    /// A shared sender that we'll use to report incoming INTRODUCE2 requests
226    /// for rendezvous circuits.
227    #[educe(Debug(ignore))]
228    pub(crate) introduce_tx: mpsc::Sender<RendRequest>,
229    /// Opaque local ID for this introduction point.
230    ///
231    /// This ID does not change within the lifetime of an [`IptEstablisher`].
232    /// See [`IptLocalId`] for information about what changes would require a
233    /// new ID (and hence a new `IptEstablisher`).
234    pub(crate) lid: IptLocalId,
235    /// Persistent log for INTRODUCE2 requests.
236    ///
237    /// We use this to record the requests that we see, and to prevent replays.
238    #[educe(Debug(ignore))]
239    pub(crate) replay_log: IptReplayLog,
240    /// A set of identifiers for the Relay that we intend to use as the
241    /// introduction point.
242    ///
243    /// We use this to identify the relay within a `NetDir`, and to make sure
244    /// we're connecting to the right introduction point.
245    pub(crate) target: RelayIds,
246    /// Keypair used to authenticate and identify ourselves to this introduction
247    /// point.
248    ///
249    /// Later, we publish the public component of this keypair in our HsDesc,
250    /// and clients use it to tell the introduction point which introduction circuit
251    /// should receive their requests.
252    ///
253    /// This is the `K_hs_ipt_sid` keypair.
254    pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>,
255    /// Whether this `IptEstablisher` should begin by accepting requests, or
256    /// wait to be told that requests are okay.
257    pub(crate) accepting_requests: RequestDisposition,
258    /// Keypair used to decrypt INTRODUCE2 requests from clients.
259    ///
260    /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to
261    /// form a shared key set of keys with the client, and decrypt information
262    /// about the client's chosen rendezvous point and extensions.
263    pub(crate) k_ntor: Arc<HsSvcNtorKeypair>,
264}
265
266impl IptEstablisher {
267    /// Try to set up, and maintain, an IPT at `target`.
268    ///
269    /// Rendezvous requests will be rejected or accepted
270    /// depending on the value of `accepting_requests`
271    /// (which must be `Advertised` or `NotAdvertised`).
272    ///
273    /// Also returns a stream of events that is produced whenever we have a
274    /// change in the IptStatus for this intro point.  Note that this stream is
275    /// potentially lossy.
276    ///
277    /// The returned `watch::Receiver` will yield `Faulty` if the IPT
278    /// establisher is shut down (or crashes).
279    ///
280    /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks
281    /// and close all circuits used to establish this introduction point.
282    pub(crate) fn launch<R: Runtime>(
283        runtime: &R,
284        params: IptParameters,
285        pool: Arc<HsCircPool<R>>,
286        keymgr: &Arc<KeyMgr>,
287    ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> {
288        // This exhaustive deconstruction ensures that we don't
289        // accidentally forget to handle any of our inputs.
290        let IptParameters {
291            config_rx,
292            netdir_provider,
293            introduce_tx,
294            lid,
295            target,
296            k_sid,
297            k_ntor,
298            accepting_requests,
299            replay_log,
300        } = params;
301        let config = Arc::clone(&config_rx.borrow());
302        let nickname = config.nickname().clone();
303
304        if matches!(accepting_requests, RequestDisposition::Shutdown) {
305            return Err(bad_api_usage!(
306                "Tried to create a IptEstablisher that that was already shutting down?"
307            )
308            .into());
309        }
310
311        let state = Arc::new(Mutex::new(EstablisherState { accepting_requests }));
312
313        let request_context = Arc::new(RendRequestContext {
314            nickname: nickname.clone(),
315            keymgr: Arc::clone(keymgr),
316            kp_hss_ntor: Arc::clone(&k_ntor),
317            kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(),
318            filter: config.filter_settings(),
319            netdir_provider: netdir_provider.clone(),
320            circ_pool: pool.clone(),
321        });
322
323        let reactor = Reactor {
324            runtime: runtime.clone(),
325            nickname,
326            pool,
327            netdir_provider,
328            lid,
329            target,
330            k_sid,
331            introduce_tx,
332            extensions: EstIntroExtensionSet {
333                // Updates to this are handled by the IPT manager: when it changes,
334                // this IPT will be replaced with one with the correct parameters.
335                dos_params: config.dos_extension()?,
336            },
337            state: state.clone(),
338            request_context,
339            replay_log: Arc::new(replay_log.into()),
340        };
341
342        let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new());
343        let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>();
344        let status_tx = DropNotifyWatchSender::new(status_tx);
345
346        // Spawn a task to keep the intro established.  The task will shut down
347        // when terminate_tx is dropped.
348        runtime
349            .spawn(async move {
350                futures::select_biased!(
351                    terminated = terminate_rx => {
352                        // Only Err is possible, but the compiler can't tell that.
353                        let oneshot::Canceled = terminated.void_unwrap_err();
354                    }
355                    outcome = reactor.keep_intro_established(status_tx).fuse() =>  {
356                      warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task");
357                    }
358                );
359            })
360            .map_err(|e| FatalError::Spawn {
361                spawning: "introduction point establisher",
362                cause: Arc::new(e),
363            })?;
364        let establisher = IptEstablisher {
365            _terminate_tx: terminate_tx,
366            state,
367        };
368        Ok((establisher, status_rx))
369    }
370
371    /// Begin accepting requests from this introduction point.
372    ///
373    /// If any introduction requests are sent before we have called this method,
374    /// they are treated as an error and our connection to this introduction
375    /// point is closed.
376    pub(crate) fn start_accepting(&self) {
377        self.state.lock().expect("poisoned lock").accepting_requests =
378            RequestDisposition::Advertised;
379    }
380}
381
382/// The current status of an introduction point, as defined in
383/// `hssvc-ipt-algorithms.md`.
384///
385/// TODO (#1235) Make that file unneeded.
386#[derive(Clone, Debug)]
387pub(crate) enum IptStatusStatus {
388    /// We are (re)establishing our connection to the IPT
389    ///
390    /// But we don't think there's anything wrong with it.
391    ///
392    /// The IPT manager should *not* arrange to include this in descriptors.
393    Establishing,
394
395    /// The IPT is established and ready to accept rendezvous requests
396    ///
397    /// Also contains information about the introduction point
398    /// necessary for making descriptors,
399    /// including information from the netdir about the relay
400    ///
401    /// The IPT manager *should* arrange to include this in descriptors.
402    Good(GoodIptDetails),
403
404    /// We don't have the IPT and it looks like it was the IPT's fault
405    ///
406    /// This should be used whenever trying another IPT relay is likely to work better;
407    /// regardless of whether attempts to establish *this* IPT can continue.
408    ///
409    /// The IPT manager should *not* arrange to include this in descriptors.
410    /// If this persists, the IPT manager should replace this IPT
411    /// with a new IPT at a different relay.
412    Faulty(Option<IptError>),
413}
414
415/// Details of a good introduction point
416///
417/// This struct contains similar information to
418/// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`].
419/// However, that insists that the contained `T` is a [`CircTarget`],
420/// which `<NtorPublicKey>` isn't.
421/// And, we don't use this as a circuit target (at least, not here -
422/// the client will do so, as a result of us publishing the information).
423///
424/// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974>
425#[derive(Clone, Debug, Eq, PartialEq)]
426pub(crate) struct GoodIptDetails {
427    /// The link specifiers to be used in the descriptor
428    ///
429    /// As obtained and converted from the netdir.
430    pub(crate) link_specifiers: LinkSpecs,
431
432    /// The introduction point relay's ntor key (from the netdir)
433    pub(crate) ipt_kp_ntor: NtorPublicKey,
434}
435
436impl GoodIptDetails {
437    /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails.
438    fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> {
439        Ok(Self {
440            link_specifiers: relay
441                .linkspecs()
442                .map_err(into_internal!("Unable to encode relay link specifiers"))?,
443            ipt_kp_ntor: *relay.ntor_onion_key(),
444        })
445    }
446}
447
448/// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
449///
450/// This happens when the IPT has had (too) many rendezvous requests.
451///
452/// This must *not* be used for *errors*, because it will cause the IPT manager to
453/// *immediately* start to replace the IPT, regardless of rate limits etc.
454#[derive(Clone, Debug, Eq, PartialEq)]
455pub(crate) struct IptWantsToRetire;
456
457/// State shared between the IptEstablisher and the Reactor.
458struct EstablisherState {
459    /// True if we are accepting requests right now.
460    accepting_requests: RequestDisposition,
461}
462
463/// Current state of an introduction point; determines what we want to do with
464/// any incoming messages.
465#[derive(Copy, Clone, Debug)]
466pub(crate) enum RequestDisposition {
467    /// We are not yet advertised: the message handler should complain if it
468    /// gets any requests and shut down.
469    NotAdvertised,
470    /// We are advertised: the message handler should pass along any requests
471    Advertised,
472    /// We are shutting down cleanly: the message handler should exit but not complain.
473    Shutdown,
474}
475
476/// The current status of an introduction point.
477#[derive(Clone, Debug)]
478pub(crate) struct IptStatus {
479    /// The current state of this introduction point as defined by
480    /// `hssvc-ipt-algorithms.md`.
481    ///
482    /// TODO (#1235): Make that file unneeded.
483    pub(crate) status: IptStatusStatus,
484
485    /// The current status of whether this introduction point circuit wants to be
486    /// retired based on having processed too many requests.
487    pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
488
489    /// If Some, a time after which all attempts have been unsuccessful.
490    pub(crate) failing_since: Option<Instant>,
491}
492
493/// We declare an introduction point to be faulty if all of the attempts to
494/// reach it fail, over this much time.
495///
496/// TODO: This value is more or less arbitrary; we may want to tune it in the
497/// future.
498const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60);
499
500impl IptStatus {
501    /// Record that we have successfully connected to an introduction point.
502    fn note_open(&mut self, ipt_details: GoodIptDetails) {
503        self.status = IptStatusStatus::Good(ipt_details);
504        self.failing_since = None;
505    }
506
507    /// Record that we are trying to connect to an introduction point.
508    fn note_attempt(&mut self) {
509        use IptStatusStatus::*;
510        self.status = match &self.status {
511            Establishing | Good(..) => Establishing,
512            Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken.
513        }
514    }
515
516    /// Record that an error has occurred.
517    fn note_error(&mut self, err: &IptEstablisherError, now: Instant) {
518        use IptStatusStatus::*;
519        let failing_since = *self.failing_since.get_or_insert(now);
520        #[allow(clippy::if_same_then_else)]
521        if let Some(ipt_err) = err.ipt_failure() {
522            // This error always indicates a faulty introduction point.
523            self.status = Faulty(Some(ipt_err.clone()));
524        } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD {
525            // This introduction point has gone too long without a success.
526            self.status = Faulty(Some(IptError::Timeout));
527        }
528    }
529
530    /// Return an `IptStatus` representing an establisher that has not yet taken
531    /// any action.
532    fn new() -> Self {
533        Self {
534            status: IptStatusStatus::Establishing,
535            wants_to_retire: Ok(()),
536            failing_since: None,
537        }
538    }
539
540    /// Produce an `IptStatus` representing a shut down or crashed establisher
541    fn new_terminated() -> Self {
542        IptStatus {
543            status: IptStatusStatus::Faulty(None),
544            // If we're broken, we simply tell the manager that that is the case.
545            // It will decide for itself whether it wants to replace us.
546            wants_to_retire: Ok(()),
547            failing_since: None,
548        }
549    }
550}
551
552impl Default for IptStatus {
553    fn default() -> Self {
554        Self::new()
555    }
556}
557
558impl tor_async_utils::DropNotifyEofSignallable for IptStatus {
559    fn eof() -> IptStatus {
560        IptStatus::new_terminated()
561    }
562}
563
564tor_cell::restricted_msg! {
565    /// An acceptable message to receive from an introduction point.
566     enum IptMsg : RelayMsg {
567         IntroEstablished,
568         Introduce2,
569     }
570}
571
572/// A set of extensions to send with our `ESTABLISH_INTRO` message.
573///
574/// NOTE: we eventually might want to support unrecognized extensions.  But
575/// that's potentially troublesome, since the set of extensions we sent might
576/// have an affect on how we validate the reply.
577#[derive(Clone, Debug)]
578pub(crate) struct EstIntroExtensionSet {
579    /// Parameters related to rate-limiting to prevent denial-of-service
580    /// attacks.
581    dos_params: Option<est_intro::DosParams>,
582}
583
584/// Implementation structure for the task that implements an IptEstablisher.
585struct Reactor<R: Runtime> {
586    /// A copy of our runtime, used for timeouts and sleeping.
587    runtime: R,
588    /// The nickname of the onion service we're running. Used when logging.
589    nickname: HsNickname,
590    /// A pool used to create circuits to the introduction point.
591    pool: Arc<HsCircPool<R>>,
592    /// A provider used to select the other relays in the circuit.
593    netdir_provider: Arc<dyn NetDirProvider>,
594    /// Identifier for the intro point.
595    lid: IptLocalId,
596    /// The target introduction point.
597    target: RelayIds,
598    /// The keypair to use when establishing the introduction point.
599    ///
600    /// Knowledge of this private key prevents anybody else from impersonating
601    /// us to the introduction point.
602    k_sid: Arc<HsIntroPtSessionIdKeypair>,
603    /// The extensions to use when establishing the introduction point.
604    ///
605    /// TODO (#1209): This should be able to change over time as we re-establish
606    /// the intro point.
607    extensions: EstIntroExtensionSet,
608
609    /// The stream that will receive INTRODUCE2 messages.
610    introduce_tx: mpsc::Sender<RendRequest>,
611
612    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
613    state: Arc<Mutex<EstablisherState>>,
614
615    /// Context information that we'll need to answer rendezvous requests.
616    request_context: Arc<RendRequestContext>,
617
618    /// Introduction request replay log
619    ///
620    /// Shared between multiple IPT circuit control message handlers -
621    /// [`IptMsgHandler`] contains the lock guard.
622    ///
623    /// Has to be an async mutex since it's locked for a long time,
624    /// so we mustn't block the async executor thread on it.
625    replay_log: Arc<futures::lock::Mutex<IptReplayLog>>,
626}
627
628/// An open session with a single introduction point.
629//
630// TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
631pub(crate) struct IntroPtSession {
632    /// The circuit to the introduction point, on which we're receiving
633    /// Introduce2 messages.
634    intro_circ: Arc<ClientCirc>,
635}
636
637impl<R: Runtime> Reactor<R> {
638    /// Run forever, keeping an introduction point established.
639    #[allow(clippy::blocks_in_conditions)]
640    async fn keep_intro_established(
641        &self,
642        mut status_tx: DropNotifyWatchSender<IptStatus>,
643    ) -> Result<Void, IptEstablisherError> {
644        let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000);
645        loop {
646            status_tx.borrow_mut().note_attempt();
647            match self.establish_intro_once().await {
648                Ok((session, good_ipt_details)) => {
649                    // TODO (#1239): we need to monitor the netdir for changes to this relay
650                    // Eg,
651                    //   - if it becomes unlisted, we should declare the IPT faulty
652                    //     (until it perhaps reappears)
653                    //
654                    //     TODO SPEC  Continuing to use an unlisted relay is dangerous
655                    //     It might be malicious.  We should withdraw our IPT then,
656                    //     and hope that clients find another, working, IPT.
657                    //
658                    //   - if it changes its ntor key or link specs,
659                    //     we need to update the GoodIptDetails in our status report,
660                    //     so that the updated info can make its way to the descriptor
661                    //
662                    // Possibly some this could/should be done by the IPT Manager instead,
663                    // but Diziet thinks it is probably cleanest to do it here.
664
665                    status_tx.borrow_mut().note_open(good_ipt_details);
666
667                    debug!(
668                        "{}: Successfully established introduction point with {}",
669                        &self.nickname,
670                        self.target.display_relay_ids().redacted()
671                    );
672                    // Now that we've succeeded, we can stop backing off for our
673                    // next attempt.
674                    retry_delay.reset();
675
676                    // Wait for the session to be closed.
677                    session.wait_for_close().await;
678                }
679                Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => {
680                    // The network directory didn't include this relay.  Wait
681                    // until it does.
682                    //
683                    // Note that this `note_error` will necessarily mark the
684                    // ipt as Faulty. That's important, since we may be about to
685                    // wait indefinitely when we call wait_for_netdir_to_list.
686                    status_tx.borrow_mut().note_error(&e, self.runtime.now());
687                    self.netdir_provider
688                        .wait_for_netdir_to_list(&self.target, Timeliness::Timely)
689                        .await?;
690                }
691                Err(e) => {
692                    status_tx.borrow_mut().note_error(&e, self.runtime.now());
693                    debug_report!(
694                        e,
695                        "{}: Problem establishing introduction point with {}",
696                        &self.nickname,
697                        self.target.display_relay_ids().redacted()
698                    );
699                    let retry_after = retry_delay.next_delay(&mut rand::rng());
700                    self.runtime.sleep(retry_after).await;
701                }
702            }
703        }
704    }
705
706    /// Try, once, to make a circuit to a single relay and establish an introduction
707    /// point there.
708    ///
709    /// Does not retry.  Does not time out except via `HsCircPool`.
710    async fn establish_intro_once(
711        &self,
712    ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> {
713        let (protovers, circuit, ipt_details) = {
714            let netdir = self
715                .netdir_provider
716                .wait_for_netdir(tor_netdir::Timeliness::Timely)
717                .await?;
718            let circ_target = netdir
719                .by_ids(&self.target)
720                .ok_or(IptError::IntroPointNotListed)?;
721            let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?;
722
723            let kind = tor_circmgr::hspool::HsCircKind::SvcIntro;
724            let protovers = circ_target.protovers().clone();
725            let circuit = self
726                .pool
727                .get_or_launch_specific(netdir.as_ref(), kind, circ_target)
728                .await
729                .map_err(IptEstablisherError::BuildCircuit)?;
730            // note that netdir is dropped here, to avoid holding on to it any
731            // longer than necessary.
732            (protovers, circuit, ipt_details)
733        };
734        let intro_pt_hop = circuit
735            .last_hop_num()
736            .map_err(into_internal!("Somehow built a circuit with no hops!?"))?;
737
738        let establish_intro = {
739            let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into();
740            let mut details = EstablishIntroDetails::new(ipt_sid_id);
741            if let Some(dos_params) = &self.extensions.dos_params {
742                // We only send the Dos extension when the relay is known to
743                // support it.
744                use tor_protover::named::HSINTRO_RATELIM;
745                if protovers.supports_named_subver(HSINTRO_RATELIM) {
746                    details.set_extension_dos(dos_params.clone());
747                }
748            }
749            let circuit_binding_key = circuit
750                .binding_key(intro_pt_hop)
751                .map_err(IptEstablisherError::CircuitState)?
752                .ok_or(internal!("No binding key for introduction point!?"))?;
753            let body: Vec<u8> = details
754                .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac())
755                .map_err(IptEstablisherError::CreateEstablishIntro)?;
756
757            // TODO: This is ugly, but it is the sensible way to munge the above
758            // body into a format that AnyRelayMsgOuter will accept without doing a
759            // redundant parse step.
760            //
761            // One alternative would be allowing start_conversation to take an `impl
762            // RelayMsg` rather than an AnyRelayMsg.
763            //
764            // Or possibly, when we feel like it, we could rename one or more of
765            // these "Unrecognized"s to Unparsed or Uninterpreted.  If we do that, however, we'll
766            // potentially face breaking changes up and down our crate stack.
767            AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
768                tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
769                body,
770            ))
771        };
772
773        let (established_tx, established_rx) = oneshot::channel();
774
775        // In theory there ought to be only one IptMsgHandler in existence at any one time,
776        // for any one IptLocalId (ie for any one IptReplayLog).  However, the teardown
777        // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous
778        // (so we need to synchronise).  Therefore:
779        //
780        // Make sure we don't start writing to the replay log until any previous
781        // IptMsgHandler has been torn down.  (Using an async mutex means we
782        // don't risk blocking the whole executor even if we have teardown bugs.)
783        let replay_log = self.replay_log.clone().lock_owned().await;
784
785        let handler = IptMsgHandler {
786            established_tx: Some(established_tx),
787            introduce_tx: self.introduce_tx.clone(),
788            state: self.state.clone(),
789            lid: self.lid,
790            request_context: self.request_context.clone(),
791            replay_log,
792        };
793        let _conversation = circuit
794            .start_conversation(Some(establish_intro), handler, intro_pt_hop)
795            .await
796            .map_err(IptEstablisherError::SendEstablishIntro)?;
797        // At this point, we have `await`ed for the Conversation to exist, so we know
798        // that the message was sent.  We have to wait for any actual `established`
799        // message, though.
800
801        let length = circuit
802            .n_hops()
803            .map_err(into_internal!("failed to get circuit length"))?;
804        let ack_timeout = self
805            .pool
806            .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip { length });
807        let _established: IntroEstablished = self
808            .runtime
809            .timeout(ack_timeout, established_rx)
810            .await
811            .map_err(|_| IptEstablisherError::EstablishTimeout)?
812            .map_err(|_| IptEstablisherError::ClosedWithoutAck)??;
813
814        // This session will be owned by keep_intro_established(), and dropped
815        // when the circuit closes, or when the keep_intro_established() future
816        // is dropped.
817        let session = IntroPtSession {
818            intro_circ: circuit,
819        };
820        Ok((session, ipt_details))
821    }
822}
823
824impl IntroPtSession {
825    /// Wait for this introduction point session to be closed.
826    fn wait_for_close(&self) -> impl Future<Output = ()> {
827        self.intro_circ.wait_for_close()
828    }
829}
830
831/// MsgHandler type to implement a conversation with an introduction point.
832///
833/// This, like all MsgHandlers, is installed at the circuit's reactor, and used
834/// to handle otherwise unrecognized message types.
835struct IptMsgHandler {
836    /// A oneshot sender used to report our IntroEstablished message.
837    ///
838    /// If this is None, then we already sent an IntroEstablished and we shouldn't
839    /// send any more.
840    established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>,
841
842    /// A channel used to report Introduce2 messages.
843    introduce_tx: mpsc::Sender<RendRequest>,
844
845    /// Keys that we'll need to answer the introduction requests.
846    request_context: Arc<RendRequestContext>,
847
848    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
849    state: Arc<Mutex<EstablisherState>>,
850
851    /// Unique identifier for the introduction point (including the current
852    /// keys).  Used to tag requests.
853    lid: IptLocalId,
854
855    /// A replay log used to detect replayed introduction requests.
856    replay_log: futures::lock::OwnedMutexGuard<IptReplayLog>,
857}
858
859impl tor_proto::circuit::MsgHandler for IptMsgHandler {
860    fn handle_msg(&mut self, any_msg: AnyRelayMsg) -> tor_proto::Result<MetaCellDisposition> {
861        let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| {
862            if let Some(tx) = self.established_tx.take() {
863                let _ = tx.send(Err(IptError::BadMessage(format!(
864                    "Invalid message type {}",
865                    m.cmd()
866                ))
867                .into()));
868            }
869            // TODO: It's not completely clear whether CircProto is the right
870            // type for use in this function (here and elsewhere);
871            // possibly, we should add a different tor_proto::Error type
872            // for protocol violations at a higher level than the circuit
873            // protocol.
874            //
875            // For now, however, this error type is fine: it will cause the
876            // circuit to be shut down, which is what we want.
877            tor_proto::Error::CircProto(format!(
878                "Invalid message type {} on introduction circuit",
879                m.cmd()
880            ))
881        })?;
882
883        if match msg {
884            IptMsg::IntroEstablished(established) => match self.established_tx.take() {
885                Some(tx) => {
886                    // TODO: Once we want to enforce any properties on the
887                    // intro_established message (like checking for correct
888                    // extensions) we should do it here.
889                    let established = Ok(established);
890                    tx.send(established).map_err(|_| ())
891                }
892                None => {
893                    return Err(tor_proto::Error::CircProto(
894                        "Received a redundant INTRO_ESTABLISHED".into(),
895                    ));
896                }
897            },
898            IptMsg::Introduce2(introduce2) => {
899                if let Some(tx) = self.established_tx.take() {
900                    let _ = tx.send(Err(IptError::BadMessage(
901                        "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(),
902                    )
903                    .into()));
904                    return Err(tor_proto::Error::CircProto(
905                        "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(),
906                    ));
907                }
908                let disp = self.state.lock().expect("poisoned lock").accepting_requests;
909                match disp {
910                    RequestDisposition::NotAdvertised => {
911                        return Err(tor_proto::Error::CircProto(
912                            "Received an INTRODUCE2 message before we were accepting requests!"
913                                .into(),
914                        ))
915                    }
916                    RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc),
917                    RequestDisposition::Advertised => {}
918                }
919                match self.replay_log.check_for_replay(&introduce2) {
920                    Ok(()) => {}
921                    Err(ReplayError::AlreadySeen) => {
922                        // This is probably a replay, but maybe an accident. We
923                        // just drop the request.
924
925                        // TODO (#1233): Log that this has occurred, with a rate
926                        // limit.  Possibly, we should allow it to fail once or
927                        // twice per circuit before we log, since we expect
928                        // a nonzero false-positive rate.
929                        //
930                        // Note that we should NOT close the circuit in this
931                        // case: the repeated message could come from a hostile
932                        // introduction point trying to do traffic analysis, but
933                        // it could also come from a user trying to make it look
934                        // like the intro point is doing traffic analysis.
935                        return Ok(MetaCellDisposition::Consumed);
936                    }
937                    Err(ReplayError::Log(_)) => {
938                        // Uh-oh! We failed to write the data persistently!
939                        //
940                        // TODO (#1226): We need to decide what to do here.  Right
941                        // now we close the circuit, which is wrong.
942                        return Ok(MetaCellDisposition::CloseCirc);
943                    }
944                }
945
946                let request = RendRequest::new(self.lid, introduce2, self.request_context.clone());
947                let send_outcome = self.introduce_tx.try_send(request);
948
949                // We only want to report full-stream problems as errors here.
950                // Disconnected streams are expected.
951                let report_outcome = match &send_outcome {
952                    Err(e) if e.is_full() => Err(StreamWasFull {}),
953                    _ => Ok(()),
954                };
955                // TODO: someday we might want to start tracking this by
956                // introduction or service point separately, though we would
957                // expect their failures to be correlated.
958                log_ratelim!("sending rendezvous request to handler task"; report_outcome);
959
960                match send_outcome {
961                    Ok(()) => Ok(()),
962                    Err(e) => {
963                        if e.is_disconnected() {
964                            // The receiver is disconnected, meaning that
965                            // messages from this intro point are no longer
966                            // wanted.  Close the circuit.
967                            Err(())
968                        } else {
969                            // The receiver is full; we have no real option but
970                            // to drop the request like C-tor does when the
971                            // backlog is too large.
972                            //
973                            // See discussion at
974                            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349
975                            Ok(())
976                        }
977                    }
978                }
979            }
980        } == Err(())
981        {
982            // If the above return an error, we failed to send.  That means that
983            // we need to close the circuit, since nobody is listening on the
984            // other end of the tx.
985            return Ok(MetaCellDisposition::CloseCirc);
986        }
987
988        Ok(MetaCellDisposition::Consumed)
989    }
990}
991
992/// We failed to send a rendezvous request onto the handler test that should
993/// have handled it, because it was not handling requests fast enough.
994///
995/// (This is a separate type so that we can have it implement Clone.)
996#[derive(Clone, Debug, thiserror::Error)]
997#[error("Could not send request; stream was full.")]
998struct StreamWasFull {}