1
//! IPT Manager
2
//!
3
//! Maintains introduction points and publishes descriptors.
4
//! Provides a stream of rendezvous requests.
5
//!
6
//! See [`IptManager::run_once`] for discussion of the implementation approach.
7

            
8
use crate::internal_prelude::*;
9

            
10
use tor_relay_selection::{RelayExclusion, RelaySelector, RelayUsage};
11
use IptStatusStatus as ISS;
12
use TrackedStatus as TS;
13

            
14
mod persist;
15
pub(crate) use persist::IptStorageHandle;
16

            
17
pub use crate::ipt_establish::IptError;
18

            
19
/// Expiry time to put on an interim descriptor (IPT publication set Uncertain)
20
///
21
/// (Note that we use the same value in both cases, since it doesn't actually do
22
/// much good to have a short expiration time. This expiration time only affects
23
/// caches, and we can supersede an old descriptor just by publishing it. Thus,
24
/// we pick a uniform publication time as done by the C tor implementation.)
25
const IPT_PUBLISH_UNCERTAIN: Duration = Duration::from_secs(3 * 60 * 60); // 3 hours
26
/// Expiry time to put on a final descriptor (IPT publication set Certain
27
const IPT_PUBLISH_CERTAIN: Duration = IPT_PUBLISH_UNCERTAIN;
28

            
29
//========== data structures ==========
30

            
31
/// IPT Manager (for one hidden service)
32
#[derive(Educe)]
33
#[educe(Debug(bound))]
34
pub(crate) struct IptManager<R, M> {
35
    /// Immutable contents
36
    imm: Immutable<R>,
37

            
38
    /// Mutable state
39
    state: State<R, M>,
40
}
41

            
42
/// Immutable contents of an IPT Manager
43
///
44
/// Contains things inherent to our identity, and
45
/// handles to services that we'll be using.
46
#[derive(Educe)]
47
#[educe(Debug(bound))]
48
pub(crate) struct Immutable<R> {
49
    /// Runtime
50
    #[educe(Debug(ignore))]
51
    runtime: R,
52

            
53
    /// Netdir provider
54
    #[educe(Debug(ignore))]
55
    dirprovider: Arc<dyn NetDirProvider>,
56

            
57
    /// Nickname
58
    nick: HsNickname,
59

            
60
    /// Output MPSC for rendezvous requests
61
    ///
62
    /// Passed to IPT Establishers we create
63
    output_rend_reqs: mpsc::Sender<RendRequest>,
64

            
65
    /// Internal channel for updates from IPT Establishers (sender)
66
    ///
67
    /// When we make a new `IptEstablisher` we use this arrange for
68
    /// its status updates to arrive, appropriately tagged, via `status_recv`
69
    status_send: mpsc::Sender<(IptLocalId, IptStatus)>,
70

            
71
    /// The key manager.
72
    #[educe(Debug(ignore))]
73
    keymgr: Arc<KeyMgr>,
74

            
75
    /// Replay log directory
76
    ///
77
    /// Files are named after the (bare) IptLocalId
78
    #[educe(Debug(ignore))]
79
    replay_log_dir: tor_persist::state_dir::InstanceRawSubdir,
80

            
81
    /// A sender for updating the status of the onion service.
82
    #[educe(Debug(ignore))]
83
    status_tx: IptMgrStatusSender,
84
}
85

            
86
/// State of an IPT Manager
87
#[derive(Educe)]
88
#[educe(Debug(bound))]
89
pub(crate) struct State<R, M> {
90
    /// Source of configuration updates
91
    //
92
    // TODO #1209 reject reconfigurations we can't cope with
93
    // for example, state dir changes will go quite wrong
94
    new_configs: watch::Receiver<Arc<OnionServiceConfig>>,
95

            
96
    /// Last configuration update we received
97
    ///
98
    /// This is the snapshot of the config we are currently using.
99
    /// (Doing it this way avoids running our algorithms
100
    /// with a mixture of old and new config.)
101
    current_config: Arc<OnionServiceConfig>,
102

            
103
    /// Channel for updates from IPT Establishers (receiver)
104
    ///
105
    /// We arrange for all the updates to be multiplexed,
106
    /// as that makes handling them easy in our event loop.
107
    status_recv: mpsc::Receiver<(IptLocalId, IptStatus)>,
108

            
109
    /// State: selected relays
110
    ///
111
    /// We append to this, and call `retain` on it,
112
    /// so these are in chronological order of selection.
113
    irelays: Vec<IptRelay>,
114

            
115
    /// Did we fail to select a relay last time?
116
    ///
117
    /// This can only be caused (or triggered) by a busted netdir or config.
118
    last_irelay_selection_outcome: Result<(), ()>,
119

            
120
    /// Have we removed any IPTs but not yet cleaned up keys and logfiles?
121
    #[educe(Debug(ignore))]
122
    ipt_removal_cleanup_needed: bool,
123

            
124
    /// Signal for us to shut down
125
    shutdown: broadcast::Receiver<Void>,
126

            
127
    /// The on-disk state storage handle.
128
    #[educe(Debug(ignore))]
129
    storage: IptStorageHandle,
130

            
131
    /// Mockable state, normally [`Real`]
132
    ///
133
    /// This is in `State` so it can be passed mutably to tests,
134
    /// even though the main code doesn't need `mut`
135
    /// since `HsCircPool` is a service with interior mutability.
136
    mockable: M,
137

            
138
    /// Runtime (to placate compiler)
139
    runtime: PhantomData<R>,
140
}
141

            
142
/// One selected relay, at which we are establishing (or relavantly advertised) IPTs
143
struct IptRelay {
144
    /// The actual relay
145
    relay: RelayIds,
146

            
147
    /// The retirement time we selected for this relay
148
    planned_retirement: Instant,
149

            
150
    /// IPTs at this relay
151
    ///
152
    /// At most one will have [`IsCurrent`].
153
    ///
154
    /// We append to this, and call `retain` on it,
155
    /// so these are in chronological order of selection.
156
    ipts: Vec<Ipt>,
157
}
158

            
159
/// One introduction point, representation in memory
160
#[derive(Debug)]
161
struct Ipt {
162
    /// Local persistent identifier
163
    lid: IptLocalId,
164

            
165
    /// Handle for the establisher; we keep this here just for its `Drop` action
166
    establisher: Box<ErasedIptEstablisher>,
167

            
168
    /// `KS_hs_ipt_sid`, `KP_hs_ipt_sid`
169
    ///
170
    /// This is an `Arc` because:
171
    ///  * The manager needs a copy so that it can save it to disk.
172
    ///  * The establisher needs a copy to actually use.
173
    ///  * The underlying secret key type is not `Clone`.
174
    k_sid: Arc<HsIntroPtSessionIdKeypair>,
175

            
176
    /// `KS_hss_ntor`, `KP_hss_ntor`
177
    k_hss_ntor: Arc<HsSvcNtorKeypair>,
178

            
179
    /// Last information about how it's doing including timing info
180
    status_last: TrackedStatus,
181

            
182
    /// Until when ought we to try to maintain it
183
    ///
184
    /// For introduction points we are publishing,
185
    /// this is a copy of the value set by the publisher
186
    /// in the `IptSet` we share with the publisher,
187
    ///
188
    /// (`None` means the IPT has not been advertised at all yet.)
189
    ///
190
    /// We must duplicate the information because:
191
    ///
192
    ///  * We can't have it just live in the shared `IptSet`
193
    ///    because we need to retain it for no-longer-being published IPTs.
194
    ///
195
    ///  * We can't have it just live here because the publisher needs to update it.
196
    ///
197
    /// (An alternative would be to more seriously entangle the manager and publisher.)
198
    last_descriptor_expiry_including_slop: Option<Instant>,
199

            
200
    /// Is this IPT current - should we include it in descriptors ?
201
    ///
202
    /// `None` might mean:
203
    ///  * WantsToRetire
204
    ///  * We have >N IPTs and we have been using this IPT so long we want to rotate it out
205
    ///    (the [`IptRelay`] has reached its `planned_retirement` time)
206
    ///  * The IPT has wrong parameters of some kind, and needs to be replaced
207
    ///    (Eg, we set it up with the wrong DOS_PARAMS extension)
208
    is_current: Option<IsCurrent>,
209
}
210

            
211
/// Last information from establisher about an IPT, with timing info added by us
212
#[derive(Debug)]
213
enum TrackedStatus {
214
    /// Corresponds to [`IptStatusStatus::Faulty`]
215
    Faulty {
216
        /// When we were first told this started to establish, if we know it
217
        ///
218
        /// This might be an early estimate, which would give an overestimate
219
        /// of the establishment time, which is fine.
220
        /// Or it might be `Err` meaning we don't know.
221
        started: Result<Instant, ()>,
222

            
223
        /// The error, if any.
224
        error: Option<IptError>,
225
    },
226

            
227
    /// Corresponds to [`IptStatusStatus::Establishing`]
228
    Establishing {
229
        /// When we were told we started to establish, for calculating `time_to_establish`
230
        started: Instant,
231
    },
232

            
233
    /// Corresponds to [`IptStatusStatus::Good`]
234
    Good {
235
        /// How long it took to establish (if we could determine that information)
236
        ///
237
        /// Can only be `Err` in strange situations.
238
        time_to_establish: Result<Duration, ()>,
239

            
240
        /// Details, from the Establisher
241
        details: ipt_establish::GoodIptDetails,
242
    },
243
}
244

            
245
/// Token indicating that this introduction point is current (not Retiring)
246
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
247
struct IsCurrent;
248

            
249
//---------- related to mockability ----------
250

            
251
/// Type-erased version of `Box<IptEstablisher>`
252
///
253
/// The real type is `M::IptEstablisher`.
254
/// We use `Box<dyn Any>` to avoid propagating the `M` type parameter to `Ipt` etc.
255
type ErasedIptEstablisher = dyn Any + Send + Sync + 'static;
256

            
257
/// Mockable state in an IPT Manager - real version
258
#[derive(Educe)]
259
#[educe(Debug)]
260
pub(crate) struct Real<R: Runtime> {
261
    /// Circuit pool for circuits we need to make
262
    ///
263
    /// Passed to the each new Establisher
264
    #[educe(Debug(ignore))]
265
    pub(crate) circ_pool: Arc<HsCircPool<R>>,
266
}
267

            
268
//---------- errors ----------
269

            
270
/// An error that happened while trying to select a relay
271
///
272
/// Used only within the IPT manager.
273
/// Can only be caused by bad netdir or maybe bad config.
274
#[derive(Debug, Error)]
275
enum ChooseIptError {
276
    /// Bad or insufficient netdir
277
    #[error("bad or insufficient netdir")]
278
    NetDir(#[from] tor_netdir::Error),
279
    /// Too few suitable relays
280
    #[error("too few suitable relays")]
281
    TooFewUsableRelays,
282
    /// Time overflow
283
    #[error("time overflow (system clock set wrong?)")]
284
    TimeOverflow,
285
    /// Internal error
286
    #[error("internal error")]
287
    Bug(#[from] Bug),
288
}
289

            
290
/// An error that happened while trying to crate an IPT (at a selected relay)
291
///
292
/// Used only within the IPT manager.
293
#[derive(Debug, Error)]
294
pub(crate) enum CreateIptError {
295
    /// Fatal error
296
    #[error("fatal error")]
297
    Fatal(#[from] FatalError),
298

            
299
    /// Error accessing keystore
300
    #[error("problems with keystores")]
301
    Keystore(#[from] tor_keymgr::Error),
302

            
303
    /// Error opening the intro request replay log
304
    #[error("unable to open the intro req replay log: {file:?}")]
305
    OpenReplayLog {
306
        /// What filesystem object we tried to do it to
307
        file: PathBuf,
308
        /// What happened
309
        #[source]
310
        error: Arc<io::Error>,
311
    },
312
}
313

            
314
//========== Relays we've chosen, and IPTs ==========
315

            
316
impl IptRelay {
317
    /// Get a reference to this IPT relay's current intro point state (if any)
318
    ///
319
    /// `None` means this IPT has no current introduction points.
320
    /// That might be, briefly, because a new intro point needs to be created;
321
    /// or it might be because we are retiring the relay.
322
1180
    fn current_ipt(&self) -> Option<&Ipt> {
323
1180
        self.ipts
324
1180
            .iter()
325
1770
            .find(|ipt| ipt.is_current == Some(IsCurrent))
326
1180
    }
327

            
328
    /// Get a mutable reference to this IPT relay's current intro point state (if any)
329
424
    fn current_ipt_mut(&mut self) -> Option<&mut Ipt> {
330
424
        self.ipts
331
424
            .iter_mut()
332
622
            .find(|ipt| ipt.is_current == Some(IsCurrent))
333
424
    }
334

            
335
    /// Should this IPT Relay be retired ?
336
    ///
337
    /// This is determined by our IPT relay rotation time.
338
1236
    fn should_retire(&self, now: &TrackingNow) -> bool {
339
1236
        now > &self.planned_retirement
340
1236
    }
341

            
342
    /// Make a new introduction point at this relay
343
    ///
344
    /// It becomes the current IPT.
345
28
    fn make_new_ipt<R: Runtime, M: Mockable<R>>(
346
28
        &mut self,
347
28
        imm: &Immutable<R>,
348
28
        new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
349
28
        mockable: &mut M,
350
28
    ) -> Result<(), CreateIptError> {
351
28
        let lid: IptLocalId = mockable.thread_rng().random();
352

            
353
28
        let ipt = Ipt::start_establisher(
354
28
            imm,
355
28
            new_configs,
356
28
            mockable,
357
28
            &self.relay,
358
28
            lid,
359
28
            Some(IsCurrent),
360
28
            None::<IptExpectExistingKeys>,
361
28
            // None is precisely right: the descriptor hasn't been published.
362
28
            PromiseLastDescriptorExpiryNoneIsGood {},
363
28
        )?;
364

            
365
28
        self.ipts.push(ipt);
366
28

            
367
28
        Ok(())
368
28
    }
369
}
370

            
371
/// Token, representing promise by caller of `start_establisher`
372
///
373
/// Caller who makes one of these structs promises that it is OK for `start_establisher`
374
/// to set `last_descriptor_expiry_including_slop` to `None`.
375
struct PromiseLastDescriptorExpiryNoneIsGood {}
376

            
377
/// Token telling [`Ipt::start_establisher`] to expect existing keys in the keystore
378
#[derive(Debug, Clone, Copy)]
379
struct IptExpectExistingKeys;
380

            
381
impl Ipt {
382
    /// Start a new IPT establisher, and create and return an `Ipt`
383
    #[allow(clippy::too_many_arguments)] // There's only two call sites
384
40
    fn start_establisher<R: Runtime, M: Mockable<R>>(
385
40
        imm: &Immutable<R>,
386
40
        new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
387
40
        mockable: &mut M,
388
40
        relay: &RelayIds,
389
40
        lid: IptLocalId,
390
40
        is_current: Option<IsCurrent>,
391
40
        expect_existing_keys: Option<IptExpectExistingKeys>,
392
40
        _: PromiseLastDescriptorExpiryNoneIsGood,
393
40
    ) -> Result<Ipt, CreateIptError> {
394
40
        let mut rng = tor_llcrypto::rng::CautiousRng;
395

            
396
        /// Load (from disk) or generate an IPT key with role IptKeyRole::$role
397
        ///
398
        /// Ideally this would be a closure, but it has to be generic over the
399
        /// returned key type.  So it's a macro.  (A proper function would have
400
        /// many type parameters and arguments and be quite annoying.)
401
80
        macro_rules! get_or_gen_key { { $Keypair:ty, $role:ident } => { (||{
402
80
            let spec = IptKeySpecifier {
403
80
                nick: imm.nick.clone(),
404
80
                role: IptKeyRole::$role,
405
80
                lid,
406
80
            };
407
            // Our desired behaviour:
408
            //  expect_existing_keys == None
409
            //     The keys shouldn't exist.  Generate and insert.
410
            //     If they do exist then things are badly messed up
411
            //     (we're creating a new IPT with a fres lid).
412
            //     So, then, crash.
413
            //  expect_existing_keys == Some(IptExpectExistingKeys)
414
            //     The key is supposed to exist.  Load them.
415
            //     We ought to have stored them before storing in our on-disk records that
416
            //     this IPT exists.  But this could happen due to file deletion or something.
417
            //     And we could recover by creating fresh keys, although maybe some clients
418
            //     would find the previous keys in old descriptors.
419
            //     So if the keys are missing, make and store new ones, logging an error msg.
420
80
            let k: Option<$Keypair> = imm.keymgr.get(&spec)?;
421
80
            let arti_path = || {
422
                spec
423
                    .arti_path()
424
                    .map_err(|e| {
425
                        CreateIptError::Fatal(
426
                            into_internal!("bad ArtiPath from IPT key spec")(e).into()
427
                        )
428
                    })
429
            };
430
80
            match (expect_existing_keys, k) {
431
56
                (None, None) => { }
432
24
                (Some(_), Some(k)) => return Ok(Arc::new(k)),
433
                (None, Some(_)) => {
434
                    return Err(FatalError::IptKeysFoundUnexpectedly(arti_path()?).into())
435
                },
436
                (Some(_), None) => {
437
                    error!("bug: HS service {} missing previous key {:?}. Regenerating.",
438
                           &imm.nick, arti_path()?);
439
                }
440
             }
441

            
442
56
            let res = imm.keymgr.generate::<$Keypair>(
443
56
                &spec,
444
56
                tor_keymgr::KeystoreSelector::Primary,
445
56
                &mut rng,
446
56
                false, /* overwrite */
447
56
            );
448

            
449
            match res {
450
56
                Ok(k) => Ok::<_, CreateIptError>(Arc::new(k)),
451
                Err(tor_keymgr::Error::KeyAlreadyExists) => {
452
                    Err(FatalError::KeystoreRace { action: "generate", path: arti_path()? }.into() )
453
                },
454
                Err(e) => Err(e.into()),
455
            }
456
        })() } }
457

            
458
40
        let k_hss_ntor = get_or_gen_key!(HsSvcNtorKeypair, KHssNtor)?;
459
40
        let k_sid = get_or_gen_key!(HsIntroPtSessionIdKeypair, KSid)?;
460

            
461
        // we'll treat it as Establishing until we find otherwise
462
40
        let status_last = TS::Establishing {
463
40
            started: imm.runtime.now(),
464
40
        };
465

            
466
        // TODO #1186 Support ephemeral services (without persistent replay log)
467
40
        let replay_log = IptReplayLog::new_logged(&imm.replay_log_dir, &lid)?;
468

            
469
40
        let params = IptParameters {
470
40
            replay_log,
471
40
            config_rx: new_configs.clone(),
472
40
            netdir_provider: imm.dirprovider.clone(),
473
40
            introduce_tx: imm.output_rend_reqs.clone(),
474
40
            lid,
475
40
            target: relay.clone(),
476
40
            k_sid: k_sid.clone(),
477
40
            k_ntor: Arc::clone(&k_hss_ntor),
478
40
            accepting_requests: ipt_establish::RequestDisposition::NotAdvertised,
479
40
        };
480
40
        let (establisher, mut watch_rx) = mockable.make_new_ipt(imm, params)?;
481

            
482
        // This task will shut down when self.establisher is dropped, causing
483
        // watch_tx to close.
484
40
        imm.runtime
485
40
            .spawn({
486
40
                let mut status_send = imm.status_send.clone();
487
40
                async move {
488
                    loop {
489
92
                        let Some(status) = watch_rx.next().await else {
490
40
                            trace!("HS service IPT status task: establisher went away");
491
40
                            break;
492
                        };
493
52
                        match status_send.send((lid, status)).await {
494
52
                            Ok(()) => {}
495
                            Err::<_, mpsc::SendError>(e) => {
496
                                // Not using trace_report because SendError isn't HasKind
497
                                trace!("HS service IPT status task: manager went away: {e}");
498
                                break;
499
                            }
500
                        }
501
                    }
502
40
                }
503
40
            })
504
40
            .map_err(|cause| FatalError::Spawn {
505
                spawning: "IPT establisher watch status task",
506
                cause: cause.into(),
507
40
            })?;
508

            
509
40
        let ipt = Ipt {
510
40
            lid,
511
40
            establisher: Box::new(establisher),
512
40
            k_hss_ntor,
513
40
            k_sid,
514
40
            status_last,
515
40
            is_current,
516
40
            last_descriptor_expiry_including_slop: None,
517
40
        };
518
40

            
519
40
        debug!(
520
            "Hs service {}: {lid:?} establishing {} IPT at relay {}",
521
            &imm.nick,
522
            match expect_existing_keys {
523
                None => "new",
524
                Some(_) => "previous",
525
            },
526
            &relay,
527
        );
528

            
529
40
        Ok(ipt)
530
40
    }
531

            
532
    /// Returns `true` if this IPT has status Good (and should perhaps be published)
533
552
    fn is_good(&self) -> bool {
534
552
        match self.status_last {
535
64
            TS::Good { .. } => true,
536
488
            TS::Establishing { .. } | TS::Faulty { .. } => false,
537
        }
538
552
    }
539

            
540
    /// Returns the error, if any, we are currently encountering at this IPT.
541
264
    fn error(&self) -> Option<&IptError> {
542
264
        match &self.status_last {
543
264
            TS::Good { .. } | TS::Establishing { .. } => None,
544
            TS::Faulty { error, .. } => error.as_ref(),
545
        }
546
264
    }
547

            
548
    /// Construct the information needed by the publisher for this intro point
549
24
    fn for_publish(&self, details: &ipt_establish::GoodIptDetails) -> Result<ipt_set::Ipt, Bug> {
550
24
        let k_sid: &ed25519::Keypair = (*self.k_sid).as_ref();
551
24
        tor_netdoc::doc::hsdesc::IntroPointDesc::builder()
552
24
            .link_specifiers(details.link_specifiers.clone())
553
24
            .ipt_kp_ntor(details.ipt_kp_ntor)
554
24
            .kp_hs_ipt_sid(k_sid.verifying_key().into())
555
24
            .kp_hss_ntor(self.k_hss_ntor.public().clone())
556
24
            .build()
557
24
            .map_err(into_internal!("failed to construct IntroPointDesc"))
558
24
    }
559
}
560

            
561
impl HasKind for ChooseIptError {
562
    fn kind(&self) -> ErrorKind {
563
        use ChooseIptError as E;
564
        use ErrorKind as EK;
565
        match self {
566
            E::NetDir(e) => e.kind(),
567
            E::TooFewUsableRelays => EK::TorDirectoryUnusable,
568
            E::TimeOverflow => EK::ClockSkew,
569
            E::Bug(e) => e.kind(),
570
        }
571
    }
572
}
573

            
574
// This is somewhat abbreviated but it is legible and enough for most purposes.
575
impl Debug for IptRelay {
576
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
577
        writeln!(f, "IptRelay {}", self.relay)?;
578
        write!(
579
            f,
580
            "          planned_retirement: {:?}",
581
            self.planned_retirement
582
        )?;
583
        for ipt in &self.ipts {
584
            write!(
585
                f,
586
                "\n          ipt {} {} {:?} ldeis={:?}",
587
                match ipt.is_current {
588
                    Some(IsCurrent) => "cur",
589
                    None => "old",
590
                },
591
                &ipt.lid,
592
                &ipt.status_last,
593
                &ipt.last_descriptor_expiry_including_slop,
594
            )?;
595
        }
596
        Ok(())
597
    }
598
}
599

            
600
//========== impls on IptManager and State ==========
601

            
602
impl<R: Runtime, M: Mockable<R>> IptManager<R, M> {
603
    //
604
    //---------- constructor and setup ----------
605

            
606
    /// Create a new IptManager
607
    #[allow(clippy::too_many_arguments)] // this is an internal function with 1 call site
608
8
    pub(crate) fn new(
609
8
        runtime: R,
610
8
        dirprovider: Arc<dyn NetDirProvider>,
611
8
        nick: HsNickname,
612
8
        config: watch::Receiver<Arc<OnionServiceConfig>>,
613
8
        output_rend_reqs: mpsc::Sender<RendRequest>,
614
8
        shutdown: broadcast::Receiver<Void>,
615
8
        state_handle: &tor_persist::state_dir::InstanceStateHandle,
616
8
        mockable: M,
617
8
        keymgr: Arc<KeyMgr>,
618
8
        status_tx: IptMgrStatusSender,
619
8
    ) -> Result<Self, StartupError> {
620
8
        let irelays = vec![]; // See TODO near persist::load call, in launch_background_tasks
621
8

            
622
8
        // We don't need buffering; since this is written to by dedicated tasks which
623
8
        // are reading watches.
624
8
        //
625
8
        // Internally-generated status updates (hopefully rate limited?), no need for mq.
626
8
        let (status_send, status_recv) = mpsc_channel_no_memquota(0);
627

            
628
8
        let storage = state_handle
629
8
            .storage_handle("ipts")
630
8
            .map_err(StartupError::StateDirectoryInaccessible)?;
631

            
632
8
        let replay_log_dir = state_handle
633
8
            .raw_subdir("iptreplay")
634
8
            .map_err(StartupError::StateDirectoryInaccessible)?;
635

            
636
8
        let imm = Immutable {
637
8
            runtime,
638
8
            dirprovider,
639
8
            nick,
640
8
            status_send,
641
8
            output_rend_reqs,
642
8
            keymgr,
643
8
            replay_log_dir,
644
8
            status_tx,
645
8
        };
646
8
        let current_config = config.borrow().clone();
647
8

            
648
8
        let state = State {
649
8
            current_config,
650
8
            new_configs: config,
651
8
            status_recv,
652
8
            storage,
653
8
            mockable,
654
8
            shutdown,
655
8
            irelays,
656
8
            last_irelay_selection_outcome: Ok(()),
657
8
            ipt_removal_cleanup_needed: false,
658
8
            runtime: PhantomData,
659
8
        };
660
8
        let mgr = IptManager { imm, state };
661
8

            
662
8
        Ok(mgr)
663
8
    }
664

            
665
    /// Send the IPT manager off to run and establish intro points
666
8
    pub(crate) fn launch_background_tasks(
667
8
        mut self,
668
8
        mut publisher: IptsManagerView,
669
8
    ) -> Result<(), StartupError> {
670
8
        // TODO maybe this should be done in new(), so we don't have this dummy irelays
671
8
        // but then new() would need the IptsManagerView
672
8
        assert!(self.state.irelays.is_empty());
673
8
        self.state.irelays = persist::load(
674
8
            &self.imm,
675
8
            &self.state.storage,
676
8
            &self.state.new_configs,
677
8
            &mut self.state.mockable,
678
8
            &publisher.borrow_for_read(),
679
8
        )?;
680

            
681
        // Now that we've populated `irelays` and its `ipts` from the on-disk state,
682
        // we should check any leftover disk files from previous runs.  Make a note.
683
8
        self.state.ipt_removal_cleanup_needed = true;
684
8

            
685
8
        let runtime = self.imm.runtime.clone();
686
8

            
687
8
        self.imm.status_tx.send(IptMgrState::Bootstrapping, None);
688
8

            
689
8
        // This task will shut down when the RunningOnionService is dropped, causing
690
8
        // self.state.shutdown to become ready.
691
8
        runtime
692
8
            .spawn(self.main_loop_task(publisher))
693
8
            .map_err(|cause| StartupError::Spawn {
694
                spawning: "ipt manager",
695
                cause: cause.into(),
696
8
            })?;
697
8
        Ok(())
698
8
    }
699

            
700
    //---------- internal utility and helper methods ----------
701

            
702
    /// Iterate over *all* the IPTs we know about
703
    ///
704
    /// Yields each `IptRelay` at most once.
705
112
    fn all_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
706
112
        self.state
707
112
            .irelays
708
112
            .iter()
709
336
            .flat_map(|ir| ir.ipts.iter().map(move |ipt| (ir, ipt)))
710
112
    }
711

            
712
    /// Iterate over the *current* IPTs
713
    ///
714
    /// Yields each `IptRelay` at most once.
715
404
    fn current_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
716
404
        self.state
717
404
            .irelays
718
404
            .iter()
719
1144
            .filter_map(|ir| Some((ir, ir.current_ipt()?)))
720
404
    }
721

            
722
    /// Iterate over the *current* IPTs in `Good` state
723
180
    fn good_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
724
516
        self.current_ipts().filter(|(_ir, ipt)| ipt.is_good())
725
180
    }
726

            
727
    /// Iterate over the current IPT errors.
728
    ///
729
    /// Used when reporting our state as [`Recovering`](crate::status::State::Recovering).
730
88
    fn ipt_errors(&self) -> impl Iterator<Item = &IptError> {
731
264
        self.all_ipts().filter_map(|(_ir, ipt)| ipt.error())
732
88
    }
733

            
734
    /// Target number of intro points
735
360
    pub(crate) fn target_n_intro_points(&self) -> usize {
736
360
        self.state.current_config.num_intro_points.into()
737
360
    }
738

            
739
    /// Maximum number of concurrent intro point relays
740
28
    pub(crate) fn max_n_intro_relays(&self) -> usize {
741
28
        let params = self.imm.dirprovider.params();
742
28
        let num_extra = (*params).as_ref().hs_intro_num_extra_intropoints.get() as usize;
743
28
        self.target_n_intro_points() + num_extra
744
28
    }
745

            
746
    //---------- main implementation logic ----------
747

            
748
    /// Make some progress, if possible, and say when to wake up again
749
    ///
750
    /// Examines the current state and attempts to improve it.
751
    ///
752
    /// If `idempotently_progress_things_now` makes any changes,
753
    /// it will return `None`.
754
    /// It should then be called again immediately.
755
    ///
756
    /// Otherwise, it returns the time in the future when further work ought to be done:
757
    /// i.e., the time of the earliest timeout or planned future state change -
758
    /// as a [`TrackingNow`].
759
    ///
760
    /// In that case, the caller must call `compute_iptsetstatus_publish`,
761
    /// since the IPT set etc. may have changed.
762
    ///
763
    /// ### Goals and algorithms
764
    ///
765
    /// We attempt to maintain a pool of N established and verified IPTs,
766
    /// at N IPT Relays.
767
    ///
768
    /// When we have fewer than N IPT Relays
769
    /// that have `Establishing` or `Good` IPTs (see below)
770
    /// and fewer than k*N IPT Relays overall,
771
    /// we choose a new IPT Relay at random from the consensus
772
    /// and try to establish an IPT on it.
773
    ///
774
    /// (Rationale for the k*N limit:
775
    /// we do want to try to replace faulty IPTs, but
776
    /// we don't want an attacker to be able to provoke us into
777
    /// rapidly churning through IPT candidates.)
778
    ///
779
    /// When we select a new IPT Relay, we randomly choose a planned replacement time,
780
    /// after which it becomes `Retiring`.
781
    ///
782
    /// Additionally, any IPT becomes `Retiring`
783
    /// after it has been used for a certain number of introductions
784
    /// (c.f. C Tor `#define INTRO_POINT_MIN_LIFETIME_INTRODUCTIONS 16384`.)
785
    /// When this happens we retain the IPT Relay,
786
    /// and make new parameters to make a new IPT at the same Relay.
787
    ///
788
    /// An IPT is removed from our records, and we give up on it,
789
    /// when it is no longer `Good` or `Establishing`
790
    /// and all descriptors that mentioned it have expired.
791
    ///
792
    /// (Until all published descriptors mentioning an IPT expire,
793
    /// we consider ourselves bound by those previously-published descriptors,
794
    /// and try to maintain the IPT.
795
    /// TODO: Allegedly this is unnecessary, but I don't see how it could be.)
796
    ///
797
    /// ### Performance
798
    ///
799
    /// This function is at worst O(N) where N is the number of IPTs.
800
    /// When handling state changes relating to a particular IPT (or IPT relay)
801
    /// it needs at most O(1) calls to progress that one IPT to its proper new state.
802
    ///
803
    /// See the performance note on [`run_once()`](Self::run_once).
804
    #[allow(clippy::redundant_closure_call)]
805
164
    fn idempotently_progress_things_now(&mut self) -> Result<Option<TrackingNow>, FatalError> {
806
        /// Return value which means "we changed something, please run me again"
807
        ///
808
        /// In each case, if we make any changes which indicate we might
809
        /// want to restart, , we `return CONTINUE`, and
810
        /// our caller will just call us again.
811
        ///
812
        /// This approach simplifies the logic: everything here is idempotent.
813
        /// (It does mean the algorithm can be quadratic in the number of intro points,
814
        /// but that number is reasonably small for a modern computer and the constant
815
        /// factor is small too.)
816
        const CONTINUE: Result<Option<TrackingNow>, FatalError> = Ok(None);
817

            
818
        // This tracks everything we compare it to, using interior mutability,
819
        // so that if there is no work to do and no timeouts have expired,
820
        // we know when we will want to wake up.
821
164
        let now = TrackingNow::now(&self.imm.runtime);
822

            
823
        // ---------- collect garbage ----------
824

            
825
        // Rotate out an old IPT(s)
826
584
        for ir in &mut self.state.irelays {
827
436
            if ir.should_retire(&now) {
828
32
                if let Some(ipt) = ir.current_ipt_mut() {
829
16
                    ipt.is_current = None;
830
16
                    return CONTINUE;
831
16
                }
832
404
            }
833
        }
834

            
835
        // Forget old IPTs (after the last descriptor mentioning them has expired)
836
556
        for ir in &mut self.state.irelays {
837
408
            // When we drop the Ipt we drop the IptEstablisher, withdrawing the intro point
838
408
            ir.ipts.retain(|ipt| {
839
380
                let keep = ipt.is_current.is_some()
840
16
                    || match ipt.last_descriptor_expiry_including_slop {
841
16
                        None => false,
842
                        Some(last) => now < last,
843
                    };
844
                // This is the only place in the manager where an IPT is dropped,
845
                // other than when the whole service is dropped.
846
380
                self.state.ipt_removal_cleanup_needed |= !keep;
847
380
                keep
848
408
            });
849
408
            // No need to return CONTINUE, since there is no other future work implied
850
408
            // by discarding a non-current IPT.
851
408
        }
852

            
853
        // Forget retired IPT relays (all their IPTs are gone)
854
148
        self.state
855
148
            .irelays
856
408
            .retain(|ir| !(ir.should_retire(&now) && ir.ipts.is_empty()));
857
        // If we deleted relays, we might want to select new ones.  That happens below.
858

            
859
        // ---------- make progress ----------
860
        //
861
        // Consider selecting new relays and setting up new IPTs.
862

            
863
        // Create new IPTs at already-chosen relays
864
512
        for ir in &mut self.state.irelays {
865
392
            if !ir.should_retire(&now) && ir.current_ipt_mut().is_none() {
866
                // We don't have a current IPT at this relay, but we should.
867
28
                match ir.make_new_ipt(&self.imm, &self.state.new_configs, &mut self.state.mockable)
868
                {
869
28
                    Ok(()) => return CONTINUE,
870
                    Err(CreateIptError::Fatal(fatal)) => return Err(fatal),
871
                    Err(
872
                        e @ (CreateIptError::Keystore(_) | CreateIptError::OpenReplayLog { .. }),
873
                    ) => {
874
                        error_report!(e, "HS {}: failed to prepare new IPT", &self.imm.nick);
875
                        // Let's not try any more of this.
876
                        // We'll run the rest of our "make progress" algorithms,
877
                        // presenting them with possibly-suboptimal state.  That's fine.
878
                        // At some point we'll be poked to run again and then we'll retry.
879
                        /// Retry no later than this:
880
                        const STORAGE_RETRY: Duration = Duration::from_secs(60);
881
                        now.update(STORAGE_RETRY);
882
                        break;
883
                    }
884
                }
885
364
            }
886
        }
887

            
888
        // Consider choosing a new IPT relay
889
        {
890
            // block {} prevents use of `n_good_ish_relays` for other (wrong) purposes
891

            
892
            // We optimistically count an Establishing IPT as good-ish;
893
            // specifically, for the purposes of deciding whether to select a new
894
            // relay because we don't have enough good-looking ones.
895
120
            let n_good_ish_relays = self
896
120
                .current_ipts()
897
320
                .filter(|(_ir, ipt)| match ipt.status_last {
898
320
                    TS::Good { .. } | TS::Establishing { .. } => true,
899
                    TS::Faulty { .. } => false,
900
320
                })
901
120
                .count();
902
120

            
903
120
            #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] // in map_err
904
120
            if n_good_ish_relays < self.target_n_intro_points()
905
28
                && self.state.irelays.len() < self.max_n_intro_relays()
906
28
                && self.state.last_irelay_selection_outcome.is_ok()
907
            {
908
28
                self.state.last_irelay_selection_outcome = self
909
28
                    .state
910
28
                    .choose_new_ipt_relay(&self.imm, now.instant().get_now_untracked())
911
28
                    .map_err(|error| {
912
                        /// Call $report! with the message.
913
                        // The macros are annoying and want a cost argument.
914
                        macro_rules! report { { $report:ident } => {
915
                            $report!(
916
                                error,
917
                                "HS service {} failed to select IPT relay",
918
                                &self.imm.nick,
919
                            )
920
                        }}
921
                        use ChooseIptError as E;
922
                        match &error {
923
                            E::NetDir(_) => report!(info_report),
924
                            _ => report!(error_report),
925
                        };
926
                        ()
927
28
                    });
928
28
                return CONTINUE;
929
92
            }
930
92
        }
931
92

            
932
92
        //---------- caller (run_once) will update publisher, and wait ----------
933
92

            
934
92
        Ok(Some(now))
935
164
    }
936

            
937
    /// Import publisher's updates to latest descriptor expiry times
938
    ///
939
    /// Copies the `last_descriptor_expiry_including_slop` field
940
    /// from each ipt in `publish_set` to the corresponding ipt in `self`.
941
    ///
942
    /// ### Performance
943
    ///
944
    /// This function is at worst O(N) where N is the number of IPTs.
945
    /// See the performance note on [`run_once()`](Self::run_once).
946
96
    fn import_new_expiry_times(irelays: &mut [IptRelay], publish_set: &PublishIptSet) {
947
96
        // Every entry in the PublishIptSet ought to correspond to an ipt in self.
948
96
        //
949
96
        // If there are IPTs in publish_set.last_descriptor_expiry_including_slop
950
96
        // that aren't in self, those are IPTs that we know were published,
951
96
        // but can't establish since we have forgotten their details.
952
96
        //
953
96
        // We are not supposed to allow that to happen:
954
96
        // we save IPTs to disk before we allow them to be published.
955
96
        //
956
96
        // (This invariant is across two data structures:
957
96
        // `ipt_mgr::State` (specifically, `Ipt`) which is modified only here,
958
96
        // and `ipt_set::PublishIptSet` which is shared with the publisher.
959
96
        // See the comments in PublishIptSet.)
960
96

            
961
276
        let all_ours = irelays.iter_mut().flat_map(|ir| ir.ipts.iter_mut());
962

            
963
372
        for ours in all_ours {
964
276
            if let Some(theirs) = publish_set
965
276
                .last_descriptor_expiry_including_slop
966
276
                .get(&ours.lid)
967
            {
968
                ours.last_descriptor_expiry_including_slop = Some(*theirs);
969
276
            }
970
        }
971
96
    }
972

            
973
    /// Expire old entries in publish_set.last_descriptor_expiry_including_slop
974
    ///
975
    /// Deletes entries where `now` > `last_descriptor_expiry_including_slop`,
976
    /// ie, entries where the publication's validity time has expired,
977
    /// meaning we don't need to maintain that IPT any more,
978
    /// at least, not just because we've published it.
979
    ///
980
    /// We may expire even entries for IPTs that we, the manager, still want to maintain.
981
    /// That's fine: this is (just) the information about what we have previously published.
982
    ///
983
    /// ### Performance
984
    ///
985
    /// This function is at worst O(N) where N is the number of IPTs.
986
    /// See the performance note on [`run_once()`](Self::run_once).
987
92
    fn expire_old_expiry_times(&self, publish_set: &mut PublishIptSet, now: &TrackingNow) {
988
92
        // We don't want to bother waking up just to expire things,
989
92
        // so use an untracked comparison.
990
92
        let now = now.instant().get_now_untracked();
991
92

            
992
92
        publish_set
993
92
            .last_descriptor_expiry_including_slop
994
92
            .retain(|_lid, expiry| *expiry <= now);
995
92
    }
996

            
997
    /// Compute the IPT set to publish, and update the data shared with the publisher
998
    ///
999
    /// `now` is current time and also the earliest wakeup,
    /// which we are in the process of planning.
    /// The noted earliest wakeup can be updated by this function,
    /// for example, with a future time at which the IPT set ought to be published
    /// (eg, the status goes from Unknown to Uncertain).
    ///
    /// ## IPT sets and lifetimes
    ///
    /// We remember every IPT we have published that is still valid.
    ///
    /// At each point in time we have an idea of set of IPTs we want to publish.
    /// The possibilities are:
    ///
    ///  * `Certain`:
    ///    We are sure of which IPTs we want to publish.
    ///    We try to do so, talking to hsdirs as necessary,
    ///    updating any existing information.
    ///    (We also republish to an hsdir if its descriptor will expire soon,
    ///    or we haven't published there since Arti was restarted.)
    ///
    ///  * `Unknown`:
    ///    We have no idea which IPTs to publish.
    ///    We leave whatever is on the hsdirs as-is.
    ///
    ///  * `Uncertain`:
    ///    We have some IPTs we could publish,
    ///    but we're not confident about them.
    ///    We publish these to a particular hsdir if:
    ///     - our last-published descriptor has expired
    ///     - or it will expire soon
    ///     - or if we haven't published since Arti was restarted.
    ///
    /// The idea of what to publish is calculated as follows:
    ///
    ///  * If we have at least N `Good` IPTs: `Certain`.
    ///    (We publish the "best" N IPTs for some definition of "best".
    ///    TODO: should we use the fault count?  recency?)
    ///
    ///  * Unless we have at least one `Good` IPT: `Unknown`.
    ///
    ///  * Otherwise: if there are IPTs in `Establishing`,
    ///    and they have been in `Establishing` only a short time \[1\]:
    ///    `Unknown`; otherwise `Uncertain`.
    ///
    /// The effect is that we delay publishing an initial descriptor
    /// by at most 1x the fastest IPT setup time,
    /// at most doubling the initial setup time.
    ///
    /// Each update to the IPT set that isn't `Unknown` comes with a
    /// proposed descriptor expiry time,
    /// which is used if the descriptor is to be actually published.
    /// The proposed descriptor lifetime for `Uncertain`
    /// is the minimum (30 minutes).
    /// Otherwise, we double the lifetime each time,
    /// unless any IPT in the previous descriptor was declared `Faulty`,
    /// in which case we reset it back to the minimum.
    /// TODO: Perhaps we should just pick fixed short and long lifetimes instead,
    /// to limit distinguishability.
    ///
    /// (Rationale: if IPTs are regularly misbehaving,
    /// we should be cautious and limit our exposure to the damage.)
    ///
    /// \[1\] NOTE: We wait a "short time" between establishing our first IPT,
    /// and publishing an incomplete (<N) descriptor -
    /// this is a compromise between
    /// availability (publishing as soon as we have any working IPT)
    /// and
    /// exposure and hsdir load
    /// (which would suggest publishing only when our IPT set is stable).
    /// One possible strategy is to wait as long again
    /// as the time it took to establish our first IPT.
    /// Another is to somehow use our circuit timing estimator.
    ///
    /// ### Performance
    ///
    /// This function is at worst O(N) where N is the number of IPTs.
    /// See the performance note on [`run_once()`](Self::run_once).
    #[allow(clippy::unnecessary_wraps)] // for regularity
    #[allow(clippy::cognitive_complexity)] // this function is in fact largely linear
92
    fn compute_iptsetstatus_publish(
92
        &mut self,
92
        now: &TrackingNow,
92
        publish_set: &mut PublishIptSet,
92
    ) -> Result<(), IptStoreError> {
92
        //---------- tell the publisher what to announce ----------
92

            
92
        let very_recently: Option<(TrackingInstantOffsetNow, Duration)> = (|| {
            // on time overflow, don't treat any as started establishing very recently
92
            let fastest_good_establish_time = self
92
                .current_ipts()
276
                .filter_map(|(_ir, ipt)| match ipt.status_last {
                    TS::Good {
28
                        time_to_establish, ..
28
                    } => Some(time_to_establish.ok()?),
248
                    TS::Establishing { .. } | TS::Faulty { .. } => None,
276
                })
92
                .min()?;
            // Rationale:
            // we could use circuit timings etc., but arguably the actual time to establish
            // our fastest IPT is a better estimator here (and we want an optimistic,
            // rather than pessimistic estimate).
            //
            // This algorithm has potential to publish too early and frequently,
            // but our overall rate-limiting should keep it from getting out of hand.
            //
            // TODO: We might want to make this "1" tuneable, and/or tune the
            // algorithm as a whole based on experience.
16
            let wait_more = fastest_good_establish_time * 1;
16
            let very_recently = fastest_good_establish_time.checked_add(wait_more)?;
16
            let very_recently = now.checked_sub(very_recently)?;
16
            Some((very_recently, wait_more))
92
        })();
92

            
92
        let started_establishing_very_recently = || {
12
            let (very_recently, wait_more) = very_recently?;
12
            let lid = self
12
                .current_ipts()
32
                .filter_map(|(_ir, ipt)| {
32
                    let started = match ipt.status_last {
16
                        TS::Establishing { started } => Some(started),
16
                        TS::Good { .. } | TS::Faulty { .. } => None,
16
                    }?;
16
                    (started > very_recently).then_some(ipt.lid)
32
                })
12
                .next()?;
4
            Some((lid, wait_more))
12
        };
92
        let n_good_ipts = self.good_ipts().count();
92
        let publish_lifetime = if n_good_ipts >= self.target_n_intro_points() {
            // "Certain" - we are sure of which IPTs we want to publish
4
            debug!(
                "HS service {}: {} good IPTs, >= target {}, publishing",
                &self.imm.nick,
                n_good_ipts,
                self.target_n_intro_points()
            );
4
            self.imm.status_tx.send(IptMgrState::Running, None);
4

            
4
            Some(IPT_PUBLISH_CERTAIN)
88
        } else if self.good_ipts().next().is_none()
        /* !... .is_empty() */
        {
            // "Unknown" - we have no idea which IPTs to publish.
76
            debug!("HS service {}: no good IPTs", &self.imm.nick);
76
            self.imm
76
                .status_tx
76
                .send_recovering(self.ipt_errors().cloned().collect_vec());
76

            
76
            None
12
        } else if let Some((wait_for, wait_more)) = started_establishing_very_recently() {
            // "Unknown" - we say have no idea which IPTs to publish:
            // although we have *some* idea, we hold off a bit to see if things improve.
            // The wait_more period started counting when the fastest IPT became ready,
            // so the printed value isn't an offset from the message timestamp.
4
            debug!(
                "HS service {}: {} good IPTs, < target {}, waiting up to {}ms for {:?}",
                &self.imm.nick,
                n_good_ipts,
                self.target_n_intro_points(),
                wait_more.as_millis(),
                wait_for
            );
4
            self.imm
4
                .status_tx
4
                .send_recovering(self.ipt_errors().cloned().collect_vec());
4

            
4
            None
        } else {
            // "Uncertain" - we have some IPTs we could publish, but we're not confident
8
            debug!(
                "HS service {}: {} good IPTs, < target {}, publishing what we have",
                &self.imm.nick,
                n_good_ipts,
                self.target_n_intro_points()
            );
            // We are close to being Running -- we just need more IPTs!
8
            let errors = self.ipt_errors().cloned().collect_vec();
8
            let errors = if errors.is_empty() {
8
                None
            } else {
                Some(errors)
            };
8
            self.imm
8
                .status_tx
8
                .send(IptMgrState::DegradedReachable, errors.map(|e| e.into()));
8

            
8
            Some(IPT_PUBLISH_UNCERTAIN)
        };
92
        publish_set.ipts = if let Some(lifetime) = publish_lifetime {
12
            let selected = self.publish_set_select();
36
            for ipt in &selected {
24
                self.state.mockable.start_accepting(&*ipt.establisher);
24
            }
12
            Some(Self::make_publish_set(selected, lifetime)?)
        } else {
80
            None
        };
        //---------- store persistent state ----------
92
        persist::store(&self.imm, &mut self.state)?;
92
        Ok(())
92
    }
    /// Select IPTs to publish, given that we have decided to publish *something*
    ///
    /// Calculates set of ipts to publish, selecting up to the target `N`
    /// from the available good current IPTs.
    /// (Old, non-current IPTs, that we are trying to retire, are never published.)
    ///
    /// The returned list is in the same order as our data structure:
    /// firstly, by the ordering in `State.irelays`, and then within each relay,
    /// by the ordering in `IptRelay.ipts`.  Both of these are stable.
    ///
    /// ### Performance
    ///
    /// This function is at worst O(N) where N is the number of IPTs.
    /// See the performance note on [`run_once()`](Self::run_once).
12
    fn publish_set_select(&self) -> VecDeque<&Ipt> {
        /// Good candidate introduction point for publication
        type Candidate<'i> = &'i Ipt;
12
        let target_n = self.target_n_intro_points();
12

            
12
        let mut candidates: VecDeque<_> = self
12
            .state
12
            .irelays
12
            .iter()
36
            .filter_map(|ir: &_| -> Option<Candidate<'_>> {
36
                let current_ipt = ir.current_ipt()?;
36
                if !current_ipt.is_good() {
12
                    return None;
24
                }
24
                Some(current_ipt)
36
            })
12
            .collect();
        // Take the last N good IPT relays
        //
        // The way we manage irelays means that this is always
        // the ones we selected most recently.
        //
        // TODO SPEC  Publication strategy when we have more than >N IPTs
        //
        // We could have a number of strategies here.  We could take some timing
        // measurements, or use the establishment time, or something; but we don't
        // want to add distinguishability.
        //
        // Another concern is manipulability, but
        // We can't be forced to churn because we don't remove relays
        // from our list of relays to try to use, other than on our own schedule.
        // But we probably won't want to be too reactive to the network environment.
        //
        // Since we only choose new relays when old ones are to retire, or are faulty,
        // choosing the most recently selected, rather than the least recently,
        // has the effect of preferring relays we don't know to be faulty,
        // to ones we have considered faulty least once.
        //
        // That's better than the opposite.  Also, choosing more recently selected relays
        // for publication may slightly bring forward the time at which all descriptors
        // mentioning that relay have expired, and then we can forget about it.
12
        while candidates.len() > target_n {
            // WTB: VecDeque::truncate_front
            let _: Candidate = candidates.pop_front().expect("empty?!");
        }
12
        candidates
12
    }
    /// Produce a `publish::IptSet`, from a list of IPT selected for publication
    ///
    /// Updates each chosen `Ipt`'s `last_descriptor_expiry_including_slop`
    ///
    /// The returned `IptSet` set is in the same order as `selected`.
    ///
    /// ### Performance
    ///
    /// This function is at worst O(N) where N is the number of IPTs.
    /// See the performance note on [`run_once()`](Self::run_once).
