1
//! IPT set - the principal API between the IPT manager and publisher
2

            
3
use crate::internal_prelude::*;
4

            
5
/// Handle for a suitable persistent storage manager
6
pub(crate) type IptSetStorageHandle = tor_persist::state_dir::StorageHandle<StateRecord>;
7

            
8
/// Information shared between the IPT manager and the IPT publisher
9
///
10
/// The principal information is `ipts`, which is calculated by the IPT Manager.
11
/// See
12
/// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish)
13
/// for more detailed information about how this is calculated.
14
#[derive(Educe)]
15
#[educe(Debug)]
16
pub(crate) struct PublishIptSet {
17
    /// Set of introduction points to be advertised in a descriptor (if we are to publish)
18
    ///
19
    /// If `Some`, the publisher will try to maintain a published descriptor,
20
    /// of lifetime `lifetime`, listing `ipts`.
21
    ///
22
    /// If `None`, the publisher will not try to publish.
23
    /// (Already-published descriptors will not be deleted.)
24
    ///
25
    /// These instructions ultimately come from
26
    /// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish).
27
    pub(crate) ipts: Option<IptSet>,
28

            
29
    /// Record of publication attempts
30
    ///
31
    /// Time until which the manager ought we to try to maintain each ipt,
32
    /// even after we stop publishing it.
33
    ///
34
    /// This is a ceiling on:
35
    ///
36
    ///   * The last time we *finished* publishing the descriptor
37
    ///     (we can estimate this by taking the time we *started* to publish
38
    ///     plus our timeout on the publication attempt).
39
    ///
40
    ///   * Plus the `lifetime` that was used for publication.
41
    ///
42
    ///   * Plus the length of time between a client obtaining the descriptor
43
    ///     and its introduction request reaching us through the intro point
44
    ///     ([`IPT_PUBLISH_EXPIRY_SLOP`])
45
    ///
46
    /// This field is updated by the publisher, using
47
    /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt),
48
    /// and read by the manager.
49
    ///
50
    /// A separate copy of the information is stored by the manager,
51
    /// in `ipt_mgr::Ipt::last_descriptor_expiry_including_slop`.
52
    ///
53
    /// There may be entries in this table that don't
54
    /// correspond to introduction points in `ipts`.
55
    /// The publisher mustn't create such entries
56
    /// (since that would imply publishing IPTs contrary to the manager's instructions)
57
    /// but it can occur, for example, on restart.
58
    ///
59
    /// It is the manager's job to remove expired entries.
60
    //
61
    // This is a separate field, rather than being part of IptSet, so that during startup,
62
    // we can load information about previously-published IPTs, even though we don't want,
63
    // at that stage, to publish anything.
64
    //
65
    // The publication information is stored in a separate on-disk file, so that the
66
    // IPT publisher can record publication attempts without having to interact with the
67
    // IPT manager's main data structure.
68
    //
69
    // (The publisher needs to update the on-disk state synchronously, before publication,
70
    // since otherwise there could be a bug scenario where we succeed in publishing,
71
    // but don't succeed in recording that we published, and then, on restart,
72
    // don't know that we need to (re)establish this IPT.)
73
    pub(crate) last_descriptor_expiry_including_slop: HashMap<IptLocalId, Instant>,
74

            
75
    /// The on-disk state storage handle.
76
    #[educe(Debug(ignore))]
77
    storage: IptSetStorageHandle,
