1
//! IPT Establisher
2
//!
3
//! Responsible for maintaining and establishing one introduction point.
4
//!
5
//! TODO (#1235): move docs from `hssvc-ipt-algorithm.md`
6
//!
7
//! See the docs for
8
//! [`IptManager::idempotently_progress_things_now`](crate::ipt_mgr::IptManager::idempotently_progress_things_now)
9
//! for details of our algorithm.
10

            
11
use crate::internal_prelude::*;
12

            
13
use tor_cell::relaycell::{
14
    hs::est_intro::{self, EstablishIntroDetails},
15
    msg::IntroEstablished,
16
};
17
use tor_proto::TargetHop;
18

            
19
/// Handle onto the task which is establishing and maintaining one IPT
20
pub(crate) struct IptEstablisher {
21
    /// A oneshot sender which, when dropped, notifies the running task that it's time to shut
22
    /// down.
23
    _terminate_tx: oneshot::Sender<Void>,
24

            
25
    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
26
    state: Arc<Mutex<EstablisherState>>,
27
}
28

            
29
/// When the `IptEstablisher` is dropped it is torn down
30
///
31
/// Synchronously
32
///
33
///  * No rendezvous requests will be accepted
34
///    that arrived after `Drop::drop` returns.
35
///
36
/// Asynchronously
37
///
38
///  * Circuits constructed for this IPT are torn down
39
///  * The `rend_reqs` sink is closed (dropped)
40
///  * `IptStatusStatus::Faulty` will be indicated
41
impl Drop for IptEstablisher {
42
    fn drop(&mut self) {
43
        // Make sure no more requests are accepted once this returns.
44
        //
45
        // (Note that if we didn't care about the "no more rendezvous
46
        // requests will be accepted" requirement, we could do away with this
47
        // code and the corresponding check for `RequestDisposition::Shutdown` in
48
        // `IptMsgHandler::handle_msg`.)
49
        self.state.lock().expect("poisoned lock").accepting_requests = RequestDisposition::Shutdown;
50

            
51
        // Tell the reactor to shut down... by doing nothing.
52
        //
53
        // (When terminate_tx is dropped, it will send an error to the
54
        // corresponding terminate_rx.)
55
    }
56
}
57

            
58
/// An error from trying to work with an IptEstablisher.
59
#[derive(Clone, Debug, thiserror::Error)]
60
pub(crate) enum IptEstablisherError {
61
    /// We encountered a faulty IPT.
62
    #[error("{0}")]
63
    Ipt(#[from] IptError),
64

            
65
    /// The network directory provider is shutting down without giving us the
66
    /// netdir we asked for.
67
    #[error("{0}")]
68
    NetdirProviderShutdown(#[from] tor_netdir::NetdirProviderShutdown),
69

            
70
    /// We encountered an error while building a circuit to an intro point.
71
    #[error("Unable to build circuit to introduction point")]
72
    BuildCircuit(#[source] tor_circmgr::Error),
73

            
74
    /// We encountered an error while querying the circuit state.
75
    #[error("Unable to query circuit state")]
76
    CircuitState(#[source] tor_proto::Error),
77

            
78
    /// We encountered an error while building and signing our establish_intro
79
    /// message.
80
    #[error("Unable to construct signed ESTABLISH_INTRO message")]
81
    CreateEstablishIntro(#[source] tor_cell::Error),
82

            
83
    /// We encountered a timeout after building the circuit.
84
    #[error("Timeout during ESTABLISH_INTRO handshake.")]
85
    EstablishTimeout,
86

            
87
    /// We encountered an error while sending our establish_intro
88
    /// message.
89
    #[error("Unable to send an ESTABLISH_INTRO message")]
90
    SendEstablishIntro(#[source] tor_proto::Error),
91

            
92
    /// We did not receive an INTRO_ESTABLISHED message like we wanted; instead, the
93
    /// circuit was closed.
94
    #[error("Circuit closed during INTRO_ESTABLISHED handshake")]
95
    ClosedWithoutAck,
96

            
97
    /// We encountered a programming error.
98
    #[error("Internal error")]
99
    Bug(#[from] tor_error::Bug),
100
}
101

            
102
/// An error caused by a faulty IPT.
103
#[derive(Clone, Debug, thiserror::Error)]
104
#[non_exhaustive]
105
pub enum IptError {
106
    /// When we tried to establish this introduction point, we found that the
107
    /// netdir didn't list it.
108
    ///
109
    /// This introduction point is not strictly "faulty", but unlisted in the directory means we
110
    /// can't use the introduction point.
111
    #[error("Introduction point not listed in network directory")]
112
    IntroPointNotListed,
113

            
114
    /// We received an invalid INTRO_ESTABLISHED message.
115
    ///
116
    /// This is definitely the introduction point's fault: it sent us
117
    /// an authenticated message, but the contents of that message were
118
    /// definitely wrong.
119
    #[error("Got an invalid INTRO_ESTABLISHED message")]
120
    // Eventually, once we expect intro_established extensions, we will make
121
    // sure that they are well-formed.
122
    #[allow(dead_code)]
123
    BadEstablished,
124

            
125
    /// We received a message that not a valid part of the introduction-point
126
    /// protocol.
127
    #[error("Invalid message: {0}")]
128
    BadMessage(String),
129

            
130
    /// This introduction point has gone too long without a success.
131
    #[error("introduction point has gone too long without a success")]
132
    Timeout,
133
}
134

            
135
impl tor_error::HasKind for IptEstablisherError {
136
    fn kind(&self) -> tor_error::ErrorKind {
137
        use tor_error::ErrorKind as EK;
138
        use IptEstablisherError as E;
139
        match self {
140
            E::Ipt(e) => e.kind(),
141
            E::NetdirProviderShutdown(e) => e.kind(),
142
            E::BuildCircuit(e) => e.kind(),
143
            E::CircuitState(e) => e.kind(),
144
            E::EstablishTimeout => EK::TorNetworkTimeout,
145
            E::SendEstablishIntro(e) => e.kind(),
146
            E::ClosedWithoutAck => EK::CircuitCollapse,
147
            E::CreateEstablishIntro(_) => EK::Internal,
148
            E::Bug(e) => e.kind(),
149
        }
150
    }
151
}
152

            
153
impl tor_error::HasKind for IptError {
154
    fn kind(&self) -> tor_error::ErrorKind {
155
        use tor_error::ErrorKind as EK;
156
        use IptError as E;
157
        match self {
158
            E::IntroPointNotListed => EK::TorDirectoryError, // TODO (#1255) Not correct kind.
159
            E::BadEstablished => EK::RemoteProtocolViolation,
160
            E::BadMessage(_) => EK::RemoteProtocolViolation,
161
            E::Timeout => EK::RemoteProtocolViolation, // TODO: this is not necessarily correct
162
        }
163
    }
164
}
165

            
166
impl IptEstablisherError {
167
    /// Return the underlying [`IptError`] if this error appears to be the introduction point's fault.
168
    ///
169
    /// This corresponds to [`IptStatusStatus::Faulty`]`: when we return `Some`,
170
    /// it means that we should try another relay as an introduction point,
171
    /// though we don't necessarily need to give up on this one.
172
    ///
173
    /// Note that the intro point may be to blame even if we return `None`;
174
    /// we only return `true` when we are certain that the intro point is
175
    /// unlisted, unusable, or misbehaving.
176
    fn ipt_failure(&self) -> Option<&IptError> {
177
        use IptEstablisherError as IE;
178
        match self {
179
            // If we don't have a netdir, then no intro point is better than any other.
180
            IE::NetdirProviderShutdown(_) => None,
181
            IE::Ipt(e) => Some(e),
182
            // This _might_ be the introduction point's fault, but it might not.
183
            // We can't be certain.
184
            IE::BuildCircuit(_) => None,
185
            IE::CircuitState(_) => None,
186
            IE::EstablishTimeout => None,
187
            IE::ClosedWithoutAck => None,
188
            // These are, most likely, not the introduction point's fault,
189
            // though they might or might not be survivable.
190
            IE::CreateEstablishIntro(_) => None,
191
            IE::SendEstablishIntro(_) => None,
192
            IE::Bug(_) => None,
193
        }
194
    }
195
}
196

            
197
/// Parameters for an introduction point
198
///
199
/// Consumed by `IptEstablisher::new`.
200
/// Primarily serves as a convenient way to bundle the many arguments required.
201
///
202
/// Does not include:
203
///  * The runtime (which would force this struct to have a type parameter)
204
///  * The circuit builder (leaving this out makes it possible to use this
205
///    struct during mock execution, where we don't call `IptEstablisher::new`).
206
#[derive(Educe)]
207
#[educe(Debug)]
208
pub(crate) struct IptParameters {
209
    /// A receiver that we can use to tell us about updates in our configuration.
210
    ///
211
    /// Configuration changes may tell us to change our introduction points or build new
212
    /// circuits to them.
213
    //
214
    // TODO (#1209):
215
    //
216
    // We want to make a new introduction circuit if our dos parameters change,
217
    // which means that we should possibly be watching for changes in our
218
    // configuration.  Right now, though, we only copy out the configuration
219
    // on startup.
220
    #[educe(Debug(ignore))]
221
    pub(crate) config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
222
    /// A `NetDirProvider` that we'll use to find changes in the network
223
    /// parameters, and to look up information about routers.
224
    #[educe(Debug(ignore))]
225
    pub(crate) netdir_provider: Arc<dyn NetDirProvider>,
226
    /// A shared sender that we'll use to report incoming INTRODUCE2 requests
227
    /// for rendezvous circuits.
228
    #[educe(Debug(ignore))]
229
    pub(crate) introduce_tx: mpsc::Sender<RendRequest>,
230
    /// Opaque local ID for this introduction point.
231
    ///
232
    /// This ID does not change within the lifetime of an [`IptEstablisher`].
233
    /// See [`IptLocalId`] for information about what changes would require a
234
    /// new ID (and hence a new `IptEstablisher`).
235
    pub(crate) lid: IptLocalId,
236
    /// Persistent log for INTRODUCE2 requests.
237
    ///
238
    /// We use this to record the requests that we see, and to prevent replays.
239
    #[educe(Debug(ignore))]
240
    pub(crate) replay_log: IptReplayLog,
241
    /// A set of identifiers for the Relay that we intend to use as the
242
    /// introduction point.
243
    ///
244
    /// We use this to identify the relay within a `NetDir`, and to make sure
245
    /// we're connecting to the right introduction point.
246
    pub(crate) target: RelayIds,
247
    /// Keypair used to authenticate and identify ourselves to this introduction
248
    /// point.
249
    ///
250
    /// Later, we publish the public component of this keypair in our HsDesc,
251
    /// and clients use it to tell the introduction point which introduction circuit
252
    /// should receive their requests.
253
    ///
254
    /// This is the `K_hs_ipt_sid` keypair.
255
    pub(crate) k_sid: Arc<HsIntroPtSessionIdKeypair>,
256
    /// Whether this `IptEstablisher` should begin by accepting requests, or
257
    /// wait to be told that requests are okay.
258
    pub(crate) accepting_requests: RequestDisposition,
259
    /// Keypair used to decrypt INTRODUCE2 requests from clients.
260
    ///
261
    /// This is the `K_hss_ntor` keypair, used with the "HS_NTOR" handshake to
262
    /// form a shared key set of keys with the client, and decrypt information
263
    /// about the client's chosen rendezvous point and extensions.
264
    pub(crate) k_ntor: Arc<HsSvcNtorKeypair>,
265
}
266

            
267
impl IptEstablisher {
268
    /// Try to set up, and maintain, an IPT at `target`.
269
    ///
270
    /// Rendezvous requests will be rejected or accepted
271
    /// depending on the value of `accepting_requests`
272
    /// (which must be `Advertised` or `NotAdvertised`).
273
    ///
274
    /// Also returns a stream of events that is produced whenever we have a
275
    /// change in the IptStatus for this intro point.  Note that this stream is
276
    /// potentially lossy.
277
    ///
278
    /// The returned `watch::Receiver` will yield `Faulty` if the IPT
279
    /// establisher is shut down (or crashes).
280
    ///
281
    /// When the resulting `IptEstablisher` is dropped, it will cancel all tasks
282
    /// and close all circuits used to establish this introduction point.
283
    pub(crate) fn launch<R: Runtime>(
284
        runtime: &R,
285
        params: IptParameters,
286
        pool: Arc<HsCircPool<R>>,
287
        keymgr: &Arc<KeyMgr>,
288
    ) -> Result<(Self, postage::watch::Receiver<IptStatus>), FatalError> {
289
        // This exhaustive deconstruction ensures that we don't
290
        // accidentally forget to handle any of our inputs.
291
        let IptParameters {
292
            config_rx,
293
            netdir_provider,
294
            introduce_tx,
295
            lid,
296
            target,
297
            k_sid,
298
            k_ntor,
299
            accepting_requests,
300
            replay_log,
301
        } = params;
302
        let config = Arc::clone(&config_rx.borrow());
303
        let nickname = config.nickname().clone();
304

            
305
        if matches!(accepting_requests, RequestDisposition::Shutdown) {
306
            return Err(bad_api_usage!(
307
                "Tried to create a IptEstablisher that that was already shutting down?"
308
            )
309
            .into());
310
        }
311

            
312
        let state = Arc::new(Mutex::new(EstablisherState { accepting_requests }));
313

            
314
        let request_context = Arc::new(RendRequestContext {
315
            nickname: nickname.clone(),
316
            keymgr: Arc::clone(keymgr),
317
            kp_hss_ntor: Arc::clone(&k_ntor),
318
            kp_hs_ipt_sid: k_sid.as_ref().as_ref().verifying_key().into(),
319
            filter: config.filter_settings(),
320
            netdir_provider: netdir_provider.clone(),
321
            circ_pool: pool.clone(),
322
        });
323

            
324
        let reactor = Reactor {
325
            runtime: runtime.clone(),
326
            nickname,
327
            pool,
328
            netdir_provider,
329
            lid,
330
            target,
331
            k_sid,
332
            introduce_tx,
333
            extensions: EstIntroExtensionSet {
334
                // Updates to this are handled by the IPT manager: when it changes,
335
                // this IPT will be replaced with one with the correct parameters.
336
                dos_params: config.dos_extension()?,
337
            },
338
            state: state.clone(),
339
            request_context,
340
            replay_log: Arc::new(replay_log.into()),
341
        };
342

            
343
        let (status_tx, status_rx) = postage::watch::channel_with(IptStatus::new());
344
        let (terminate_tx, mut terminate_rx) = oneshot::channel::<Void>();
345
        let status_tx = DropNotifyWatchSender::new(status_tx);
346

            
347
        // Spawn a task to keep the intro established.  The task will shut down
348
        // when terminate_tx is dropped.
349
        runtime
350
            .spawn(async move {
351
                futures::select_biased!(
352
                    terminated = terminate_rx => {
353
                        // Only Err is possible, but the compiler can't tell that.
354
                        let oneshot::Canceled = terminated.void_unwrap_err();
355
                    }
356
                    outcome = reactor.keep_intro_established(status_tx).fuse() =>  {
357
                      warn_report!(outcome.void_unwrap_err(), "Error from intro-point establisher task");
358
                    }
359
                );
360
            })
361
            .map_err(|e| FatalError::Spawn {
362
                spawning: "introduction point establisher",
363
                cause: Arc::new(e),
364
            })?;
365
        let establisher = IptEstablisher {
366
            _terminate_tx: terminate_tx,
367
            state,
368
        };
369
        Ok((establisher, status_rx))
370
    }
371

            
372
    /// Begin accepting requests from this introduction point.
373
    ///
374
    /// If any introduction requests are sent before we have called this method,
375
    /// they are treated as an error and our connection to this introduction
376
    /// point is closed.
377
    pub(crate) fn start_accepting(&self) {
378
        self.state.lock().expect("poisoned lock").accepting_requests =
379
            RequestDisposition::Advertised;
380
    }
381
}
382

            
383
/// The current status of an introduction point, as defined in
384
/// `hssvc-ipt-algorithms.md`.
385
///
386
/// TODO (#1235) Make that file unneeded.
387
#[derive(Clone, Debug)]
388
pub(crate) enum IptStatusStatus {
389
    /// We are (re)establishing our connection to the IPT
390
    ///
391
    /// But we don't think there's anything wrong with it.
392
    ///
393
    /// The IPT manager should *not* arrange to include this in descriptors.
394
    Establishing,
395

            
396
    /// The IPT is established and ready to accept rendezvous requests
397
    ///
398
    /// Also contains information about the introduction point
399
    /// necessary for making descriptors,
400
    /// including information from the netdir about the relay
401
    ///
402
    /// The IPT manager *should* arrange to include this in descriptors.
403
    Good(GoodIptDetails),
404

            
405
    /// We don't have the IPT and it looks like it was the IPT's fault
406
    ///
407
    /// This should be used whenever trying another IPT relay is likely to work better;
408
    /// regardless of whether attempts to establish *this* IPT can continue.
409
    ///
410
    /// The IPT manager should *not* arrange to include this in descriptors.
411
    /// If this persists, the IPT manager should replace this IPT
412
    /// with a new IPT at a different relay.
413
    Faulty(Option<IptError>),
414
}
415

            
416
/// Details of a good introduction point
417
///
418
/// This struct contains similar information to
419
/// [`tor_linkspec::verbatim::VerbatimLinkSpecCircTarget`].
420
/// However, that insists that the contained `T` is a [`CircTarget`],
421
/// which `<NtorPublicKey>` isn't.
422
/// And, we don't use this as a circuit target (at least, not here -
423
/// the client will do so, as a result of us publishing the information).
424
///
425
/// See <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1559#note_2937974>
426
#[derive(Clone, Debug, Eq, PartialEq)]
427
pub(crate) struct GoodIptDetails {
428
    /// The link specifiers to be used in the descriptor
429
    ///
430
    /// As obtained and converted from the netdir.
431
    pub(crate) link_specifiers: LinkSpecs,
432

            
433
    /// The introduction point relay's ntor key (from the netdir)
434
    pub(crate) ipt_kp_ntor: NtorPublicKey,
435
}
436

            
437
impl GoodIptDetails {
438
    /// Try to copy out the relevant parts of a CircTarget into a GoodIptDetails.
439
    fn try_from_circ_target(relay: &impl CircTarget) -> Result<Self, IptEstablisherError> {
440
        Ok(Self {
441
            link_specifiers: relay
442
                .linkspecs()
443
                .map_err(into_internal!("Unable to encode relay link specifiers"))?,
444
            ipt_kp_ntor: *relay.ntor_onion_key(),
445
        })
446
    }
447
}
448

            
449
/// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
450
///
451
/// This happens when the IPT has had (too) many rendezvous requests.
452
///
453
/// This must *not* be used for *errors*, because it will cause the IPT manager to
454
/// *immediately* start to replace the IPT, regardless of rate limits etc.
455
#[derive(Clone, Debug, Eq, PartialEq)]
456
pub(crate) struct IptWantsToRetire;
457

            
458
/// State shared between the IptEstablisher and the Reactor.
459
struct EstablisherState {
460
    /// True if we are accepting requests right now.
461
    accepting_requests: RequestDisposition,
462
}
463

            
464
/// Current state of an introduction point; determines what we want to do with
465
/// any incoming messages.
466
#[derive(Copy, Clone, Debug)]
467
pub(crate) enum RequestDisposition {
468
    /// We are not yet advertised: the message handler should complain if it
469
    /// gets any requests and shut down.
470
    NotAdvertised,
471
    /// We are advertised: the message handler should pass along any requests
472
    Advertised,
473
    /// We are shutting down cleanly: the message handler should exit but not complain.
474
    Shutdown,
475
}
476

            
477
/// The current status of an introduction point.
478
#[derive(Clone, Debug)]
479
pub(crate) struct IptStatus {
480
    /// The current state of this introduction point as defined by
481
    /// `hssvc-ipt-algorithms.md`.
482
    ///
483
    /// TODO (#1235): Make that file unneeded.
484
    pub(crate) status: IptStatusStatus,
485

            
486
    /// The current status of whether this introduction point circuit wants to be
487
    /// retired based on having processed too many requests.
488
    pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
489

            
490
    /// If Some, a time after which all attempts have been unsuccessful.
491
    pub(crate) failing_since: Option<Instant>,
492
}
493

            
494
/// We declare an introduction point to be faulty if all of the attempts to
495
/// reach it fail, over this much time.
496
///
497
/// TODO: This value is more or less arbitrary; we may want to tune it in the
498
/// future.
499
const FAULTY_IPT_THRESHOLD: Duration = Duration::from_secs(15 * 60);
500

            
501
impl IptStatus {
502
    /// Record that we have successfully connected to an introduction point.
503
    fn note_open(&mut self, ipt_details: GoodIptDetails) {
504
        self.status = IptStatusStatus::Good(ipt_details);
505
        self.failing_since = None;
506
    }
507

            
508
    /// Record that we are trying to connect to an introduction point.
509
    fn note_attempt(&mut self) {
510
        use IptStatusStatus::*;
511
        self.status = match &self.status {
512
            Establishing | Good(..) => Establishing,
513
            Faulty(e) => Faulty(e.clone()), // We don't change status if we think we're broken.
514
        }
515
    }
516

            
517
    /// Record that an error has occurred.
518
    fn note_error(&mut self, err: &IptEstablisherError, now: Instant) {
519
        use IptStatusStatus::*;
520
        let failing_since = *self.failing_since.get_or_insert(now);
521
        #[allow(clippy::if_same_then_else)]
522
        if let Some(ipt_err) = err.ipt_failure() {
523
            // This error always indicates a faulty introduction point.
524
            self.status = Faulty(Some(ipt_err.clone()));
525
        } else if now.saturating_duration_since(failing_since) >= FAULTY_IPT_THRESHOLD {
526
            // This introduction point has gone too long without a success.
527
            self.status = Faulty(Some(IptError::Timeout));
528
        }
529
    }
530

            
531
    /// Return an `IptStatus` representing an establisher that has not yet taken
532
    /// any action.
533
40
    fn new() -> Self {
534
40
        Self {
535
40
            status: IptStatusStatus::Establishing,
536
40
            wants_to_retire: Ok(()),
537
40
            failing_since: None,
538
40
        }
539
40
    }
540

            
541
    /// Produce an `IptStatus` representing a shut down or crashed establisher
542
    fn new_terminated() -> Self {
543
        IptStatus {
544
            status: IptStatusStatus::Faulty(None),
545
            // If we're broken, we simply tell the manager that that is the case.
546
            // It will decide for itself whether it wants to replace us.
547
            wants_to_retire: Ok(()),
548
            failing_since: None,
549
        }
550
    }
551
}
552

            
553
impl Default for IptStatus {
554
40
    fn default() -> Self {
555
40
        Self::new()
556
40
    }
557
}
558

            
559
impl tor_async_utils::DropNotifyEofSignallable for IptStatus {
560
    fn eof() -> IptStatus {
561
        IptStatus::new_terminated()
562
    }
563
}
564

            
565
tor_cell::restricted_msg! {
566
    /// An acceptable message to receive from an introduction point.
567
     enum IptMsg : RelayMsg {
568
         IntroEstablished,
569
         Introduce2,
570
     }
571
}
572

            
573
/// A set of extensions to send with our `ESTABLISH_INTRO` message.
574
///
575
/// NOTE: we eventually might want to support unrecognized extensions.  But
576
/// that's potentially troublesome, since the set of extensions we sent might
577
/// have an affect on how we validate the reply.
578
#[derive(Clone, Debug)]
579
pub(crate) struct EstIntroExtensionSet {
580
    /// Parameters related to rate-limiting to prevent denial-of-service
581
    /// attacks.
582
    dos_params: Option<est_intro::DosParams>,
583
}
584

            
585
/// Implementation structure for the task that implements an IptEstablisher.
586
struct Reactor<R: Runtime> {
587
    /// A copy of our runtime, used for timeouts and sleeping.
588
    runtime: R,
589
    /// The nickname of the onion service we're running. Used when logging.
590
    nickname: HsNickname,
591
    /// A pool used to create circuits to the introduction point.
592
    pool: Arc<HsCircPool<R>>,
593
    /// A provider used to select the other relays in the circuit.
594
    netdir_provider: Arc<dyn NetDirProvider>,
595
    /// Identifier for the intro point.
596
    lid: IptLocalId,
597
    /// The target introduction point.
598
    target: RelayIds,
599
    /// The keypair to use when establishing the introduction point.
600
    ///
601
    /// Knowledge of this private key prevents anybody else from impersonating
602
    /// us to the introduction point.
603
    k_sid: Arc<HsIntroPtSessionIdKeypair>,
604
    /// The extensions to use when establishing the introduction point.
605
    ///
606
    /// TODO (#1209): This should be able to change over time as we re-establish
607
    /// the intro point.
608
    extensions: EstIntroExtensionSet,
609

            
610
    /// The stream that will receive INTRODUCE2 messages.
611
    introduce_tx: mpsc::Sender<RendRequest>,
612

            
613
    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
614
    state: Arc<Mutex<EstablisherState>>,
615

            
616
    /// Context information that we'll need to answer rendezvous requests.
617
    request_context: Arc<RendRequestContext>,
618

            
619
    /// Introduction request replay log
620
    ///
621
    /// Shared between multiple IPT circuit control message handlers -
622
    /// [`IptMsgHandler`] contains the lock guard.
623
    ///
624
    /// Has to be an async mutex since it's locked for a long time,
625
    /// so we mustn't block the async executor thread on it.
626
    replay_log: Arc<futures::lock::Mutex<IptReplayLog>>,
627
}
628

            
629
/// An open session with a single introduction point.
630
//
631
// TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
632
pub(crate) struct IntroPtSession {
633
    /// The circuit to the introduction point, on which we're receiving
634
    /// Introduce2 messages.
635
    intro_circ: Arc<ClientCirc>,
636
}
637

            
638
impl<R: Runtime> Reactor<R> {
639
    /// Run forever, keeping an introduction point established.
640
    #[allow(clippy::blocks_in_conditions)]
641
    #[allow(clippy::cognitive_complexity)]
642
    async fn keep_intro_established(
643
        &self,
644
        mut status_tx: DropNotifyWatchSender<IptStatus>,
645
    ) -> Result<Void, IptEstablisherError> {
646
        let mut retry_delay = tor_basic_utils::retry::RetryDelay::from_msec(1000);
647
        loop {
648
            status_tx.borrow_mut().note_attempt();
649
            match self.establish_intro_once().await {
650
                Ok((session, good_ipt_details)) => {
651
                    // TODO (#1239): we need to monitor the netdir for changes to this relay
652
                    // Eg,
653
                    //   - if it becomes unlisted, we should declare the IPT faulty
654
                    //     (until it perhaps reappears)
655
                    //
656
                    //     TODO SPEC  Continuing to use an unlisted relay is dangerous
657
                    //     It might be malicious.  We should withdraw our IPT then,
658
                    //     and hope that clients find another, working, IPT.
659
                    //
660
                    //   - if it changes its ntor key or link specs,
661
                    //     we need to update the GoodIptDetails in our status report,
662
                    //     so that the updated info can make its way to the descriptor
663
                    //
664
                    // Possibly some this could/should be done by the IPT Manager instead,
665
                    // but Diziet thinks it is probably cleanest to do it here.
666

            
667
                    status_tx.borrow_mut().note_open(good_ipt_details);
668

            
669
                    debug!(
670
                        "{}: Successfully established introduction point with {}",
671
                        &self.nickname,
672
                        self.target.display_relay_ids().redacted()
673
                    );
674
                    // Now that we've succeeded, we can stop backing off for our
675
                    // next attempt.
676
                    retry_delay.reset();
677

            
678
                    // Wait for the session to be closed.
679
                    session.wait_for_close().await;
680
                }
681
                Err(e @ IptEstablisherError::Ipt(IptError::IntroPointNotListed)) => {
682
                    // The network directory didn't include this relay.  Wait
683
                    // until it does.
684
                    //
685
                    // Note that this `note_error` will necessarily mark the
686
                    // ipt as Faulty. That's important, since we may be about to
687
                    // wait indefinitely when we call wait_for_netdir_to_list.
688
                    status_tx.borrow_mut().note_error(&e, self.runtime.now());
689
                    self.netdir_provider
690
                        .wait_for_netdir_to_list(&self.target, Timeliness::Timely)
691
                        .await?;
692
                }
693
                Err(e) => {
694
                    status_tx.borrow_mut().note_error(&e, self.runtime.now());
695
                    debug_report!(
696
                        e,
697
                        "{}: Problem establishing introduction point with {}",
698
                        &self.nickname,
699
                        self.target.display_relay_ids().redacted()
700
                    );
701
                    let retry_after = retry_delay.next_delay(&mut rand::rng());
702
                    self.runtime.sleep(retry_after).await;
703
                }
704
            }
705
        }
706
    }
707

            
708
    /// Try, once, to make a circuit to a single relay and establish an introduction
709
    /// point there.
710
    ///
711
    /// Does not retry.  Does not time out except via `HsCircPool`.
712
    async fn establish_intro_once(
713
        &self,
714
    ) -> Result<(IntroPtSession, GoodIptDetails), IptEstablisherError> {
715
        let (protovers, circuit, ipt_details) = {
716
            let netdir = self
717
                .netdir_provider
718
                .wait_for_netdir(tor_netdir::Timeliness::Timely)
719
                .await?;
720
            let circ_target = netdir
721
                .by_ids(&self.target)
722
                .ok_or(IptError::IntroPointNotListed)?;
723
            let ipt_details = GoodIptDetails::try_from_circ_target(&circ_target)?;
724

            
725
            let kind = tor_circmgr::hspool::HsCircKind::SvcIntro;
726
            let protovers = circ_target.protovers().clone();
727
            let circuit = self
728
                .pool
729
                .get_or_launch_specific(netdir.as_ref(), kind, circ_target)
730
                .await
731
                .map_err(IptEstablisherError::BuildCircuit)?;
732
            // note that netdir is dropped here, to avoid holding on to it any
733
            // longer than necessary.
734
            (protovers, circuit, ipt_details)
735
        };
736

            
737
        let establish_intro = {
738
            let ipt_sid_id = (*self.k_sid).as_ref().verifying_key().into();
739
            let mut details = EstablishIntroDetails::new(ipt_sid_id);
740
            if let Some(dos_params) = &self.extensions.dos_params {
741
                // We only send the Dos extension when the relay is known to
742
                // support it.
743
                use tor_protover::named::HSINTRO_RATELIM;
744
                if protovers.supports_named_subver(HSINTRO_RATELIM) {
745
                    details.set_extension_dos(dos_params.clone());
746
                }
747
            }
748
            let circuit_binding_key = circuit
749
                .binding_key(TargetHop::LastHop)
750
                .await
751
                .map_err(IptEstablisherError::CircuitState)?
752
                .ok_or(internal!("No binding key for introduction point!?"))?;
753
            let body: Vec<u8> = details
754
                .sign_and_encode((*self.k_sid).as_ref(), circuit_binding_key.hs_mac())
755
                .map_err(IptEstablisherError::CreateEstablishIntro)?;
756

            
757
            // TODO: This is ugly, but it is the sensible way to munge the above
758
            // body into a format that AnyRelayMsgOuter will accept without doing a
759
            // redundant parse step.
760
            //
761
            // One alternative would be allowing start_conversation to take an `impl
762
            // RelayMsg` rather than an AnyRelayMsg.
763
            //
764
            // Or possibly, when we feel like it, we could rename one or more of
765
            // these "Unrecognized"s to Unparsed or Uninterpreted.  If we do that, however, we'll
766
            // potentially face breaking changes up and down our crate stack.
767
            AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
768
                tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
769
                body,
770
            ))
771
        };
772

            
773
        let (established_tx, established_rx) = oneshot::channel();
774

            
775
        // In theory there ought to be only one IptMsgHandler in existence at any one time,
776
        // for any one IptLocalId (ie for any one IptReplayLog).  However, the teardown
777
        // arrangements are (i) complicated (so might have bugs) and (ii) asynchronous
778
        // (so we need to synchronise).  Therefore:
779
        //
780
        // Make sure we don't start writing to the replay log until any previous
781
        // IptMsgHandler has been torn down.  (Using an async mutex means we
782
        // don't risk blocking the whole executor even if we have teardown bugs.)
783
        let replay_log = self.replay_log.clone().lock_owned().await;
784

            
785
        let handler = IptMsgHandler {
786
            established_tx: Some(established_tx),
787
            introduce_tx: self.introduce_tx.clone(),
788
            state: self.state.clone(),
789
            lid: self.lid,
790
            request_context: self.request_context.clone(),
791
            replay_log,
792
        };
793
        let _conversation = circuit
794
            .start_conversation(Some(establish_intro), handler, TargetHop::LastHop)
795
            .await
796
            .map_err(IptEstablisherError::SendEstablishIntro)?;
797
        // At this point, we have `await`ed for the Conversation to exist, so we know
798
        // that the message was sent.  We have to wait for any actual `established`
799
        // message, though.
800

            
801
        let length = circuit
802
            .n_hops()
803
            .map_err(into_internal!("failed to get circuit length"))?;
804
        let ack_timeout = self
805
            .pool
806
            .estimate_timeout(&tor_circmgr::timeouts::Action::RoundTrip { length });
807
        let _established: IntroEstablished = self
808
            .runtime
809
            .timeout(ack_timeout, established_rx)
810
            .await
811
            .map_err(|_| IptEstablisherError::EstablishTimeout)?
812
            .map_err(|_| IptEstablisherError::ClosedWithoutAck)??;
813

            
814
        // This session will be owned by keep_intro_established(), and dropped
815
        // when the circuit closes, or when the keep_intro_established() future
816
        // is dropped.
817
        let session = IntroPtSession {
818
            intro_circ: circuit,
819
        };
820
        Ok((session, ipt_details))
821
    }
822
}
823

            
824
impl IntroPtSession {
825
    /// Wait for this introduction point session to be closed.
826
    fn wait_for_close(&self) -> impl Future<Output = ()> {
827
        self.intro_circ.wait_for_close()
828
    }
829
}
830

            
831
/// MsgHandler type to implement a conversation with an introduction point.
832
///
833
/// This, like all MsgHandlers, is installed at the circuit's reactor, and used
834
/// to handle otherwise unrecognized message types.
835
struct IptMsgHandler {
836
    /// A oneshot sender used to report our IntroEstablished message.
837
    ///
838
    /// If this is None, then we already sent an IntroEstablished and we shouldn't
839
    /// send any more.
840
    established_tx: Option<oneshot::Sender<Result<IntroEstablished, IptEstablisherError>>>,
841

            
842
    /// A channel used to report Introduce2 messages.
843
    introduce_tx: mpsc::Sender<RendRequest>,
844

            
845
    /// Keys that we'll need to answer the introduction requests.
846
    request_context: Arc<RendRequestContext>,
847

            
848
    /// Mutable state shared with the Establisher, Reactor, and MsgHandler.
849
    state: Arc<Mutex<EstablisherState>>,
850

            
851
    /// Unique identifier for the introduction point (including the current
852
    /// keys).  Used to tag requests.
853
    lid: IptLocalId,
854

            
855
    /// A replay log used to detect replayed introduction requests.
856
    replay_log: futures::lock::OwnedMutexGuard<IptReplayLog>,
857
}
858

            
859
impl tor_proto::circuit::MsgHandler for IptMsgHandler {
860
    fn handle_msg(&mut self, any_msg: AnyRelayMsg) -> tor_proto::Result<MetaCellDisposition> {
861
        let msg: IptMsg = any_msg.try_into().map_err(|m: AnyRelayMsg| {
862
            if let Some(tx) = self.established_tx.take() {
863
                let _ = tx.send(Err(IptError::BadMessage(format!(
864
                    "Invalid message type {}",
865
                    m.cmd()
866
                ))
867
                .into()));
868
            }
869
            // TODO: It's not completely clear whether CircProto is the right
870
            // type for use in this function (here and elsewhere);
871
            // possibly, we should add a different tor_proto::Error type
872
            // for protocol violations at a higher level than the circuit
873
            // protocol.
874
            //
875
            // For now, however, this error type is fine: it will cause the
876
            // circuit to be shut down, which is what we want.
877
            tor_proto::Error::CircProto(format!(
878
                "Invalid message type {} on introduction circuit",
879
                m.cmd()
880
            ))
881
        })?;
882

            
883
        if match msg {
884
            IptMsg::IntroEstablished(established) => match self.established_tx.take() {
885
                Some(tx) => {
886
                    // TODO: Once we want to enforce any properties on the
887
                    // intro_established message (like checking for correct
888
                    // extensions) we should do it here.
889
                    let established = Ok(established);
890
                    tx.send(established).map_err(|_| ())
891
                }
892
                None => {
893
                    return Err(tor_proto::Error::CircProto(
894
                        "Received a redundant INTRO_ESTABLISHED".into(),
895
                    ));
896
                }
897
            },
898
            IptMsg::Introduce2(introduce2) => {
899
                if let Some(tx) = self.established_tx.take() {
900
                    let _ = tx.send(Err(IptError::BadMessage(
901
                        "INTRODUCE2 message without INTRO_ESTABLISHED.".to_string(),
902
                    )
903
                    .into()));
904
                    return Err(tor_proto::Error::CircProto(
905
                        "Received an INTRODUCE2 message before INTRO_ESTABLISHED".into(),
906
                    ));
907
                }
908
                let disp = self.state.lock().expect("poisoned lock").accepting_requests;
909
                match disp {
910
                    RequestDisposition::NotAdvertised => {
911
                        return Err(tor_proto::Error::CircProto(
912
                            "Received an INTRODUCE2 message before we were accepting requests!"
913
                                .into(),
914
                        ))
915
                    }
916
                    RequestDisposition::Shutdown => return Ok(MetaCellDisposition::CloseCirc),
917
                    RequestDisposition::Advertised => {}
918
                }
919
                match self.replay_log.check_for_replay(&introduce2) {
920
                    Ok(()) => {}
921
                    Err(ReplayError::AlreadySeen) => {
922
                        // This is probably a replay, but maybe an accident. We
923
                        // just drop the request.
924

            
925
                        // TODO (#1233): Log that this has occurred, with a rate
926
                        // limit.  Possibly, we should allow it to fail once or
927
                        // twice per circuit before we log, since we expect
928
                        // a nonzero false-positive rate.
929
                        //
930
                        // Note that we should NOT close the circuit in this
931
                        // case: the repeated message could come from a hostile
932
                        // introduction point trying to do traffic analysis, but
933
                        // it could also come from a user trying to make it look
934
                        // like the intro point is doing traffic analysis.
935
                        return Ok(MetaCellDisposition::Consumed);
936
                    }
937
                    Err(ReplayError::Log(_)) => {
938
                        // Uh-oh! We failed to write the data persistently!
939
                        //
940
                        // TODO (#1226): We need to decide what to do here.  Right
941
                        // now we close the circuit, which is wrong.
942
                        return Ok(MetaCellDisposition::CloseCirc);
943
                    }
944
                }
945

            
946
                let request = RendRequest::new(self.lid, introduce2, self.request_context.clone());
947
                let send_outcome = self.introduce_tx.try_send(request);
948

            
949
                // We only want to report full-stream problems as errors here.
950
                // Disconnected streams are expected.
951
                let report_outcome = match &send_outcome {
952
                    Err(e) if e.is_full() => Err(StreamWasFull {}),
953
                    _ => Ok(()),
954
                };
955
                // TODO: someday we might want to start tracking this by
956
                // introduction or service point separately, though we would
957
                // expect their failures to be correlated.
958
                log_ratelim!("sending rendezvous request to handler task"; report_outcome);
959

            
960
                match send_outcome {
961
                    Ok(()) => Ok(()),
962
                    Err(e) => {
963
                        if e.is_disconnected() {
964
                            // The receiver is disconnected, meaning that
965
                            // messages from this intro point are no longer
966
                            // wanted.  Close the circuit.
967
                            Err(())
968
                        } else {
969
                            // The receiver is full; we have no real option but
970
                            // to drop the request like C-tor does when the
971
                            // backlog is too large.
972
                            //
973
                            // See discussion at
974
                            // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1465#note_2928349
975
                            Ok(())
976
                        }
977
                    }
978
                }
979
            }
980
        } == Err(())
981
        {
982
            // If the above return an error, we failed to send.  That means that
983
            // we need to close the circuit, since nobody is listening on the
984
            // other end of the tx.
985
            return Ok(MetaCellDisposition::CloseCirc);
986
        }
987

            
988
        Ok(MetaCellDisposition::Consumed)
989
    }
990
}
991

            
992
/// We failed to send a rendezvous request onto the handler test that should
993
/// have handled it, because it was not handling requests fast enough.
994
///
995
/// (This is a separate type so that we can have it implement Clone.)
996
#[derive(Clone, Debug, thiserror::Error)]
997
#[error("Could not send request; stream was full.")]
998
struct StreamWasFull {}