12
    fn make_publish_set<'i>(
12
        selected: impl IntoIterator<Item = &'i Ipt>,
12
        lifetime: Duration,
12
    ) -> Result<ipt_set::IptSet, FatalError> {
12
        let ipts = selected
12
            .into_iter()
24
            .map(|current_ipt| {
24
                let TS::Good { details, .. } = &current_ipt.status_last else {
                    return Err(internal!("was good but now isn't?!").into());
                };
24
                let publish = current_ipt.for_publish(details)?;
                // last_descriptor_expiry_including_slop was earlier merged in from
                // the previous IptSet, and here we copy it back
24
                let publish = ipt_set::IptInSet {
24
                    ipt: publish,
24
                    lid: current_ipt.lid,
24
                };
24

            
24
                Ok::<_, FatalError>(publish)
24
            })
12
            .collect::<Result<_, _>>()?;
12
        Ok(ipt_set::IptSet { ipts, lifetime })
12
    }
    /// Delete persistent on-disk data (including keys) for old IPTs
    ///
    /// More precisely, scan places where per-IPT data files live,
    /// and delete anything that doesn't correspond to
    /// one of the IPTs in our main in-memory data structure.
    ///
    /// Does *not* deal with deletion of data handled via storage handles
    /// (`state_dir::StorageHandle`), `ipt_mgr/persist.rs` etc.;
    /// those are one file for each service, so old data is removed as we rewrite them.
    ///
    /// Does *not* deal with deletion of entire old hidden services.
    ///
    /// (This function works on the basis of the invariant that every IPT
    /// in [`ipt_set::PublishIptSet`] is also an [`Ipt`] in [`ipt_mgr::State`](State).
    /// See the comment in [`IptManager::import_new_expiry_times`].
    /// If that invariant is violated, we would delete on-disk files for the affected IPTs.
    /// That's fine since we couldn't re-establish them anyway.)
    #[allow(clippy::cognitive_complexity)] // Splitting this up would make it worse