78
}
79

            
80
/// A set of introduction points for publication
81
///
82
/// This is shared between the manager and the publisher.
83
/// Each leaf field says who sets it.
84
///
85
/// This is not `Clone` and its contents should not be cloned.
86
/// When its contents are copied out into a descriptor by the publisher,
87
/// this should be accompanied by a call to
88
/// [`note_publication_attempt`](PublishIptSet::note_publication_attempt).
89
#[derive(Debug)]
90
pub(crate) struct IptSet {
91
    /// The actual introduction points
92
    pub(crate) ipts: Vec<IptInSet>,
93

            
94
    /// When to make the descriptor expire
95
    ///
96
    /// Set by the manager and read by the publisher.
97
    pub(crate) lifetime: Duration,
98
}
99

            
100
/// Introduction point as specified to publisher by manager
101
///
102
/// Convenience type alias.
103
#[derive(Debug)]
104
pub(crate) struct IptInSet {
105
    /// Details of the introduction point
106
    ///
107
    /// Set by the manager and read by the publisher.
108
    pub(crate) ipt: Ipt,
109

            
110
    /// Local identifier for this introduction point
111
    ///
112
    /// Set and used by the manager, to correlate this data structure with the manager's.
113
    /// May also be read by the publisher.
114
    pub(crate) lid: IptLocalId,
115
}
116

            
117
/// Actual introduction point details as specified to publisher by manager
118
///
119
/// Convenience type alias.
120
pub(crate) type Ipt = tor_netdoc::doc::hsdesc::IntroPointDesc;
121

            
122
/// Descriptor expiry time slop
123
///
124
/// How long after our descriptor expired should we continue to maintain an old IPT?
125
/// This is an allowance for:
126
///
127
///   - Various RTTs and delays in clients setting up circuits
128
///     (we can't really measure this ourselves properly,
129
///     since what matters is the client's latency)
130
///
131
///   - Clock skew
132
///
133
// TODO: This is something we might want to tune based on experience.
134
//
135
// TODO: We'd like to use "+" here, but it isn't const yet.
136
const IPT_PUBLISH_EXPIRY_SLOP: Duration =
137
    Duration::from_secs(10 * 60).saturating_add(crate::publish::OVERALL_UPLOAD_TIMEOUT);
138

            
139
/// Shared view of introduction points - IPT manager's view
140
///
141
/// This is the manager's end of a bidirectional "channel",
142
/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
143
#[derive(Debug)]
144
pub(crate) struct IptsManagerView {
145
    /// Actual shared data
146
    shared: Shared,
147

            
148
    /// Notification sender
149
    ///
150
    /// We don't wrap the state in a postage::watch,
151
    /// because the publisher needs to be able to mutably borrow the data
152
    /// without re-notifying itself when it drops the guard.
153
    notify: mpsc::Sender<()>,
154
}
155

            
156
/// Shared view of introduction points - IPT publisher's view
157
///
158
/// This is the publishers's end of a bidirectional "channel",
159
/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
160
pub(crate) struct IptsPublisherView {
161
    /// Actual shared data
162
    shared: Shared,
163

            
164
    /// Notification receiver
165
    notify: mpsc::Receiver<()>,
166
}
167

            
168
/// Shared view of introduction points - IPT publisher's publication-only view
169
///
170
/// This is a restricted version of [`IptsPublisherView`]
171
/// which can only be used to:
172
///
173
///   - check that a publication attempt should still continue; and
174
///   - note publication attempts.
175
///
176
/// via the [`.borrow_for_publish()`](IptsPublisherUploadView::borrow_for_publish) method.
177
///
178
/// This is useful because multiple `IptsPublisherUploadView`
179
/// can exist (so, for example, it is `Clone`);
180
/// unlike `IptsPublisherView`, of which there is one per IPTs channel.
181
/// So the publisher's individual upload tasks can each have one.
182
///
183
/// Obtained from [`IptsPublisherView::upload_view`].
184
#[derive(Debug, Clone)]
185
pub(crate) struct IptsPublisherUploadView {
186
    /// Actual shared data
187
    shared: Shared,
188
}
189

            
190
/// Core shared state
191
type Shared = Arc<Mutex<PublishIptSet>>;
192

            
193
/// Mutex guard that will notify when dropped
194
///
195
/// Returned by [`IptsManagerView::borrow_for_update`]
196
#[derive(Deref, DerefMut)]
197
struct NotifyingBorrow<'v, R: SleepProvider> {
198
    /// Lock guard
199
    #[deref(forward)]
200
    #[deref_mut(forward)]
201
    guard: MutexGuard<'v, PublishIptSet>,
202

            
203
    /// To be notified on drop
204
    notify: &'v mut mpsc::Sender<()>,
205

            
206
    /// For saving!
207
    runtime: R,
