1
//! IPT Establisher
2
//!
3
//! Responsible for maintaining and establishing one introduction point.
4
//!
5
//! TODO (#1235): move docs from `hssvc-ipt-algorithm.md`
6
//!
7
//! See the docs for
8
//! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now)
9
//! for details of our algorithm.
10

            
11
use crate::internal_prelude::*;
12

            
13
use tor_cell::relaycell::{
14
    hs::est_intro::{self, EstablishIntroDetails},
15
    msg::IntroEstablished,
16
};
17

            
18
/// Handle onto the task which is establishing and maintaining one IPT
19
pub(crate) struct IptEstablisher {
20
    /// A oneshot sender which, when dropped, notifies the running task that it's time to shut
21
    /// down.
22
    _terminate_tx: oneshot::Sender<Void>,
23

            
24
    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
25
    state: Arc<Mutex<EstablisherState>>,
26
}
27

            
28
/// When the `IptEstablisher` is dropped it is torn down
29
///
30
/// Synchronously
31
///
32
///  * No rendezvous requests will be accepted
33
///    that arrived after `Drop::drop` returns.
34
///
35
/// Asynchronously
36
///
37
///  * Circuits constructed for this IPT are torn down
38
///  * The `rend_reqs` sink is closed (dropped)
39
///  * `IptStatusStatus::Faulty` will be indicated
40
impl Drop for IptEstablisher {
41
    fn drop(&mut self) {
42
        // Make sure no more requests are accepted once this returns.
43
        //
44
        // (Note that if we didn't care about the "no more rendezvous
45
        // requests will be accepted" requirement, we could do away with this
46
        // code and the corresponding check for `RequestDisposition::Shutdown` in
47
        // `IptMsgHandler::handle_msg`.)
48
        self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown;
49

            
50
        // Tell the reactor to shut down... by doing nothing.
51
        //
52
        // (When terminate_tx is dropped, it will send an error to the
53
        // corresponding terminate_rx.)
54
    }
55
}
56

            
57
/// An error from trying to work with an IptEstablisher.
58
#[derive(Clone, Debug, thiserror::Error)]
59
pub(crate) enum IptEstablisherError {
60
    /// We encountered a faulty IPT.
61
    #[error("{0}")]
62
    Ipt(#[from] IptError),
63

            
64
    /// The network directory provider is shutting down without giving us the
65
    /// netdir we asked for.
66
    #[error("{0}")]
67
    NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown),
68

            
69
    /// We encountered an error while building a circuit to an intro point.
70
    #[error("Unable to build circuit to introduction point")]
71
    BuildCircuit(#[source] tor_circmgr::Error),
72

            
73
    /// We encountered an error while querying the circuit state.
74
    #[error("Unable to query circuit state")]
75
    CircuitState(#[source] tor_proto::Error),
76

            
77
    /// We encountered an error while building and signing our establish_intro
78
    /// message.
79
    #[error("Unable to construct signed ESTABLISH_INTRO message")]
80
    CreateEstablishIntro(#[source] tor_cell::Error),
81

            
82
    /// We encountered a timeout after building the circuit.
83
    #[error("Timeout during ESTABLISH_INTRO handshake.")]
84
    EstablishTimeout,
85

            
86
    /// We encountered an error while sending our establish_intro
87
    /// message.
88
    #[error("Unable to send an ESTABLISH_INTRO message")]
89
    SendEstablishIntro(#[source] tor_proto::Error),
90

            
91
    /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the
92
    /// circuit was closed.
93
    #[error("Circuit closed during INTRO_ESTABLISHED handshake")]
94
    ClosedWithoutAck,
95

            
96
    /// We encountered a programming error.
97
    #[error("Internal error")]
98
    Bug(#[from] tor_error::Bug),
99
}
100

            
101
/// An error caused by a faulty IPT.
102
#[derive(Clone, Debug, thiserror::Error)]
103
#[non_exhaustive]
104
pub enum IptError {
105
    /// When we tried to establish this introduction point, we found that the
106
    /// netdir didn't list it.
107
    ///
108
    /// This introduction point is not strictly "faulty", but unlisted in the directory means we
109
    /// can't use the introduction point.
110
    #[error("Introduction point not listed in network directory")]
111
    IntroPointNotListed,
112

            
113
    /// We received an invalid INTRO_ESTABLISHED message.
114
    ///
115
    /// This is definitely the introduction point's fault: it sent us
116
    /// an authenticated message, but the contents of that message were
117
    /// definitely wrong.
118
    #[error("Got an invalid INTRO_ESTABLISHED message")]
119
    // Eventually, once we expect intro_established extensions, we will make
120
    // sure that they are well-formed.
121
    #[allow(dead_code)]
122
    BadEstablished,
123

            
124
    /// We received a message that not a valid part of the introduction-point
125
    /// protocol.
126
    #[error("Invalid message: {0}")]
127
    BadMessage(String),
128

            
129
    /// This introduction point has gone too long without a success.
130
    #[error("introduction point has gone too long without a success")]
131
    Timeout,
132
}
133

            
134
impl tor_error::HasKind for IptEstablisherError {
135
    fn kind(&self) -> tor_error::ErrorKind {
136
        use tor_error::ErrorKind as EK;
137
        use IptEstablisherError as E;
138
        match self {
139
            E::Ipt(e) => e.kind(),
140
            E::NetdirProviderShutdown(e) => e.kind(),
141
            E::BuildCircuit(e) => e.kind(),
142
            E::CircuitState(e) => e.kind(),
143
            E::EstablishTimeout => EK::TorNetworkTimeout,
144
            E::SendEstablishIntro(e) => e.kind(),
145
            E::ClosedWithoutAck => EK::CircuitCollapse,
146
            E::CreateEstablishIntro(_) => EK::Internal,
147
            E::Bug(e) => e.kind(),
148
        }
149
    }
150
}
151

            
152
impl tor_error::HasKind for IptError {
153
    fn kind(&self) -> tor_error::ErrorKind {
154
        use tor_error::ErrorKind as EK;
155
        use IptError as E;
156
        match self {
157
            E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind.
158
            E::BadEstablished => EK::RemoteProtocolViolation,
159
            E::BadMessage(_) => EK::RemoteProtocolViolation,
160
            E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct
161
        }
162
    }
163
}
164

            
165
impl IptEstablisherError {
166
    /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault.
167
    ///
168
    /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`,
169
    /// it means that we should try another relay as an introduction point,
170
    /// though we don't necessarily need to give up on this one.
171
    ///
172
    /// Note that the intro point may be to blame even if we return `None`;
173
    /// we only return `true` when we are certain that the intro point is
174
    /// unlisted, unusable, or misbehaving.
175
    fn ipt_failure(&self) -> Option<&IptError> {
176
        use IptEstablisherError as IE;
177
        match self {
178
            // If we don't have a netdir, then no intro point is better than any other.
179
            IE::NetdirProviderShutdown(_) => None,
180
            IE::Ipt(e) => Some(e),
181
            // This _might_ be the introduction point's fault, but it might not.
182
            // We can't be certain.
183
            IE::BuildCircuit(_) => None,
184
            IE::CircuitState(_) => None,
185
            IE::EstablishTimeout => None,
186
            IE::ClosedWithoutAck => None,
187
            // These are, most likely, not the introduction point's fault,
188
            // though they might or might not be survivable.
189
            IE::CreateEstablishIntro(_) => None,
190
            IE::SendEstablishIntro(_) => None,
191
            IE::Bug(_) => None,
192
        }
193
    }
194
}
195

            
196
/// Parameters for an introduction point
197
///
198
/// Consumed by `IptEstablisher::new`.
199
/// Primarily serves as a convenient way to bundle the many arguments required.
200
///
201
/// Does not include:
202
///  * The runtime (which would force this struct to have a type parameter)
203
///  * The circuit builder (leaving this out makes it possible to use this
204
///    struct during mock execution, where we don't call `IptEstablisher::new`).
205
#[derive(Educe)]
206
#[educe(Debug)]
207
pub(crate) struct IptParameters {
208
    /// A receiver that we can use to tell us about updates in our configuration.
209
    ///
210
    /// Configuration changes may tell us to change our introduction points or build new
211
    /// circuits to them.
212
    //
213
    // TODO (#1209):
214
    //
215
    // We want to make a new introduction circuit if our dos parameters change,
216
    // which means that we should possibly be watching for changes in our
217
    // configuration.  Right now, though, we only copy out the configuration
218
    // on startup.
219
    #[educe(Debug(ignore))]
220
    pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
221
    /// A `NetDirProvider` that we'll use to find changes in the network
222
    /// parameters, and to look up information about routers.
223
    #[educe(Debug(ignore))]
224
    pub(crate) netdir_provider: Arc<dyn NetDirProvider>,
225
    /// A shared sender that we'll use to report incoming INTRODUCE2 requests
226
    /// for rendezvous circuits.
227
    #[educe(Debug(ignore))]
228
    pub(crate) introduce_tx: mpsc::Sender<RendRequest>,
229
    /// Opaque local ID for this introduction point.
230
    ///
231
    /// This ID does not change within the lifetime of an [`IptEstablisher`].
232
    /// See [`IptLocalId`] for information about what changes would require a
233
    /// new ID (and hence a new `IptEstablisher`).
234
    pub(crate) lid: IptLocalId,
235
    /// Persistent log for INTRODUCE2 requests.
236
    ///
237
    /// We use this to record the requests that we see, and to prevent replays.
238
    #[educe(Debug(ignore))]
239
    pub(crate) replay_log: IptReplayLog,
240
    /// A set of identifiers for the Relay that we intend to use as the
241
    /// introduction point.
242
    ///
243
    /// We use this to identify the relay within a `NetDir`, and to make sure
244
    /// we're connecting to the right introduction point.
245
    pub(crate) target: RelayIds,
246
    /// Keypair used to authenticate and identify ourselves to this introduction
247
    /// point.
248
    ///
249
    /// Later, we publish the public component of this keypair in our HsDesc,
250
    /// and clients use it to tell the introduction point which introduction circuit
251
    /// should receive their requests.
252
    ///
253
    /// This is the `K_hs_ipt_sid` keypair.
254
    pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>,
255
    /// Whether this `IptEstablisher` should begin by accepting requests, or
256
    /// wait to be told that requests are okay.
257
    pub(crate) accepting_requests: RequestDisposition,
258
    /// Keypair used to decrypt INTRODUCE2 requests from clients.
259
    ///
260
    /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to
261
    /// form a shared key set of keys with the client, and decrypt information
262
    /// about the client's chosen rendezvous point and extensions.
263
    pub(crate) k_ntor: Arc<HsSvcNtorKeypair>,
264
}
265

            
266
impl IptEstablisher {
267
    /// Try to set up, and maintain, an IPT at `target`.
268
    ///
269
    /// Rendezvous requests will be rejected or accepted
270
    /// depending on the value of `accepting_requests`
271
    /// (which must be `Advertised` or `NotAdvertised`).
272
    ///
273
    /// Also returns a stream of events that is produced whenever we have a
274
    /// change in the IptStatus for this intro point.  Note that this stream is
275
    /// potentially lossy.
276
    ///
277
    /// The returned `watch::Receiver` will yield `Faulty` if the IPT
278
    /// establisher is shut down (or crashes).
279
    ///
280
    /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks
281
    /// and close all circuits used to establish this introduction point.
282
    pub(crate) fn launch<R: Runtime>(
283
        runtime: &R,
284
        params: IptParameters,
285
        pool: Arc<HsCircPool<R>>,
286
        keymgr: &Arc<KeyMgr>,
287
    ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> {
288
        // This exhaustive deconstruction ensures that we don't
289
        // accidentally forget to handle any of our inputs.
290
        let IptParameters {
291
            config_rx,
292
            netdir_provider,
293
            introduce_tx,
294
            lid,
295
            target,
296
            k_sid,
297
            k_ntor,
298
            accepting_requests,
299
            replay_log,
300
        } = params;
301
        let config = Arc::clone(&config_rx.borrow());
302
        let nickname = config.nickname().clone();
303

            
304
        if matches!(accepting_requests, RequestDisposition::Shutdown) {
305
            return Err(bad_api_usage!(
306
                "Tried to create a IptEstablisher that that was already shutting down?"
307
            )
308
            .into());
309
        }
310

            
311
        let state = Arc::new(Mutex::new(EstablisherState { accepting_requests }));
312

            
313
        let request_context = Arc::new(RendRequestContext {
314
            nickname: nickname.clone(),
315
            keymgr: Arc::clone(keymgr),
316
            kp_hss_ntor: Arc::clone(&k_ntor),
317
            kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(),
318
            filter: config.filter_settings(),
319
            netdir_provider: netdir_provider.clone(),
320
            circ_pool: pool.clone(),
321
        });
322

            
323
        let reactor = Reactor {
324
            runtime: runtime.clone(),
325
            nickname,
326
            pool,
327
            netdir_provider,
328
            lid,
329
            target,
330
            k_sid,
331
            introduce_tx,
332
            extensions: EstIntroExtensionSet {
333
                // Updates to this are handled by the IPT manager: when it changes,
334
                // this IPT will be replaced with one with the correct parameters.
335
                dos_params: config.dos_extension()?,
336
            },
337
            state: state.clone(),
338
            request_context,
339
            replay_log: Arc::new(replay_log.into()),
340
        };
341

            
342
        let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new());
343
        let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>();
344
        let status_tx = DropNotifyWatchSender::new(status_tx);
345

            
346
        // Spawn a task to keep the intro established.  The task will shut down
347
        // when terminate_tx is dropped.
348
        runtime
349
            .spawn(async move {
350
                futures::select_biased!(
351
                    terminated = terminate_rx => {
352
                        // Only Err is possible, but the compiler can't tell that.
353
                        let oneshot::Canceled = terminated.void_unwrap_err();
354
                    }
355
                    outcome = reactor.keep_intro_established(status_tx).fuse() =>  {
356
                      warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task");
357
                    }
358
                );
359
            })
360
            .map_err(|e| FatalError::Spawn {
361
                spawning: "introduction point establisher",
362
                cause: Arc::new(e),
363
            })?;
364
        let establisher = IptEstablisher {
365
            _terminate_tx: terminate_tx,
366
            state,
367
        };
368
        Ok((establisher, status_rx))
369
    }
370

            
371
    /// Begin accepting requests from this introduction point.
372
    ///
373
    /// If any introduction requests are sent before we have called this method,
374
    /// they are treated as an error and our connection to this introduction
375
    /// point is closed.
376
    pub(crate) fn start_accepting(&self) {
377
        self.state.lock().expect("poisoned lock").accepting_requests =
378
            RequestDisposition::Advertised;
379
    }
380
}
381

            
382
/// The current status of an introduction point, as defined in
383
/// `hssvc-ipt-algorithms.md`.
384
///
385
/// TODO (#1235) Make that file unneeded.
386
#[derive(Clone, Debug)]
387
pub(crate) enum IptStatusStatus {
388
    /// We are (re)establishing our connection to the IPT
389
    ///
390
    /// But we don't think there's anything wrong with it.
391
    ///
392
    /// The IPT manager should *not* arrange to include this in descriptors.
393
    Establishing,
394

            
395
    /// The IPT is established and ready to accept rendezvous requests
396
    ///
397
    /// Also contains information about the introduction point
398
    /// necessary for making descriptors,
399
    /// including information from the netdir about the relay
400
    ///
401
    /// The IPT manager *should* arrange to include this in descriptors.
402
    Good(GoodIptDetails),
403

            
404
    /// We don't have the IPT and it looks like it was the IPT's fault
405
    ///
406
    /// This should be used whenever trying another IPT relay is likely to work better;
407
    /// regardless of whether attempts to establish *this* IPT can continue.
408
    ///
409
    /// The IPT manager should *not* arrange to include this in descriptors.
410
    /// If this persists, the IPT manager should replace this IPT
411
    /// with a new IPT at a different relay.
412
    Faulty(Option<IptError>),
413
}
414

            
415
/// Details of a good introduction point
416
///
417
/// This struct contains similar information to
418
/// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`].
419
/// However, that insists that the contained `T` is a [`CircTarget`],
420
/// which `<NtorPublicKey>` isn't.
421
/// And, we don't use this as a circuit target (at least, not here -
422
/// the client will do so, as a result of us publishing the information).
423
///
424
/// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974>
425
#[derive(Clone, Debug, Eq, PartialEq)]
426
pub(crate) struct GoodIptDetails {
427
    /// The link specifiers to be used in the descriptor
428
    ///
429
    /// As obtained and converted from the netdir.
430
    pub(crate) link_specifiers: LinkSpecs,
431

            
432
    /// The introduction point relay's ntor key (from the netdir)
433
    pub(crate) ipt_kp_ntor: NtorPublicKey,
434
}
435

            
436
impl GoodIptDetails {
437
    /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails.
438
    fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> {
439
        Ok(Self {
440
            link_specifiers: relay
441
                .linkspecs()
442
                .map_err(into_internal!("Unable to encode relay link specifiers"))?,
443
            ipt_kp_ntor: *relay.ntor_onion_key(),
444
        })
445
    }
446
}
447

            
448
/// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
449
///
450
/// This happens when the IPT has had (too) many rendezvous requests.
451
///
452
/// This must *not* be used for *errors*, because it will cause the IPT manager to
453
/// *immediately* start to replace the IPT, regardless of rate limits etc.
454
#[derive(Clone, Debug, Eq, PartialEq)]
455
pub(crate) struct IptWantsToRetire;
456

            
457
/// State shared between the IptEstablisher and the Reactor.
458
struct EstablisherState {
459
    /// True if we are accepting requests right now.
460
    accepting_requests: RequestDisposition,
461
}
462

            
463
/// Current state of an introduction point; determines what we want to do with
464
/// any incoming messages.
465
#[derive(Copy, Clone, Debug)]
466
pub(crate) enum RequestDisposition {
467
    /// We are not yet advertised: the message handler should complain if it
468
    /// gets any requests and shut down.
469
    NotAdvertised,
470
    /// We are advertised: the message handler should pass along any requests
471
    Advertised,
472
    /// We are shutting down cleanly: the message handler should exit but not complain.
473
    Shutdown,
474
}
475

            
476
/// The current status of an introduction point.
477
#[derive(Clone, Debug)]
478
pub(crate) struct IptStatus {
479
    /// The current state of this introduction point as defined by
480
    /// `hssvc-ipt-algorithms.md`.
481
    ///
482
    /// TODO (#1235): Make that file unneeded.
483
    pub(crate) status: IptStatusStatus,
484

            
485
    /// The current status of whether this introduction point circuit wants to be
486
    /// retired based on having processed too many requests.
487
    pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
488

            
489
    /// If Some, a time after which all attempts have been unsuccessful.
490
    pub(crate) failing_since: Option<Instant>,
491
}
492

            
493
/// We declare an introduction point to be faulty if all of the attempts to
494
/// reach it fail, over this much time.
495
///
496
/// TODO: This value is more or less arbitrary; we may want to tune it in the
497
/// future.
498
const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60);
499

            
500
impl IptStatus {
501
    /// Record that we have successfully connected to an introduction point.
502
    fn note_open(&mut self, ipt_details: GoodIptDetails) {
503
        self.status = IptStatusStatus::Good(ipt_details);
504
        self.failing_since = None;
505
    }
506

            
507
    /// Record that we are trying to connect to an introduction point.
508
    fn note_attempt(&mut self) {
509
        use IptStatusStatus::*;
510
        self.status = match &self.status {
511
            Establishing | Good(..) => Establishing,
512
            Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken.
513
        }
514
    }
515

            
516
    /// Record that an error has occurred.
517
    fn note_error(&mut self, err: &IptEstablisherError, now: Instant) {
518
        use IptStatusStatus::*;
519
        let failing_since = *self.failing_since.get_or_insert(now);
520
        #[allow(clippy::if_same_then_else)]
521
        if let Some(ipt_err) = err.ipt_failure() {
522
            // This error always indicates a faulty introduction point.
523
            self.status = Faulty(Some(ipt_err.clone()));
524
        } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD {
525
            // This introduction point has gone too long without a success.
526
            self.status = Faulty(Some(IptError::Timeout));
527
        }
528
    }
529

            
530
    /// Return an `IptStatus` representing an establisher that has not yet taken
531
    /// any action.
532
40
    fn new() -> Self {
533
40
        Self {
534
40
            status: IptStatusStatus::Establishing,
535
40
            wants_to_retire: Ok(()),
536
40
            failing_since: None,
537
40
        }
538
40
    }
539

            
540
    /// Produce an `IptStatus` representing a shut down or crashed establisher
541
    fn new_terminated() -> Self {
542
        IptStatus {
543
            status: IptStatusStatus::Faulty(None),
544
            // If we're broken, we simply tell the manager that that is the case.
545
            // It will decide for itself whether it wants to replace us.
546
            wants_to_retire: Ok(()),
547
            failing_since: None,
548
        }
549
    }
550
}
551

            
552
impl Default for IptStatus {
553
40
    fn default() -> Self {
554
40
        Self::new()
555
40
    }
556
}
557

            
558
impl tor_async_utils::DropNotifyEofSignallable for IptStatus {
559
    fn eof() -> IptStatus {
560
        IptStatus::new_terminated()
561
    }
562
}
563

            
564
tor_cell::restricted_msg! {
565
    /// An acceptable message to receive from an introduction point.
566
     enum IptMsg : RelayMsg {
567
         IntroEstablished,
568
         Introduce2,
569
     }
570
}
571

            
572
/// A set of extensions to send with our `ESTABLISH_INTRO` message.
573
///
574
/// NOTE: we eventually might want to support unrecognized extensions.  But
575
/// that's potentially troublesome, since the set of extensions we sent might
576
/// have an affect on how we validate the reply.
577
#[derive(Clone, Debug)]
578
pub(crate) struct EstIntroExtensionSet {
579
    /// Parameters related to rate-limiting to prevent denial-of-service
580
    /// attacks.
581
    dos_params: Option<est_intro::DosParams>,
582
}
583

            
584
/// Implementation structure for the task that implements an IptEstablisher.
585
struct Reactor<R: Runtime> {
586
    /// A copy of our runtime, used for timeouts and sleeping.
587
    runtime: R,
588
    /// The nickname of the onion service we're running. Used when logging.
589
    nickname: HsNickname,
590
    /// A pool used to create circuits to the introduction point.
591
    pool: Arc<HsCircPool<R>>,
592
    /// A provider used to select the other relays in the circuit.
593
    netdir_provider: Arc<dyn NetDirProvider>,
594
    /// Identifier for the intro point.
595
    lid: IptLocalId,
596
    /// The target introduction point.
597
    target: RelayIds,
598
    /// The keypair to use when establishing the introduction point.
599
    ///
600
    /// Knowledge of this private key prevents anybody else from impersonating
601
    /// us to the introduction point.
602
    k_sid: Arc<HsIntroPtSessionIdKeypair>,
603
    /// The extensions to use when establishing the introduction point.
604
    ///
605
    /// TODO (#1209): This should be able to change over time as we re-establish
606
    /// the intro point.
607
    extensions: EstIntroExtensionSet,
608

            
609
    /// The stream that will receive INTRODUCE2 messages.
610
    introduce_tx: mpsc::Sender<RendRequest>,
611

            
612
    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
613
    state: Arc<Mutex<EstablisherState>>,
614

            
615
    /// Context information that we'll need to answer rendezvous requests.
616
    request_context: Arc<RendRequestContext>,
617

            
618
    /// Introduction request replay log
619
    ///
620
    /// Shared between multiple IPT circuit control message handlers -
621
    /// [`IptMsgHandler`] contains the lock guard.
622
    ///
623
    /// Has to be an async mutex since it's locked for a long time,
624
    /// so we mustn't block the async executor thread on it.
625
    replay_log: Arc<futures::lock::Mutex<IptReplayLog>>,
626
}
627

            
628
/// An open session with a single introduction point.
629
//
630
// TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
631
pub(crate) struct IntroPtSession {
632
    /// The circuit to the introduction point, on which we're receiving
633
    /// Introduce2 messages.
634
    intro_circ: Arc<ClientCirc>,
635
}
636

            
637
impl<R: Runtime> Reactor<R> {
638
    /// Run forever, keeping an introduction point established.
639
    #[allow(clippy::blocks_in_conditions)]
640
    #[allow(clippy::cognitive_complexity)]
641
    async fn keep_intro_established(
642
        &self,
643
        mut status_tx: DropNotifyWatchSender<IptStatus>,
644
    ) -> Result<Void, IptEstablisherError> {
645
        let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000);
646
        loop {
647
            status_tx.borrow_mut().note_attempt();
648
            match self.establish_intro_once().await {
649
                Ok((session, good_ipt_details)) => {
650
                    // TODO (#1239): we need to monitor the netdir for changes to this relay
651
                    // Eg,
652
                    //   - if it becomes unlisted, we should declare the IPT faulty
653
                    //     (until it perhaps reappears)
654
                    //
655
                    //     TODO SPEC  Continuing to use an unlisted relay is dangerous
656
                    //     It might be malicious.  We should withdraw our IPT then,
657
                    //     and hope that clients find another, working, IPT.
658
                    //
659
                    //   - if it changes its ntor key or link specs,
660
                    //     we need to update the GoodIptDetails in our status report,
661
                    //     so that the updated info can make its way to the descriptor
662
                    //
663
                    // Possibly some this could/should be done by the IPT Manager instead,
664
                    // but Diziet thinks it is probably cleanest to do it here.
665

            
666
                    status_tx.borrow_mut().note_open(good_ipt_details);
667

            
668
                    debug!(
669
                        "{}: Successfully established introduction point with {}",
670
                        &self.nickname,
671
                        self.target.display_relay_ids().redacted()
672
                    );
673
                    // Now that we've succeeded, we can stop backing off for our
674
                    // next attempt.
675
                    retry_delay.reset();
676

            
677
                    // Wait for the session to be closed.
678
                    session.wait_for_close().await;
679
                }
680
                Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => {
681
                    // The network directory didn't include this relay.  Wait
682
                    // until it does.
683
                    //
684
                    // Note that this `note_error` will necessarily mark the
685
                    // ipt as Faulty. That's important, since we may be about to
686
                    // wait indefinitely when we call wait_for_netdir_to_list.
687
                    status_tx.borrow_mut().note_error(&e, self.runtime.now());
688
                    self.netdir_provider
689
                        .wait_for_netdir_to_list(&self.target, Timeliness::Timely)
690
                        .await?;
691
                }
692
                Err(e) => {
693
                    status_tx.borrow_mut().note_error(&e, self.runtime.now());
694
                    debug_report!(
695
                        e,
696
                        "{}: Problem establishing introduction point with {}",
697
                        &self.nickname,
698
                        self.target.display_relay_ids().redacted()
699
                    );
700
                    let retry_after = retry_delay.next_delay(&mut rand::rng());
701
                    self.runtime.sleep(retry_after).await;
702
                }
703
            }
704
        }
705
    }
706

            
707
    /// Try, once, to make a circuit to a single relay and establish an introduction
708
    /// point there.
709
    ///
710
    /// Does not retry.  Does not time out except via `HsCircPool`.
711
    async fn establish_intro_once(
712
        &self,
713
    ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> {
714
        let (protovers, circuit, ipt_details) = {
715
            let netdir = self
716
                .netdir_provider
717
                .wait_for_netdir(tor_netdir::Timeliness::Timely)
718
                .await?;
719
            let circ_target = netdir
720
                .by_ids(&self.target)
721
                .ok_or(IptError::IntroPointNotListed)?;
722
            let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?;
723

            
724
            let kind = tor_circmgr::hspool::HsCircKind::SvcIntro;
725
            let protovers = circ_target.protovers().clone();
726
            let circuit = self
727
                .pool
728
                .get_or_launch_specific(netdir.as_ref(), kind, circ_target)
729
                .await
730
                .map_err(IptEstablisherError::BuildCircuit)?;
731
            // note that netdir is dropped here, to avoid holding on to it any
732
            // longer than necessary.
733
            (protovers, circuit, ipt_details)
734
        };
735
        let intro_pt_hop = circuit
736
            .last_hop_num()
737
            .map_err(into_internal!("Somehow built a circuit with no hops!?"))?;
738

            
739
        let establish_intro = {
740
            let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into();
741
            let mut details = EstablishIntroDetails::new(ipt_sid_id);
742
            if let Some(dos_params) = &self.extensions.dos_params {
743
                // We only send the Dos extension when the relay is known to
744
                // support it.
745
                use tor_protover::named::HSINTRO_RATELIM;
746
                if protovers.supports_named_subver(HSINTRO_RATELIM) {
747
                    details.set_extension_dos(dos_params.clone());
748
                }
749
            }
750
            let circuit_binding_key = circuit
751
                .binding_key(intro_pt_hop)
752
                .map_err(IptEstablisherError::CircuitState)?
753
                .ok_or(internal!("No binding key for introduction point!?"))?;
754
            let body: Vec<u8> = details
755
                .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac())
756
                .map_err(IptEstablisherError::CreateEstablishIntro)?;
757

            
758
            // TODO: This is ugly, but it is the sensible way to munge the above
759
            // body into a format that AnyRelayMsgOuter will accept without doing a
760
            // redundant parse step.
761
            //
762
            // One alternative would be allowing start_conversation to take an `impl
763
            // RelayMsg` rather than an AnyRelayMsg.
764
            //
765
            // Or possibly, when we feel like it, we could rename one or more of
766
            // these "Unrecognized"s to Unparsed or Uninterpreted.  If we do that, however, we'll
767
            // potentially face breaking changes up and down our crate stack.
768
            AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
769
                tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
770
                body,
771
            ))
772
        };
773

            
774
        let (established_tx, established_rx) = oneshot::channel();
775

            
776
        // In theory there ought to be only one IptMsgHandler in existence at any one time,
777
        // for any one IptLocalId (ie for any one IptReplayLog).  However, the teardown
778
        // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous
779
        // (so we need to synchronise).  Therefore:
780
        //
781
        // Make sure we don't start writing to the replay log until any previous
782
        // IptMsgHandler has been torn down.  (Using an async mutex means we
783
        // don't risk blocking the whole executor even if we have teardown bugs.)
784
        let replay_log = self.replay_log.clone().lock_owned().await;
785

            
786
        let handler = IptMsgHandler {
787
            established_tx: Some(established_tx),
788
            introduce_tx: self.introduce_tx.clone(),
789
            state: self.state.clone(),
790
            lid: self.lid,
791
            request_context: self.request_context.clone(),
792
            replay_log,
793
        };
794
        let _conversation = circuit
795
            .start_conversation(Some(establish_intro), handler, intro_pt_hop)
796
            .await
797
            .map_err(IptEstablisherError::SendEstablishIntro)?;
798
        // At this point, we have `await`ed for the Conversation to exist, so we know
799
        // that the message was sent.  We have to wait for any actual `established`
800
        // message, though.
801

            
802
        let length = circuit
803
            .n_hops()
804
            .map_err(into_internal!("failed to get circuit length"))?;
805
        let ack_timeout = self
806
            .pool
807
            .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip { length });
808
        let _established: IntroEstablished = self
809
            .runtime
810
            .timeout(ack_timeout, established_rx)
811
            .await
812
            .map_err(|_| IptEstablisherError::EstablishTimeout)?
813
            .map_err(|_| IptEstablisherError::ClosedWithoutAck)??;
814

            
815
        // This session will be owned by keep_intro_established(), and dropped
816
        // when the circuit closes, or when the keep_intro_established() future
817
        // is dropped.
818
        let session = IntroPtSession {
819
            intro_circ: circuit,
820
        };
821
        Ok((session, ipt_details))
822
    }
823
}
824

            
825
impl IntroPtSession {
826
    /// Wait for this introduction point session to be closed.
827
    fn wait_for_close(&self) -> impl Future<Output = ()> {
828
        self.intro_circ.wait_for_close()
829
    }
830
}
831

            
832
/// MsgHandler type to implement a conversation with an introduction point.
833
///
834
/// This, like all MsgHandlers, is installed at the circuit's reactor, and used
835
/// to handle otherwise unrecognized message types.
836
struct IptMsgHandler {
837
    /// A oneshot sender used to report our IntroEstablished message.
838
    ///
839
    /// If this is None, then we already sent an IntroEstablished and we shouldn't
840
    /// send any more.
841
    established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>,
842

            
843
    /// A channel used to report Introduce2 messages.
844
    introduce_tx: mpsc::Sender<RendRequest>,
845

            
846
    /// Keys that we'll need to answer the introduction requests.
847
    request_context: Arc<RendRequestContext>,
848

            
849
    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
850
    state: Arc<Mutex<EstablisherState>>,
851

            
852
    /// Unique identifier for the introduction point (including the current
853
    /// keys).  Used to tag requests.
854
    lid: IptLocalId,
855

            
856
    /// A replay log used to detect replayed introduction requests.
857
    replay_log: futures::lock::OwnedMutexGuard<IptReplayLog>,
858
}
859

            
860
impl tor_proto::circuit::MsgHandler for IptMsgHandler {
861
    fn handle_msg(&mut self, any_msg: AnyRelayMsg) -> tor_proto::Result<MetaCellDisposition> {
862
        let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| {
863
            if let Some(tx) = self.established_tx.take() {
864
                let _ = tx.send(Err(IptError::BadMessage(format!(
865
                    "Invalid message type {}",
866
                    m.cmd()
867
                ))
868
                .into()));
869
            }
870
            // TODO: It's not completely clear whether CircProto is the right
871
            // type for use in this function (here and elsewhere);
872
            // possibly, we should add a different tor_proto::Error type
873
            // for protocol violations at a higher level than the circuit
874
            // protocol.
875
            //
876
            // For now, however, this error type is fine: it will cause the
877
            // circuit to be shut down, which is what we want.
878
            tor_proto::Error::CircProto(format!(
879
                "Invalid message type {} on introduction circuit",
880
                m.cmd()
881
            ))
882
        })?;
883

            
884
        if match msg {
885
            IptMsg::IntroEstablished(established) => match self.established_tx.take() {
886
                Some(tx) => {
887
                    // TODO: Once we want to enforce any properties on the
888
                    // intro_established message (like checking for correct
889
                    // extensions) we should do it here.
890
                    let established = Ok(established);
891
                    tx.send(established).map_err(|_| ())
892
                }
893
                None => {
894
                    return Err(tor_proto::Error::CircProto(
895
                        "Received a redundant INTRO_ESTABLISHED".into(),
896
                    ));
897
                }
898
            },
899
            IptMsg::Introduce2(introduce2) => {
900
                if let Some(tx) = self.established_tx.take() {
901
                    let _ = tx.send(Err(IptError::BadMessage(
902
                        "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(),
903
                    )
904
                    .into()));
905
                    return Err(tor_proto::Error::CircProto(
906
                        "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(),
907
                    ));
908
                }
909
                let disp = self.state.lock().expect("poisoned lock").accepting_requests;
910
                match disp {
911
                    RequestDisposition::NotAdvertised => {
912
                        return Err(tor_proto::Error::CircProto(
913
                            "Received an INTRODUCE2 message before we were accepting requests!"
914
                                .into(),
915
                        ))
916
                    }
917
                    RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc),
918
                    RequestDisposition::Advertised => {}
919
                }
920
                match self.replay_log.check_for_replay(&introduce2) {
921
                    Ok(()) => {}
922
                    Err(ReplayError::AlreadySeen) => {
923
                        // This is probably a replay, but maybe an accident. We
924
                        // just drop the request.
925

            
926
                        // TODO (#1233): Log that this has occurred, with a rate
927
                        // limit.  Possibly, we should allow it to fail once or
928
                        // twice per circuit before we log, since we expect
929
                        // a nonzero false-positive rate.
930
                        //
931
                        // Note that we should NOT close the circuit in this
932
                        // case: the repeated message could come from a hostile
933
                        // introduction point trying to do traffic analysis, but
934
                        // it could also come from a user trying to make it look
935
                        // like the intro point is doing traffic analysis.
936
                        return Ok(MetaCellDisposition::Consumed);
937
                    }
938
                    Err(ReplayError::Log(_)) => {
939
                        // Uh-oh! We failed to write the data persistently!
940
                        //
941
                        // TODO (#1226): We need to decide what to do here.  Right
942
                        // now we close the circuit, which is wrong.
943
                        return Ok(MetaCellDisposition::CloseCirc);
944
                    }
945
                }
946

            
947
                let request = RendRequest::new(self.lid, introduce2, self.request_context.clone());
948
                let send_outcome = self.introduce_tx.try_send(request);
949

            
950
                // We only want to report full-stream problems as errors here.
951
                // Disconnected streams are expected.
952
                let report_outcome = match &send_outcome {
953
                    Err(e) if e.is_full() => Err(StreamWasFull {}),
954
                    _ => Ok(()),
955
                };
956
                // TODO: someday we might want to start tracking this by
957
                // introduction or service point separately, though we would
958
                // expect their failures to be correlated.
959
                log_ratelim!("sending rendezvous request to handler task"; report_outcome);
960

            
961
                match send_outcome {
962
                    Ok(()) => Ok(()),
963
                    Err(e) => {
964
                        if e.is_disconnected() {
965
                            // The receiver is disconnected, meaning that
966
                            // messages from this intro point are no longer
967
                            // wanted.  Close the circuit.
968
                            Err(())
969
                        } else {
970
                            // The receiver is full; we have no real option but
971
                            // to drop the request like C-tor does when the
972
                            // backlog is too large.
973
                            //
974
                            // See discussion at
975
                            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349
976
                            Ok(())
977
                        }
978
                    }
979
                }
980
            }
981
        } == Err(())
982
        {
983
            // If the above return an error, we failed to send.  That means that
984
            // we need to close the circuit, since nobody is listening on the
985
            // other end of the tx.
986
            return Ok(MetaCellDisposition::CloseCirc);
987
        }
988

            
989
        Ok(MetaCellDisposition::Consumed)
990
    }
991
}
992

            
993
/// We failed to send a rendezvous request onto the handler test that should
994
/// have handled it, because it was not handling requests fast enough.
995
///
996
/// (This is a separate type so that we can have it implement Clone.)
997
#[derive(Clone, Debug, thiserror::Error)]
998
#[error("Could not send request; stream was full.")]
999
struct StreamWasFull {}