24
    fn expire_old_ipts_external_persistent_state(&self) -> Result<(), StateExpiryError> {
24
        self.state
24
            .mockable
24
            .expire_old_ipts_external_persistent_state_hook();
24

            
72
        let all_ipts: HashSet<_> = self.all_ipts().map(|(_, ipt)| &ipt.lid).collect();
        // Keys
24
        let pat = IptKeySpecifierPattern {
24
            nick: Some(self.imm.nick.clone()),
24
            role: None,
24
            lid: None,
24
        }
24
        .arti_pattern()?;
24
        let found = self.imm.keymgr.list_matching(&pat)?;
200
        for entry in found {
176
            let path = entry.key_path();
176
            // Try to identify this key (including its IptLocalId)
176
            match IptKeySpecifier::try_from(path) {
176
                Ok(spec) if all_ipts.contains(&spec.lid) => continue,
32
                Ok(_) => trace!("deleting key for old IPT: {path}"),
                Err(bad) => info!("deleting unrecognised IPT key: {path} ({})", bad.report()),
            };
            // Not known, remove it
32
            self.imm.keymgr.remove_entry(&entry)?;
        }
        // IPT replay logs
128
        let handle_rl_err = |operation, path: &Path| {
128
            let path = path.to_owned();
            move |source| StateExpiryError::ReplayLog {
                operation,
                path,
                source: Arc::new(source),
            }
128
        };
        // fs-mistrust doesn't offer CheckedDir::read_this_directory.
        // But, we probably don't mind that we're not doing many checks here.
24
        let replay_logs = self.imm.replay_log_dir.as_path();
24
        let replay_logs_dir =
24
            fs::read_dir(replay_logs).map_err(handle_rl_err("open dir", replay_logs))?;
112
        for ent in replay_logs_dir {
88
            let ent = ent.map_err(handle_rl_err("read dir", replay_logs))?;
88
            let leaf = ent.file_name();
88
            // Try to identify this replay logfile (including its IptLocalId)
88
            match IptReplayLog::parse_log_leafname(&leaf) {
88
                Ok(lid) if all_ipts.contains(&lid) => continue,
16
                Ok(_) => trace!(
                    leaf = leaf.to_string_lossy().as_ref(),
                    "deleting replay log for old IPT"
                ),
                Err(bad) => info!(
                    "deleting garbage in IPT replay log dir: {} ({})",
                    leaf.to_string_lossy(),
                    bad
                ),
            }
            // Not known, remove it
16
            let path = ent.path();
16
            fs::remove_file(&path).map_err(handle_rl_err("remove", &path))?;
        }
24
        Ok(())
24
    }
    /// Run one iteration of the loop
    ///
    /// Either do some work, making changes to our state,
    /// or, if there's nothing to be done, wait until there *is* something to do.
    ///
    /// ### Implementation approach
    ///
    /// Every time we wake up we idempotently make progress
    /// by searching our whole state machine, looking for something to do.
    /// If we find something to do, we do that one thing, and search again.
    /// When we're done, we unconditionally recalculate the IPTs to publish, and sleep.
    ///
    /// This approach avoids the need for complicated reasoning about
    /// which state updates need to trigger other state updates,
    /// and thereby avoids several classes of potential bugs.
    /// However, it has some performance implications:
    ///
    /// ### Performance
    ///
    /// Events relating to an IPT occur, at worst,
    /// at a rate proportional to the current number of IPTs,
    /// times the maximum flap rate of any one IPT.
    ///
    /// [`idempotently_progress_things_now`](Self::idempotently_progress_things_now)
    /// can be called more than once for each such event,
    /// but only a finite number of times per IPT.
    ///
    /// Therefore, overall, our work rate is O(N^2) where N is the number of IPTs.
    /// We think this is tolerable,
    /// but it does mean that the principal functions should be written
    /// with an eye to avoiding "accidentally quadratic" algorithms,
    /// because that would make the whole manager cubic.
    /// Ideally we would avoid O(N.log(N)) algorithms.
    ///
    /// (Note that the number of IPTs can be significantly larger than
    /// the maximum target of 20, if the service is very busy so the intro points
    /// are cycling rapidly due to the need to replace the replay database.)