208
}
209

            
210
/// Create a new shared state channel for the publication instructions
211
18
pub(crate) fn ipts_channel(
212
18
    runtime: &impl SleepProvider,
213
18
    storage: IptSetStorageHandle,
214
18
) -> Result<(IptsManagerView, IptsPublisherView), StartupError> {
215
18
    let initial_state = PublishIptSet::load(storage, runtime)?;
216
18
    let shared = Arc::new(Mutex::new(initial_state));
217
18
    // Zero buffer is right.  Docs for `mpsc::channel` say:
218
18
    //   each sender gets a guaranteed slot in the channel capacity,
219
18
    //   and on top of that there are buffer “first come, first serve” slots
220
18
    // We only have one sender and only ever want one outstanding,
221
18
    // since we can (and would like to) coalesce notifications.
222
18
    //
223
18
    // Internally-generated instructions, no need for mq.
224
18
    let (tx, rx) = mpsc_channel_no_memquota(0);
225
18
    let r = (
226
18
        IptsManagerView {
227
18
            shared: shared.clone(),
228
18
            notify: tx,
229
18
        },
230
18
        IptsPublisherView { shared, notify: rx },
231
18
    );
232
18
    Ok(r)
233
18
}
234

            
235
/// Lock the shared state and obtain a lock guard
236
///
237
/// Does not do any notification.
238
286
fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> {
239
286
    // Propagating panics is fine since if either the manager or the publisher crashes,
240
286
    // the other one cannot survive.
241
286
    shared.lock().expect("IPT set shared state poisoned")
242
286
}
243

            
244
impl IptsManagerView {
245
    /// Arrange to be able to update the list of introduction points
246
    ///
247
    /// The manager may add new ipts, or delete old ones.
248
    ///
249
    /// The returned value is a lock guard.
250
    /// (It is not `Send` so cannot be held across await points.)
251
    /// The publisher will be notified when it is dropped.
252
106
    pub(crate) fn borrow_for_update(
253
106
        &mut self,
254
106
        runtime: impl SleepProvider,
255
106
    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
256
106
        let guard = lock_shared(&self.shared);
257
106
        NotifyingBorrow {
258
106
            guard,
259
106
            notify: &mut self.notify,
260
106
            runtime,
261
106
        }
262
106
    }
263

            
264
    /// Peek at the list of introduction points we are providing to the publisher
265
    ///
266
    /// (Used for testing and during startup.)
267
12
    pub(crate) fn borrow_for_read(&mut self) -> impl std::ops::Deref<Target = PublishIptSet> + '_ {
268
12
        lock_shared(&self.shared)
269
12
    }
270
}
271

            
272
impl<R: SleepProvider> Drop for NotifyingBorrow<'_, R> {
273
106
    fn drop(&mut self) {
274
106
        // Channel full?  Well, then the receiver is indeed going to wake up, so fine
275
106
        // Channel disconnected?  The publisher has crashed or terminated,
276
106
        // but we are not in a position to fail and shut down the establisher.
277
106
        // If our HS is shutting down, the manager will be shut down by other means.
278
106
        let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(());
279
106

            
280
106
        let save_outcome = self.guard.save(&self.runtime);
281
106
        log_ratelim!(
282
106
            // This message is a true description for the following reasons:
283
106
            //
284
106
            // "until" times can only be extended by the *publisher*.
285
106
            // The manager won't ever shorten them either, but if they are in the past,
286
106
            // it might delete them if it has decided to retire the IPT.
287
106
            // Leaving them undeleted is not ideal from a privacy pov,
288
106
            // but it doesn't prevent us continuing to operate correctly.
289
106
            //
290
106
            // It is therefore OK to just log the error here.
291
106
            //
292
106
            // In practice, we're likely to try to save as a result of the publisher's
293
106
            // operation, too.  That's going to be more of a problem, but it's handled
294
106
            // by other code paths.
295
106
            //
296
106
            // We *don't* include the HS nickname in the activity
297
106
            // because this is probably not HS instance specific.
298
106
            "possibly deleting expiry times for old HSS IPTs";
299
106
            save_outcome;
300
106
        );
301
106

            
302
106
        // Now the fields will be dropped, including `guard`.
303
106
        // I.e. the mutex gets unlocked.  This means we notify the publisher
304
106
        // (which might make it wake up on another thread) just *before*
305
106
        // we release the lock, rather than just after.
306
106
        // This is slightly suboptimal but doesn't matter here.
307
106
        // To do better, we'd need to make the guard into an Option.
308
106
    }
