1
//! IPT set - the principal API between the IPT manager and publisher
2

            
3
use crate::internal_prelude::*;
4

            
5
/// Handle for a suitable persistent storage manager
6
pub(crate) type IptSetStorageHandle = tor_persist::state_dir::StorageHandle<StateRecord>;
7

            
8
/// Information shared between the IPT manager and the IPT publisher
9
///
10
/// The principal information is `ipts`, which is calculated by the IPT Manager.
11
/// See
12
/// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish)
13
/// for more detailed information about how this is calculated.
14
#[derive(Educe)]
15
#[educe(Debug)]
16
pub(crate) struct PublishIptSet {
17
    /// Set of introduction points to be advertised in a descriptor (if we are to publish)
18
    ///
19
    /// If `Some`, the publisher will try to maintain a published descriptor,
20
    /// of lifetime `lifetime`, listing `ipts`.
21
    ///
22
    /// If `None`, the publisher will not try to publish.
23
    /// (Already-published descriptors will not be deleted.)
24
    ///
25
    /// These instructions ultimately come from
26
    /// [`IptManager::compute_iptsetstatus_publish`](crate::ipt_mgr::IptManager::compute_iptsetstatus_publish).
27
    pub(crate) ipts: Option<IptSet>,
28

            
29
    /// Record of publication attempts
30
    ///
31
    /// Time until which the manager ought we to try to maintain each ipt,
32
    /// even after we stop publishing it.
33
    ///
34
    /// This is a ceiling on:
35
    ///
36
    ///   * The last time we *finished* publishing the descriptor
37
    ///     (we can estimate this by taking the time we *started* to publish
38
    ///     plus our timeout on the publication attempt).
39
    ///
40
    ///   * Plus the `lifetime` that was used for publication.
41
    ///
42
    ///   * Plus the length of time between a client obtaining the descriptor
43
    ///     and its introduction request reaching us through the intro point
44
    ///     ([`IPT_PUBLISH_EXPIRY_SLOP`])
45
    ///
46
    /// This field is updated by the publisher, using
47
    /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt),
48
    /// and read by the manager.
49
    ///
50
    /// A separate copy of the information is stored by the manager,
51
    /// in `ipt_mgr::Ipt::last_descriptor_expiry_including_slop`.
52
    ///
53
    /// There may be entries in this table that don't
54
    /// correspond to introduction points in `ipts`.
55
    /// The publisher mustn't create such entries
56
    /// (since that would imply publishing IPTs contrary to the manager's instructions)
57
    /// but it can occur, for example, on restart.
58
    ///
59
    /// It is the manager's job to remove expired entries.
60
    //
61
    // This is a separate field, rather than being part of IptSet, so that during startup,
62
    // we can load information about previously-published IPTs, even though we don't want,
63
    // at that stage, to publish anything.
64
    //
65
    // The publication information is stored in a separate on-disk file, so that the
66
    // IPT publisher can record publication attempts without having to interact with the
67
    // IPT manager's main data structure.
68
    //
69
    // (The publisher needs to update the on-disk state synchronously, before publication,
70
    // since otherwise there could be a bug scenario where we succeed in publishing,
71
    // but don't succeed in recording that we published, and then, on restart,
72
    // don't know that we need to (re)establish this IPT.)
73
    pub(crate) last_descriptor_expiry_including_slop: HashMap<IptLocalId, Instant>,
74

            
75
    /// The on-disk state storage handle.
76
    #[educe(Debug(ignore))]
77
    storage: IptSetStorageHandle,