92
    async fn run_once(
92
        &mut self,
92
        // This is a separate argument for borrowck reasons
92
        publisher: &mut IptsManagerView,
92
    ) -> Result<ShutdownStatus, FatalError> {
92
        let now = {
            // Block to persuade borrow checker that publish_set isn't
            // held over an await point.
92
            let mut publish_set = publisher.borrow_for_update(self.imm.runtime.clone());
92

            
92
            Self::import_new_expiry_times(&mut self.state.irelays, &publish_set);
92

            
92
            let mut loop_limit = 0..(
92
                // Work we do might be O(number of intro points),
92
                // but we might also have cycled the intro points due to many requests.
92
                // 10K is a guess at a stupid upper bound on the number of times we
92
                // might cycle ipts during a descriptor lifetime.
92
                // We don't need a tight bound; if we're going to crash. we can spin a bit first.
92
                (self.target_n_intro_points() + 1) * 10_000
92
            );
92
            let now = loop {
164
                let _: usize = loop_limit.next().expect("IPT manager is looping");
164
                if let Some(now) = self.idempotently_progress_things_now()? {
92
                    break now;
72
                }
            };
            // TODO #1214 Maybe something at level Error or Info, for example
            // Log an error if everything is terrilbe
            //   - we have >=N Faulty IPTs ?
            //    we have only Faulty IPTs and can't select another due to 2N limit ?
            // Log at info if and when we publish?  Maybe the publisher should do that?
92
            if let Err(operr) = self.compute_iptsetstatus_publish(&now, &mut publish_set) {
                // This is not good, is it.
                publish_set.ipts = None;
                let wait = operr.log_retry_max(&self.imm.nick)?;
                now.update(wait);
92
            };
92
            self.expire_old_expiry_times(&mut publish_set, &now);
92

            
92
            drop(publish_set); // release lock, and notify publisher of any changes
92

            
92
            if self.state.ipt_removal_cleanup_needed {
24
                let outcome = self.expire_old_ipts_external_persistent_state();
24
                log_ratelim!("removing state for old IPT(s)"; outcome);
24
                match outcome {
24
                    Ok(()) => self.state.ipt_removal_cleanup_needed = false,
                    Err(_already_logged) => {}
                }
68
            }
92
            now
92
        };
92

            
92
        assert_ne!(
92
            now.clone().shortest(),
            Some(Duration::ZERO),
            "IPT manager zero timeout, would loop"
        );
92
        let mut new_configs = self.state.new_configs.next().fuse();
92

            
92
        select_biased! {
92
            () = now.wait_for_earliest(&self.imm.runtime).fuse() => {},
92
            shutdown = self.state.shutdown.next().fuse() => {
8
                info!("HS service {}: terminating due to shutdown signal", &self.imm.nick);
                // We shouldn't be receiving anything on thisi channel.
8
                assert!(shutdown.is_none());
8
                return Ok(ShutdownStatus::Terminate)
            },
92
            update = self.state.status_recv.next() => {
52
                let (lid, update) = update.ok_or_else(|| internal!("update mpsc ended!"))?;
52
                self.state.handle_ipt_status_update(&self.imm, lid, update);
            }
80
            _dir_event = async {
80
                match self.state.last_irelay_selection_outcome {
80
                    Ok(()) => future::pending().await,
                    // This boxes needlessly but it shouldn't really happen
                    Err(()) => self.imm.dirprovider.events().next().await,
                }
92
            }.fuse() => {
                self.state.last_irelay_selection_outcome = Ok(());
            }
8
            new_config = new_configs => {
8
                let Some(new_config) = new_config else {
                    trace!("HS service {}: terminating due to EOF on config updates stream",
                           &self.imm.nick);
                    return Ok(ShutdownStatus::Terminate);
                };
8
                if let Err(why) = (|| {
16
                    let dos = |config: &OnionServiceConfig| config.dos_extension()
16
                        .map_err(|e| e.report().to_string());
8
                    if dos(&self.state.current_config)? != dos(&new_config)? {
                        return Err("DOS parameters (rate limit) changed".to_string());
8
                    }
8
                    Ok(())
8
                })() {
                    // We need new IPTs with the new parameters.  (The previously-published
                    // IPTs will automatically be retained so long as needed, by the
                    // rest of our algorithm.)
                    info!("HS service {}: replacing IPTs: {}", &self.imm.nick, &why);
                    for ir in &mut self.state.irelays {
                        for ipt in &mut ir.ipts {
                            ipt.is_current = None;
                        }
                    }
8
                }
8
                self.state.current_config = new_config;
8
                self.state.last_irelay_selection_outcome = Ok(());
            }
        }
84
        Ok(ShutdownStatus::Continue)
92
    }
    /// IPT Manager main loop, runs as a task
    ///
    /// Contains the error handling, including catching panics.