309
}
310

            
311
impl IptsPublisherView {
312
    /// Wait until the IPT set has changed (or may have)
313
    ///
314
    /// After this returns, to find out what the new IPT set is,
315
    /// the publisher calls `borrow_for_publish`.
316
    ///
317
    /// Will complete immediately if the IPT set has
318
    /// changed since the last call to `await_update`.
319
    ///
320
    /// Returns:
321
    ///  * `Some(Ok(())` if the IPT set was (or may have been) updated
322
    ///  * `None` if the manager is shutting down and the publisher should shut down too
323
    ///  * `Some(Err(..))` if a fatal error occurred
324
    //
325
    // TODO: make this return Result<ShutdownStatus, FatalError> instead
326
    // (this is what we do in other places, e.g. in ipt_mgr, publisher).
327
    //
328
    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1812#note_2976758
329
153
    pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> {
330
102
        // Cancellation safety:
331
102
        //
332
102
        // We're using mpsc::Receiver's implementation of Stream, via StreamExt.
333
102
        // Stream::next() must be cancellation safe or it would be lossy everywhere.
334
102
        // So it is OK to create the future from next, here, and possibly discard it
335
102
        // before it becomes Ready.
336
102
        let () = self.notify.next().await?;
337
12
        Some(Ok(()))
338
12
    }
339

            
340
    /// Look at the list of introduction points to publish
341
    ///
342
    /// Whenever a publication attempt is started
343
    /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt)
344
    /// must be called on this same [`IptSet`].
345
    ///
346
    /// The returned value is a lock guard.
347
    /// (It is not `Send` so cannot be held across await points.)
348
38
    pub(crate) fn borrow_for_publish(
349
38
        &self,
350
38
    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
351
38
        lock_shared(&self.shared)
352
38
    }
353

            
354
    /// Obtain an [`IptsPublisherUploadView`], for use just prior to a publication attempt
355
18
    pub(crate) fn upload_view(&self) -> IptsPublisherUploadView {
356
18
        let shared = self.shared.clone();
357
18
        IptsPublisherUploadView { shared }
358
18
    }