78
}
79

            
80
/// A set of introduction points for publication
81
///
82
/// This is shared between the manager and the publisher.
83
/// Each leaf field says who sets it.
84
///
85
/// This is not `Clone` and its contents should not be cloned.
86
/// When its contents are copied out into a descriptor by the publisher,
87
/// this should be accompanied by a call to
88
/// [`note_publication_attempt`](PublishIptSet::note_publication_attempt).
89
#[derive(Debug)]
90
pub(crate) struct IptSet {
91
    /// The actual introduction points
92
    pub(crate) ipts: Vec<IptInSet>,
93

            
94
    /// When to make the descriptor expire
95
    ///
96
    /// Set by the manager and read by the publisher.
97
    pub(crate) lifetime: Duration,
98
}
99

            
100
/// Introduction point as specified to publisher by manager
101
///
102
/// Convenience type alias.
103
#[derive(Debug)]
104
pub(crate) struct IptInSet {
105
    /// Details of the introduction point
106
    ///
107
    /// Set by the manager and read by the publisher.
108
    pub(crate) ipt: Ipt,
109

            
110
    /// Local identifier for this introduction point
111
    ///
112
    /// Set and used by the manager, to correlate this data structure with the manager's.
113
    /// May also be read by the publisher.
114
    pub(crate) lid: IptLocalId,
115
}
116

            
117
/// Actual introduction point details as specified to publisher by manager
118
///
119
/// Convenience type alias.
120
pub(crate) type Ipt = tor_netdoc::doc::hsdesc::IntroPointDesc;
121

            
122
/// Descriptor expiry time slop
123
///
124
/// How long after our descriptor expired should we continue to maintain an old IPT?
125
/// This is an allowance for:
126
///
127
///   - Various RTTs and delays in clients setting up circuits
128
///     (we can't really measure this ourselves properly,
129
///     since what matters is the client's latency)
130
///
131
///   - Clock skew
132
///
133
// TODO: This is something we might want to tune based on experience.
134
//
135
// TODO: We'd like to use "+" here, but it isn't const yet.
136
const IPT_PUBLISH_EXPIRY_SLOP: Duration =
137
    Duration::from_secs(10 * 60).saturating_add(crate::publish::OVERALL_UPLOAD_TIMEOUT);
138

            
139
/// Shared view of introduction points - IPT manager's view
140
///
141
/// This is the manager's end of a bidirectional "channel",
142
/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
143
#[derive(Debug)]
144
pub(crate) struct IptsManagerView {
145
    /// Actual shared data
146
    shared: Shared,
147

            
148
    /// Notification sender
149
    ///
150
    /// We don't wrap the state in a postage::watch,
151
    /// because the publisher needs to be able to mutably borrow the data
152
    /// without re-notifying itself when it drops the guard.
153
    notify: mpsc::Sender<()>,
154
}
155

            
156
/// Shared view of introduction points - IPT publisher's view
157
///
158
/// This is the publishers's end of a bidirectional "channel",
159
/// containing a shared `PublishIptSet`, i.e. an `Option<IptSet>`.
160
pub(crate) struct IptsPublisherView {
161
    /// Actual shared data
162
    shared: Shared,
163

            
164
    /// Notification receiver
165
    notify: mpsc::Receiver<()>,
166
}
167

            
168
/// Shared view of introduction points - IPT publisher's publication-only view
169
///
170
/// This is a restricted version of [`IptsPublisherView`]
171
/// which can only be used to:
172
///
173
///   - check that a publication attempt should still continue; and
174
///   - note publication attempts.
175
///
176
/// via the [`.borrow_for_publish()`](IptsPublisherUploadView::borrow_for_publish) method.
177
///
178
/// This is useful because multiple `IptsPublisherUploadView`
179
/// can exist (so, for example, it is `Clone`);
180
/// unlike `IptsPublisherView`, of which there is one per IPTs channel.
181
/// So the publisher's individual upload tasks can each have one.
182
///
183
/// Obtained from [`IptsPublisherView::upload_view`].
184
#[derive(Debug, Clone)]
185
pub(crate) struct IptsPublisherUploadView {
186
    /// Actual shared data
187
    shared: Shared,
188
}
189

            
190
/// Core shared state
191
type Shared = Arc<Mutex<PublishIptSet>>;
192

            
193
/// Mutex guard that will notify when dropped
194
///
195
/// Returned by [`IptsManagerView::borrow_for_update`]
196
#[derive(Deref, DerefMut)]
197
struct NotifyingBorrow<'v, R: SleepProvider> {
198
    /// Lock guard