8
    async fn main_loop_task(mut self, mut publisher: IptsManagerView) {
92
        loop {
92
            match async {
92
                AssertUnwindSafe(self.run_once(&mut publisher))
92
                    .catch_unwind()
92
                    .await
92
                    .map_err(|_: Box<dyn Any + Send>| internal!("IPT manager crashed"))?
92
            }
92
            .await
            {
                Err(crash) => {
                    error!("bug: HS service {} crashed! {}", &self.imm.nick, crash);
                    self.imm.status_tx.send_broken(crash);
                    break;
                }
84
                Ok(ShutdownStatus::Continue) => continue,
                Ok(ShutdownStatus::Terminate) => {
8
                    self.imm.status_tx.send_shutdown();
8

            
8
                    break;
                }
            }
        }
8
    }
}
impl<R: Runtime, M: Mockable<R>> State<R, M> {
    /// Find the `Ipt` with persistent local id `lid`
52
    fn ipt_by_lid_mut(&mut self, needle: IptLocalId) -> Option<&mut Ipt> {
52
        self.irelays
52
            .iter_mut()
120
            .find_map(|ir| ir.ipts.iter_mut().find(|ipt| ipt.lid == needle))
52
    }
    /// Choose a new relay to use for IPTs
28
    fn choose_new_ipt_relay(
28
        &mut self,
28
        imm: &Immutable<R>,
28
        now: Instant,
28
    ) -> Result<(), ChooseIptError> {
28
        let netdir = imm.dirprovider.timely_netdir()?;
28
        let mut rng = self.mockable.thread_rng();
28
        let relay = {
28
            let exclude_ids = self
28
                .irelays
28
                .iter()
44
                .flat_map(|e| e.relay.identities())
88
                .map(|id| id.to_owned())
28
                .collect();
28
            let selector = RelaySelector::new(
28
                RelayUsage::new_intro_point(),
28
                RelayExclusion::exclude_identities(exclude_ids),
28
            );
28
            selector
28
                .select_relay(&mut rng, &netdir)
28
                .0 // TODO: Someday we might want to report why we rejected everything on failure.
28
                .ok_or(ChooseIptError::TooFewUsableRelays)?
        };
28
        let lifetime_low = netdir
28
            .params()
28
            .hs_intro_min_lifetime
28
            .try_into()
28
            .expect("Could not convert param to duration.");
28
        let lifetime_high = netdir
28
            .params()
28
            .hs_intro_max_lifetime
28
            .try_into()
28
            .expect("Could not convert param to duration.");
28
        let lifetime_range: std::ops::RangeInclusive<Duration> = lifetime_low..=lifetime_high;
28
        let retirement = rng
28
            .gen_range_checked(lifetime_range)
28
            // If the range from the consensus is invalid, just pick the high-bound.
28
            .unwrap_or(lifetime_high);
28
        let retirement = now
28
            .checked_add(retirement)
28
            .ok_or(ChooseIptError::TimeOverflow)?;
28
        let new_irelay = IptRelay {
28
            relay: RelayIds::from_relay_ids(&relay),
28
            planned_retirement: retirement,
28
            ipts: vec![],
28
        };
28
        self.irelays.push(new_irelay);
28

            
28
        debug!(
            "HS service {}: choosing new IPT relay {}",
            &imm.nick,
            relay.display_relay_ids()
        );
28
        Ok(())
28
    }
    /// Update `self`'s status tracking for one introduction point
52
    fn handle_ipt_status_update(&mut self, imm: &Immutable<R>, lid: IptLocalId, update: IptStatus) {
52
        let Some(ipt) = self.ipt_by_lid_mut(lid) else {
            // update from now-withdrawn IPT, ignore it (can happen due to the IPT being a task)
            return;
        };
52
        debug!("HS service {}: {lid:?} status update {update:?}", &imm.nick);
        let IptStatus {
52
            status: update,
52
            wants_to_retire,
52
            ..
52
        } = update;
52

            
52
        #[allow(clippy::single_match)] // want to be explicit about the Ok type
52
        match wants_to_retire {
            Err(IptWantsToRetire) => ipt.is_current = None,
52
            Ok(()) => {}
        }
52
        let now = || imm.runtime.now();
52
        let started = match &ipt.status_last {
52
            TS::Establishing { started, .. } => Ok(*started),
            TS::Faulty { started, .. } => *started,
            TS::Good { .. } => Err(()),
        };
52
        ipt.status_last = match update {
40
            ISS::Establishing => TS::Establishing {
40
                started: started.unwrap_or_else(|()| now()),
40
            },
12
            ISS::Good(details) => {
12
                let time_to_establish = started.and_then(|started| {
12
                    // return () at end of ok_or_else closure, for clarity
12
                    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
12
                    now().checked_duration_since(started).ok_or_else(|| {
                        warn!("monotonic clock went backwards! (HS IPT)");
                        ()
12
                    })
12
                });
12
                TS::Good {
12
                    time_to_establish,
12
                    details,
12
                }
            }
            ISS::Faulty(error) => TS::Faulty { started, error },
        };
52
    }
}
//========== mockability ==========
/// Mockable state for the IPT Manager
///
/// This allows us to use a fake IPT Establisher and IPT Publisher,
/// so that we can unit test the Manager.
pub(crate) trait Mockable<R>: Debug + Send + Sync + Sized + 'static {
    /// IPT establisher type
    type IptEstablisher: Send + Sync + 'static;
    /// A random number generator
    type Rng<'m>: rand::Rng + rand::CryptoRng + 'm;
    /// Return a random number generator
    fn thread_rng(&mut self) -> Self::Rng<'_>;
    /// Call `IptEstablisher::new`
    fn make_new_ipt(
        &mut self,
        imm: &Immutable<R>,
        params: IptParameters,
    ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError>;
    /// Call `IptEstablisher::start_accepting`
    fn start_accepting(&self, establisher: &ErasedIptEstablisher);
    /// Allow tests to see when [`IptManager::expire_old_ipts_external_persistent_state`]
    /// is called.
    ///
    /// This lets tests see that it gets called at the right times,
    /// and not the wrong ones.
    fn expire_old_ipts_external_persistent_state_hook(&self);
}
impl<R: Runtime> Mockable<R> for Real<R> {
    type IptEstablisher = IptEstablisher;
    /// A random number generator
    type Rng<'m> = rand::rngs::ThreadRng;
    /// Return a random number generator
    fn thread_rng(&mut self) -> Self::Rng<'_> {
        rand::rng()
    }
    fn make_new_ipt(
        &mut self,
        imm: &Immutable<R>,
        params: IptParameters,
    ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
        IptEstablisher::launch(&imm.runtime, params, self.circ_pool.clone(), &imm.keymgr)
    }
    fn start_accepting(&self, establisher: &ErasedIptEstablisher) {
        let establisher: &IptEstablisher = <dyn Any>::downcast_ref(establisher)
            .expect("upcast failure, ErasedIptEstablisher is not IptEstablisher!");
        establisher.start_accepting();
    }
    fn expire_old_ipts_external_persistent_state_hook(&self) {}
}
// TODO #1213 add more unit tests for IptManager
// Especially, we want to exercise all code paths in idempotently_progress_things_now
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    #![allow(clippy::match_single_binding)] // false positives, need the lifetime extension
    use super::*;
    use crate::config::OnionServiceConfigBuilder;
    use crate::ipt_establish::GoodIptDetails;
    use crate::status::{OnionServiceStatus, StatusSender};
    use crate::test::{create_keymgr, create_storage_handles_from_state_dir};
    use rand::SeedableRng as _;
    use slotmap_careful::DenseSlotMap;
    use std::collections::BTreeMap;
    use std::sync::Mutex;
    use test_temp_dir::{test_temp_dir, TestTempDir};
    use tor_basic_utils::test_rng::TestingRng;
    use tor_netdir::testprovider::TestNetDirProvider;
    use tor_rtmock::MockRuntime;
    use tracing_test::traced_test;
    use walkdir::WalkDir;
    slotmap_careful::new_key_type! {
        struct MockEstabId;
    }
    type MockEstabs = Arc<Mutex<DenseSlotMap<MockEstabId, MockEstabState>>>;
    fn ms(ms: u64) -> Duration {
        Duration::from_millis(ms)
    }
    #[derive(Debug)]
    struct Mocks {
        rng: TestingRng,
        estabs: MockEstabs,
        expect_expire_ipts_calls: Arc<Mutex<usize>>,
    }
    #[derive(Debug)]
    struct MockEstabState {
        st_tx: watch::Sender<IptStatus>,
        params: IptParameters,
    }
    #[derive(Debug)]
    struct MockEstab {
        esid: MockEstabId,
        estabs: MockEstabs,
    }
    impl Mockable<MockRuntime> for Mocks {
        type IptEstablisher = MockEstab;
        type Rng<'m> = &'m mut TestingRng;
        fn thread_rng(&mut self) -> Self::Rng<'_> {
            &mut self.rng
        }
        fn make_new_ipt(
            &mut self,
            _imm: &Immutable<MockRuntime>,
            params: IptParameters,
        ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
            let (st_tx, st_rx) = watch::channel();
            let estab = MockEstabState { st_tx, params };
            let esid = self.estabs.lock().unwrap().insert(estab);
            let estab = MockEstab {
                esid,
                estabs: self.estabs.clone(),
            };
            Ok((estab, st_rx))
        }
        fn start_accepting(&self, _establisher: &ErasedIptEstablisher) {}
        fn expire_old_ipts_external_persistent_state_hook(&self) {
            let mut expect = self.expect_expire_ipts_calls.lock().unwrap();
            eprintln!("expire_old_ipts_external_persistent_state_hook, expect={expect}");
            *expect = expect.checked_sub(1).expect("unexpected expiry");
        }
    }
    impl Drop for MockEstab {
        fn drop(&mut self) {
            let mut estabs = self.estabs.lock().unwrap();
            let _: MockEstabState = estabs
                .remove(self.esid)
                .expect("dropping non-recorded MockEstab");
        }
    }
    struct MockedIptManager<'d> {
        estabs: MockEstabs,
        pub_view: ipt_set::IptsPublisherView,
        shut_tx: broadcast::Sender<Void>,
        #[allow(dead_code)]
        cfg_tx: watch::Sender<Arc<OnionServiceConfig>>,
        #[allow(dead_code)] // ensures temp dir lifetime; paths stored in self
        temp_dir: &'d TestTempDir,
        expect_expire_ipts_calls: Arc<Mutex<usize>>, // use usize::MAX to not mind
    }
    impl<'d> MockedIptManager<'d> {
        fn startup(
            runtime: MockRuntime,
            temp_dir: &'d TestTempDir,
            seed: u64,
            expect_expire_ipts_calls: usize,
        ) -> Self {
            let dir: TestNetDirProvider = tor_netdir::testnet::construct_netdir()
                .unwrap_if_sufficient()
                .unwrap()
                .into();
            let nick: HsNickname = "nick".to_string().try_into().unwrap();
            let cfg = OnionServiceConfigBuilder::default()
                .nickname(nick.clone())
                .build()
                .unwrap();
            let (cfg_tx, cfg_rx) = watch::channel_with(Arc::new(cfg));
            let (rend_tx, _rend_rx) = mpsc::channel(10);
            let (shut_tx, shut_rx) = broadcast::channel::<Void>(0);
            let estabs: MockEstabs = Default::default();
            let expect_expire_ipts_calls = Arc::new(Mutex::new(expect_expire_ipts_calls));
            let mocks = Mocks {
                rng: TestingRng::seed_from_u64(seed),
                estabs: estabs.clone(),
                expect_expire_ipts_calls: expect_expire_ipts_calls.clone(),
            };
            // Don't provide a subdir; the ipt_mgr is supposed to add any needed subdirs
            let state_dir = temp_dir
                // untracked is OK because our return value captures 'd
                .subdir_untracked("state_dir");
            let (state_handle, iptpub_state_handle) =
                create_storage_handles_from_state_dir(&state_dir, &nick);
            let (mgr_view, pub_view) =
                ipt_set::ipts_channel(&runtime, iptpub_state_handle).unwrap();
            let keymgr = create_keymgr(temp_dir);
            let keymgr = keymgr.into_untracked(); // OK because our return value captures 'd
            let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
            let mgr = IptManager::new(
                runtime.clone(),
                Arc::new(dir),
                nick,
                cfg_rx,
                rend_tx,
                shut_rx,
                &state_handle,
                mocks,
                keymgr,
                status_tx,
            )
            .unwrap();
            mgr.launch_background_tasks(mgr_view).unwrap();
            MockedIptManager {
                estabs,
                pub_view,
                shut_tx,
                cfg_tx,
                temp_dir,
                expect_expire_ipts_calls,
            }
        }
        async fn shutdown_check_no_tasks(self, runtime: &MockRuntime) {
            drop(self.shut_tx);
            runtime.progress_until_stalled().await;
            assert_eq!(runtime.mock_task().n_tasks(), 1); // just us
        }
        fn estabs_inventory(&self) -> impl Eq + Debug + 'static {
            let estabs = self.estabs.lock().unwrap();
            let estabs = estabs
                .values()
                .map(|MockEstabState { params: p, .. }| {
                    (
                        p.lid,
                        (
                            p.target.clone(),
                            // We want to check the key values, but they're very hard to get at
                            // in a way we can compare.  Especially the private keys, for which
                            // we can't getting a clone or copy of the private key material out of the Arc.
                            // They're keypairs, we can use the debug rep which shows the public half.
                            // That will have to do.
                            format!("{:?}", p.k_sid),
                            format!("{:?}", p.k_ntor),
                        ),
                    )
                })
                .collect::<BTreeMap<_, _>>();
            estabs
        }
    }
    #[test]
    #[traced_test]
    fn test_mgr_lifecycle() {
        MockRuntime::test_with_various(|runtime| async move {
            let temp_dir = test_temp_dir!();
            let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 0, 1);
            runtime.progress_until_stalled().await;
            assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
            // We expect it to try to establish 3 IPTs
            const EXPECT_N_IPTS: usize = 3;
            const EXPECT_MAX_IPTS: usize = EXPECT_N_IPTS + 2 /* num_extra */;
            assert_eq!(m.estabs.lock().unwrap().len(), EXPECT_N_IPTS);
            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
            // Advancing time a bit and it still shouldn't publish anything
            runtime.advance_by(ms(500)).await;
            runtime.progress_until_stalled().await;
            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
            let good = GoodIptDetails {
                link_specifiers: vec![],
                ipt_kp_ntor: [0x55; 32].into(),
            };
            // Imagine that one of our IPTs becomes good
            m.estabs
                .lock()
                .unwrap()
                .values_mut()
                .next()
                .unwrap()
                .st_tx
                .borrow_mut()
                .status = IptStatusStatus::Good(good.clone());
            // TODO #1213 test that we haven't called start_accepting
            // It won't publish until a further fastest establish time
            // Ie, until a further 500ms = 1000ms
            runtime.progress_until_stalled().await;
            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
            runtime.advance_by(ms(499)).await;
            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
            runtime.advance_by(ms(1)).await;
            match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
                pub_view => {
                    assert_eq!(pub_view.ipts.len(), 1);
                    assert_eq!(pub_view.lifetime, IPT_PUBLISH_UNCERTAIN);
                }
            };
            // TODO #1213 test that we have called start_accepting on the right IPTs
            // Set the other IPTs to be Good too
            for e in m.estabs.lock().unwrap().values_mut().skip(1) {
                e.st_tx.borrow_mut().status = IptStatusStatus::Good(good.clone());
            }
            runtime.progress_until_stalled().await;
            match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
                pub_view => {
                    assert_eq!(pub_view.ipts.len(), EXPECT_N_IPTS);
                    assert_eq!(pub_view.lifetime, IPT_PUBLISH_CERTAIN);
                }
            };
            // TODO #1213 test that we have called start_accepting on the right IPTs
            let estabs_inventory = m.estabs_inventory();
            // Shut down
            m.shutdown_check_no_tasks(&runtime).await;
            // ---------- restart! ----------
            info!("*** Restarting ***");
            let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 1, 1);
            runtime.progress_until_stalled().await;
            assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
            assert_eq!(estabs_inventory, m.estabs_inventory());
            // TODO #1213 test that we have called start_accepting on all the old IPTs
            // ---------- New IPT relay selection ----------
            let old_lids: Vec<String> = m
                .estabs
                .lock()
                .unwrap()
                .values()
                .map(|ess| ess.params.lid.to_string())
                .collect();
            eprintln!("IPTs to rotate out: {old_lids:?}");
            let old_lid_files = || {
                WalkDir::new(temp_dir.as_path_untracked())
                    .into_iter()
                    .map(|ent| {
                        ent.unwrap()
                            .into_path()
                            .into_os_string()
                            .into_string()
                            .unwrap()
                    })
                    .filter(|path| old_lids.iter().any(|lid| path.contains(lid)))
                    .collect_vec()
            };
            let no_files: [String; 0] = [];
            assert_ne!(old_lid_files(), no_files);
            // It might call the expiry function once, or once per IPT.
            // The latter is quadratic but this is quite rare, so that's fine.
            *m.expect_expire_ipts_calls.lock().unwrap() = EXPECT_MAX_IPTS;
            // wait 2 days, > hs_intro_max_lifetime
            runtime.advance_by(ms(48 * 60 * 60 * 1_000)).await;
            runtime.progress_until_stalled().await;
            // It must have called it at least once.
            assert_ne!(*m.expect_expire_ipts_calls.lock().unwrap(), EXPECT_MAX_IPTS);
            // There should now be no files names after old IptLocalIds.
            assert_eq!(old_lid_files(), no_files);
            // Shut down
            m.shutdown_check_no_tasks(&runtime).await;
        });
    }
}