359
}
360

            
361
impl IptsPublisherUploadView {
362
    /// Look at the list of introduction points to publish
363
    ///
364
    /// See [`IptsPublisherView::borrow_for_publish`].
365
130
    pub(crate) fn borrow_for_publish(
366
130
        &self,
367
130
    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
368
130
        lock_shared(&self.shared)
369
130
    }
370
}
371

            
372
impl PublishIptSet {
373
    /// Update all the `last_descriptor_expiry_including_slop` for a publication attempt
374
    ///
375
    /// Called by the publisher when it starts a publication attempt
376
    /// which will advertise this set of introduction points.
377
    ///
378
    /// When calling this, the publisher promises that the publication attempt
379
    /// will either complete, or be abandoned, before `worst_case_end`.
380
132
    pub(crate) fn note_publication_attempt(
381
132
        &mut self,
382
132
        runtime: &impl SleepProvider,
383
132
        worst_case_end: Instant,
384
132
    ) -> Result<(), IptStoreError> {
385
132
        let ipts = self
386
132
            .ipts
387
132
            .as_ref()
388
132
            .ok_or_else(|| internal!("publishing None!"))?;
389

            
390
132
        let new_value = (|| {
391
132
            worst_case_end
392
132
                .checked_add(ipts.lifetime)?
393
132
                .checked_add(IPT_PUBLISH_EXPIRY_SLOP)
394
132
        })()
395
132
        .ok_or_else(
396
132
            // Clock overflow on the monotonic clock.  Everything is terrible.
397
132
            // We will have no idea when we can stop publishing the descriptor!
398
132
            // I guess we'll return an error and cause the publisher to bail out?
399
132
            // An ErrorKind of ClockSkew is wrong, since this is a purely local problem,
400
132
            // and should be impossible if we properly checked our parameters.
401
132
            || internal!("monotonic clock overflow"),
402
132
        )?;
403

            
404
520
        for ipt in &ipts.ipts {
405
388
            use std::collections::hash_map::Entry;
406
388
            let entry = self.last_descriptor_expiry_including_slop.entry(ipt.lid);
407

            
408
            // Open-coding a hypothetical Entry::value()
409
388
            let old_value = match &entry {
410
362
                Entry::Occupied(oe) => Some(*oe.get()),
411
26
                Entry::Vacant(_) => None,
412
            };
413

            
414
388
            let to_store = chain!(
415
388
                //
416
388
                old_value,
417
388
                [new_value],
418
388
            )
419
388
            .max()
420
388
            .expect("max of known-non-empty iterator was None");
421
388

            
422
388
            // Open-coding Entry::insert(); unstable insert_netry() would do
423
388
            match entry {
424
362
                Entry::Occupied(mut oe) => {
425
362
                    oe.insert(to_store);
426
362
                }
427
26
                Entry::Vacant(ve) => {
428
26
                    ve.insert(to_store);
429
26
                }
430
            };
431
        }
432

            
433
132
        self.save(runtime)?;
434

            
435
132
        Ok(())
436
132
    }
437
}
438

            
439
//---------- On disk data structures, done with serde ----------
440

            
441
/// Record of intro point publications
442
#[derive(Serialize, Deserialize, Debug)]
443
pub(crate) struct StateRecord {
444
    /// Ipts
445
    ipts: Vec<IptRecord>,
446
    /// Reference time
447
    stored: time_store::Reference,
448
}
449

            
450
/// Record of publication of one intro point
451
#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq)]
452
struct IptRecord {
453
    /// Which ipt?
454
    lid: IptLocalId,
455
    /// Maintain until, `last_descriptor_expiry_including_slop`
456
    // We use a shorter variable name so the on disk files aren't silly
457
    until: time_store::FutureTimestamp,
458
}
459

            
460
impl PublishIptSet {
461
    /// Save the publication times to the persistent state
462
238
    fn save(&mut self, runtime: &impl SleepProvider) -> Result<(), IptStoreError> {
463
238
        // Throughout, we use exhaustive struct patterns on the in-memory data,
464
238
        // so we avoid missing any of the data.
465
238
        let PublishIptSet {
466
238
            ipts,
467
238
            last_descriptor_expiry_including_slop,
468
238
            storage,
469
238
        } = self;
470
238

            
471
238
        let tstoring = time_store::Storing::start(runtime);
472
238

            
473
238
        // we don't save the instructions to the publisher; on reload that becomes None
474
238
        let _: &Option<IptSet> = ipts;
475
238

            
476
238
        let mut ipts = last_descriptor_expiry_including_slop
477
238
            .iter()
478
388
            .map(|(&lid, &until)| {
479
388
                let until = tstoring.store_future(until);
480
388
                IptRecord { lid, until }
481
388
            })
482
238
            .collect_vec();
483
238
        ipts.sort(); // normalise
484
238

            
485
238
        let on_disk = StateRecord {
486
238
            ipts,
487
238
            stored: tstoring.store_ref(),
488
238
        };
489
238

            
490
238
        Ok(storage.store(&on_disk)?)
491
238
    }
492

            
493
    /// Load the publication times from the persistent state
494
18
    fn load(
495
18
        storage: IptSetStorageHandle,
496
18
        runtime: &impl SleepProvider,
497
18
    ) -> Result<PublishIptSet, StartupError> {
498
18
        let on_disk = storage.load().map_err(StartupError::LoadState)?;
499
18
        let last_descriptor_expiry_including_slop = on_disk
500
18
            .map(|record| {
501
4
                // Throughout, we use exhaustive struct patterns on the data we got from disk,
502
4
                // so we avoid missing any of the data.
503
4
                let StateRecord { ipts, stored } = record;
504
4
                let tloading = time_store::Loading::start(runtime, stored);
505
4
                ipts.into_iter()
506
4
                    .map(|ipt| {
507
                        let IptRecord { lid, until } = ipt;
508
                        let until = tloading.load_future(until);
509
                        (lid, until)
510
4
                    })
511
4
                    .collect()
512
18
            })
513
18
            .unwrap_or_default();
514
18
        Ok(PublishIptSet {
515
18
            ipts: None,
516
18
            last_descriptor_expiry_including_slop,
517
18
            storage,
518
18
        })
519
18
    }
520
}
521

            
522
#[cfg(test)]
523
mod test {
524
    // @@ begin test lint list maintained by maint/add_warning @@
525
    #![allow(clippy::bool_assert_comparison)]
526
    #![allow(clippy::clone_on_copy)]
527
    #![allow(clippy::dbg_macro)]
528
    #![allow(clippy::mixed_attributes_style)]
529
    #![allow(clippy::print_stderr)]
530
    #![allow(clippy::print_stdout)]
531
    #![allow(clippy::single_char_pattern)]
532
    #![allow(clippy::unwrap_used)]
533
    #![allow(clippy::unchecked_duration_subtraction)]
534
    #![allow(clippy::useless_vec)]
535
    #![allow(clippy::needless_pass_by_value)]
536
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
537
    use super::*;
538
    use crate::test::create_storage_handles;
539
    use crate::FatalError;
540
    use futures::{pin_mut, poll};
541
    use std::task::Poll::{self, *};
542
    use test_temp_dir::test_temp_dir;
543
    use tor_rtcompat::ToplevelBlockOn as _;
544

            
545
    fn test_intro_point() -> Ipt {
546
        use tor_netdoc::doc::hsdesc::test_data;
547
        test_data::test_parsed_hsdesc().unwrap().intro_points()[0].clone()
548
    }
549

            
550
    async fn pv_poll_await_update(
551
        pv: &mut IptsPublisherView,
552
    ) -> Poll<Option<Result<(), FatalError>>> {
553
        let fut = pv.await_update();
554
        pin_mut!(fut);
555
        poll!(fut)
556
    }
557

            
558
    async fn pv_expect_one_await_update(pv: &mut IptsPublisherView) {
559
        assert!(matches!(
560
            pv_poll_await_update(pv).await,
561
            Ready(Some(Ok(())))
562
        ));
563
        assert!(pv_poll_await_update(pv).await.is_pending());
564
    }
565

            
566
    fn pv_note_publication_attempt(
567
        runtime: &impl SleepProvider,
568
        pv: &IptsPublisherView,
569
        worst_case_end: Instant,
570
    ) {
571
        pv.borrow_for_publish()
572
            .note_publication_attempt(runtime, worst_case_end)
573
            .unwrap();
574
    }
575

            
576
    fn mv_get_0_expiry(mv: &mut IptsManagerView) -> Instant {
577
        let g = mv.borrow_for_read();
578
        let lid = g.ipts.as_ref().unwrap().ipts[0].lid;
579
        *g.last_descriptor_expiry_including_slop.get(&lid).unwrap()
580
    }
581

            
582
    #[test]
583
    fn test() {
584
        // We don't bother with MockRuntime::test_with_various
585
        // since this test case doesn't spawn tasks
586
        let runtime = tor_rtmock::MockRuntime::new();
587

            
588
        let temp_dir_owned = test_temp_dir!();
589
        let temp_dir = temp_dir_owned.as_path_untracked();
590

            
591
        runtime.clone().block_on(async move {
592
            // make a channel; it should have no updates yet
593

            
594
            let (_state_mgr, iptpub_state_handle) = create_storage_handles(temp_dir);
595
            let (mut mv, mut pv) = ipts_channel(&runtime, iptpub_state_handle).unwrap();
596
            assert!(pv_poll_await_update(&mut pv).await.is_pending());
597

            
598
            // borrowing publisher view for publish doesn't cause an update
599

            
600
            let pg = pv.borrow_for_publish();
601
            assert!(pg.ipts.is_none());
602
            drop(pg);
603

            
604
            let uv = pv.upload_view();
605
            let pg = uv.borrow_for_publish();
606
            assert!(pg.ipts.is_none());
607
            drop(pg);
608

            
609
            // borrowing manager view for update *does* cause one update
610

            
611
            let mut mg = mv.borrow_for_update(runtime.clone());
612
            mg.ipts = Some(IptSet {
613
                ipts: vec![],
614
                lifetime: Duration::ZERO,
615
            });
616
            drop(mg);
617

            
618
            pv_expect_one_await_update(&mut pv).await;
619

            
620
            // borrowing manager view for update twice cause one update
621

            
622
            const LIFETIME: Duration = Duration::from_secs(1800);
623
            const PUBLISH_END_TIMEOUT: Duration = Duration::from_secs(300);
624

            
625
            mv.borrow_for_update(runtime.clone())
626
                .ipts
627
                .as_mut()
628
                .unwrap()
629
                .lifetime = LIFETIME;
630
            mv.borrow_for_update(runtime.clone())
631
                .ipts
632
                .as_mut()
633
                .unwrap()
634
                .ipts
635
                .push(IptInSet {
636
                    ipt: test_intro_point(),
637
                    lid: [42; 32].into(),
638
                });
639

            
640
            pv_expect_one_await_update(&mut pv).await;
641

            
642
            // test setting lifetime
643

            
644
            pv_note_publication_attempt(&runtime, &pv, runtime.now() + PUBLISH_END_TIMEOUT);
645

            
646
            let expected_expiry =
647
                runtime.now() + PUBLISH_END_TIMEOUT + LIFETIME + IPT_PUBLISH_EXPIRY_SLOP;
648
            assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
649

            
650
            // setting an *earlier* lifetime is ignored
651

            
652
            pv_note_publication_attempt(&runtime, &pv, runtime.now() - Duration::from_secs(10));
653
            assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
654
        });
655

            
656
        drop(temp_dir_owned); // prove it's still live
657
    }
658
}