199
    #[deref(forward)]
200
    #[deref_mut(forward)]
201
    guard: MutexGuard<'v, PublishIptSet>,
202

            
203
    /// To be notified on drop
204
    notify: &'v mut mpsc::Sender<()>,
205

            
206
    /// For saving!
207
    runtime: R,
208
}
209

            
210
/// Create a new shared state channel for the publication instructions
211
10
pub(crate) fn ipts_channel(
212
10
    runtime: &impl SleepProvider,
213
10
    storage: IptSetStorageHandle,
214
10
) -> Result<(IptsManagerView, IptsPublisherView), StartupError> {
215
10
    let initial_state = PublishIptSet::load(storage, runtime)?;
216
10
    let shared = Arc::new(Mutex::new(initial_state));
217
10
    // Zero buffer is right.  Docs for `mpsc::channel` say:
218
10
    //   each sender gets a guaranteed slot in the channel capacity,
219
10
    //   and on top of that there are buffer “first come, first serve” slots
220
10
    // We only have one sender and only ever want one outstanding,
221
10
    // since we can (and would like to) coalesce notifications.
222
10
    //
223
10
    // Internally-generated instructions, no need for mq.
224
10
    let (tx, rx) = mpsc_channel_no_memquota(0);
225
10
    let r = (
226
10
        IptsManagerView {
227
10
            shared: shared.clone(),
228
10
            notify: tx,
229
10
        },
230
10
        IptsPublisherView { shared, notify: rx },
231
10
    );
232
10
    Ok(r)
233
10
}
234

            
235
/// Lock the shared state and obtain a lock guard
236
///
237
/// Does not do any notification.
238
142
fn lock_shared(shared: &Shared) -> MutexGuard<PublishIptSet> {
239
142
    // Propagating panics is fine since if either the manager or the publisher crashes,
240
142
    // the other one cannot survive.
241
142
    shared.lock().expect("IPT set shared state poisoned")
242
142
}
243

            
244
impl IptsManagerView {
245
    /// Arrange to be able to update the list of introduction points
246
    ///
247
    /// The manager may add new ipts, or delete old ones.
248
    ///
249
    /// The returned value is a lock guard.
250
    /// (It is not `Send` so cannot be held across await points.)
251
    /// The publisher will be notified when it is dropped.
252
98
    pub(crate) fn borrow_for_update(
253
98
        &mut self,
254
98
        runtime: impl SleepProvider,
255
98
    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
256
98
        let guard = lock_shared(&self.shared);
257
98
        NotifyingBorrow {
258
98
            guard,
259
98
            notify: &mut self.notify,
260
98
            runtime,
261
98
        }
262
98
    }
263

            
264
    /// Peek at the list of introduction points we are providing to the publisher
265
    ///
266
    /// (Used for testing and during startup.)
267
12
    pub(crate) fn borrow_for_read(&mut self) -> impl std::ops::Deref<Target = PublishIptSet> + '_ {
268
12
        lock_shared(&self.shared)
269
12
    }
