tor_hsservice/ipt_set.rs
1//! IPT set - the principal API between the IPT manager and publisher
2
3use crate::internal_prelude::*;
4
5/// Handle for a suitable persistent storage manager
6pub(crate) type IptSetStorageHandle = tor_persist::state_dir::StorageHandle<StateRecord>;
7
8/// Information shared between the IPT manager and the IPT publisher
9///
10/// The principal information is `ipts`, which is calculated by the IPT Manager.
11/// See
12/// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish)
13/// for more detailed information about how this is calculated.
14#[derive(Educe)]
15#[educe(Debug)]
16pub(crate) struct PublishIptSet {
17 /// Set of introduction points to be advertised in a descriptor (if we are to publish)
18 ///
19 /// If `Some`, the publisher will try to maintain a published descriptor,
20 /// of lifetime `lifetime`, listing `ipts`.
21 ///
22 /// If `None`, the publisher will not try to publish.
23 /// (Already-published descriptors will not be deleted.)
24 ///
25 /// These instructions ultimately come from
26 /// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish).
27 pub(crate) ipts: Option<IptSet>,
28
29 /// Record of publication attempts
30 ///
31 /// Time until which the manager ought we to try to maintain each ipt,
32 /// even after we stop publishing it.
33 ///
34 /// This is a ceiling on:
35 ///
36 /// * The last time we *finished* publishing the descriptor
37 /// (we can estimate this by taking the time we *started* to publish
38 /// plus our timeout on the publication attempt).
39 ///
40 /// * Plus the `lifetime` that was used for publication.
41 ///
42 /// * Plus the length of time between a client obtaining the descriptor
43 /// and its introduction request reaching us through the intro point
44 /// ([`IPT_PUBLISH_EXPIRY_SLOP`])
45 ///
46 /// This field is updated by the publisher, using
47 /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt),
48 /// and read by the manager.
49 ///
50 /// A separate copy of the information is stored by the manager,
51 /// in `ipt_mgr::Ipt::last_descriptor_expiry_including_slop`.
52 ///
53 /// There may be entries in this table that don't
54 /// correspond to introduction points in `ipts`.
55 /// The publisher mustn't create such entries
56 /// (since that would imply publishing IPTs contrary to the manager's instructions)
57 /// but it can occur, for example, on restart.
58 ///
59 /// It is the manager's job to remove expired entries.
60 //
61 // This is a separate field, rather than being part of IptSet, so that during startup,
62 // we can load information about previously-published IPTs, even though we don't want,
63 // at that stage, to publish anything.
64 //
65 // The publication information is stored in a separate on-disk file, so that the
66 // IPT publisher can record publication attempts without having to interact with the
67 // IPT manager's main data structure.
68 //
69 // (The publisher needs to update the on-disk state synchronously, before publication,
70 // since otherwise there could be a bug scenario where we succeed in publishing,
71 // but don't succeed in recording that we published, and then, on restart,
72 // don't know that we need to (re)establish this IPT.)
73 pub(crate) last_descriptor_expiry_including_slop: HashMap<IptLocalId, Instant>,
74
75 /// The on-disk state storage handle.
76 #[educe(Debug(ignore))]
77 storage: IptSetStorageHandle,
78}
79
80/// A set of introduction points for publication
81///
82/// This is shared between the manager and the publisher.
83/// Each leaf field says who sets it.
84///
85/// This is not `Clone` and its contents should not be cloned.
86/// When its contents are copied out into a descriptor by the publisher,
87/// this should be accompanied by a call to
88/// [`note_publication_attempt`](PublishIptSet::note_publication_attempt).
89#[derive(Debug)]
90pub(crate) struct IptSet {
91 /// The actual introduction points
92 pub(crate) ipts: Vec<IptInSet>,
93
94 /// When to make the descriptor expire
95 ///
96 /// Set by the manager and read by the publisher.
97 pub(crate) lifetime: Duration,
98}
99
100/// Introduction point as specified to publisher by manager
101///
102/// Convenience type alias.
103#[derive(Debug)]
104pub(crate) struct IptInSet {
105 /// Details of the introduction point
106 ///
107 /// Set by the manager and read by the publisher.
108 pub(crate) ipt: Ipt,
109
110 /// Local identifier for this introduction point
111 ///
112 /// Set and used by the manager, to correlate this data structure with the manager's.
113 /// May also be read by the publisher.
114 pub(crate) lid: IptLocalId,
115}
116
117/// Actual introduction point details as specified to publisher by manager
118///
119/// Convenience type alias.
120pub(crate) type Ipt = tor_netdoc::doc::hsdesc::IntroPointDesc;
121
122/// Descriptor expiry time slop
123///
124/// How long after our descriptor expired should we continue to maintain an old IPT?
125/// This is an allowance for:
126///
127/// - Various RTTs and delays in clients setting up circuits
128/// (we can't really measure this ourselves properly,
129/// since what matters is the client's latency)
130///
131/// - Clock skew
132///
133// TODO: This is something we might want to tune based on experience.
134//
135// TODO: We'd like to use "+" here, but it isn't const yet.
136const IPT_PUBLISH_EXPIRY_SLOP: Duration =
137 Duration::from_secs(10 * 60).saturating_add(crate::publish::OVERALL_UPLOAD_TIMEOUT);
138
139/// Shared view of introduction points - IPT manager's view
140///
141/// This is the manager's end of a bidirectional "channel",
142/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
143#[derive(Debug)]
144pub(crate) struct IptsManagerView {
145 /// Actual shared data
146 shared: Shared,
147
148 /// Notification sender
149 ///
150 /// We don't wrap the state in a postage::watch,
151 /// because the publisher needs to be able to mutably borrow the data
152 /// without re-notifying itself when it drops the guard.
153 notify: mpsc::Sender<()>,
154}
155
156/// Shared view of introduction points - IPT publisher's view
157///
158/// This is the publishers's end of a bidirectional "channel",
159/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
160pub(crate) struct IptsPublisherView {
161 /// Actual shared data
162 shared: Shared,
163
164 /// Notification receiver
165 notify: mpsc::Receiver<()>,
166}
167
168/// Shared view of introduction points - IPT publisher's publication-only view
169///
170/// This is a restricted version of [`IptsPublisherView`]
171/// which can only be used to:
172///
173/// - check that a publication attempt should still continue; and
174/// - note publication attempts.
175///
176/// via the [`.borrow_for_publish()`](IptsPublisherUploadView::borrow_for_publish) method.
177///
178/// This is useful because multiple `IptsPublisherUploadView`
179/// can exist (so, for example, it is `Clone`);
180/// unlike `IptsPublisherView`, of which there is one per IPTs channel.
181/// So the publisher's individual upload tasks can each have one.
182///
183/// Obtained from [`IptsPublisherView::upload_view`].
184#[derive(Debug, Clone)]
185pub(crate) struct IptsPublisherUploadView {
186 /// Actual shared data
187 shared: Shared,
188}
189
190/// Core shared state
191type Shared = Arc<Mutex<PublishIptSet>>;
192
193/// Mutex guard that will notify when dropped
194///
195/// Returned by [`IptsManagerView::borrow_for_update`]
196#[derive(Deref, DerefMut)]
197struct NotifyingBorrow<'v, R: SleepProvider> {
198 /// Lock guard
199 #[deref(forward)]
200 #[deref_mut(forward)]
201 guard: MutexGuard<'v, PublishIptSet>,
202
203 /// To be notified on drop
204 notify: &'v mut mpsc::Sender<()>,
205
206 /// For saving!
207 runtime: R,
208}
209
210/// Create a new shared state channel for the publication instructions
211pub(crate) fn ipts_channel(
212 runtime: &impl SleepProvider,
213 storage: IptSetStorageHandle,
214) -> Result<(IptsManagerView, IptsPublisherView), StartupError> {
215 let initial_state = PublishIptSet::load(storage, runtime)?;
216 let shared = Arc::new(Mutex::new(initial_state));
217 // Zero buffer is right. Docs for `mpsc::channel` say:
218 // each sender gets a guaranteed slot in the channel capacity,
219 // and on top of that there are buffer “first come, first serve” slots
220 // We only have one sender and only ever want one outstanding,
221 // since we can (and would like to) coalesce notifications.
222 //
223 // Internally-generated instructions, no need for mq.
224 let (tx, rx) = mpsc_channel_no_memquota(0);
225 let r = (
226 IptsManagerView {
227 shared: shared.clone(),
228 notify: tx,
229 },
230 IptsPublisherView { shared, notify: rx },
231 );
232 Ok(r)
233}
234
235/// Lock the shared state and obtain a lock guard
236///
237/// Does not do any notification.
238fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> {
239 // Propagating panics is fine since if either the manager or the publisher crashes,
240 // the other one cannot survive.
241 shared.lock().expect("IPT set shared state poisoned")
242}
243
244impl IptsManagerView {
245 /// Arrange to be able to update the list of introduction points
246 ///
247 /// The manager may add new ipts, or delete old ones.
248 ///
249 /// The returned value is a lock guard.
250 /// (It is not `Send` so cannot be held across await points.)
251 /// The publisher will be notified when it is dropped.
252 pub(crate) fn borrow_for_update(
253 &mut self,
254 runtime: impl SleepProvider,
255 ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
256 let guard = lock_shared(&self.shared);
257 NotifyingBorrow {
258 guard,
259 notify: &mut self.notify,
260 runtime,
261 }
262 }
263
264 /// Peek at the list of introduction points we are providing to the publisher
265 ///
266 /// (Used for testing and during startup.)
267 pub(crate) fn borrow_for_read(&mut self) -> impl std::ops::Deref<Target = PublishIptSet> + '_ {
268 lock_shared(&self.shared)
269 }
270}
271
272impl<R: SleepProvider> Drop for NotifyingBorrow<'_, R> {
273 fn drop(&mut self) {
274 // Channel full? Well, then the receiver is indeed going to wake up, so fine
275 // Channel disconnected? The publisher has crashed or terminated,
276 // but we are not in a position to fail and shut down the establisher.
277 // If our HS is shutting down, the manager will be shut down by other means.
278 let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(());
279
280 let save_outcome = self.guard.save(&self.runtime);
281 log_ratelim!(
282 // This message is a true description for the following reasons:
283 //
284 // "until" times can only be extended by the *publisher*.
285 // The manager won't ever shorten them either, but if they are in the past,
286 // it might delete them if it has decided to retire the IPT.
287 // Leaving them undeleted is not ideal from a privacy pov,
288 // but it doesn't prevent us continuing to operate correctly.
289 //
290 // It is therefore OK to just log the error here.
291 //
292 // In practice, we're likely to try to save as a result of the publisher's
293 // operation, too. That's going to be more of a problem, but it's handled
294 // by other code paths.
295 //
296 // We *don't* include the HS nickname in the activity
297 // because this is probably not HS instance specific.
298 "possibly deleting expiry times for old HSS IPTs";
299 save_outcome;
300 );
301
302 // Now the fields will be dropped, including `guard`.
303 // I.e. the mutex gets unlocked. This means we notify the publisher
304 // (which might make it wake up on another thread) just *before*
305 // we release the lock, rather than just after.
306 // This is slightly suboptimal but doesn't matter here.
307 // To do better, we'd need to make the guard into an Option.
308 }
309}
310
311impl IptsPublisherView {
312 /// Wait until the IPT set has changed (or may have)
313 ///
314 /// After this returns, to find out what the new IPT set is,
315 /// the publisher calls `borrow_for_publish`.
316 ///
317 /// Will complete immediately if the IPT set has
318 /// changed since the last call to `await_update`.
319 ///
320 /// Returns:
321 /// * `Some(Ok(())` if the IPT set was (or may have been) updated
322 /// * `None` if the manager is shutting down and the publisher should shut down too
323 /// * `Some(Err(..))` if a fatal error occurred
324 //
325 // TODO: make this return Result<ShutdownStatus, FatalError> instead
326 // (this is what we do in other places, e.g. in ipt_mgr, publisher).
327 //
328 // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1812#note_2976758
329 pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> {
330 // Cancellation safety:
331 //
332 // We're using mpsc::Receiver's implementation of Stream, via StreamExt.
333 // Stream::next() must be cancellation safe or it would be lossy everywhere.
334 // So it is OK to create the future from next, here, and possibly discard it
335 // before it becomes Ready.
336 let () = self.notify.next().await?;
337 Some(Ok(()))
338 }
339
340 /// Look at the list of introduction points to publish
341 ///
342 /// Whenever a publication attempt is started
343 /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt)
344 /// must be called on this same [`IptSet`].
345 ///
346 /// The returned value is a lock guard.
347 /// (It is not `Send` so cannot be held across await points.)
348 pub(crate) fn borrow_for_publish(
349 &self,
350 ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
351 lock_shared(&self.shared)
352 }
353
354 /// Obtain an [`IptsPublisherUploadView`], for use just prior to a publication attempt
355 pub(crate) fn upload_view(&self) -> IptsPublisherUploadView {
356 let shared = self.shared.clone();
357 IptsPublisherUploadView { shared }
358 }
359}
360
361impl IptsPublisherUploadView {
362 /// Look at the list of introduction points to publish
363 ///
364 /// See [`IptsPublisherView::borrow_for_publish`].
365 pub(crate) fn borrow_for_publish(
366 &self,
367 ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
368 lock_shared(&self.shared)
369 }
370}
371
372impl PublishIptSet {
373 /// Update all the `last_descriptor_expiry_including_slop` for a publication attempt
374 ///
375 /// Called by the publisher when it starts a publication attempt
376 /// which will advertise this set of introduction points.
377 ///
378 /// When calling this, the publisher promises that the publication attempt
379 /// will either complete, or be abandoned, before `worst_case_end`.
380 pub(crate) fn note_publication_attempt(
381 &mut self,
382 runtime: &impl SleepProvider,
383 worst_case_end: Instant,
384 ) -> Result<(), IptStoreError> {
385 let ipts = self
386 .ipts
387 .as_ref()
388 .ok_or_else(|| internal!("publishing None!"))?;
389
390 let new_value = (|| {
391 worst_case_end
392 .checked_add(ipts.lifetime)?
393 .checked_add(IPT_PUBLISH_EXPIRY_SLOP)
394 })()
395 .ok_or_else(
396 // Clock overflow on the monotonic clock. Everything is terrible.
397 // We will have no idea when we can stop publishing the descriptor!
398 // I guess we'll return an error and cause the publisher to bail out?
399 // An ErrorKind of ClockSkew is wrong, since this is a purely local problem,
400 // and should be impossible if we properly checked our parameters.
401 || internal!("monotonic clock overflow"),
402 )?;
403
404 for ipt in &ipts.ipts {
405 use std::collections::hash_map::Entry;
406 let entry = self.last_descriptor_expiry_including_slop.entry(ipt.lid);
407
408 // Open-coding a hypothetical Entry::value()
409 let old_value = match &entry {
410 Entry::Occupied(oe) => Some(*oe.get()),
411 Entry::Vacant(_) => None,
412 };
413
414 let to_store = chain!(
415 //
416 old_value,
417 [new_value],
418 )
419 .max()
420 .expect("max of known-non-empty iterator was None");
421
422 // Open-coding Entry::insert(); unstable insert_netry() would do
423 match entry {
424 Entry::Occupied(mut oe) => {
425 oe.insert(to_store);
426 }
427 Entry::Vacant(ve) => {
428 ve.insert(to_store);
429 }
430 };
431 }
432
433 self.save(runtime)?;
434
435 Ok(())
436 }
437}
438
439//---------- On disk data structures, done with serde ----------
440
441/// Record of intro point publications
442#[derive(Serialize, Deserialize, Debug)]
443pub(crate) struct StateRecord {
444 /// Ipts
445 ipts: Vec<IptRecord>,
446 /// Reference time
447 stored: time_store::Reference,
448}
449
450/// Record of publication of one intro point
451#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq)]
452struct IptRecord {
453 /// Which ipt?
454 lid: IptLocalId,
455 /// Maintain until, `last_descriptor_expiry_including_slop`
456 // We use a shorter variable name so the on disk files aren't silly
457 until: time_store::FutureTimestamp,
458}
459
460impl PublishIptSet {
461 /// Save the publication times to the persistent state
462 fn save(&mut self, runtime: &impl SleepProvider) -> Result<(), IptStoreError> {
463 // Throughout, we use exhaustive struct patterns on the in-memory data,
464 // so we avoid missing any of the data.
465 let PublishIptSet {
466 ipts,
467 last_descriptor_expiry_including_slop,
468 storage,
469 } = self;
470
471 let tstoring = time_store::Storing::start(runtime);
472
473 // we don't save the instructions to the publisher; on reload that becomes None
474 let _: &Option<IptSet> = ipts;
475
476 let mut ipts = last_descriptor_expiry_including_slop
477 .iter()
478 .map(|(&lid, &until)| {
479 let until = tstoring.store_future(until);
480 IptRecord { lid, until }
481 })
482 .collect_vec();
483 ipts.sort(); // normalise
484
485 let on_disk = StateRecord {
486 ipts,
487 stored: tstoring.store_ref(),
488 };
489
490 Ok(storage.store(&on_disk)?)
491 }
492
493 /// Load the publication times from the persistent state
494 fn load(
495 storage: IptSetStorageHandle,
496 runtime: &impl SleepProvider,
497 ) -> Result<PublishIptSet, StartupError> {
498 let on_disk = storage.load().map_err(StartupError::LoadState)?;
499 let last_descriptor_expiry_including_slop = on_disk
500 .map(|record| {
501 // Throughout, we use exhaustive struct patterns on the data we got from disk,
502 // so we avoid missing any of the data.
503 let StateRecord { ipts, stored } = record;
504 let tloading = time_store::Loading::start(runtime, stored);
505 ipts.into_iter()
506 .map(|ipt| {
507 let IptRecord { lid, until } = ipt;
508 let until = tloading.load_future(until);
509 (lid, until)
510 })
511 .collect()
512 })
513 .unwrap_or_default();
514 Ok(PublishIptSet {
515 ipts: None,
516 last_descriptor_expiry_including_slop,
517 storage,
518 })
519 }
520}
521
522#[cfg(test)]
523mod test {
524 // @@ begin test lint list maintained by maint/add_warning @@
525 #![allow(clippy::bool_assert_comparison)]
526 #![allow(clippy::clone_on_copy)]
527 #![allow(clippy::dbg_macro)]
528 #![allow(clippy::mixed_attributes_style)]
529 #![allow(clippy::print_stderr)]
530 #![allow(clippy::print_stdout)]
531 #![allow(clippy::single_char_pattern)]
532 #![allow(clippy::unwrap_used)]
533 #![allow(clippy::unchecked_duration_subtraction)]
534 #![allow(clippy::useless_vec)]
535 #![allow(clippy::needless_pass_by_value)]
536 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
537 use super::*;
538 use crate::test::create_storage_handles;
539 use crate::FatalError;
540 use futures::{pin_mut, poll};
541 use std::task::Poll::{self, *};
542 use test_temp_dir::test_temp_dir;
543 use tor_rtcompat::ToplevelBlockOn as _;
544
545 fn test_intro_point() -> Ipt {
546 use tor_netdoc::doc::hsdesc::test_data;
547 test_data::test_parsed_hsdesc().unwrap().intro_points()[0].clone()
548 }
549
550 async fn pv_poll_await_update(
551 pv: &mut IptsPublisherView,
552 ) -> Poll<Option<Result<(), FatalError>>> {
553 let fut = pv.await_update();
554 pin_mut!(fut);
555 poll!(fut)
556 }
557
558 async fn pv_expect_one_await_update(pv: &mut IptsPublisherView) {
559 assert!(matches!(
560 pv_poll_await_update(pv).await,
561 Ready(Some(Ok(())))
562 ));
563 assert!(pv_poll_await_update(pv).await.is_pending());
564 }
565
566 fn pv_note_publication_attempt(
567 runtime: &impl SleepProvider,
568 pv: &IptsPublisherView,
569 worst_case_end: Instant,
570 ) {
571 pv.borrow_for_publish()
572 .note_publication_attempt(runtime, worst_case_end)
573 .unwrap();
574 }
575
576 fn mv_get_0_expiry(mv: &mut IptsManagerView) -> Instant {
577 let g = mv.borrow_for_read();
578 let lid = g.ipts.as_ref().unwrap().ipts[0].lid;
579 *g.last_descriptor_expiry_including_slop.get(&lid).unwrap()
580 }
581
582 #[test]
583 fn test() {
584 // We don't bother with MockRuntime::test_with_various
585 // since this test case doesn't spawn tasks
586 let runtime = tor_rtmock::MockRuntime::new();
587
588 let temp_dir_owned = test_temp_dir!();
589 let temp_dir = temp_dir_owned.as_path_untracked();
590
591 runtime.clone().block_on(async move {
592 // make a channel; it should have no updates yet
593
594 let (_state_mgr, iptpub_state_handle) = create_storage_handles(temp_dir);
595 let (mut mv, mut pv) = ipts_channel(&runtime, iptpub_state_handle).unwrap();
596 assert!(pv_poll_await_update(&mut pv).await.is_pending());
597
598 // borrowing publisher view for publish doesn't cause an update
599
600 let pg = pv.borrow_for_publish();
601 assert!(pg.ipts.is_none());
602 drop(pg);
603
604 let uv = pv.upload_view();
605 let pg = uv.borrow_for_publish();
606 assert!(pg.ipts.is_none());
607 drop(pg);
608
609 // borrowing manager view for update *does* cause one update
610
611 let mut mg = mv.borrow_for_update(runtime.clone());
612 mg.ipts = Some(IptSet {
613 ipts: vec![],
614 lifetime: Duration::ZERO,
615 });
616 drop(mg);
617
618 pv_expect_one_await_update(&mut pv).await;
619
620 // borrowing manager view for update twice cause one update
621
622 const LIFETIME: Duration = Duration::from_secs(1800);
623 const PUBLISH_END_TIMEOUT: Duration = Duration::from_secs(300);
624
625 mv.borrow_for_update(runtime.clone())
626 .ipts
627 .as_mut()
628 .unwrap()
629 .lifetime = LIFETIME;
630 mv.borrow_for_update(runtime.clone())
631 .ipts
632 .as_mut()
633 .unwrap()
634 .ipts
635 .push(IptInSet {
636 ipt: test_intro_point(),
637 lid: [42; 32].into(),
638 });
639
640 pv_expect_one_await_update(&mut pv).await;
641
642 // test setting lifetime
643
644 pv_note_publication_attempt(&runtime, &pv, runtime.now() + PUBLISH_END_TIMEOUT);
645
646 let expected_expiry =
647 runtime.now() + PUBLISH_END_TIMEOUT + LIFETIME + IPT_PUBLISH_EXPIRY_SLOP;
648 assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
649
650 // setting an *earlier* lifetime is ignored
651
652 pv_note_publication_attempt(&runtime, &pv, runtime.now() - Duration::from_secs(10));
653 assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
654 });
655
656 drop(temp_dir_owned); // prove it's still live
657 }
658}