tor_hsservice/
ipt_set.rs

1//! IPT set - the principal API between the IPT manager and publisher
2
3use crate::internal_prelude::*;
4
5/// Handle for a suitable persistent storage manager
6pub(crate) type IptSetStorageHandle = tor_persist::state_dir::StorageHandle<StateRecord>;
7
8/// Information shared between the IPT manager and the IPT publisher
9///
10/// The principal information is `ipts`, which is calculated by the IPT Manager.
11/// See
12/// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish)
13/// for more detailed information about how this is calculated.
14#[derive(Educe)]
15#[educe(Debug)]
16pub(crate) struct PublishIptSet {
17    /// Set of introduction points to be advertised in a descriptor (if we are to publish)
18    ///
19    /// If `Some`, the publisher will try to maintain a published descriptor,
20    /// of lifetime `lifetime`, listing `ipts`.
21    ///
22    /// If `None`, the publisher will not try to publish.
23    /// (Already-published descriptors will not be deleted.)
24    ///
25    /// These instructions ultimately come from
26    /// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish).
27    pub(crate) ipts: Option<IptSet>,
28
29    /// Record of publication attempts
30    ///
31    /// Time until which the manager ought we to try to maintain each ipt,
32    /// even after we stop publishing it.
33    ///
34    /// This is a ceiling on:
35    ///
36    ///   * The last time we *finished* publishing the descriptor
37    ///     (we can estimate this by taking the time we *started* to publish
38    ///     plus our timeout on the publication attempt).
39    ///
40    ///   * Plus the `lifetime` that was used for publication.
41    ///
42    ///   * Plus the length of time between a client obtaining the descriptor
43    ///     and its introduction request reaching us through the intro point
44    ///     ([`IPT_PUBLISH_EXPIRY_SLOP`])
45    ///
46    /// This field is updated by the publisher, using
47    /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt),
48    /// and read by the manager.
49    ///
50    /// A separate copy of the information is stored by the manager,
51    /// in `ipt_mgr::Ipt::last_descriptor_expiry_including_slop`.
52    ///
53    /// There may be entries in this table that don't
54    /// correspond to introduction points in `ipts`.
55    /// The publisher mustn't create such entries
56    /// (since that would imply publishing IPTs contrary to the manager's instructions)
57    /// but it can occur, for example, on restart.
58    ///
59    /// It is the manager's job to remove expired entries.
60    //
61    // This is a separate field, rather than being part of IptSet, so that during startup,
62    // we can load information about previously-published IPTs, even though we don't want,
63    // at that stage, to publish anything.
64    //
65    // The publication information is stored in a separate on-disk file, so that the
66    // IPT publisher can record publication attempts without having to interact with the
67    // IPT manager's main data structure.
68    //
69    // (The publisher needs to update the on-disk state synchronously, before publication,
70    // since otherwise there could be a bug scenario where we succeed in publishing,
71    // but don't succeed in recording that we published, and then, on restart,
72    // don't know that we need to (re)establish this IPT.)
73    pub(crate) last_descriptor_expiry_including_slop: HashMap<IptLocalId, Instant>,
74
75    /// The on-disk state storage handle.
76    #[educe(Debug(ignore))]
77    storage: IptSetStorageHandle,
78}
79
80/// A set of introduction points for publication
81///
82/// This is shared between the manager and the publisher.
83/// Each leaf field says who sets it.
84///
85/// This is not `Clone` and its contents should not be cloned.
86/// When its contents are copied out into a descriptor by the publisher,
87/// this should be accompanied by a call to
88/// [`note_publication_attempt`](PublishIptSet::note_publication_attempt).
89#[derive(Debug)]
90pub(crate) struct IptSet {
91    /// The actual introduction points
92    pub(crate) ipts: Vec<IptInSet>,
93
94    /// When to make the descriptor expire
95    ///
96    /// Set by the manager and read by the publisher.
97    pub(crate) lifetime: Duration,
98}
99
100/// Introduction point as specified to publisher by manager
101///
102/// Convenience type alias.
103#[derive(Debug)]
104pub(crate) struct IptInSet {
105    /// Details of the introduction point
106    ///
107    /// Set by the manager and read by the publisher.
108    pub(crate) ipt: Ipt,
109
110    /// Local identifier for this introduction point
111    ///
112    /// Set and used by the manager, to correlate this data structure with the manager's.
113    /// May also be read by the publisher.
114    pub(crate) lid: IptLocalId,
115}
116
117/// Actual introduction point details as specified to publisher by manager
118///
119/// Convenience type alias.
120pub(crate) type Ipt = tor_netdoc::doc::hsdesc::IntroPointDesc;
121
122/// Descriptor expiry time slop
123///
124/// How long after our descriptor expired should we continue to maintain an old IPT?
125/// This is an allowance for:
126///
127///   - Various RTTs and delays in clients setting up circuits
128///     (we can't really measure this ourselves properly,
129///     since what matters is the client's latency)
130///
131///   - Clock skew
132///
133// TODO: This is something we might want to tune based on experience.
134//
135// TODO: We'd like to use "+" here, but it isn't const yet.
136const IPT_PUBLISH_EXPIRY_SLOP: Duration =
137    Duration::from_secs(10 * 60).saturating_add(crate::publish::OVERALL_UPLOAD_TIMEOUT);
138
139/// Shared view of introduction points - IPT manager's view
140///
141/// This is the manager's end of a bidirectional "channel",
142/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
143#[derive(Debug)]
144pub(crate) struct IptsManagerView {
145    /// Actual shared data
146    shared: Shared,
147
148    /// Notification sender
149    ///
150    /// We don't wrap the state in a postage::watch,
151    /// because the publisher needs to be able to mutably borrow the data
152    /// without re-notifying itself when it drops the guard.
153    notify: mpsc::Sender<()>,
154}
155
156/// Shared view of introduction points - IPT publisher's view
157///
158/// This is the publishers's end of a bidirectional "channel",
159/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
160pub(crate) struct IptsPublisherView {
161    /// Actual shared data
162    shared: Shared,
163
164    /// Notification receiver
165    notify: mpsc::Receiver<()>,
166}
167
168/// Shared view of introduction points - IPT publisher's publication-only view
169///
170/// This is a restricted version of [`IptsPublisherView`]
171/// which can only be used to:
172///
173///   - check that a publication attempt should still continue; and
174///   - note publication attempts.
175///
176/// via the [`.borrow_for_publish()`](IptsPublisherUploadView::borrow_for_publish) method.
177///
178/// This is useful because multiple `IptsPublisherUploadView`
179/// can exist (so, for example, it is `Clone`);
180/// unlike `IptsPublisherView`, of which there is one per IPTs channel.
181/// So the publisher's individual upload tasks can each have one.
182///
183/// Obtained from [`IptsPublisherView::upload_view`].
184#[derive(Debug, Clone)]
185pub(crate) struct IptsPublisherUploadView {
186    /// Actual shared data
187    shared: Shared,
188}
189
190/// Core shared state
191type Shared = Arc<Mutex<PublishIptSet>>;
192
193/// Mutex guard that will notify when dropped
194///
195/// Returned by [`IptsManagerView::borrow_for_update`]
196#[derive(Deref, DerefMut)]
197struct NotifyingBorrow<'v, R: SleepProvider> {
198    /// Lock guard
199    #[deref(forward)]
200    #[deref_mut(forward)]
201    guard: MutexGuard<'v, PublishIptSet>,
202
203    /// To be notified on drop
204    notify: &'v mut mpsc::Sender<()>,
205
206    /// For saving!
207    runtime: R,
208}
209
210/// Create a new shared state channel for the publication instructions
211pub(crate) fn ipts_channel(
212    runtime: &impl SleepProvider,
213    storage: IptSetStorageHandle,
214) -> Result<(IptsManagerView, IptsPublisherView), StartupError> {
215    let initial_state = PublishIptSet::load(storage, runtime)?;
216    let shared = Arc::new(Mutex::new(initial_state));
217    // Zero buffer is right.  Docs for `mpsc::channel` say:
218    //   each sender gets a guaranteed slot in the channel capacity,
219    //   and on top of that there are buffer “first come, first serve” slots
220    // We only have one sender and only ever want one outstanding,
221    // since we can (and would like to) coalesce notifications.
222    //
223    // Internally-generated instructions, no need for mq.
224    let (tx, rx) = mpsc_channel_no_memquota(0);
225    let r = (
226        IptsManagerView {
227            shared: shared.clone(),
228            notify: tx,
229        },
230        IptsPublisherView { shared, notify: rx },
231    );
232    Ok(r)
233}
234
235/// Lock the shared state and obtain a lock guard
236///
237/// Does not do any notification.
238fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> {
239    // Propagating panics is fine since if either the manager or the publisher crashes,
240    // the other one cannot survive.
241    shared.lock().expect("IPT set shared state poisoned")
242}
243
244impl IptsManagerView {
245    /// Arrange to be able to update the list of introduction points
246    ///
247    /// The manager may add new ipts, or delete old ones.
248    ///
249    /// The returned value is a lock guard.
250    /// (It is not `Send` so cannot be held across await points.)
251    /// The publisher will be notified when it is dropped.
252    pub(crate) fn borrow_for_update(
253        &mut self,
254        runtime: impl SleepProvider,
255    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
256        let guard = lock_shared(&self.shared);
257        NotifyingBorrow {
258            guard,
259            notify: &mut self.notify,
260            runtime,
261        }
262    }
263
264    /// Peek at the list of introduction points we are providing to the publisher
265    ///
266    /// (Used for testing and during startup.)
267    pub(crate) fn borrow_for_read(&mut self) -> impl std::ops::Deref<Target = PublishIptSet> + '_ {
268        lock_shared(&self.shared)
269    }
270}
271
272impl<R: SleepProvider> Drop for NotifyingBorrow<'_, R> {
273    fn drop(&mut self) {
274        // Channel full?  Well, then the receiver is indeed going to wake up, so fine
275        // Channel disconnected?  The publisher has crashed or terminated,
276        // but we are not in a position to fail and shut down the establisher.
277        // If our HS is shutting down, the manager will be shut down by other means.
278        let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(());
279
280        let save_outcome = self.guard.save(&self.runtime);
281        log_ratelim!(
282            // This message is a true description for the following reasons:
283            //
284            // "until" times can only be extended by the *publisher*.
285            // The manager won't ever shorten them either, but if they are in the past,
286            // it might delete them if it has decided to retire the IPT.
287            // Leaving them undeleted is not ideal from a privacy pov,
288            // but it doesn't prevent us continuing to operate correctly.
289            //
290            // It is therefore OK to just log the error here.
291            //
292            // In practice, we're likely to try to save as a result of the publisher's
293            // operation, too.  That's going to be more of a problem, but it's handled
294            // by other code paths.
295            //
296            // We *don't* include the HS nickname in the activity
297            // because this is probably not HS instance specific.
298            "possibly deleting expiry times for old HSS IPTs";
299            save_outcome;
300        );
301
302        // Now the fields will be dropped, including `guard`.
303        // I.e. the mutex gets unlocked.  This means we notify the publisher
304        // (which might make it wake up on another thread) just *before*
305        // we release the lock, rather than just after.
306        // This is slightly suboptimal but doesn't matter here.
307        // To do better, we'd need to make the guard into an Option.
308    }
309}
310
311impl IptsPublisherView {
312    /// Wait until the IPT set has changed (or may have)
313    ///
314    /// After this returns, to find out what the new IPT set is,
315    /// the publisher calls `borrow_for_publish`.
316    ///
317    /// Will complete immediately if the IPT set has
318    /// changed since the last call to `await_update`.
319    ///
320    /// Returns:
321    ///  * `Some(Ok(())` if the IPT set was (or may have been) updated
322    ///  * `None` if the manager is shutting down and the publisher should shut down too
323    ///  * `Some(Err(..))` if a fatal error occurred
324    //
325    // TODO: make this return Result<ShutdownStatus, FatalError> instead
326    // (this is what we do in other places, e.g. in ipt_mgr, publisher).
327    //
328    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1812#note_2976758
329    pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> {
330        // Cancellation safety:
331        //
332        // We're using mpsc::Receiver's implementation of Stream, via StreamExt.
333        // Stream::next() must be cancellation safe or it would be lossy everywhere.
334        // So it is OK to create the future from next, here, and possibly discard it
335        // before it becomes Ready.
336        let () = self.notify.next().await?;
337        Some(Ok(()))
338    }
339
340    /// Look at the list of introduction points to publish
341    ///
342    /// Whenever a publication attempt is started
343    /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt)
344    /// must be called on this same [`IptSet`].
345    ///
346    /// The returned value is a lock guard.
347    /// (It is not `Send` so cannot be held across await points.)
348    pub(crate) fn borrow_for_publish(
349        &self,
350    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
351        lock_shared(&self.shared)
352    }
353
354    /// Obtain an [`IptsPublisherUploadView`], for use just prior to a publication attempt
355    pub(crate) fn upload_view(&self) -> IptsPublisherUploadView {
356        let shared = self.shared.clone();
357        IptsPublisherUploadView { shared }
358    }
359}
360
361impl IptsPublisherUploadView {
362    /// Look at the list of introduction points to publish
363    ///
364    /// See [`IptsPublisherView::borrow_for_publish`].
365    pub(crate) fn borrow_for_publish(
366        &self,
367    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
368        lock_shared(&self.shared)
369    }
370}
371
372impl PublishIptSet {
373    /// Update all the `last_descriptor_expiry_including_slop` for a publication attempt
374    ///
375    /// Called by the publisher when it starts a publication attempt
376    /// which will advertise this set of introduction points.
377    ///
378    /// When calling this, the publisher promises that the publication attempt
379    /// will either complete, or be abandoned, before `worst_case_end`.
380    pub(crate) fn note_publication_attempt(
381        &mut self,
382        runtime: &impl SleepProvider,
383        worst_case_end: Instant,
384    ) -> Result<(), IptStoreError> {
385        let ipts = self
386            .ipts
387            .as_ref()
388            .ok_or_else(|| internal!("publishing None!"))?;
389
390        let new_value = (|| {
391            worst_case_end
392                .checked_add(ipts.lifetime)?
393                .checked_add(IPT_PUBLISH_EXPIRY_SLOP)
394        })()
395        .ok_or_else(
396            // Clock overflow on the monotonic clock.  Everything is terrible.
397            // We will have no idea when we can stop publishing the descriptor!
398            // I guess we'll return an error and cause the publisher to bail out?
399            // An ErrorKind of ClockSkew is wrong, since this is a purely local problem,
400            // and should be impossible if we properly checked our parameters.
401            || internal!("monotonic clock overflow"),
402        )?;
403
404        for ipt in &ipts.ipts {
405            use std::collections::hash_map::Entry;
406            let entry = self.last_descriptor_expiry_including_slop.entry(ipt.lid);
407
408            // Open-coding a hypothetical Entry::value()
409            let old_value = match &entry {
410                Entry::Occupied(oe) => Some(*oe.get()),
411                Entry::Vacant(_) => None,
412            };
413
414            let to_store = chain!(
415                //
416                old_value,
417                [new_value],
418            )
419            .max()
420            .expect("max of known-non-empty iterator was None");
421
422            // Open-coding Entry::insert(); unstable insert_netry() would do
423            match entry {
424                Entry::Occupied(mut oe) => {
425                    oe.insert(to_store);
426                }
427                Entry::Vacant(ve) => {
428                    ve.insert(to_store);
429                }
430            };
431        }
432
433        self.save(runtime)?;
434
435        Ok(())
436    }
437}
438
439//---------- On disk data structures, done with serde ----------
440
441/// Record of intro point publications
442#[derive(Serialize, Deserialize, Debug)]
443pub(crate) struct StateRecord {
444    /// Ipts
445    ipts: Vec<IptRecord>,
446    /// Reference time
447    stored: time_store::Reference,
448}
449
450/// Record of publication of one intro point
451#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq)]
452struct IptRecord {
453    /// Which ipt?
454    lid: IptLocalId,
455    /// Maintain until, `last_descriptor_expiry_including_slop`
456    // We use a shorter variable name so the on disk files aren't silly
457    until: time_store::FutureTimestamp,
458}
459
460impl PublishIptSet {
461    /// Save the publication times to the persistent state
462    fn save(&mut self, runtime: &impl SleepProvider) -> Result<(), IptStoreError> {
463        // Throughout, we use exhaustive struct patterns on the in-memory data,
464        // so we avoid missing any of the data.
465        let PublishIptSet {
466            ipts,
467            last_descriptor_expiry_including_slop,
468            storage,
469        } = self;
470
471        let tstoring = time_store::Storing::start(runtime);
472
473        // we don't save the instructions to the publisher; on reload that becomes None
474        let _: &Option<IptSet> = ipts;
475
476        let mut ipts = last_descriptor_expiry_including_slop
477            .iter()
478            .map(|(&lid, &until)| {
479                let until = tstoring.store_future(until);
480                IptRecord { lid, until }
481            })
482            .collect_vec();
483        ipts.sort(); // normalise
484
485        let on_disk = StateRecord {
486            ipts,
487            stored: tstoring.store_ref(),
488        };
489
490        Ok(storage.store(&on_disk)?)
491    }
492
493    /// Load the publication times from the persistent state
494    fn load(
495        storage: IptSetStorageHandle,
496        runtime: &impl SleepProvider,
497    ) -> Result<PublishIptSet, StartupError> {
498        let on_disk = storage.load().map_err(StartupError::LoadState)?;
499        let last_descriptor_expiry_including_slop = on_disk
500            .map(|record| {
501                // Throughout, we use exhaustive struct patterns on the data we got from disk,
502                // so we avoid missing any of the data.
503                let StateRecord { ipts, stored } = record;
504                let tloading = time_store::Loading::start(runtime, stored);
505                ipts.into_iter()
506                    .map(|ipt| {
507                        let IptRecord { lid, until } = ipt;
508                        let until = tloading.load_future(until);
509                        (lid, until)
510                    })
511                    .collect()
512            })
513            .unwrap_or_default();
514        Ok(PublishIptSet {
515            ipts: None,
516            last_descriptor_expiry_including_slop,
517            storage,
518        })
519    }
520}
521
522#[cfg(test)]
523mod test {
524    // @@ begin test lint list maintained by maint/add_warning @@
525    #![allow(clippy::bool_assert_comparison)]
526    #![allow(clippy::clone_on_copy)]
527    #![allow(clippy::dbg_macro)]
528    #![allow(clippy::mixed_attributes_style)]
529    #![allow(clippy::print_stderr)]
530    #![allow(clippy::print_stdout)]
531    #![allow(clippy::single_char_pattern)]
532    #![allow(clippy::unwrap_used)]
533    #![allow(clippy::unchecked_duration_subtraction)]
534    #![allow(clippy::useless_vec)]
535    #![allow(clippy::needless_pass_by_value)]
536    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
537    use super::*;
538    use crate::test::create_storage_handles;
539    use crate::FatalError;
540    use futures::{pin_mut, poll};
541    use std::task::Poll::{self, *};
542    use test_temp_dir::test_temp_dir;
543    use tor_rtcompat::ToplevelBlockOn as _;
544
545    fn test_intro_point() -> Ipt {
546        use tor_netdoc::doc::hsdesc::test_data;
547        test_data::test_parsed_hsdesc().unwrap().intro_points()[0].clone()
548    }
549
550    async fn pv_poll_await_update(
551        pv: &mut IptsPublisherView,
552    ) -> Poll<Option<Result<(), FatalError>>> {
553        let fut = pv.await_update();
554        pin_mut!(fut);
555        poll!(fut)
556    }
557
558    async fn pv_expect_one_await_update(pv: &mut IptsPublisherView) {
559        assert!(matches!(
560            pv_poll_await_update(pv).await,
561            Ready(Some(Ok(())))
562        ));
563        assert!(pv_poll_await_update(pv).await.is_pending());
564    }
565
566    fn pv_note_publication_attempt(
567        runtime: &impl SleepProvider,
568        pv: &IptsPublisherView,
569        worst_case_end: Instant,
570    ) {
571        pv.borrow_for_publish()
572            .note_publication_attempt(runtime, worst_case_end)
573            .unwrap();
574    }
575
576    fn mv_get_0_expiry(mv: &mut IptsManagerView) -> Instant {
577        let g = mv.borrow_for_read();
578        let lid = g.ipts.as_ref().unwrap().ipts[0].lid;
579        *g.last_descriptor_expiry_including_slop.get(&lid).unwrap()
580    }
581
582    #[test]
583    fn test() {
584        // We don't bother with MockRuntime::test_with_various
585        // since this test case doesn't spawn tasks
586        let runtime = tor_rtmock::MockRuntime::new();
587
588        let temp_dir_owned = test_temp_dir!();
589        let temp_dir = temp_dir_owned.as_path_untracked();
590
591        runtime.clone().block_on(async move {
592            // make a channel; it should have no updates yet
593
594            let (_state_mgr, iptpub_state_handle) = create_storage_handles(temp_dir);
595            let (mut mv, mut pv) = ipts_channel(&runtime, iptpub_state_handle).unwrap();
596            assert!(pv_poll_await_update(&mut pv).await.is_pending());
597
598            // borrowing publisher view for publish doesn't cause an update
599
600            let pg = pv.borrow_for_publish();
601            assert!(pg.ipts.is_none());
602            drop(pg);
603
604            let uv = pv.upload_view();
605            let pg = uv.borrow_for_publish();
606            assert!(pg.ipts.is_none());
607            drop(pg);
608
609            // borrowing manager view for update *does* cause one update
610
611            let mut mg = mv.borrow_for_update(runtime.clone());
612            mg.ipts = Some(IptSet {
613                ipts: vec![],
614                lifetime: Duration::ZERO,
615            });
616            drop(mg);
617
618            pv_expect_one_await_update(&mut pv).await;
619
620            // borrowing manager view for update twice cause one update
621
622            const LIFETIME: Duration = Duration::from_secs(1800);
623            const PUBLISH_END_TIMEOUT: Duration = Duration::from_secs(300);
624
625            mv.borrow_for_update(runtime.clone())
626                .ipts
627                .as_mut()
628                .unwrap()
629                .lifetime = LIFETIME;
630            mv.borrow_for_update(runtime.clone())
631                .ipts
632                .as_mut()
633                .unwrap()
634                .ipts
635                .push(IptInSet {
636                    ipt: test_intro_point(),
637                    lid: [42; 32].into(),
638                });
639
640            pv_expect_one_await_update(&mut pv).await;
641
642            // test setting lifetime
643
644            pv_note_publication_attempt(&runtime, &pv, runtime.now() + PUBLISH_END_TIMEOUT);
645
646            let expected_expiry =
647                runtime.now() + PUBLISH_END_TIMEOUT + LIFETIME + IPT_PUBLISH_EXPIRY_SLOP;
648            assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
649
650            // setting an *earlier* lifetime is ignored
651
652            pv_note_publication_attempt(&runtime, &pv, runtime.now() - Duration::from_secs(10));
653            assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
654        });
655
656        drop(temp_dir_owned); // prove it's still live
657    }
658}