270
}
271

            
272
impl<R: SleepProvider> Drop for NotifyingBorrow<'_, R> {
273
98
    fn drop(&mut self) {
274
98
        // Channel full?  Well, then the receiver is indeed going to wake up, so fine
275
98
        // Channel disconnected?  The publisher has crashed or terminated,
276
98
        // but we are not in a position to fail and shut down the establisher.
277
98
        // If our HS is shutting down, the manager will be shut down by other means.
278
98
        let _: Result<(), mpsc::TrySendError<_>> = self.notify.try_send(());
279
98

            
280
98
        let save_outcome = self.guard.save(&self.runtime);
281
98
        log_ratelim!(
282
98
            // This message is a true description for the following reasons:
283
98
            //
284
98
            // "until" times can only be extended by the *publisher*.
285
98
            // The manager won't ever shorten them either, but if they are in the past,
286
98
            // it might delete them if it has decided to retire the IPT.
287
98
            // Leaving them undeleted is not ideal from a privacy pov,
288
98
            // but it doesn't prevent us continuing to operate correctly.
289
98
            //
290
98
            // It is therefore OK to just log the error here.
291
98
            //
292
98
            // In practice, we're likely to try to save as a result of the publisher's
293
98
            // operation, too.  That's going to be more of a problem, but it's handled
294
98
            // by other code paths.
295
98
            //
296
98
            // We *don't* include the HS nickname in the activity
297
98
            // because this is probably not HS instance specific.
298
98
            "possibly deleting expiry times for old HSS IPTs";
299
98
            save_outcome;
300
98
        );
301
98

            
302
98
        // Now the fields will be dropped, including `guard`.
303
98
        // I.e. the mutex gets unlocked.  This means we notify the publisher
304
98
        // (which might make it wake up on another thread) just *before*
305
98
        // we release the lock, rather than just after.
306
98
        // This is slightly suboptimal but doesn't matter here.
307
98
        // To do better, we'd need to make the guard into an Option.
308
98
    }
309
}
310

            
311
impl IptsPublisherView {
312
    /// Wait until the IPT set has changed (or may have)
313
    ///
314
    /// After this returns, to find out what the new IPT set is,
315
    /// the publisher calls `borrow_for_publish`.
316
    ///
317
    /// Will complete immediately if the IPT set has
318
    /// changed since the last call to `await_update`.
319
    ///
320
    /// Returns:
321
    ///  * `Some(Ok(())` if the IPT set was (or may have been) updated
322
    ///  * `None` if the manager is shutting down and the publisher should shut down too
323
    ///  * `Some(Err(..))` if a fatal error occurred
324
    //
325
    // TODO: make this return Result<ShutdownStatus, FatalError> instead
326
    // (this is what we do in other places, e.g. in ipt_mgr, publisher).
327
    //
328
    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1812#note_2976758
329
15
    pub(crate) async fn await_update(&mut self) -> Option<Result<(), crate::FatalError>> {
330
10
        // Cancellation safety:
331
10
        //
332
10
        // We're using mpsc::Receiver's implementation of Stream, via StreamExt.
333
10
        // Stream::next() must be cancellation safe or it would be lossy everywhere.
334
10
        // So it is OK to create the future from next, here, and possibly discard it
335
10
        // before it becomes Ready.
336
10
        let () = self.notify.next().await?;
337
4
        Some(Ok(()))
338
4
    }
339

            
340
    /// Look at the list of introduction points to publish
341
    ///
342
    /// Whenever a publication attempt is started
343
    /// [`note_publication_attempt`](PublishIptSet::note_publication_attempt)
344
    /// must be called on this same [`IptSet`].
345
    ///
346
    /// The returned value is a lock guard.
347
    /// (It is not `Send` so cannot be held across await points.)
348
30
    pub(crate) fn borrow_for_publish(
349
30
        &self,
350
30
    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
351
30
        lock_shared(&self.shared)
352
30
    }
353

            
354
    /// Obtain an [`IptsPublisherUploadView`], for use just prior to a publication attempt
355
2
    pub(crate) fn upload_view(&self) -> IptsPublisherUploadView {
356
2
        let shared = self.shared.clone();
357
2
        IptsPublisherUploadView { shared }
358
2
    }
359
}
360

            
361
impl IptsPublisherUploadView {
362
    /// Look at the list of introduction points to publish
363
    ///
364
    /// See [`IptsPublisherView::borrow_for_publish`].
365
2
    pub(crate) fn borrow_for_publish(
366
2
        &self,
367
2
    ) -> impl std::ops::DerefMut<Target = PublishIptSet> + '_ {
368
2
        lock_shared(&self.shared)
369
2
    }
