tor_hsservice/
ipt_mgr.rs

1//! IPT Manager
2//!
3//! Maintains introduction points and publishes descriptors.
4//! Provides a stream of rendezvous requests.
5//!
6//! See [`IptManager::run_once`] for discussion of the implementation approach.
7
8use crate::internal_prelude::*;
9
10use tor_relay_selection::{RelayExclusion, RelaySelector, RelayUsage};
11use IptStatusStatus as ISS;
12use TrackedStatus as TS;
13
14mod persist;
15pub(crate) use persist::IptStorageHandle;
16
17pub use crate::ipt_establish::IptError;
18
19/// Expiry time to put on an interim descriptor (IPT publication set Uncertain)
20///
21/// (Note that we use the same value in both cases, since it doesn't actually do
22/// much good to have a short expiration time. This expiration time only affects
23/// caches, and we can supersede an old descriptor just by publishing it. Thus,
24/// we pick a uniform publication time as done by the C tor implementation.)
25const IPT_PUBLISH_UNCERTAIN: Duration = Duration::from_secs(3 * 60 * 60); // 3 hours
26/// Expiry time to put on a final descriptor (IPT publication set Certain
27const IPT_PUBLISH_CERTAIN: Duration = IPT_PUBLISH_UNCERTAIN;
28
29//========== data structures ==========
30
31/// IPT Manager (for one hidden service)
32#[derive(Educe)]
33#[educe(Debug(bound))]
34pub(crate) struct IptManager<R, M> {
35    /// Immutable contents
36    imm: Immutable<R>,
37
38    /// Mutable state
39    state: State<R, M>,
40}
41
42/// Immutable contents of an IPT Manager
43///
44/// Contains things inherent to our identity, and
45/// handles to services that we'll be using.
46#[derive(Educe)]
47#[educe(Debug(bound))]
48pub(crate) struct Immutable<R> {
49    /// Runtime
50    #[educe(Debug(ignore))]
51    runtime: R,
52
53    /// Netdir provider
54    #[educe(Debug(ignore))]
55    dirprovider: Arc<dyn NetDirProvider>,
56
57    /// Nickname
58    nick: HsNickname,
59
60    /// Output MPSC for rendezvous requests
61    ///
62    /// Passed to IPT Establishers we create
63    output_rend_reqs: mpsc::Sender<RendRequest>,
64
65    /// Internal channel for updates from IPT Establishers (sender)
66    ///
67    /// When we make a new `IptEstablisher` we use this arrange for
68    /// its status updates to arrive, appropriately tagged, via `status_recv`
69    status_send: mpsc::Sender<(IptLocalId, IptStatus)>,
70
71    /// The key manager.
72    #[educe(Debug(ignore))]
73    keymgr: Arc<KeyMgr>,
74
75    /// Replay log directory
76    ///
77    /// Files are named after the (bare) IptLocalId
78    #[educe(Debug(ignore))]
79    replay_log_dir: tor_persist::state_dir::InstanceRawSubdir,
80
81    /// A sender for updating the status of the onion service.
82    #[educe(Debug(ignore))]
83    status_tx: IptMgrStatusSender,
84}
85
86/// State of an IPT Manager
87#[derive(Educe)]
88#[educe(Debug(bound))]
89pub(crate) struct State<R, M> {
90    /// Source of configuration updates
91    //
92    // TODO #1209 reject reconfigurations we can't cope with
93    // for example, state dir changes will go quite wrong
94    new_configs: watch::Receiver<Arc<OnionServiceConfig>>,
95
96    /// Last configuration update we received
97    ///
98    /// This is the snapshot of the config we are currently using.
99    /// (Doing it this way avoids running our algorithms
100    /// with a mixture of old and new config.)
101    current_config: Arc<OnionServiceConfig>,
102
103    /// Channel for updates from IPT Establishers (receiver)
104    ///
105    /// We arrange for all the updates to be multiplexed,
106    /// as that makes handling them easy in our event loop.
107    status_recv: mpsc::Receiver<(IptLocalId, IptStatus)>,
108
109    /// State: selected relays
110    ///
111    /// We append to this, and call `retain` on it,
112    /// so these are in chronological order of selection.
113    irelays: Vec<IptRelay>,
114
115    /// Did we fail to select a relay last time?
116    ///
117    /// This can only be caused (or triggered) by a busted netdir or config.
118    last_irelay_selection_outcome: Result<(), ()>,
119
120    /// Have we removed any IPTs but not yet cleaned up keys and logfiles?
121    #[educe(Debug(ignore))]
122    ipt_removal_cleanup_needed: bool,
123
124    /// Signal for us to shut down
125    shutdown: broadcast::Receiver<Void>,
126
127    /// The on-disk state storage handle.
128    #[educe(Debug(ignore))]
129    storage: IptStorageHandle,
130
131    /// Mockable state, normally [`Real`]
132    ///
133    /// This is in `State` so it can be passed mutably to tests,
134    /// even though the main code doesn't need `mut`
135    /// since `HsCircPool` is a service with interior mutability.
136    mockable: M,
137
138    /// Runtime (to placate compiler)
139    runtime: PhantomData<R>,
140}
141
142/// One selected relay, at which we are establishing (or relavantly advertised) IPTs
143struct IptRelay {
144    /// The actual relay
145    relay: RelayIds,
146
147    /// The retirement time we selected for this relay
148    planned_retirement: Instant,
149
150    /// IPTs at this relay
151    ///
152    /// At most one will have [`IsCurrent`].
153    ///
154    /// We append to this, and call `retain` on it,
155    /// so these are in chronological order of selection.
156    ipts: Vec<Ipt>,
157}
158
159/// One introduction point, representation in memory
160#[derive(Debug)]
161struct Ipt {
162    /// Local persistent identifier
163    lid: IptLocalId,
164
165    /// Handle for the establisher; we keep this here just for its `Drop` action
166    establisher: Box<ErasedIptEstablisher>,
167
168    /// `KS_hs_ipt_sid`, `KP_hs_ipt_sid`
169    ///
170    /// This is an `Arc` because:
171    ///  * The manager needs a copy so that it can save it to disk.
172    ///  * The establisher needs a copy to actually use.
173    ///  * The underlying secret key type is not `Clone`.
174    k_sid: Arc<HsIntroPtSessionIdKeypair>,
175
176    /// `KS_hss_ntor`, `KP_hss_ntor`
177    k_hss_ntor: Arc<HsSvcNtorKeypair>,
178
179    /// Last information about how it's doing including timing info
180    status_last: TrackedStatus,
181
182    /// Until when ought we to try to maintain it
183    ///
184    /// For introduction points we are publishing,
185    /// this is a copy of the value set by the publisher
186    /// in the `IptSet` we share with the publisher,
187    ///
188    /// (`None` means the IPT has not been advertised at all yet.)
189    ///
190    /// We must duplicate the information because:
191    ///
192    ///  * We can't have it just live in the shared `IptSet`
193    ///    because we need to retain it for no-longer-being published IPTs.
194    ///
195    ///  * We can't have it just live here because the publisher needs to update it.
196    ///
197    /// (An alternative would be to more seriously entangle the manager and publisher.)
198    last_descriptor_expiry_including_slop: Option<Instant>,
199
200    /// Is this IPT current - should we include it in descriptors ?
201    ///
202    /// `None` might mean:
203    ///  * WantsToRetire
204    ///  * We have >N IPTs and we have been using this IPT so long we want to rotate it out
205    ///    (the [`IptRelay`] has reached its `planned_retirement` time)
206    ///  * The IPT has wrong parameters of some kind, and needs to be replaced
207    ///    (Eg, we set it up with the wrong DOS_PARAMS extension)
208    is_current: Option<IsCurrent>,
209}
210
211/// Last information from establisher about an IPT, with timing info added by us
212#[derive(Debug)]
213enum TrackedStatus {
214    /// Corresponds to [`IptStatusStatus::Faulty`]
215    Faulty {
216        /// When we were first told this started to establish, if we know it
217        ///
218        /// This might be an early estimate, which would give an overestimate
219        /// of the establishment time, which is fine.
220        /// Or it might be `Err` meaning we don't know.
221        started: Result<Instant, ()>,
222
223        /// The error, if any.
224        error: Option<IptError>,
225    },
226
227    /// Corresponds to [`IptStatusStatus::Establishing`]
228    Establishing {
229        /// When we were told we started to establish, for calculating `time_to_establish`
230        started: Instant,
231    },
232
233    /// Corresponds to [`IptStatusStatus::Good`]
234    Good {
235        /// How long it took to establish (if we could determine that information)
236        ///
237        /// Can only be `Err` in strange situations.
238        time_to_establish: Result<Duration, ()>,
239
240        /// Details, from the Establisher
241        details: ipt_establish::GoodIptDetails,
242    },
243}
244
245/// Token indicating that this introduction point is current (not Retiring)
246#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
247struct IsCurrent;
248
249//---------- related to mockability ----------
250
251/// Type-erased version of `Box<IptEstablisher>`
252///
253/// The real type is `M::IptEstablisher`.
254/// We use `Box<dyn Any>` to avoid propagating the `M` type parameter to `Ipt` etc.
255type ErasedIptEstablisher = dyn Any + Send + Sync + 'static;
256
257/// Mockable state in an IPT Manager - real version
258#[derive(Educe)]
259#[educe(Debug)]
260pub(crate) struct Real<R: Runtime> {
261    /// Circuit pool for circuits we need to make
262    ///
263    /// Passed to the each new Establisher
264    #[educe(Debug(ignore))]
265    pub(crate) circ_pool: Arc<HsCircPool<R>>,
266}
267
268//---------- errors ----------
269
270/// An error that happened while trying to select a relay
271///
272/// Used only within the IPT manager.
273/// Can only be caused by bad netdir or maybe bad config.
274#[derive(Debug, Error)]
275enum ChooseIptError {
276    /// Bad or insufficient netdir
277    #[error("bad or insufficient netdir")]
278    NetDir(#[from] tor_netdir::Error),
279    /// Too few suitable relays
280    #[error("too few suitable relays")]
281    TooFewUsableRelays,
282    /// Time overflow
283    #[error("time overflow (system clock set wrong?)")]
284    TimeOverflow,
285    /// Internal error
286    #[error("internal error")]
287    Bug(#[from] Bug),
288}
289
290/// An error that happened while trying to crate an IPT (at a selected relay)
291///
292/// Used only within the IPT manager.
293#[derive(Debug, Error)]
294pub(crate) enum CreateIptError {
295    /// Fatal error
296    #[error("fatal error")]
297    Fatal(#[from] FatalError),
298
299    /// Error accessing keystore
300    #[error("problems with keystores")]
301    Keystore(#[from] tor_keymgr::Error),
302
303    /// Error opening the intro request replay log
304    #[error("unable to open the intro req replay log: {file:?}")]
305    OpenReplayLog {
306        /// What filesystem object we tried to do it to
307        file: PathBuf,
308        /// What happened
309        #[source]
310        error: Arc<io::Error>,
311    },
312}
313
314//========== Relays we've chosen, and IPTs ==========
315
316impl IptRelay {
317    /// Get a reference to this IPT relay's current intro point state (if any)
318    ///
319    /// `None` means this IPT has no current introduction points.
320    /// That might be, briefly, because a new intro point needs to be created;
321    /// or it might be because we are retiring the relay.
322    fn current_ipt(&self) -> Option<&Ipt> {
323        self.ipts
324            .iter()
325            .find(|ipt| ipt.is_current == Some(IsCurrent))
326    }
327
328    /// Get a mutable reference to this IPT relay's current intro point state (if any)
329    fn current_ipt_mut(&mut self) -> Option<&mut Ipt> {
330        self.ipts
331            .iter_mut()
332            .find(|ipt| ipt.is_current == Some(IsCurrent))
333    }
334
335    /// Should this IPT Relay be retired ?
336    ///
337    /// This is determined by our IPT relay rotation time.
338    fn should_retire(&self, now: &TrackingNow) -> bool {
339        now > &self.planned_retirement
340    }
341
342    /// Make a new introduction point at this relay
343    ///
344    /// It becomes the current IPT.
345    fn make_new_ipt<R: Runtime, M: Mockable<R>>(
346        &mut self,
347        imm: &Immutable<R>,
348        new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
349        mockable: &mut M,
350    ) -> Result<(), CreateIptError> {
351        let lid: IptLocalId = mockable.thread_rng().random();
352
353        let ipt = Ipt::start_establisher(
354            imm,
355            new_configs,
356            mockable,
357            &self.relay,
358            lid,
359            Some(IsCurrent),
360            None::<IptExpectExistingKeys>,
361            // None is precisely right: the descriptor hasn't been published.
362            PromiseLastDescriptorExpiryNoneIsGood {},
363        )?;
364
365        self.ipts.push(ipt);
366
367        Ok(())
368    }
369}
370
371/// Token, representing promise by caller of `start_establisher`
372///
373/// Caller who makes one of these structs promises that it is OK for `start_establisher`
374/// to set `last_descriptor_expiry_including_slop` to `None`.
375struct PromiseLastDescriptorExpiryNoneIsGood {}
376
377/// Token telling [`Ipt::start_establisher`] to expect existing keys in the keystore
378#[derive(Debug, Clone, Copy)]
379struct IptExpectExistingKeys;
380
381impl Ipt {
382    /// Start a new IPT establisher, and create and return an `Ipt`
383    #[allow(clippy::too_many_arguments)] // There's only two call sites
384    fn start_establisher<R: Runtime, M: Mockable<R>>(
385        imm: &Immutable<R>,
386        new_configs: &watch::Receiver<Arc<OnionServiceConfig>>,
387        mockable: &mut M,
388        relay: &RelayIds,
389        lid: IptLocalId,
390        is_current: Option<IsCurrent>,
391        expect_existing_keys: Option<IptExpectExistingKeys>,
392        _: PromiseLastDescriptorExpiryNoneIsGood,
393    ) -> Result<Ipt, CreateIptError> {
394        let mut rng = tor_llcrypto::rng::CautiousRng;
395
396        /// Load (from disk) or generate an IPT key with role IptKeyRole::$role
397        ///
398        /// Ideally this would be a closure, but it has to be generic over the
399        /// returned key type.  So it's a macro.  (A proper function would have
400        /// many type parameters and arguments and be quite annoying.)
401        macro_rules! get_or_gen_key { { $Keypair:ty, $role:ident } => { (||{
402            let spec = IptKeySpecifier {
403                nick: imm.nick.clone(),
404                role: IptKeyRole::$role,
405                lid,
406            };
407            // Our desired behaviour:
408            //  expect_existing_keys == None
409            //     The keys shouldn't exist.  Generate and insert.
410            //     If they do exist then things are badly messed up
411            //     (we're creating a new IPT with a fres lid).
412            //     So, then, crash.
413            //  expect_existing_keys == Some(IptExpectExistingKeys)
414            //     The key is supposed to exist.  Load them.
415            //     We ought to have stored them before storing in our on-disk records that
416            //     this IPT exists.  But this could happen due to file deletion or something.
417            //     And we could recover by creating fresh keys, although maybe some clients
418            //     would find the previous keys in old descriptors.
419            //     So if the keys are missing, make and store new ones, logging an error msg.
420            let k: Option<$Keypair> = imm.keymgr.get(&spec)?;
421            let arti_path = || {
422                spec
423                    .arti_path()
424                    .map_err(|e| {
425                        CreateIptError::Fatal(
426                            into_internal!("bad ArtiPath from IPT key spec")(e).into()
427                        )
428                    })
429            };
430            match (expect_existing_keys, k) {
431                (None, None) => { }
432                (Some(_), Some(k)) => return Ok(Arc::new(k)),
433                (None, Some(_)) => {
434                    return Err(FatalError::IptKeysFoundUnexpectedly(arti_path()?).into())
435                },
436                (Some(_), None) => {
437                    error!("bug: HS service {} missing previous key {:?}. Regenerating.",
438                           &imm.nick, arti_path()?);
439                }
440             }
441
442            let res = imm.keymgr.generate::<$Keypair>(
443                &spec,
444                tor_keymgr::KeystoreSelector::Primary,
445                &mut rng,
446                false, /* overwrite */
447            );
448
449            match res {
450                Ok(k) => Ok::<_, CreateIptError>(Arc::new(k)),
451                Err(tor_keymgr::Error::KeyAlreadyExists) => {
452                    Err(FatalError::KeystoreRace { action: "generate", path: arti_path()? }.into() )
453                },
454                Err(e) => Err(e.into()),
455            }
456        })() } }
457
458        let k_hss_ntor = get_or_gen_key!(HsSvcNtorKeypair, KHssNtor)?;
459        let k_sid = get_or_gen_key!(HsIntroPtSessionIdKeypair, KSid)?;
460
461        // we'll treat it as Establishing until we find otherwise
462        let status_last = TS::Establishing {
463            started: imm.runtime.now(),
464        };
465
466        // TODO #1186 Support ephemeral services (without persistent replay log)
467        let replay_log = IptReplayLog::new_logged(&imm.replay_log_dir, &lid)?;
468
469        let params = IptParameters {
470            replay_log,
471            config_rx: new_configs.clone(),
472            netdir_provider: imm.dirprovider.clone(),
473            introduce_tx: imm.output_rend_reqs.clone(),
474            lid,
475            target: relay.clone(),
476            k_sid: k_sid.clone(),
477            k_ntor: Arc::clone(&k_hss_ntor),
478            accepting_requests: ipt_establish::RequestDisposition::NotAdvertised,
479        };
480        let (establisher, mut watch_rx) = mockable.make_new_ipt(imm, params)?;
481
482        // This task will shut down when self.establisher is dropped, causing
483        // watch_tx to close.
484        imm.runtime
485            .spawn({
486                let mut status_send = imm.status_send.clone();
487                async move {
488                    loop {
489                        let Some(status) = watch_rx.next().await else {
490                            trace!("HS service IPT status task: establisher went away");
491                            break;
492                        };
493                        match status_send.send((lid, status)).await {
494                            Ok(()) => {}
495                            Err::<_, mpsc::SendError>(e) => {
496                                // Not using trace_report because SendError isn't HasKind
497                                trace!("HS service IPT status task: manager went away: {e}");
498                                break;
499                            }
500                        }
501                    }
502                }
503            })
504            .map_err(|cause| FatalError::Spawn {
505                spawning: "IPT establisher watch status task",
506                cause: cause.into(),
507            })?;
508
509        let ipt = Ipt {
510            lid,
511            establisher: Box::new(establisher),
512            k_hss_ntor,
513            k_sid,
514            status_last,
515            is_current,
516            last_descriptor_expiry_including_slop: None,
517        };
518
519        debug!(
520            "Hs service {}: {lid:?} establishing {} IPT at relay {}",
521            &imm.nick,
522            match expect_existing_keys {
523                None => "new",
524                Some(_) => "previous",
525            },
526            &relay,
527        );
528
529        Ok(ipt)
530    }
531
532    /// Returns `true` if this IPT has status Good (and should perhaps be published)
533    fn is_good(&self) -> bool {
534        match self.status_last {
535            TS::Good { .. } => true,
536            TS::Establishing { .. } | TS::Faulty { .. } => false,
537        }
538    }
539
540    /// Returns the error, if any, we are currently encountering at this IPT.
541    fn error(&self) -> Option<&IptError> {
542        match &self.status_last {
543            TS::Good { .. } | TS::Establishing { .. } => None,
544            TS::Faulty { error, .. } => error.as_ref(),
545        }
546    }
547
548    /// Construct the information needed by the publisher for this intro point
549    fn for_publish(&self, details: &ipt_establish::GoodIptDetails) -> Result<ipt_set::Ipt, Bug> {
550        let k_sid: &ed25519::Keypair = (*self.k_sid).as_ref();
551        tor_netdoc::doc::hsdesc::IntroPointDesc::builder()
552            .link_specifiers(details.link_specifiers.clone())
553            .ipt_kp_ntor(details.ipt_kp_ntor)
554            .kp_hs_ipt_sid(k_sid.verifying_key().into())
555            .kp_hss_ntor(self.k_hss_ntor.public().clone())
556            .build()
557            .map_err(into_internal!("failed to construct IntroPointDesc"))
558    }
559}
560
561impl HasKind for ChooseIptError {
562    fn kind(&self) -> ErrorKind {
563        use ChooseIptError as E;
564        use ErrorKind as EK;
565        match self {
566            E::NetDir(e) => e.kind(),
567            E::TooFewUsableRelays => EK::TorDirectoryUnusable,
568            E::TimeOverflow => EK::ClockSkew,
569            E::Bug(e) => e.kind(),
570        }
571    }
572}
573
574// This is somewhat abbreviated but it is legible and enough for most purposes.
575impl Debug for IptRelay {
576    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
577        writeln!(f, "IptRelay {}", self.relay)?;
578        write!(
579            f,
580            "          planned_retirement: {:?}",
581            self.planned_retirement
582        )?;
583        for ipt in &self.ipts {
584            write!(
585                f,
586                "\n          ipt {} {} {:?} ldeis={:?}",
587                match ipt.is_current {
588                    Some(IsCurrent) => "cur",
589                    None => "old",
590                },
591                &ipt.lid,
592                &ipt.status_last,
593                &ipt.last_descriptor_expiry_including_slop,
594            )?;
595        }
596        Ok(())
597    }
598}
599
600//========== impls on IptManager and State ==========
601
602impl<R: Runtime, M: Mockable<R>> IptManager<R, M> {
603    //
604    //---------- constructor and setup ----------
605
606    /// Create a new IptManager
607    #[allow(clippy::too_many_arguments)] // this is an internal function with 1 call site
608    pub(crate) fn new(
609        runtime: R,
610        dirprovider: Arc<dyn NetDirProvider>,
611        nick: HsNickname,
612        config: watch::Receiver<Arc<OnionServiceConfig>>,
613        output_rend_reqs: mpsc::Sender<RendRequest>,
614        shutdown: broadcast::Receiver<Void>,
615        state_handle: &tor_persist::state_dir::InstanceStateHandle,
616        mockable: M,
617        keymgr: Arc<KeyMgr>,
618        status_tx: IptMgrStatusSender,
619    ) -> Result<Self, StartupError> {
620        let irelays = vec![]; // See TODO near persist::load call, in launch_background_tasks
621
622        // We don't need buffering; since this is written to by dedicated tasks which
623        // are reading watches.
624        //
625        // Internally-generated status updates (hopefully rate limited?), no need for mq.
626        let (status_send, status_recv) = mpsc_channel_no_memquota(0);
627
628        let storage = state_handle
629            .storage_handle("ipts")
630            .map_err(StartupError::StateDirectoryInaccessible)?;
631
632        let replay_log_dir = state_handle
633            .raw_subdir("iptreplay")
634            .map_err(StartupError::StateDirectoryInaccessible)?;
635
636        let imm = Immutable {
637            runtime,
638            dirprovider,
639            nick,
640            status_send,
641            output_rend_reqs,
642            keymgr,
643            replay_log_dir,
644            status_tx,
645        };
646        let current_config = config.borrow().clone();
647
648        let state = State {
649            current_config,
650            new_configs: config,
651            status_recv,
652            storage,
653            mockable,
654            shutdown,
655            irelays,
656            last_irelay_selection_outcome: Ok(()),
657            ipt_removal_cleanup_needed: false,
658            runtime: PhantomData,
659        };
660        let mgr = IptManager { imm, state };
661
662        Ok(mgr)
663    }
664
665    /// Send the IPT manager off to run and establish intro points
666    pub(crate) fn launch_background_tasks(
667        mut self,
668        mut publisher: IptsManagerView,
669    ) -> Result<(), StartupError> {
670        // TODO maybe this should be done in new(), so we don't have this dummy irelays
671        // but then new() would need the IptsManagerView
672        assert!(self.state.irelays.is_empty());
673        self.state.irelays = persist::load(
674            &self.imm,
675            &self.state.storage,
676            &self.state.new_configs,
677            &mut self.state.mockable,
678            &publisher.borrow_for_read(),
679        )?;
680
681        // Now that we've populated `irelays` and its `ipts` from the on-disk state,
682        // we should check any leftover disk files from previous runs.  Make a note.
683        self.state.ipt_removal_cleanup_needed = true;
684
685        let runtime = self.imm.runtime.clone();
686
687        self.imm.status_tx.send(IptMgrState::Bootstrapping, None);
688
689        // This task will shut down when the RunningOnionService is dropped, causing
690        // self.state.shutdown to become ready.
691        runtime
692            .spawn(self.main_loop_task(publisher))
693            .map_err(|cause| StartupError::Spawn {
694                spawning: "ipt manager",
695                cause: cause.into(),
696            })?;
697        Ok(())
698    }
699
700    //---------- internal utility and helper methods ----------
701
702    /// Iterate over *all* the IPTs we know about
703    ///
704    /// Yields each `IptRelay` at most once.
705    fn all_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
706        self.state
707            .irelays
708            .iter()
709            .flat_map(|ir| ir.ipts.iter().map(move |ipt| (ir, ipt)))
710    }
711
712    /// Iterate over the *current* IPTs
713    ///
714    /// Yields each `IptRelay` at most once.
715    fn current_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
716        self.state
717            .irelays
718            .iter()
719            .filter_map(|ir| Some((ir, ir.current_ipt()?)))
720    }
721
722    /// Iterate over the *current* IPTs in `Good` state
723    fn good_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
724        self.current_ipts().filter(|(_ir, ipt)| ipt.is_good())
725    }
726
727    /// Iterate over the current IPT errors.
728    ///
729    /// Used when reporting our state as [`Recovering`](crate::status::State::Recovering).
730    fn ipt_errors(&self) -> impl Iterator<Item = &IptError> {
731        self.all_ipts().filter_map(|(_ir, ipt)| ipt.error())
732    }
733
734    /// Target number of intro points
735    pub(crate) fn target_n_intro_points(&self) -> usize {
736        self.state.current_config.num_intro_points.into()
737    }
738
739    /// Maximum number of concurrent intro point relays
740    pub(crate) fn max_n_intro_relays(&self) -> usize {
741        let params = self.imm.dirprovider.params();
742        let num_extra = (*params).as_ref().hs_intro_num_extra_intropoints.get() as usize;
743        self.target_n_intro_points() + num_extra
744    }
745
746    //---------- main implementation logic ----------
747
748    /// Make some progress, if possible, and say when to wake up again
749    ///
750    /// Examines the current state and attempts to improve it.
751    ///
752    /// If `idempotently_progress_things_now` makes any changes,
753    /// it will return `None`.
754    /// It should then be called again immediately.
755    ///
756    /// Otherwise, it returns the time in the future when further work ought to be done:
757    /// i.e., the time of the earliest timeout or planned future state change -
758    /// as a [`TrackingNow`].
759    ///
760    /// In that case, the caller must call `compute_iptsetstatus_publish`,
761    /// since the IPT set etc. may have changed.
762    ///
763    /// ### Goals and algorithms
764    ///
765    /// We attempt to maintain a pool of N established and verified IPTs,
766    /// at N IPT Relays.
767    ///
768    /// When we have fewer than N IPT Relays
769    /// that have `Establishing` or `Good` IPTs (see below)
770    /// and fewer than k*N IPT Relays overall,
771    /// we choose a new IPT Relay at random from the consensus
772    /// and try to establish an IPT on it.
773    ///
774    /// (Rationale for the k*N limit:
775    /// we do want to try to replace faulty IPTs, but
776    /// we don't want an attacker to be able to provoke us into
777    /// rapidly churning through IPT candidates.)
778    ///
779    /// When we select a new IPT Relay, we randomly choose a planned replacement time,
780    /// after which it becomes `Retiring`.
781    ///
782    /// Additionally, any IPT becomes `Retiring`
783    /// after it has been used for a certain number of introductions
784    /// (c.f. C Tor `#define INTRO_POINT_MIN_LIFETIME_INTRODUCTIONS 16384`.)
785    /// When this happens we retain the IPT Relay,
786    /// and make new parameters to make a new IPT at the same Relay.
787    ///
788    /// An IPT is removed from our records, and we give up on it,
789    /// when it is no longer `Good` or `Establishing`
790    /// and all descriptors that mentioned it have expired.
791    ///
792    /// (Until all published descriptors mentioning an IPT expire,
793    /// we consider ourselves bound by those previously-published descriptors,
794    /// and try to maintain the IPT.
795    /// TODO: Allegedly this is unnecessary, but I don't see how it could be.)
796    ///
797    /// ### Performance
798    ///
799    /// This function is at worst O(N) where N is the number of IPTs.
800    /// When handling state changes relating to a particular IPT (or IPT relay)
801    /// it needs at most O(1) calls to progress that one IPT to its proper new state.
802    ///
803    /// See the performance note on [`run_once()`](Self::run_once).
804    #[allow(clippy::redundant_closure_call)]
805    fn idempotently_progress_things_now(&mut self) -> Result<Option<TrackingNow>, FatalError> {
806        /// Return value which means "we changed something, please run me again"
807        ///
808        /// In each case, if we make any changes which indicate we might
809        /// want to restart, , we `return CONTINUE`, and
810        /// our caller will just call us again.
811        ///
812        /// This approach simplifies the logic: everything here is idempotent.
813        /// (It does mean the algorithm can be quadratic in the number of intro points,
814        /// but that number is reasonably small for a modern computer and the constant
815        /// factor is small too.)
816        const CONTINUE: Result<Option<TrackingNow>, FatalError> = Ok(None);
817
818        // This tracks everything we compare it to, using interior mutability,
819        // so that if there is no work to do and no timeouts have expired,
820        // we know when we will want to wake up.
821        let now = TrackingNow::now(&self.imm.runtime);
822
823        // ---------- collect garbage ----------
824
825        // Rotate out an old IPT(s)
826        for ir in &mut self.state.irelays {
827            if ir.should_retire(&now) {
828                if let Some(ipt) = ir.current_ipt_mut() {
829                    ipt.is_current = None;
830                    return CONTINUE;
831                }
832            }
833        }
834
835        // Forget old IPTs (after the last descriptor mentioning them has expired)
836        for ir in &mut self.state.irelays {
837            // When we drop the Ipt we drop the IptEstablisher, withdrawing the intro point
838            ir.ipts.retain(|ipt| {
839                let keep = ipt.is_current.is_some()
840                    || match ipt.last_descriptor_expiry_including_slop {
841                        None => false,
842                        Some(last) => now < last,
843                    };
844                // This is the only place in the manager where an IPT is dropped,
845                // other than when the whole service is dropped.
846                self.state.ipt_removal_cleanup_needed |= !keep;
847                keep
848            });
849            // No need to return CONTINUE, since there is no other future work implied
850            // by discarding a non-current IPT.
851        }
852
853        // Forget retired IPT relays (all their IPTs are gone)
854        self.state
855            .irelays
856            .retain(|ir| !(ir.should_retire(&now) && ir.ipts.is_empty()));
857        // If we deleted relays, we might want to select new ones.  That happens below.
858
859        // ---------- make progress ----------
860        //
861        // Consider selecting new relays and setting up new IPTs.
862
863        // Create new IPTs at already-chosen relays
864        for ir in &mut self.state.irelays {
865            if !ir.should_retire(&now) && ir.current_ipt_mut().is_none() {
866                // We don't have a current IPT at this relay, but we should.
867                match ir.make_new_ipt(&self.imm, &self.state.new_configs, &mut self.state.mockable)
868                {
869                    Ok(()) => return CONTINUE,
870                    Err(CreateIptError::Fatal(fatal)) => return Err(fatal),
871                    Err(
872                        e @ (CreateIptError::Keystore(_) | CreateIptError::OpenReplayLog { .. }),
873                    ) => {
874                        error_report!(e, "HS {}: failed to prepare new IPT", &self.imm.nick);
875                        // Let's not try any more of this.
876                        // We'll run the rest of our "make progress" algorithms,
877                        // presenting them with possibly-suboptimal state.  That's fine.
878                        // At some point we'll be poked to run again and then we'll retry.
879                        /// Retry no later than this:
880                        const STORAGE_RETRY: Duration = Duration::from_secs(60);
881                        now.update(STORAGE_RETRY);
882                        break;
883                    }
884                }
885            }
886        }
887
888        // Consider choosing a new IPT relay
889        {
890            // block {} prevents use of `n_good_ish_relays` for other (wrong) purposes
891
892            // We optimistically count an Establishing IPT as good-ish;
893            // specifically, for the purposes of deciding whether to select a new
894            // relay because we don't have enough good-looking ones.
895            let n_good_ish_relays = self
896                .current_ipts()
897                .filter(|(_ir, ipt)| match ipt.status_last {
898                    TS::Good { .. } | TS::Establishing { .. } => true,
899                    TS::Faulty { .. } => false,
900                })
901                .count();
902
903            #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)] // in map_err
904            if n_good_ish_relays < self.target_n_intro_points()
905                && self.state.irelays.len() < self.max_n_intro_relays()
906                && self.state.last_irelay_selection_outcome.is_ok()
907            {
908                self.state.last_irelay_selection_outcome = self
909                    .state
910                    .choose_new_ipt_relay(&self.imm, now.instant().get_now_untracked())
911                    .map_err(|error| {
912                        /// Call $report! with the message.
913                        // The macros are annoying and want a cost argument.
914                        macro_rules! report { { $report:ident } => {
915                            $report!(
916                                error,
917                                "HS service {} failed to select IPT relay",
918                                &self.imm.nick,
919                            )
920                        }}
921                        use ChooseIptError as E;
922                        match &error {
923                            E::NetDir(_) => report!(info_report),
924                            _ => report!(error_report),
925                        };
926                        ()
927                    });
928                return CONTINUE;
929            }
930        }
931
932        //---------- caller (run_once) will update publisher, and wait ----------
933
934        Ok(Some(now))
935    }
936
937    /// Import publisher's updates to latest descriptor expiry times
938    ///
939    /// Copies the `last_descriptor_expiry_including_slop` field
940    /// from each ipt in `publish_set` to the corresponding ipt in `self`.
941    ///
942    /// ### Performance
943    ///
944    /// This function is at worst O(N) where N is the number of IPTs.
945    /// See the performance note on [`run_once()`](Self::run_once).
946    fn import_new_expiry_times(irelays: &mut [IptRelay], publish_set: &PublishIptSet) {
947        // Every entry in the PublishIptSet ought to correspond to an ipt in self.
948        //
949        // If there are IPTs in publish_set.last_descriptor_expiry_including_slop
950        // that aren't in self, those are IPTs that we know were published,
951        // but can't establish since we have forgotten their details.
952        //
953        // We are not supposed to allow that to happen:
954        // we save IPTs to disk before we allow them to be published.
955        //
956        // (This invariant is across two data structures:
957        // `ipt_mgr::State` (specifically, `Ipt`) which is modified only here,
958        // and `ipt_set::PublishIptSet` which is shared with the publisher.
959        // See the comments in PublishIptSet.)
960
961        let all_ours = irelays.iter_mut().flat_map(|ir| ir.ipts.iter_mut());
962
963        for ours in all_ours {
964            if let Some(theirs) = publish_set
965                .last_descriptor_expiry_including_slop
966                .get(&ours.lid)
967            {
968                ours.last_descriptor_expiry_including_slop = Some(*theirs);
969            }
970        }
971    }
972
973    /// Expire old entries in publish_set.last_descriptor_expiry_including_slop
974    ///
975    /// Deletes entries where `now` > `last_descriptor_expiry_including_slop`,
976    /// ie, entries where the publication's validity time has expired,
977    /// meaning we don't need to maintain that IPT any more,
978    /// at least, not just because we've published it.
979    ///
980    /// We may expire even entries for IPTs that we, the manager, still want to maintain.
981    /// That's fine: this is (just) the information about what we have previously published.
982    ///
983    /// ### Performance
984    ///
985    /// This function is at worst O(N) where N is the number of IPTs.
986    /// See the performance note on [`run_once()`](Self::run_once).
987    fn expire_old_expiry_times(&self, publish_set: &mut PublishIptSet, now: &TrackingNow) {
988        // We don't want to bother waking up just to expire things,
989        // so use an untracked comparison.
990        let now = now.instant().get_now_untracked();
991
992        publish_set
993            .last_descriptor_expiry_including_slop
994            .retain(|_lid, expiry| *expiry <= now);
995    }
996
997    /// Compute the IPT set to publish, and update the data shared with the publisher
998    ///
999    /// `now` is current time and also the earliest wakeup,
1000    /// which we are in the process of planning.
1001    /// The noted earliest wakeup can be updated by this function,
1002    /// for example, with a future time at which the IPT set ought to be published
1003    /// (eg, the status goes from Unknown to Uncertain).
1004    ///
1005    /// ## IPT sets and lifetimes
1006    ///
1007    /// We remember every IPT we have published that is still valid.
1008    ///
1009    /// At each point in time we have an idea of set of IPTs we want to publish.
1010    /// The possibilities are:
1011    ///
1012    ///  * `Certain`:
1013    ///    We are sure of which IPTs we want to publish.
1014    ///    We try to do so, talking to hsdirs as necessary,
1015    ///    updating any existing information.
1016    ///    (We also republish to an hsdir if its descriptor will expire soon,
1017    ///    or we haven't published there since Arti was restarted.)
1018    ///
1019    ///  * `Unknown`:
1020    ///    We have no idea which IPTs to publish.
1021    ///    We leave whatever is on the hsdirs as-is.
1022    ///
1023    ///  * `Uncertain`:
1024    ///    We have some IPTs we could publish,
1025    ///    but we're not confident about them.
1026    ///    We publish these to a particular hsdir if:
1027    ///     - our last-published descriptor has expired
1028    ///     - or it will expire soon
1029    ///     - or if we haven't published since Arti was restarted.
1030    ///
1031    /// The idea of what to publish is calculated as follows:
1032    ///
1033    ///  * If we have at least N `Good` IPTs: `Certain`.
1034    ///    (We publish the "best" N IPTs for some definition of "best".
1035    ///    TODO: should we use the fault count?  recency?)
1036    ///
1037    ///  * Unless we have at least one `Good` IPT: `Unknown`.
1038    ///
1039    ///  * Otherwise: if there are IPTs in `Establishing`,
1040    ///    and they have been in `Establishing` only a short time \[1\]:
1041    ///    `Unknown`; otherwise `Uncertain`.
1042    ///
1043    /// The effect is that we delay publishing an initial descriptor
1044    /// by at most 1x the fastest IPT setup time,
1045    /// at most doubling the initial setup time.
1046    ///
1047    /// Each update to the IPT set that isn't `Unknown` comes with a
1048    /// proposed descriptor expiry time,
1049    /// which is used if the descriptor is to be actually published.
1050    /// The proposed descriptor lifetime for `Uncertain`
1051    /// is the minimum (30 minutes).
1052    /// Otherwise, we double the lifetime each time,
1053    /// unless any IPT in the previous descriptor was declared `Faulty`,
1054    /// in which case we reset it back to the minimum.
1055    /// TODO: Perhaps we should just pick fixed short and long lifetimes instead,
1056    /// to limit distinguishability.
1057    ///
1058    /// (Rationale: if IPTs are regularly misbehaving,
1059    /// we should be cautious and limit our exposure to the damage.)
1060    ///
1061    /// \[1\] NOTE: We wait a "short time" between establishing our first IPT,
1062    /// and publishing an incomplete (<N) descriptor -
1063    /// this is a compromise between
1064    /// availability (publishing as soon as we have any working IPT)
1065    /// and
1066    /// exposure and hsdir load
1067    /// (which would suggest publishing only when our IPT set is stable).
1068    /// One possible strategy is to wait as long again
1069    /// as the time it took to establish our first IPT.
1070    /// Another is to somehow use our circuit timing estimator.
1071    ///
1072    /// ### Performance
1073    ///
1074    /// This function is at worst O(N) where N is the number of IPTs.
1075    /// See the performance note on [`run_once()`](Self::run_once).
1076    #[allow(clippy::unnecessary_wraps)] // for regularity
1077    #[allow(clippy::cognitive_complexity)] // this function is in fact largely linear
1078    fn compute_iptsetstatus_publish(
1079        &mut self,
1080        now: &TrackingNow,
1081        publish_set: &mut PublishIptSet,
1082    ) -> Result<(), IptStoreError> {
1083        //---------- tell the publisher what to announce ----------
1084
1085        let very_recently: Option<(TrackingInstantOffsetNow, Duration)> = (|| {
1086            // on time overflow, don't treat any as started establishing very recently
1087
1088            let fastest_good_establish_time = self
1089                .current_ipts()
1090                .filter_map(|(_ir, ipt)| match ipt.status_last {
1091                    TS::Good {
1092                        time_to_establish, ..
1093                    } => Some(time_to_establish.ok()?),
1094                    TS::Establishing { .. } | TS::Faulty { .. } => None,
1095                })
1096                .min()?;
1097
1098            // Rationale:
1099            // we could use circuit timings etc., but arguably the actual time to establish
1100            // our fastest IPT is a better estimator here (and we want an optimistic,
1101            // rather than pessimistic estimate).
1102            //
1103            // This algorithm has potential to publish too early and frequently,
1104            // but our overall rate-limiting should keep it from getting out of hand.
1105            //
1106            // TODO: We might want to make this "1" tuneable, and/or tune the
1107            // algorithm as a whole based on experience.
1108            let wait_more = fastest_good_establish_time * 1;
1109            let very_recently = fastest_good_establish_time.checked_add(wait_more)?;
1110
1111            let very_recently = now.checked_sub(very_recently)?;
1112            Some((very_recently, wait_more))
1113        })();
1114
1115        let started_establishing_very_recently = || {
1116            let (very_recently, wait_more) = very_recently?;
1117            let lid = self
1118                .current_ipts()
1119                .filter_map(|(_ir, ipt)| {
1120                    let started = match ipt.status_last {
1121                        TS::Establishing { started } => Some(started),
1122                        TS::Good { .. } | TS::Faulty { .. } => None,
1123                    }?;
1124
1125                    (started > very_recently).then_some(ipt.lid)
1126                })
1127                .next()?;
1128            Some((lid, wait_more))
1129        };
1130
1131        let n_good_ipts = self.good_ipts().count();
1132        let publish_lifetime = if n_good_ipts >= self.target_n_intro_points() {
1133            // "Certain" - we are sure of which IPTs we want to publish
1134            debug!(
1135                "HS service {}: {} good IPTs, >= target {}, publishing",
1136                &self.imm.nick,
1137                n_good_ipts,
1138                self.target_n_intro_points()
1139            );
1140
1141            self.imm.status_tx.send(IptMgrState::Running, None);
1142
1143            Some(IPT_PUBLISH_CERTAIN)
1144        } else if self.good_ipts().next().is_none()
1145        /* !... .is_empty() */
1146        {
1147            // "Unknown" - we have no idea which IPTs to publish.
1148            debug!("HS service {}: no good IPTs", &self.imm.nick);
1149
1150            self.imm
1151                .status_tx
1152                .send_recovering(self.ipt_errors().cloned().collect_vec());
1153
1154            None
1155        } else if let Some((wait_for, wait_more)) = started_establishing_very_recently() {
1156            // "Unknown" - we say have no idea which IPTs to publish:
1157            // although we have *some* idea, we hold off a bit to see if things improve.
1158            // The wait_more period started counting when the fastest IPT became ready,
1159            // so the printed value isn't an offset from the message timestamp.
1160            debug!(
1161                "HS service {}: {} good IPTs, < target {}, waiting up to {}ms for {:?}",
1162                &self.imm.nick,
1163                n_good_ipts,
1164                self.target_n_intro_points(),
1165                wait_more.as_millis(),
1166                wait_for
1167            );
1168
1169            self.imm
1170                .status_tx
1171                .send_recovering(self.ipt_errors().cloned().collect_vec());
1172
1173            None
1174        } else {
1175            // "Uncertain" - we have some IPTs we could publish, but we're not confident
1176            debug!(
1177                "HS service {}: {} good IPTs, < target {}, publishing what we have",
1178                &self.imm.nick,
1179                n_good_ipts,
1180                self.target_n_intro_points()
1181            );
1182
1183            // We are close to being Running -- we just need more IPTs!
1184            let errors = self.ipt_errors().cloned().collect_vec();
1185            let errors = if errors.is_empty() {
1186                None
1187            } else {
1188                Some(errors)
1189            };
1190
1191            self.imm
1192                .status_tx
1193                .send(IptMgrState::DegradedReachable, errors.map(|e| e.into()));
1194
1195            Some(IPT_PUBLISH_UNCERTAIN)
1196        };
1197
1198        publish_set.ipts = if let Some(lifetime) = publish_lifetime {
1199            let selected = self.publish_set_select();
1200            for ipt in &selected {
1201                self.state.mockable.start_accepting(&*ipt.establisher);
1202            }
1203            Some(Self::make_publish_set(selected, lifetime)?)
1204        } else {
1205            None
1206        };
1207
1208        //---------- store persistent state ----------
1209
1210        persist::store(&self.imm, &mut self.state)?;
1211
1212        Ok(())
1213    }
1214
1215    /// Select IPTs to publish, given that we have decided to publish *something*
1216    ///
1217    /// Calculates set of ipts to publish, selecting up to the target `N`
1218    /// from the available good current IPTs.
1219    /// (Old, non-current IPTs, that we are trying to retire, are never published.)
1220    ///
1221    /// The returned list is in the same order as our data structure:
1222    /// firstly, by the ordering in `State.irelays`, and then within each relay,
1223    /// by the ordering in `IptRelay.ipts`.  Both of these are stable.
1224    ///
1225    /// ### Performance
1226    ///
1227    /// This function is at worst O(N) where N is the number of IPTs.
1228    /// See the performance note on [`run_once()`](Self::run_once).
1229    fn publish_set_select(&self) -> VecDeque<&Ipt> {
1230        /// Good candidate introduction point for publication
1231        type Candidate<'i> = &'i Ipt;
1232
1233        let target_n = self.target_n_intro_points();
1234
1235        let mut candidates: VecDeque<_> = self
1236            .state
1237            .irelays
1238            .iter()
1239            .filter_map(|ir: &_| -> Option<Candidate<'_>> {
1240                let current_ipt = ir.current_ipt()?;
1241                if !current_ipt.is_good() {
1242                    return None;
1243                }
1244                Some(current_ipt)
1245            })
1246            .collect();
1247
1248        // Take the last N good IPT relays
1249        //
1250        // The way we manage irelays means that this is always
1251        // the ones we selected most recently.
1252        //
1253        // TODO SPEC  Publication strategy when we have more than >N IPTs
1254        //
1255        // We could have a number of strategies here.  We could take some timing
1256        // measurements, or use the establishment time, or something; but we don't
1257        // want to add distinguishability.
1258        //
1259        // Another concern is manipulability, but
1260        // We can't be forced to churn because we don't remove relays
1261        // from our list of relays to try to use, other than on our own schedule.
1262        // But we probably won't want to be too reactive to the network environment.
1263        //
1264        // Since we only choose new relays when old ones are to retire, or are faulty,
1265        // choosing the most recently selected, rather than the least recently,
1266        // has the effect of preferring relays we don't know to be faulty,
1267        // to ones we have considered faulty least once.
1268        //
1269        // That's better than the opposite.  Also, choosing more recently selected relays
1270        // for publication may slightly bring forward the time at which all descriptors
1271        // mentioning that relay have expired, and then we can forget about it.
1272        while candidates.len() > target_n {
1273            // WTB: VecDeque::truncate_front
1274            let _: Candidate = candidates.pop_front().expect("empty?!");
1275        }
1276
1277        candidates
1278    }
1279
1280    /// Produce a `publish::IptSet`, from a list of IPT selected for publication
1281    ///
1282    /// Updates each chosen `Ipt`'s `last_descriptor_expiry_including_slop`
1283    ///
1284    /// The returned `IptSet` set is in the same order as `selected`.
1285    ///
1286    /// ### Performance
1287    ///
1288    /// This function is at worst O(N) where N is the number of IPTs.
1289    /// See the performance note on [`run_once()`](Self::run_once).
1290    fn make_publish_set<'i>(
1291        selected: impl IntoIterator<Item = &'i Ipt>,
1292        lifetime: Duration,
1293    ) -> Result<ipt_set::IptSet, FatalError> {
1294        let ipts = selected
1295            .into_iter()
1296            .map(|current_ipt| {
1297                let TS::Good { details, .. } = &current_ipt.status_last else {
1298                    return Err(internal!("was good but now isn't?!").into());
1299                };
1300
1301                let publish = current_ipt.for_publish(details)?;
1302
1303                // last_descriptor_expiry_including_slop was earlier merged in from
1304                // the previous IptSet, and here we copy it back
1305                let publish = ipt_set::IptInSet {
1306                    ipt: publish,
1307                    lid: current_ipt.lid,
1308                };
1309
1310                Ok::<_, FatalError>(publish)
1311            })
1312            .collect::<Result<_, _>>()?;
1313
1314        Ok(ipt_set::IptSet { ipts, lifetime })
1315    }
1316
1317    /// Delete persistent on-disk data (including keys) for old IPTs
1318    ///
1319    /// More precisely, scan places where per-IPT data files live,
1320    /// and delete anything that doesn't correspond to
1321    /// one of the IPTs in our main in-memory data structure.
1322    ///
1323    /// Does *not* deal with deletion of data handled via storage handles
1324    /// (`state_dir::StorageHandle`), `ipt_mgr/persist.rs` etc.;
1325    /// those are one file for each service, so old data is removed as we rewrite them.
1326    ///
1327    /// Does *not* deal with deletion of entire old hidden services.
1328    ///
1329    /// (This function works on the basis of the invariant that every IPT
1330    /// in [`ipt_set::PublishIptSet`] is also an [`Ipt`] in [`ipt_mgr::State`](State).
1331    /// See the comment in [`IptManager::import_new_expiry_times`].
1332    /// If that invariant is violated, we would delete on-disk files for the affected IPTs.
1333    /// That's fine since we couldn't re-establish them anyway.)
1334    #[allow(clippy::cognitive_complexity)] // Splitting this up would make it worse
1335    fn expire_old_ipts_external_persistent_state(&self) -> Result<(), StateExpiryError> {
1336        self.state
1337            .mockable
1338            .expire_old_ipts_external_persistent_state_hook();
1339
1340        let all_ipts: HashSet<_> = self.all_ipts().map(|(_, ipt)| &ipt.lid).collect();
1341
1342        // Keys
1343
1344        let pat = IptKeySpecifierPattern {
1345            nick: Some(self.imm.nick.clone()),
1346            role: None,
1347            lid: None,
1348        }
1349        .arti_pattern()?;
1350
1351        let found = self.imm.keymgr.list_matching(&pat)?;
1352
1353        for entry in found {
1354            let path = entry.key_path();
1355            // Try to identify this key (including its IptLocalId)
1356            match IptKeySpecifier::try_from(path) {
1357                Ok(spec) if all_ipts.contains(&spec.lid) => continue,
1358                Ok(_) => trace!("deleting key for old IPT: {path}"),
1359                Err(bad) => info!("deleting unrecognised IPT key: {path} ({})", bad.report()),
1360            };
1361            // Not known, remove it
1362            self.imm.keymgr.remove_entry(&entry)?;
1363        }
1364
1365        // IPT replay logs
1366
1367        let handle_rl_err = |operation, path: &Path| {
1368            let path = path.to_owned();
1369            move |source| StateExpiryError::ReplayLog {
1370                operation,
1371                path,
1372                source: Arc::new(source),
1373            }
1374        };
1375
1376        // fs-mistrust doesn't offer CheckedDir::read_this_directory.
1377        // But, we probably don't mind that we're not doing many checks here.
1378        let replay_logs = self.imm.replay_log_dir.as_path();
1379        let replay_logs_dir =
1380            fs::read_dir(replay_logs).map_err(handle_rl_err("open dir", replay_logs))?;
1381
1382        for ent in replay_logs_dir {
1383            let ent = ent.map_err(handle_rl_err("read dir", replay_logs))?;
1384            let leaf = ent.file_name();
1385            // Try to identify this replay logfile (including its IptLocalId)
1386            match IptReplayLog::parse_log_leafname(&leaf) {
1387                Ok(lid) if all_ipts.contains(&lid) => continue,
1388                Ok(_) => trace!(
1389                    leaf = leaf.to_string_lossy().as_ref(),
1390                    "deleting replay log for old IPT"
1391                ),
1392                Err(bad) => info!(
1393                    "deleting garbage in IPT replay log dir: {} ({})",
1394                    leaf.to_string_lossy(),
1395                    bad
1396                ),
1397            }
1398            // Not known, remove it
1399            let path = ent.path();
1400            fs::remove_file(&path).map_err(handle_rl_err("remove", &path))?;
1401        }
1402
1403        Ok(())
1404    }
1405
1406    /// Run one iteration of the loop
1407    ///
1408    /// Either do some work, making changes to our state,
1409    /// or, if there's nothing to be done, wait until there *is* something to do.
1410    ///
1411    /// ### Implementation approach
1412    ///
1413    /// Every time we wake up we idempotently make progress
1414    /// by searching our whole state machine, looking for something to do.
1415    /// If we find something to do, we do that one thing, and search again.
1416    /// When we're done, we unconditionally recalculate the IPTs to publish, and sleep.
1417    ///
1418    /// This approach avoids the need for complicated reasoning about
1419    /// which state updates need to trigger other state updates,
1420    /// and thereby avoids several classes of potential bugs.
1421    /// However, it has some performance implications:
1422    ///
1423    /// ### Performance
1424    ///
1425    /// Events relating to an IPT occur, at worst,
1426    /// at a rate proportional to the current number of IPTs,
1427    /// times the maximum flap rate of any one IPT.
1428    ///
1429    /// [`idempotently_progress_things_now`](Self::idempotently_progress_things_now)
1430    /// can be called more than once for each such event,
1431    /// but only a finite number of times per IPT.
1432    ///
1433    /// Therefore, overall, our work rate is O(N^2) where N is the number of IPTs.
1434    /// We think this is tolerable,
1435    /// but it does mean that the principal functions should be written
1436    /// with an eye to avoiding "accidentally quadratic" algorithms,
1437    /// because that would make the whole manager cubic.
1438    /// Ideally we would avoid O(N.log(N)) algorithms.
1439    ///
1440    /// (Note that the number of IPTs can be significantly larger than
1441    /// the maximum target of 20, if the service is very busy so the intro points
1442    /// are cycling rapidly due to the need to replace the replay database.)
1443    async fn run_once(
1444        &mut self,
1445        // This is a separate argument for borrowck reasons
1446        publisher: &mut IptsManagerView,
1447    ) -> Result<ShutdownStatus, FatalError> {
1448        let now = {
1449            // Block to persuade borrow checker that publish_set isn't
1450            // held over an await point.
1451
1452            let mut publish_set = publisher.borrow_for_update(self.imm.runtime.clone());
1453
1454            Self::import_new_expiry_times(&mut self.state.irelays, &publish_set);
1455
1456            let mut loop_limit = 0..(
1457                // Work we do might be O(number of intro points),
1458                // but we might also have cycled the intro points due to many requests.
1459                // 10K is a guess at a stupid upper bound on the number of times we
1460                // might cycle ipts during a descriptor lifetime.
1461                // We don't need a tight bound; if we're going to crash. we can spin a bit first.
1462                (self.target_n_intro_points() + 1) * 10_000
1463            );
1464            let now = loop {
1465                let _: usize = loop_limit.next().expect("IPT manager is looping");
1466
1467                if let Some(now) = self.idempotently_progress_things_now()? {
1468                    break now;
1469                }
1470            };
1471
1472            // TODO #1214 Maybe something at level Error or Info, for example
1473            // Log an error if everything is terrilbe
1474            //   - we have >=N Faulty IPTs ?
1475            //    we have only Faulty IPTs and can't select another due to 2N limit ?
1476            // Log at info if and when we publish?  Maybe the publisher should do that?
1477
1478            if let Err(operr) = self.compute_iptsetstatus_publish(&now, &mut publish_set) {
1479                // This is not good, is it.
1480                publish_set.ipts = None;
1481                let wait = operr.log_retry_max(&self.imm.nick)?;
1482                now.update(wait);
1483            };
1484
1485            self.expire_old_expiry_times(&mut publish_set, &now);
1486
1487            drop(publish_set); // release lock, and notify publisher of any changes
1488
1489            if self.state.ipt_removal_cleanup_needed {
1490                let outcome = self.expire_old_ipts_external_persistent_state();
1491                log_ratelim!("removing state for old IPT(s)"; outcome);
1492                match outcome {
1493                    Ok(()) => self.state.ipt_removal_cleanup_needed = false,
1494                    Err(_already_logged) => {}
1495                }
1496            }
1497
1498            now
1499        };
1500
1501        assert_ne!(
1502            now.clone().shortest(),
1503            Some(Duration::ZERO),
1504            "IPT manager zero timeout, would loop"
1505        );
1506
1507        let mut new_configs = self.state.new_configs.next().fuse();
1508
1509        select_biased! {
1510            () = now.wait_for_earliest(&self.imm.runtime).fuse() => {},
1511            shutdown = self.state.shutdown.next().fuse() => {
1512                info!("HS service {}: terminating due to shutdown signal", &self.imm.nick);
1513                // We shouldn't be receiving anything on thisi channel.
1514                assert!(shutdown.is_none());
1515                return Ok(ShutdownStatus::Terminate)
1516            },
1517
1518            update = self.state.status_recv.next() => {
1519                let (lid, update) = update.ok_or_else(|| internal!("update mpsc ended!"))?;
1520                self.state.handle_ipt_status_update(&self.imm, lid, update);
1521            }
1522
1523            _dir_event = async {
1524                match self.state.last_irelay_selection_outcome {
1525                    Ok(()) => future::pending().await,
1526                    // This boxes needlessly but it shouldn't really happen
1527                    Err(()) => self.imm.dirprovider.events().next().await,
1528                }
1529            }.fuse() => {
1530                self.state.last_irelay_selection_outcome = Ok(());
1531            }
1532
1533            new_config = new_configs => {
1534                let Some(new_config) = new_config else {
1535                    trace!("HS service {}: terminating due to EOF on config updates stream",
1536                           &self.imm.nick);
1537                    return Ok(ShutdownStatus::Terminate);
1538                };
1539                if let Err(why) = (|| {
1540                    let dos = |config: &OnionServiceConfig| config.dos_extension()
1541                        .map_err(|e| e.report().to_string());
1542                    if dos(&self.state.current_config)? != dos(&new_config)? {
1543                        return Err("DOS parameters (rate limit) changed".to_string());
1544                    }
1545                    Ok(())
1546                })() {
1547                    // We need new IPTs with the new parameters.  (The previously-published
1548                    // IPTs will automatically be retained so long as needed, by the
1549                    // rest of our algorithm.)
1550                    info!("HS service {}: replacing IPTs: {}", &self.imm.nick, &why);
1551                    for ir in &mut self.state.irelays {
1552                        for ipt in &mut ir.ipts {
1553                            ipt.is_current = None;
1554                        }
1555                    }
1556                }
1557                self.state.current_config = new_config;
1558                self.state.last_irelay_selection_outcome = Ok(());
1559            }
1560        }
1561
1562        Ok(ShutdownStatus::Continue)
1563    }
1564
1565    /// IPT Manager main loop, runs as a task
1566    ///
1567    /// Contains the error handling, including catching panics.
1568    async fn main_loop_task(mut self, mut publisher: IptsManagerView) {
1569        loop {
1570            match async {
1571                AssertUnwindSafe(self.run_once(&mut publisher))
1572                    .catch_unwind()
1573                    .await
1574                    .map_err(|_: Box<dyn Any + Send>| internal!("IPT manager crashed"))?
1575            }
1576            .await
1577            {
1578                Err(crash) => {
1579                    error!("bug: HS service {} crashed! {}", &self.imm.nick, crash);
1580
1581                    self.imm.status_tx.send_broken(crash);
1582                    break;
1583                }
1584                Ok(ShutdownStatus::Continue) => continue,
1585                Ok(ShutdownStatus::Terminate) => {
1586                    self.imm.status_tx.send_shutdown();
1587
1588                    break;
1589                }
1590            }
1591        }
1592    }
1593}
1594
1595impl<R: Runtime, M: Mockable<R>> State<R, M> {
1596    /// Find the `Ipt` with persistent local id `lid`
1597    fn ipt_by_lid_mut(&mut self, needle: IptLocalId) -> Option<&mut Ipt> {
1598        self.irelays
1599            .iter_mut()
1600            .find_map(|ir| ir.ipts.iter_mut().find(|ipt| ipt.lid == needle))
1601    }
1602
1603    /// Choose a new relay to use for IPTs
1604    fn choose_new_ipt_relay(
1605        &mut self,
1606        imm: &Immutable<R>,
1607        now: Instant,
1608    ) -> Result<(), ChooseIptError> {
1609        let netdir = imm.dirprovider.timely_netdir()?;
1610
1611        let mut rng = self.mockable.thread_rng();
1612
1613        let relay = {
1614            let exclude_ids = self
1615                .irelays
1616                .iter()
1617                .flat_map(|e| e.relay.identities())
1618                .map(|id| id.to_owned())
1619                .collect();
1620            let selector = RelaySelector::new(
1621                RelayUsage::new_intro_point(),
1622                RelayExclusion::exclude_identities(exclude_ids),
1623            );
1624            selector
1625                .select_relay(&mut rng, &netdir)
1626                .0 // TODO: Someday we might want to report why we rejected everything on failure.
1627                .ok_or(ChooseIptError::TooFewUsableRelays)?
1628        };
1629
1630        let lifetime_low = netdir
1631            .params()
1632            .hs_intro_min_lifetime
1633            .try_into()
1634            .expect("Could not convert param to duration.");
1635        let lifetime_high = netdir
1636            .params()
1637            .hs_intro_max_lifetime
1638            .try_into()
1639            .expect("Could not convert param to duration.");
1640        let lifetime_range: std::ops::RangeInclusive<Duration> = lifetime_low..=lifetime_high;
1641        let retirement = rng
1642            .gen_range_checked(lifetime_range)
1643            // If the range from the consensus is invalid, just pick the high-bound.
1644            .unwrap_or(lifetime_high);
1645        let retirement = now
1646            .checked_add(retirement)
1647            .ok_or(ChooseIptError::TimeOverflow)?;
1648
1649        let new_irelay = IptRelay {
1650            relay: RelayIds::from_relay_ids(&relay),
1651            planned_retirement: retirement,
1652            ipts: vec![],
1653        };
1654        self.irelays.push(new_irelay);
1655
1656        debug!(
1657            "HS service {}: choosing new IPT relay {}",
1658            &imm.nick,
1659            relay.display_relay_ids()
1660        );
1661
1662        Ok(())
1663    }
1664
1665    /// Update `self`'s status tracking for one introduction point
1666    fn handle_ipt_status_update(&mut self, imm: &Immutable<R>, lid: IptLocalId, update: IptStatus) {
1667        let Some(ipt) = self.ipt_by_lid_mut(lid) else {
1668            // update from now-withdrawn IPT, ignore it (can happen due to the IPT being a task)
1669            return;
1670        };
1671
1672        debug!("HS service {}: {lid:?} status update {update:?}", &imm.nick);
1673
1674        let IptStatus {
1675            status: update,
1676            wants_to_retire,
1677            ..
1678        } = update;
1679
1680        #[allow(clippy::single_match)] // want to be explicit about the Ok type
1681        match wants_to_retire {
1682            Err(IptWantsToRetire) => ipt.is_current = None,
1683            Ok(()) => {}
1684        }
1685
1686        let now = || imm.runtime.now();
1687
1688        let started = match &ipt.status_last {
1689            TS::Establishing { started, .. } => Ok(*started),
1690            TS::Faulty { started, .. } => *started,
1691            TS::Good { .. } => Err(()),
1692        };
1693
1694        ipt.status_last = match update {
1695            ISS::Establishing => TS::Establishing {
1696                started: started.unwrap_or_else(|()| now()),
1697            },
1698            ISS::Good(details) => {
1699                let time_to_establish = started.and_then(|started| {
1700                    // return () at end of ok_or_else closure, for clarity
1701                    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1702                    now().checked_duration_since(started).ok_or_else(|| {
1703                        warn!("monotonic clock went backwards! (HS IPT)");
1704                        ()
1705                    })
1706                });
1707                TS::Good {
1708                    time_to_establish,
1709                    details,
1710                }
1711            }
1712            ISS::Faulty(error) => TS::Faulty { started, error },
1713        };
1714    }
1715}
1716
1717//========== mockability ==========
1718
1719/// Mockable state for the IPT Manager
1720///
1721/// This allows us to use a fake IPT Establisher and IPT Publisher,
1722/// so that we can unit test the Manager.
1723pub(crate) trait Mockable<R>: Debug + Send + Sync + Sized + 'static {
1724    /// IPT establisher type
1725    type IptEstablisher: Send + Sync + 'static;
1726
1727    /// A random number generator
1728    type Rng<'m>: rand::Rng + rand::CryptoRng + 'm;
1729
1730    /// Return a random number generator
1731    fn thread_rng(&mut self) -> Self::Rng<'_>;
1732
1733    /// Call `IptEstablisher::new`
1734    fn make_new_ipt(
1735        &mut self,
1736        imm: &Immutable<R>,
1737        params: IptParameters,
1738    ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError>;
1739
1740    /// Call `IptEstablisher::start_accepting`
1741    fn start_accepting(&self, establisher: &ErasedIptEstablisher);
1742
1743    /// Allow tests to see when [`IptManager::expire_old_ipts_external_persistent_state`]
1744    /// is called.
1745    ///
1746    /// This lets tests see that it gets called at the right times,
1747    /// and not the wrong ones.
1748    fn expire_old_ipts_external_persistent_state_hook(&self);
1749}
1750
1751impl<R: Runtime> Mockable<R> for Real<R> {
1752    type IptEstablisher = IptEstablisher;
1753
1754    /// A random number generator
1755    type Rng<'m> = rand::rngs::ThreadRng;
1756
1757    /// Return a random number generator
1758    fn thread_rng(&mut self) -> Self::Rng<'_> {
1759        rand::rng()
1760    }
1761
1762    fn make_new_ipt(
1763        &mut self,
1764        imm: &Immutable<R>,
1765        params: IptParameters,
1766    ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
1767        IptEstablisher::launch(&imm.runtime, params, self.circ_pool.clone(), &imm.keymgr)
1768    }
1769
1770    fn start_accepting(&self, establisher: &ErasedIptEstablisher) {
1771        let establisher: &IptEstablisher = <dyn Any>::downcast_ref(establisher)
1772            .expect("upcast failure, ErasedIptEstablisher is not IptEstablisher!");
1773        establisher.start_accepting();
1774    }
1775
1776    fn expire_old_ipts_external_persistent_state_hook(&self) {}
1777}
1778
1779// TODO #1213 add more unit tests for IptManager
1780// Especially, we want to exercise all code paths in idempotently_progress_things_now
1781
1782#[cfg(test)]
1783mod test {
1784    // @@ begin test lint list maintained by maint/add_warning @@
1785    #![allow(clippy::bool_assert_comparison)]
1786    #![allow(clippy::clone_on_copy)]
1787    #![allow(clippy::dbg_macro)]
1788    #![allow(clippy::mixed_attributes_style)]
1789    #![allow(clippy::print_stderr)]
1790    #![allow(clippy::print_stdout)]
1791    #![allow(clippy::single_char_pattern)]
1792    #![allow(clippy::unwrap_used)]
1793    #![allow(clippy::unchecked_duration_subtraction)]
1794    #![allow(clippy::useless_vec)]
1795    #![allow(clippy::needless_pass_by_value)]
1796    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1797    #![allow(clippy::match_single_binding)] // false positives, need the lifetime extension
1798    use super::*;
1799
1800    use crate::config::OnionServiceConfigBuilder;
1801    use crate::ipt_establish::GoodIptDetails;
1802    use crate::status::{OnionServiceStatus, StatusSender};
1803    use crate::test::{create_keymgr, create_storage_handles_from_state_dir};
1804    use rand::SeedableRng as _;
1805    use slotmap_careful::DenseSlotMap;
1806    use std::collections::BTreeMap;
1807    use std::sync::Mutex;
1808    use test_temp_dir::{test_temp_dir, TestTempDir};
1809    use tor_basic_utils::test_rng::TestingRng;
1810    use tor_netdir::testprovider::TestNetDirProvider;
1811    use tor_rtmock::MockRuntime;
1812    use tracing_test::traced_test;
1813    use walkdir::WalkDir;
1814
1815    slotmap_careful::new_key_type! {
1816        struct MockEstabId;
1817    }
1818
1819    type MockEstabs = Arc<Mutex<DenseSlotMap<MockEstabId, MockEstabState>>>;
1820
1821    fn ms(ms: u64) -> Duration {
1822        Duration::from_millis(ms)
1823    }
1824
1825    #[derive(Debug)]
1826    struct Mocks {
1827        rng: TestingRng,
1828        estabs: MockEstabs,
1829        expect_expire_ipts_calls: Arc<Mutex<usize>>,
1830    }
1831
1832    #[derive(Debug)]
1833    struct MockEstabState {
1834        st_tx: watch::Sender<IptStatus>,
1835        params: IptParameters,
1836    }
1837
1838    #[derive(Debug)]
1839    struct MockEstab {
1840        esid: MockEstabId,
1841        estabs: MockEstabs,
1842    }
1843
1844    impl Mockable<MockRuntime> for Mocks {
1845        type IptEstablisher = MockEstab;
1846        type Rng<'m> = &'m mut TestingRng;
1847
1848        fn thread_rng(&mut self) -> Self::Rng<'_> {
1849            &mut self.rng
1850        }
1851
1852        fn make_new_ipt(
1853            &mut self,
1854            _imm: &Immutable<MockRuntime>,
1855            params: IptParameters,
1856        ) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
1857            let (st_tx, st_rx) = watch::channel();
1858            let estab = MockEstabState { st_tx, params };
1859            let esid = self.estabs.lock().unwrap().insert(estab);
1860            let estab = MockEstab {
1861                esid,
1862                estabs: self.estabs.clone(),
1863            };
1864            Ok((estab, st_rx))
1865        }
1866
1867        fn start_accepting(&self, _establisher: &ErasedIptEstablisher) {}
1868
1869        fn expire_old_ipts_external_persistent_state_hook(&self) {
1870            let mut expect = self.expect_expire_ipts_calls.lock().unwrap();
1871            eprintln!("expire_old_ipts_external_persistent_state_hook, expect={expect}");
1872            *expect = expect.checked_sub(1).expect("unexpected expiry");
1873        }
1874    }
1875
1876    impl Drop for MockEstab {
1877        fn drop(&mut self) {
1878            let mut estabs = self.estabs.lock().unwrap();
1879            let _: MockEstabState = estabs
1880                .remove(self.esid)
1881                .expect("dropping non-recorded MockEstab");
1882        }
1883    }
1884
1885    struct MockedIptManager<'d> {
1886        estabs: MockEstabs,
1887        pub_view: ipt_set::IptsPublisherView,
1888        shut_tx: broadcast::Sender<Void>,
1889        #[allow(dead_code)]
1890        cfg_tx: watch::Sender<Arc<OnionServiceConfig>>,
1891        #[allow(dead_code)] // ensures temp dir lifetime; paths stored in self
1892        temp_dir: &'d TestTempDir,
1893        expect_expire_ipts_calls: Arc<Mutex<usize>>, // use usize::MAX to not mind
1894    }
1895
1896    impl<'d> MockedIptManager<'d> {
1897        fn startup(
1898            runtime: MockRuntime,
1899            temp_dir: &'d TestTempDir,
1900            seed: u64,
1901            expect_expire_ipts_calls: usize,
1902        ) -> Self {
1903            let dir: TestNetDirProvider = tor_netdir::testnet::construct_netdir()
1904                .unwrap_if_sufficient()
1905                .unwrap()
1906                .into();
1907
1908            let nick: HsNickname = "nick".to_string().try_into().unwrap();
1909
1910            let cfg = OnionServiceConfigBuilder::default()
1911                .nickname(nick.clone())
1912                .build()
1913                .unwrap();
1914
1915            let (cfg_tx, cfg_rx) = watch::channel_with(Arc::new(cfg));
1916
1917            let (rend_tx, _rend_rx) = mpsc::channel(10);
1918            let (shut_tx, shut_rx) = broadcast::channel::<Void>(0);
1919
1920            let estabs: MockEstabs = Default::default();
1921            let expect_expire_ipts_calls = Arc::new(Mutex::new(expect_expire_ipts_calls));
1922
1923            let mocks = Mocks {
1924                rng: TestingRng::seed_from_u64(seed),
1925                estabs: estabs.clone(),
1926                expect_expire_ipts_calls: expect_expire_ipts_calls.clone(),
1927            };
1928
1929            // Don't provide a subdir; the ipt_mgr is supposed to add any needed subdirs
1930            let state_dir = temp_dir
1931                // untracked is OK because our return value captures 'd
1932                .subdir_untracked("state_dir");
1933
1934            let (state_handle, iptpub_state_handle) =
1935                create_storage_handles_from_state_dir(&state_dir, &nick);
1936
1937            let (mgr_view, pub_view) =
1938                ipt_set::ipts_channel(&runtime, iptpub_state_handle).unwrap();
1939
1940            let keymgr = create_keymgr(temp_dir);
1941            let keymgr = keymgr.into_untracked(); // OK because our return value captures 'd
1942            let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
1943            let mgr = IptManager::new(
1944                runtime.clone(),
1945                Arc::new(dir),
1946                nick,
1947                cfg_rx,
1948                rend_tx,
1949                shut_rx,
1950                &state_handle,
1951                mocks,
1952                keymgr,
1953                status_tx,
1954            )
1955            .unwrap();
1956
1957            mgr.launch_background_tasks(mgr_view).unwrap();
1958
1959            MockedIptManager {
1960                estabs,
1961                pub_view,
1962                shut_tx,
1963                cfg_tx,
1964                temp_dir,
1965                expect_expire_ipts_calls,
1966            }
1967        }
1968
1969        async fn shutdown_check_no_tasks(self, runtime: &MockRuntime) {
1970            drop(self.shut_tx);
1971            runtime.progress_until_stalled().await;
1972            assert_eq!(runtime.mock_task().n_tasks(), 1); // just us
1973        }
1974
1975        fn estabs_inventory(&self) -> impl Eq + Debug + 'static {
1976            let estabs = self.estabs.lock().unwrap();
1977            let estabs = estabs
1978                .values()
1979                .map(|MockEstabState { params: p, .. }| {
1980                    (
1981                        p.lid,
1982                        (
1983                            p.target.clone(),
1984                            // We want to check the key values, but they're very hard to get at
1985                            // in a way we can compare.  Especially the private keys, for which
1986                            // we can't getting a clone or copy of the private key material out of the Arc.
1987                            // They're keypairs, we can use the debug rep which shows the public half.
1988                            // That will have to do.
1989                            format!("{:?}", p.k_sid),
1990                            format!("{:?}", p.k_ntor),
1991                        ),
1992                    )
1993                })
1994                .collect::<BTreeMap<_, _>>();
1995            estabs
1996        }
1997    }
1998
1999    #[test]
2000    #[traced_test]
2001    fn test_mgr_lifecycle() {
2002        MockRuntime::test_with_various(|runtime| async move {
2003            let temp_dir = test_temp_dir!();
2004
2005            let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 0, 1);
2006            runtime.progress_until_stalled().await;
2007
2008            assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
2009
2010            // We expect it to try to establish 3 IPTs
2011            const EXPECT_N_IPTS: usize = 3;
2012            const EXPECT_MAX_IPTS: usize = EXPECT_N_IPTS + 2 /* num_extra */;
2013            assert_eq!(m.estabs.lock().unwrap().len(), EXPECT_N_IPTS);
2014            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2015
2016            // Advancing time a bit and it still shouldn't publish anything
2017            runtime.advance_by(ms(500)).await;
2018            runtime.progress_until_stalled().await;
2019            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2020
2021            let good = GoodIptDetails {
2022                link_specifiers: vec![],
2023                ipt_kp_ntor: [0x55; 32].into(),
2024            };
2025
2026            // Imagine that one of our IPTs becomes good
2027            m.estabs
2028                .lock()
2029                .unwrap()
2030                .values_mut()
2031                .next()
2032                .unwrap()
2033                .st_tx
2034                .borrow_mut()
2035                .status = IptStatusStatus::Good(good.clone());
2036
2037            // TODO #1213 test that we haven't called start_accepting
2038
2039            // It won't publish until a further fastest establish time
2040            // Ie, until a further 500ms = 1000ms
2041            runtime.progress_until_stalled().await;
2042            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2043            runtime.advance_by(ms(499)).await;
2044            assert!(m.pub_view.borrow_for_publish().ipts.is_none());
2045            runtime.advance_by(ms(1)).await;
2046            match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
2047                pub_view => {
2048                    assert_eq!(pub_view.ipts.len(), 1);
2049                    assert_eq!(pub_view.lifetime, IPT_PUBLISH_UNCERTAIN);
2050                }
2051            };
2052
2053            // TODO #1213 test that we have called start_accepting on the right IPTs
2054
2055            // Set the other IPTs to be Good too
2056            for e in m.estabs.lock().unwrap().values_mut().skip(1) {
2057                e.st_tx.borrow_mut().status = IptStatusStatus::Good(good.clone());
2058            }
2059            runtime.progress_until_stalled().await;
2060            match m.pub_view.borrow_for_publish().ipts.as_mut().unwrap() {
2061                pub_view => {
2062                    assert_eq!(pub_view.ipts.len(), EXPECT_N_IPTS);
2063                    assert_eq!(pub_view.lifetime, IPT_PUBLISH_CERTAIN);
2064                }
2065            };
2066
2067            // TODO #1213 test that we have called start_accepting on the right IPTs
2068
2069            let estabs_inventory = m.estabs_inventory();
2070
2071            // Shut down
2072            m.shutdown_check_no_tasks(&runtime).await;
2073
2074            // ---------- restart! ----------
2075            info!("*** Restarting ***");
2076
2077            let m = MockedIptManager::startup(runtime.clone(), &temp_dir, 1, 1);
2078            runtime.progress_until_stalled().await;
2079            assert_eq!(*m.expect_expire_ipts_calls.lock().unwrap(), 0);
2080
2081            assert_eq!(estabs_inventory, m.estabs_inventory());
2082
2083            // TODO #1213 test that we have called start_accepting on all the old IPTs
2084
2085            // ---------- New IPT relay selection ----------
2086
2087            let old_lids: Vec<String> = m
2088                .estabs
2089                .lock()
2090                .unwrap()
2091                .values()
2092                .map(|ess| ess.params.lid.to_string())
2093                .collect();
2094            eprintln!("IPTs to rotate out: {old_lids:?}");
2095
2096            let old_lid_files = || {
2097                WalkDir::new(temp_dir.as_path_untracked())
2098                    .into_iter()
2099                    .map(|ent| {
2100                        ent.unwrap()
2101                            .into_path()
2102                            .into_os_string()
2103                            .into_string()
2104                            .unwrap()
2105                    })
2106                    .filter(|path| old_lids.iter().any(|lid| path.contains(lid)))
2107                    .collect_vec()
2108            };
2109
2110            let no_files: [String; 0] = [];
2111
2112            assert_ne!(old_lid_files(), no_files);
2113
2114            // It might call the expiry function once, or once per IPT.
2115            // The latter is quadratic but this is quite rare, so that's fine.
2116            *m.expect_expire_ipts_calls.lock().unwrap() = EXPECT_MAX_IPTS;
2117
2118            // wait 2 days, > hs_intro_max_lifetime
2119            runtime.advance_by(ms(48 * 60 * 60 * 1_000)).await;
2120            runtime.progress_until_stalled().await;
2121
2122            // It must have called it at least once.
2123            assert_ne!(*m.expect_expire_ipts_calls.lock().unwrap(), EXPECT_MAX_IPTS);
2124
2125            // There should now be no files names after old IptLocalIds.
2126            assert_eq!(old_lid_files(), no_files);
2127
2128            // Shut down
2129            m.shutdown_check_no_tasks(&runtime).await;
2130        });
2131    }
2132}