370
}
371

            
372
impl PublishIptSet {
373
    /// Update all the `last_descriptor_expiry_including_slop` for a publication attempt
374
    ///
375
    /// Called by the publisher when it starts a publication attempt
376
    /// which will advertise this set of introduction points.
377
    ///
378
    /// When calling this, the publisher promises that the publication attempt
379
    /// will either complete, or be abandoned, before `worst_case_end`.
380
4
    pub(crate) fn note_publication_attempt(
381
4
        &mut self,
382
4
        runtime: &impl SleepProvider,
383
4
        worst_case_end: Instant,
384
4
    ) -> Result<(), IptStoreError> {
385
4
        let ipts = self
386
4
            .ipts
387
4
            .as_ref()
388
4
            .ok_or_else(|| internal!("publishing None!"))?;
389

            
390
4
        let new_value = (|| {
391
4
            worst_case_end
392
4
                .checked_add(ipts.lifetime)?
393
4
                .checked_add(IPT_PUBLISH_EXPIRY_SLOP)
394
4
        })()
395
4
        .ok_or_else(
396
4
            // Clock overflow on the monotonic clock.  Everything is terrible.
397
4
            // We will have no idea when we can stop publishing the descriptor!
398
4
            // I guess we'll return an error and cause the publisher to bail out?
399
4
            // An ErrorKind of ClockSkew is wrong, since this is a purely local problem,
400
4
            // and should be impossible if we properly checked our parameters.
401
4
            || internal!("monotonic clock overflow"),
402
4
        )?;
403

            
404
8
        for ipt in &ipts.ipts {
405
4
            use std::collections::hash_map::Entry;
406
4
            let entry = self.last_descriptor_expiry_including_slop.entry(ipt.lid);
407

            
408
            // Open-coding a hypothetical Entry::value()
409
4
            let old_value = match &entry {
410
2
                Entry::Occupied(oe) => Some(*oe.get()),
411
2
                Entry::Vacant(_) => None,
412
            };
413

            
414
4
            let to_store = chain!(
415
4
                //
416
4
                old_value,
417
4
                [new_value],
418
4
            )
419
4
            .max()
420
4
            .expect("max of known-non-empty iterator was None");
421
4

            
422
4
            // Open-coding Entry::insert(); unstable insert_netry() would do
423
4
            match entry {
424
2
                Entry::Occupied(mut oe) => {
425
2
                    oe.insert(to_store);
426
2
                }
427
2
                Entry::Vacant(ve) => {
428
2
                    ve.insert(to_store);
429
2
                }
430
            };
431
        }
432

            
433
4
        self.save(runtime)?;
434

            
435
4
        Ok(())
436
4
    }
437
}
438

            
439
//---------- On disk data structures, done with serde ----------
440

            
441
/// Record of intro point publications
442
#[derive(Serialize, Deserialize, Debug)]
443
pub(crate) struct StateRecord {
444
    /// Ipts
445
    ipts: Vec<IptRecord>,
446
    /// Reference time
447
    stored: time_store::Reference,
448
}
449

            
450
/// Record of publication of one intro point
451
#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq)]
452
struct IptRecord {
453
    /// Which ipt?
454
    lid: IptLocalId,
455
    /// Maintain until, `last_descriptor_expiry_including_slop`
456
    // We use a shorter variable name so the on disk files aren't silly
457
    until: time_store::FutureTimestamp,
458
}
459

            
460
impl PublishIptSet {
461
    /// Save the publication times to the persistent state
462
102
    fn save(&mut self, runtime: &impl SleepProvider) -> Result<(), IptStoreError> {
463
102
        // Throughout, we use exhaustive struct patterns on the in-memory data,
464
102
        // so we avoid missing any of the data.
465
102
        let PublishIptSet {
466
102
            ipts,
467
102
            last_descriptor_expiry_including_slop,
468
102
            storage,
469
102
        } = self;
470
102

            
471
102
        let tstoring = time_store::Storing::start(runtime);
472
102

            
473
102
        // we don't save the instructions to the publisher; on reload that becomes None
474
102
        let _: &Option<IptSet> = ipts;
475
102

            
476
102
        let mut ipts = last_descriptor_expiry_including_slop
477
102
            .iter()
478
102
            .map(|(&lid, &until)| {
479
4
                let until = tstoring.store_future(until);
480
4
                IptRecord { lid, until }
481
102
            })
482
102
            .collect_vec();
483
102
        ipts.sort(); // normalise
484
102

            
485
102
        let on_disk = StateRecord {
486
102
            ipts,
487
102
            stored: tstoring.store_ref(),
488
102
        };
489
102

            
490
102
        Ok(storage.store(&on_disk)?)
491
102
    }
492

            
493
    /// Load the publication times from the persistent state
494
10
    fn load(
495
10
        storage: IptSetStorageHandle,
496
10
        runtime: &impl SleepProvider,
497
10
    ) -> Result<PublishIptSet, StartupError> {
498
10
        let on_disk = storage.load().map_err(StartupError::LoadState)?;
499
10
        let last_descriptor_expiry_including_slop = on_disk
500
10
            .map(|record| {
501
4
                // Throughout, we use exhaustive struct patterns on the data we got from disk,
502
4
                // so we avoid missing any of the data.
503
4
                let StateRecord { ipts, stored } = record;
504
4
                let tloading = time_store::Loading::start(runtime, stored);
505
4
                ipts.into_iter()
506
4
                    .map(|ipt| {
507
                        let IptRecord { lid, until } = ipt;
508
                        let until = tloading.load_future(until);
509
                        (lid, until)
510
4
                    })
511
4
                    .collect()
512
10
            })
513
10
            .unwrap_or_default();
514
10
        Ok(PublishIptSet {
515
10
            ipts: None,
516
10
            last_descriptor_expiry_including_slop,
517
10
            storage,
518
10
        })
519
10
    }
520
}
521

            
522
#[cfg(test)]
523
mod test {
524
    // @@ begin test lint list maintained by maint/add_warning @@
525
    #![allow(clippy::bool_assert_comparison)]
526
    #![allow(clippy::clone_on_copy)]
527
    #![allow(clippy::dbg_macro)]
528
    #![allow(clippy::mixed_attributes_style)]
529
    #![allow(clippy::print_stderr)]
530
    #![allow(clippy::print_stdout)]
531
    #![allow(clippy::single_char_pattern)]
532
    #![allow(clippy::unwrap_used)]
533
    #![allow(clippy::unchecked_duration_subtraction)]
534
    #![allow(clippy::useless_vec)]
535
    #![allow(clippy::needless_pass_by_value)]
536
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
537
    use super::*;
538
    use crate::test::create_storage_handles;
539
    use crate::FatalError;
540
    use futures::{pin_mut, poll};
541
    use std::task::Poll::{self, *};
542
    use test_temp_dir::test_temp_dir;
543
    use tor_rtcompat::ToplevelBlockOn as _;
544

            
545
    fn test_intro_point() -> Ipt {
546
        use tor_netdoc::doc::hsdesc::test_data;
547
        test_data::test_parsed_hsdesc().unwrap().intro_points()[0].clone()
548
    }
549

            
550
    async fn pv_poll_await_update(
551
        pv: &mut IptsPublisherView,
552
    ) -> Poll<Option<Result<(), FatalError>>> {
553
        let fut = pv.await_update();
554
        pin_mut!(fut);
555
        poll!(fut)
556
    }
557

            
558
    async fn pv_expect_one_await_update(pv: &mut IptsPublisherView) {
559
        assert!(matches!(
560
            pv_poll_await_update(pv).await,
561
            Ready(Some(Ok(())))
562
        ));
563
        assert!(pv_poll_await_update(pv).await.is_pending());
564
    }
565

            
566
    fn pv_note_publication_attempt(
567
        runtime: &impl SleepProvider,
568
        pv: &IptsPublisherView,
569
        worst_case_end: Instant,
570
    ) {
571
        pv.borrow_for_publish()
572
            .note_publication_attempt(runtime, worst_case_end)
573
            .unwrap();
574
    }
575

            
576
    fn mv_get_0_expiry(mv: &mut IptsManagerView) -> Instant {
577
        let g = mv.borrow_for_read();
578
        let lid = g.ipts.as_ref().unwrap().ipts[0].lid;
579
        *g.last_descriptor_expiry_including_slop.get(&lid).unwrap()
580
    }
581

            
582
    #[test]
583
    fn test() {
584
        // We don't bother with MockRuntime::test_with_various
585
        // since this test case doesn't spawn tasks
586
        let runtime = tor_rtmock::MockRuntime::new();
587

            
588
        let temp_dir_owned = test_temp_dir!();
589
        let temp_dir = temp_dir_owned.as_path_untracked();
590

            
591
        runtime.clone().block_on(async move {
592
            // make a channel; it should have no updates yet
593

            
594
            let (_state_mgr, iptpub_state_handle) = create_storage_handles(temp_dir);
595
            let (mut mv, mut pv) = ipts_channel(&runtime, iptpub_state_handle).unwrap();
596
            assert!(pv_poll_await_update(&mut pv).await.is_pending());
597

            
598
            // borrowing publisher view for publish doesn't cause an update
599

            
600
            let pg = pv.borrow_for_publish();
601
            assert!(pg.ipts.is_none());
602
            drop(pg);
603

            
604
            let uv = pv.upload_view();
605
            let pg = uv.borrow_for_publish();
606
            assert!(pg.ipts.is_none());
607
            drop(pg);
608

            
609
            // borrowing manager view for update *does* cause one update
610

            
611
            let mut mg = mv.borrow_for_update(runtime.clone());
612
            mg.ipts = Some(IptSet {
613
                ipts: vec![],
614
                lifetime: Duration::ZERO,
615
            });
616
            drop(mg);
617

            
618
            pv_expect_one_await_update(&mut pv).await;
619

            
620
            // borrowing manager view for update twice cause one update
621

            
622
            const LIFETIME: Duration = Duration::from_secs(1800);
623
            const PUBLISH_END_TIMEOUT: Duration = Duration::from_secs(300);
624

            
625
            mv.borrow_for_update(runtime.clone())
626
                .ipts
627
                .as_mut()
628
                .unwrap()
629
                .lifetime = LIFETIME;
630
            mv.borrow_for_update(runtime.clone())
631
                .ipts
632
                .as_mut()
633
                .unwrap()
634
                .ipts
635
                .push(IptInSet {
636
                    ipt: test_intro_point(),
637
                    lid: [42; 32].into(),
638
                });
639

            
640
            pv_expect_one_await_update(&mut pv).await;
641

            
642
            // test setting lifetime
643

            
644
            pv_note_publication_attempt(&runtime, &pv, runtime.now() + PUBLISH_END_TIMEOUT);
645

            
646
            let expected_expiry =
647
                runtime.now() + PUBLISH_END_TIMEOUT + LIFETIME + IPT_PUBLISH_EXPIRY_SLOP;
648
            assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
649

            
650
            // setting an *earlier* lifetime is ignored
651

            
652
            pv_note_publication_attempt(&runtime, &pv, runtime.now() - Duration::from_secs(10));
653
            assert_eq!(mv_get_0_expiry(&mut mv), expected_expiry);
654
        });
655

            
656
        drop(temp_dir_owned); // prove it's still live
657
    }
658
}