tor_hsservice/publish/
reactor.rs

1//! The onion service publisher reactor.
2//!
3//! Generates and publishes hidden service descriptors in response to various events.
4//!
5//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8//! any of the channels the reactor is receiving events from is closed
9//! (i.e. when the senders are dropped).
10//!
11//! ## Publisher status
12//!
13//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14//! which is used for onion service status reporting.
15//!
16//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17//! and responds by generating and publishing a new descriptor if needed.
18//!
19//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20//!
21//! ## When do we publish?
22//!
23//! We generate and publish a new descriptor if
24//!   * the introduction points have changed
25//!   * the onion service configuration has changed in a meaningful way (for example,
26//!     if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27//!     has changed. See [`OnionServiceConfigPublisherView`]).
28//!   * there is a new consensus
29//!   * it is time to republish the descriptor (after we upload a descriptor,
30//!     we schedule it for republishing at a random time between 60 minutes and 120 minutes
31//!     in the future)
32//!
33//! ## Onion service status
34//!
35//! With respect to [`OnionServiceStatus`] reporting,
36//! the following state transitions are possible:
37//!
38//!
39//! ```ignore
40//!
41//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42//!                +---------------------------------------+
43//!                |                                       |
44//!                |                                       v
45//!                |                               +---------------+
46//!                |                               | Bootstrapping |
47//!                |                               +---------------+
48//!                |                                       |
49//!                |                                       |           uploaded to at least
50//!                |  not enough HsDir uploads succeeded   |        some HsDirs from each ring
51//!                |         +-----------------------------+-----------------------+
52//!                |         |                             |                       |
53//!                |         |              all HsDir uploads succeeded            |
54//!                |         |                             |                       |
55//!                |         v                             v                       v
56//!                |  +---------------------+         +---------+        +---------------------+
57//!                |  | DegradedUnreachable |         | Running |        |  DegradedReachable  |
58//! +----------+   |  +---------------------+         +---------+        +---------------------+
59//! | Shutdown |-- |         |                           |                        |
60//! +----------+   |         |                           |                        |
61//!                |         |                           |                        |
62//!                |         |                           |                        |
63//!                |         +---------------------------+------------------------+
64//!                |                                     |   invalid authorized_clients
65//!                |                                     |      after handling config change
66//!                |                                     |
67//!                |                                     v
68//!                |     run_once() returns an error +--------+
69//!                +-------------------------------->| Broken |
70//!                                                  +--------+
71//! ```
72//!
73//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74//! back to `Bootstrapping` (those transitions were omitted for brevity).
75
76use tor_config::file_watcher::{
77    self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
78};
79use tor_config_path::{CfgPath, CfgPathResolver};
80use tor_dirclient::SourceInfo;
81use tor_netdir::{DirEvent, NetDir};
82
83use crate::config::restricted_discovery::{
84    DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
85};
86use crate::config::OnionServiceConfigPublisherView;
87use crate::status::{DescUploadRetryError, Problem};
88
89use super::*;
90
91// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
92// for rate-limiting the publish jobs triggered by a change in the config?
93//
94// Currently the descriptor publish tasks triggered by changes in the config
95// are rate-limited via the usual rate limiting mechanism
96// (which rate-limits the uploads for 1m).
97//
98// I think this is OK for now, but we might need to rethink this if it becomes problematic
99// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
100// each time the config is modified).
101
102/// The upload rate-limiting threshold.
103///
104/// Before initiating an upload, the reactor checks if the last upload was at least
105/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
106/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
107/// current time.
108//
109// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
110const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
111
112/// The maximum number of concurrent upload tasks per time period.
113//
114// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
115// will have no effect, since the current number of replicas is far less than
116// this value.
117//
118// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
119// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
120// (currently 2, which means the concurrency limit will, in fact, be 32).
121//
122// We should try to decouple this value from the TP parameters.
123const MAX_CONCURRENT_UPLOADS: usize = 16;
124
125/// The maximum time allowed for uploading a descriptor to a single HSDir,
126/// across all attempts.
127pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
128
129/// A reactor for the HsDir [`Publisher`]
130///
131/// The entrypoint is [`Reactor::run`].
132#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
133pub(super) struct Reactor<R: Runtime, M: Mockable> {
134    /// The immutable, shared inner state.
135    imm: Arc<Immutable<R, M>>,
136    /// A source for new network directories that we use to determine
137    /// our HsDirs.
138    dir_provider: Arc<dyn NetDirProvider>,
139    /// The mutable inner state,
140    inner: Arc<Mutex<Inner>>,
141    /// A channel for receiving IPT change notifications.
142    ipt_watcher: IptsPublisherView,
143    /// A channel for receiving onion service config change notifications.
144    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
145    /// A channel for receiving restricted discovery key_dirs change notifications.
146    key_dirs_rx: FileEventReceiver,
147    /// A channel for sending restricted discovery key_dirs change notifications.
148    ///
149    /// A copy of this sender is handed out to every `FileWatcher` created.
150    key_dirs_tx: FileEventSender,
151    /// A channel for receiving updates regarding our [`PublishStatus`].
152    ///
153    /// The main loop of the reactor watches for updates on this channel.
154    ///
155    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
156    /// we can start publishing descriptors.
157    ///
158    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
159    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
160    /// established some introduction points.
161    publish_status_rx: watch::Receiver<PublishStatus>,
162    /// A sender for updating our [`PublishStatus`].
163    ///
164    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
165    /// we can start publishing descriptors.
166    publish_status_tx: watch::Sender<PublishStatus>,
167    /// A channel for sending upload completion notifications.
168    ///
169    /// This channel is polled in the main loop of the reactor.
170    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
171    /// A channel for receiving upload completion notifications.
172    ///
173    /// A copy of this sender is handed to each upload task.
174    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
175    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
176    ///
177    /// Receivers can use this channel to find out when reactor is dropped.
178    ///
179    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
180    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
181    ///
182    /// Closing this channel will cause any pending upload tasks to be dropped.
183    shutdown_tx: broadcast::Sender<Void>,
184    /// Path resolver for configuration files.
185    path_resolver: Arc<CfgPathResolver>,
186    /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
187    /// rotated and thus we need to republish the descriptor for a particular time period.
188    update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
189}
190
191/// The immutable, shared state of the descriptor publisher reactor.
192#[derive(Clone)]
193struct Immutable<R: Runtime, M: Mockable> {
194    /// The runtime.
195    runtime: R,
196    /// Mockable state.
197    ///
198    /// This is used for launching circuits and for obtaining random number generators.
199    mockable: M,
200    /// The service for which we're publishing descriptors.
201    nickname: HsNickname,
202    /// The key manager,
203    keymgr: Arc<KeyMgr>,
204    /// A sender for updating the status of the onion service.
205    status_tx: PublisherStatusSender,
206    /// Proof-of-work state.
207    pow_manager: Arc<PowManager<R>>,
208}
209
210impl<R: Runtime, M: Mockable> Immutable<R, M> {
211    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
212    /// with the specified [`TimePeriod`].
213    ///
214    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
215    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
216    /// descriptor signing key.
217    ///
218    /// Returns an error if the service is running in offline mode and the descriptor signing
219    /// keypair of the specified `period` is not available.
220    //
221    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
222    // built from the blinded id key
223    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
224        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
225            Some(key) => {
226                let key: ed25519::ExpandedKeypair = key.into();
227                key.to_secret_key_bytes()[0..32]
228                    .try_into()
229                    .expect("Wrong length on slice")
230            }
231            None => {
232                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
233                // is unreachable (for now).
234                let desc_sign_key_spec =
235                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
236                let key: ed25519::Keypair = self
237                    .keymgr
238                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
239                    // TODO (#1194): internal! is not the right type for this error (we need an
240                    // error type for the case where a hidden service running in offline mode has
241                    // run out of its pre-previsioned keys).
242                    //
243                    // This will be addressed when we add support for offline hs_id mode
244                    .ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
245                    .into();
246                key.to_bytes()
247            }
248        };
249
250        Ok(AesOpeKey::from_secret(&ope_key))
251    }
252
253    /// Generate a revision counter for a descriptor associated with the specified
254    /// [`TimePeriod`].
255    ///
256    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
257    ///
258    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
259    fn generate_revision_counter(
260        &self,
261        params: &HsDirParams,
262        now: SystemTime,
263    ) -> Result<RevisionCounter, FatalError> {
264        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
265        // to each time we generate a new descriptor), for performance reasons.
266        let ope_key = self.create_ope_key(params.time_period())?;
267
268        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
269        let srv_start = params.start_of_shard_rand_period();
270        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
271            internal!(
272                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
273                now,
274                srv_start
275            )
276        })?;
277        let rev = ope_key.encrypt(offset);
278
279        Ok(RevisionCounter::from(rev))
280    }
281}
282
283/// Mockable state for the descriptor publisher reactor.
284///
285/// This enables us to mock parts of the [`Reactor`] for testing purposes.
286#[async_trait]
287pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
288    /// The type of random number generator.
289    type Rng: rand::Rng + rand::CryptoRng;
290
291    /// The type of client circuit.
292    type ClientCirc: MockableClientCirc;
293
294    /// Return a random number generator.
295    fn thread_rng(&self) -> Self::Rng;
296
297    /// Create a circuit of the specified `kind` to `target`.
298    async fn get_or_launch_specific<T>(
299        &self,
300        netdir: &NetDir,
301        kind: HsCircKind,
302        target: T,
303    ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
304    where
305        T: CircTarget + Send + Sync;
306
307    /// Return an estimate-based value for how long we should allow a single
308    /// directory upload operation to complete.
309    ///
310    /// Includes circuit construction, stream opening, upload, and waiting for a
311    /// response.
312    fn estimate_upload_timeout(&self) -> Duration;
313}
314
315/// Mockable client circuit
316#[async_trait]
317pub(crate) trait MockableClientCirc: Send + Sync {
318    /// The data stream type.
319    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
320
321    /// Start a new stream to the last relay in the circuit, using
322    /// a BEGIN_DIR cell.
323    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
324
325    /// Try to get a SourceInfo for this circuit, for using it in a directory request.
326    fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>>;
327}
328
329#[async_trait]
330impl MockableClientCirc for ClientCirc {
331    type DataStream = tor_proto::stream::DataStream;
332
333    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
334        ClientCirc::begin_dir_stream(self).await
335    }
336
337    fn source_info(&self) -> tor_proto::Result<Option<SourceInfo>> {
338        SourceInfo::from_circuit(self)
339    }
340}
341
342/// The real version of the mockable state of the reactor.
343#[derive(Clone, From, Into)]
344pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
345
346#[async_trait]
347impl<R: Runtime> Mockable for Real<R> {
348    type Rng = rand::rngs::ThreadRng;
349    type ClientCirc = ClientCirc;
350
351    fn thread_rng(&self) -> Self::Rng {
352        rand::rng()
353    }
354
355    async fn get_or_launch_specific<T>(
356        &self,
357        netdir: &NetDir,
358        kind: HsCircKind,
359        target: T,
360    ) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
361    where
362        T: CircTarget + Send + Sync,
363    {
364        self.0.get_or_launch_specific(netdir, kind, target).await
365    }
366
367    fn estimate_upload_timeout(&self) -> Duration {
368        use tor_circmgr::timeouts::Action;
369        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
370        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
371        // We assume that in the worst case we'll have to wait for an entire
372        // circuit construction and two round-trips to the hsdir.
373        let est_total = est_build + est_roundtrip * 2;
374        // We always allow _at least_ this much time, in case our estimate is
375        // ridiculously low.
376        let min_timeout = Duration::from_secs(30);
377        max(est_total, min_timeout)
378    }
379}
380
381/// The mutable state of a [`Reactor`].
382struct Inner {
383    /// The onion service config.
384    config: Arc<OnionServiceConfigPublisherView>,
385    /// Watcher for key_dirs.
386    ///
387    /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
388    ///
389    /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
390    file_watcher: Option<FileWatcher>,
391    /// The relevant time periods.
392    ///
393    /// This includes the current time period, as well as any other time periods we need to be
394    /// publishing descriptors for.
395    ///
396    /// This is empty until we fetch our first netdir in [`Reactor::run`].
397    time_periods: Vec<TimePeriodContext>,
398    /// Our most up to date netdir.
399    ///
400    /// This is initialized in [`Reactor::run`].
401    netdir: Option<Arc<NetDir>>,
402    /// The timestamp of our last upload.
403    ///
404    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
405    /// the publisher from spawning multiple upload tasks at once in response to multiple external
406    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
407    /// notifications in a short time frame (#1142), or an IPT change notification that's
408    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
409    /// inefficient, but it also causes the publisher to generate two different descriptors with
410    /// the same revision counter (the revision counter is derived from the current timestamp),
411    /// which ultimately causes the slower upload task to fail (see #1142).
412    ///
413    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
414    /// used for retrying failed uploads (these are handled internally by
415    /// [`Reactor::upload_descriptor_with_retries`]).
416    last_uploaded: Option<Instant>,
417    /// A max-heap containing the time periods for which we need to reupload the descriptor.
418    // TODO: we are currently reuploading more than nececessary.
419    // Ideally, this shouldn't contain contain duplicate TimePeriods,
420    // because we only need to retain the latest reupload time for each time period.
421    //
422    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
423    // we will end up with multiple ReuploadTimer entries for that TP,
424    // each of which will (eventually) result in a reupload.
425    //
426    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
427    //
428    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
429    reupload_timers: BinaryHeap<ReuploadTimer>,
430    /// The restricted discovery authorized clients.
431    ///
432    /// `None`, unless the service is running in restricted discovery mode.
433    authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
434}
435
436/// The part of the reactor state that changes with every time period.
437struct TimePeriodContext {
438    /// The HsDir params.
439    params: HsDirParams,
440    /// The HsDirs to use in this time period.
441    ///
442    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
443    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
444    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
445    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
446    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
447    /// The revision counter of the last successful upload, if any.
448    last_successful: Option<RevisionCounter>,
449    /// The outcome of the last upload, if any.
450    upload_results: Vec<HsDirUploadStatus>,
451}
452
453impl TimePeriodContext {
454    /// Create a new `TimePeriodContext`.
455    ///
456    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
457    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
458    fn new<'r>(
459        params: HsDirParams,
460        blind_id: HsBlindId,
461        netdir: &Arc<NetDir>,
462        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
463        old_upload_results: Vec<HsDirUploadStatus>,
464    ) -> Result<Self, FatalError> {
465        let period = params.time_period();
466        let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
467        let upload_results = old_upload_results
468            .into_iter()
469            .filter(|res|
470                // Check if the HsDir of this result still exists
471                hs_dirs
472                    .iter()
473                    .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
474            .collect();
475
476        Ok(Self {
477            params,
478            hs_dirs,
479            last_successful: None,
480            upload_results,
481        })
482    }
483
484    /// Recompute the HsDirs for this time period.
485    fn compute_hsdirs<'r>(
486        period: TimePeriod,
487        blind_id: HsBlindId,
488        netdir: &Arc<NetDir>,
489        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
490    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
491        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
492
493        Ok(hs_dirs
494            .map(|hs_dir| {
495                let mut builder = RelayIds::builder();
496                if let Some(ed_id) = hs_dir.ed_identity() {
497                    builder.ed_identity(*ed_id);
498                }
499
500                if let Some(rsa_id) = hs_dir.rsa_identity() {
501                    builder.rsa_identity(*rsa_id);
502                }
503
504                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
505
506                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
507                // reupload it unless it was already dirty and due for a reupload.
508                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
509                    Some((_, status)) => *status,
510                    None => DescriptorStatus::Dirty,
511                };
512
513                (relay_id, status)
514            })
515            .collect::<Vec<_>>())
516    }
517
518    /// Mark the descriptor dirty for all HSDirs of this time period.
519    fn mark_all_dirty(&mut self) {
520        self.hs_dirs
521            .iter_mut()
522            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
523    }
524
525    /// Update the upload result for this time period.
526    fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
527        self.upload_results = upload_results;
528    }
529}
530
531/// An error that occurs while trying to upload a descriptor.
532#[derive(Clone, Debug, thiserror::Error)]
533#[non_exhaustive]
534pub enum UploadError {
535    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
536    #[error("descriptor upload request failed: {}", _0.error)]
537    Request(#[from] RequestFailedError),
538
539    /// Failed to establish circuit to hidden service directory
540    #[error("could not build circuit to HsDir")]
541    Circuit(#[from] tor_circmgr::Error),
542
543    /// Failed to establish stream to hidden service directory
544    #[error("failed to establish directory stream to HsDir")]
545    Stream(#[source] tor_proto::Error),
546
547    /// An internal error.
548    #[error("Internal error")]
549    Bug(#[from] tor_error::Bug),
550}
551define_asref_dyn_std_error!(UploadError);
552
553impl UploadError {
554    /// Return true if this error is one that we should report as a suspicious event,
555    /// along with the dirserver, and description of the relevant document.
556    pub(crate) fn should_report_as_suspicious(&self) -> bool {
557        match self {
558            UploadError::Request(e) => e.error.should_report_as_suspicious_if_anon(),
559            UploadError::Circuit(_) => false, // TODO prop360
560            UploadError::Stream(_) => false,  // TODO prop360
561            UploadError::Bug(_) => false,
562        }
563    }
564}
565
566impl<R: Runtime, M: Mockable> Reactor<R, M> {
567    /// Create a new `Reactor`.
568    #[allow(clippy::too_many_arguments)]
569    pub(super) fn new(
570        runtime: R,
571        nickname: HsNickname,
572        dir_provider: Arc<dyn NetDirProvider>,
573        mockable: M,
574        config: &OnionServiceConfig,
575        ipt_watcher: IptsPublisherView,
576        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
577        status_tx: PublisherStatusSender,
578        keymgr: Arc<KeyMgr>,
579        path_resolver: Arc<CfgPathResolver>,
580        pow_manager: Arc<PowManager<R>>,
581        update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
582    ) -> Self {
583        /// The maximum size of the upload completion notifier channel.
584        ///
585        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
586        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
587        /// each sender will send exactly one message.
588        const UPLOAD_CHAN_BUF_SIZE: usize = 0;
589
590        // Internally-generated instructions, no need for mq.
591        let (upload_task_complete_tx, upload_task_complete_rx) =
592            mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
593
594        let (publish_status_tx, publish_status_rx) = watch::channel();
595        // Setting the buffer size to zero here is OK,
596        // since we never actually send anything on this channel.
597        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
598
599        let authorized_clients =
600            Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
601
602        // Create a channel for watching for changes in the configured
603        // restricted_discovery.key_dirs.
604        let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
605
606        let imm = Immutable {
607            runtime,
608            mockable,
609            nickname,
610            keymgr,
611            status_tx,
612            pow_manager,
613        };
614
615        let inner = Inner {
616            time_periods: vec![],
617            config: Arc::new(config.into()),
618            file_watcher: None,
619            netdir: None,
620            last_uploaded: None,
621            reupload_timers: Default::default(),
622            authorized_clients,
623        };
624
625        Self {
626            imm: Arc::new(imm),
627            inner: Arc::new(Mutex::new(inner)),
628            dir_provider,
629            ipt_watcher,
630            config_rx,
631            key_dirs_rx,
632            key_dirs_tx,
633            publish_status_rx,
634            publish_status_tx,
635            upload_task_complete_rx,
636            upload_task_complete_tx,
637            shutdown_tx,
638            path_resolver,
639            update_from_pow_manager_rx,
640        }
641    }
642
643    /// Start the reactor.
644    ///
645    /// Under normal circumstances, this function runs indefinitely.
646    ///
647    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
648    /// upload fails or is rate-limited.
649    pub(super) async fn run(mut self) -> Result<(), FatalError> {
650        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
651
652        {
653            let netdir = self
654                .dir_provider
655                .wait_for_netdir(Timeliness::Timely)
656                .await?;
657            let time_periods = self.compute_time_periods(&netdir, &[])?;
658
659            let mut inner = self.inner.lock().expect("poisoned lock");
660
661            inner.netdir = Some(netdir);
662            inner.time_periods = time_periods;
663        }
664
665        // Create the initial key_dirs watcher.
666        self.update_file_watcher();
667
668        loop {
669            match self.run_once().await {
670                Ok(ShutdownStatus::Continue) => continue,
671                Ok(ShutdownStatus::Terminate) => {
672                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
673
674                    self.imm.status_tx.send_shutdown();
675                    return Ok(());
676                }
677                Err(e) => {
678                    error_report!(
679                        e,
680                        "HS service {}: descriptor publisher crashed!",
681                        self.imm.nickname
682                    );
683
684                    self.imm.status_tx.send_broken(e.clone());
685
686                    return Err(e);
687                }
688            }
689        }
690    }
691
692    /// Run one iteration of the reactor loop.
693    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
694    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
695        let mut netdir_events = self.dir_provider.events();
696
697        // Note: TrackingNow tracks the values it is compared with.
698        // This is equivalent to sleeping for (until - now) units of time,
699        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
700        if let PublishStatus::RateLimited(until) = self.status() {
701            if upload_rate_lim > until {
702                // We are no longer rate-limited
703                self.expire_rate_limit().await?;
704            }
705        }
706
707        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
708        let mut reupload_periods = vec![];
709        {
710            let mut inner = self.inner.lock().expect("poisoned lock");
711            let inner = &mut *inner;
712            while let Some(reupload) = inner.reupload_timers.peek().copied() {
713                // First, extract all the timeouts that already elapsed.
714                if reupload.when <= reupload_tracking {
715                    inner.reupload_timers.pop();
716                    reupload_periods.push(reupload.period);
717                } else {
718                    // We are not ready to schedule any more reuploads.
719                    //
720                    // How much we need to sleep is implicitly
721                    // tracked in reupload_tracking (through
722                    // the TrackingNow implementation)
723                    break;
724                }
725            }
726        }
727
728        // Check if it's time to schedule any reuploads.
729        for period in reupload_periods {
730            if self.mark_dirty(&period) {
731                debug!(
732                    time_period=?period,
733                    "descriptor reupload timer elapsed; scheduling reupload",
734                );
735                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
736                    .await?;
737            }
738        }
739
740        select_biased! {
741            res = self.upload_task_complete_rx.next().fuse() => {
742                let Some(upload_res) = res else {
743                    return Ok(ShutdownStatus::Terminate);
744                };
745
746                self.handle_upload_results(upload_res);
747                self.upload_result_to_svc_status()?;
748            },
749            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
750                self.expire_rate_limit().await?;
751            },
752            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
753                // Run another iteration, executing run_once again. This time, we will remove the
754                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
755                // relevant HsDirs, and schedule the upload by setting our status to
756                // UploadScheduled.
757                return Ok(ShutdownStatus::Continue);
758            },
759            netdir_event = netdir_events.next().fuse() => {
760                let Some(netdir_event) = netdir_event else {
761                    debug!("netdir event stream ended");
762                    return Ok(ShutdownStatus::Terminate);
763                };
764
765                if !matches!(netdir_event, DirEvent::NewConsensus) {
766                    return Ok(ShutdownStatus::Continue);
767                };
768
769                // The consensus changed. Grab a new NetDir.
770                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
771                    Ok(y) => y,
772                    Err(e) => {
773                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
774                        // Hopefully a netdir will appear in the future.
775                        // in the meantime, suspend operations.
776                        //
777                        // TODO (#1218): there is a bug here: we stop reading on our inputs
778                        // including eg publish_status_rx, but it is our job to log some of
779                        // these things.  While we are waiting for a netdir, all those messages
780                        // are "stuck"; they'll appear later, with misleading timestamps.
781                        //
782                        // Probably this should be fixed by moving the logging
783                        // out of the reactor, where it won't be blocked.
784                        self.dir_provider.wait_for_netdir(Timeliness::Timely)
785                            .await?
786                    }
787                };
788                let relevant_periods = netdir.hs_all_time_periods();
789                self.handle_consensus_change(netdir).await?;
790                expire_publisher_keys(
791                    &self.imm.keymgr,
792                    &self.imm.nickname,
793                    &relevant_periods,
794                ).unwrap_or_else(|e| {
795                    error_report!(e, "failed to remove expired keys");
796                });
797            }
798            update = self.ipt_watcher.await_update().fuse() => {
799                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
800                    return Ok(ShutdownStatus::Terminate);
801                }
802            },
803            config = self.config_rx.next().fuse() => {
804                let Some(config) = config else {
805                    return Ok(ShutdownStatus::Terminate);
806                };
807
808                self.handle_svc_config_change(&config).await?;
809            },
810            res = self.key_dirs_rx.next().fuse() => {
811                let Some(event) = res else {
812                    return Ok(ShutdownStatus::Terminate);
813                };
814
815                while let Some(_ignore) = self.key_dirs_rx.try_recv() {
816                    // Discard other events, so that we only reload once.
817                }
818
819                self.handle_key_dirs_change(event).await?;
820            }
821            should_upload = self.publish_status_rx.next().fuse() => {
822                let Some(should_upload) = should_upload else {
823                    return Ok(ShutdownStatus::Terminate);
824                };
825
826                // Our PublishStatus changed -- are we ready to publish?
827                if should_upload == PublishStatus::UploadScheduled {
828                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
829                    self.upload_all().await?;
830                }
831            }
832            update_tp_pow_seed = self.update_from_pow_manager_rx.next().fuse() => {
833                debug!("Update PoW seed for TP!");
834                let Some(time_period) = update_tp_pow_seed else {
835                    return Ok(ShutdownStatus::Terminate);
836                };
837                self.mark_dirty(&time_period);
838                self.upload_all().await?;
839            }
840        }
841
842        Ok(ShutdownStatus::Continue)
843    }
844
845    /// Returns the current status of the publisher
846    fn status(&self) -> PublishStatus {
847        *self.publish_status_rx.borrow()
848    }
849
850    /// Handle a batch of upload outcomes,
851    /// possibly updating the status of the descriptor for the corresponding HSDirs.
852    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
853        let mut inner = self.inner.lock().expect("poisoned lock");
854        let inner = &mut *inner;
855
856        // Check which time period these uploads pertain to.
857        let period = inner
858            .time_periods
859            .iter_mut()
860            .find(|ctx| ctx.params.time_period() == results.time_period);
861
862        let Some(period) = period else {
863            // The uploads were for a time period that is no longer relevant, so we
864            // can ignore the result.
865            return;
866        };
867
868        // We will need to reupload this descriptor at at some point, so we pick
869        // a random time between 60 minutes and 120 minutes in the future.
870        //
871        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
872        let mut rng = self.imm.mockable.thread_rng();
873        // TODO SPEC: Control republish period using a consensus parameter?
874        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
875        let duration = Duration::from_secs(minutes * 60);
876        let reupload_when = self.imm.runtime.now() + duration;
877        let time_period = period.params.time_period();
878
879        info!(
880            time_period=?time_period,
881            "reuploading descriptor in {}",
882            humantime::format_duration(duration),
883        );
884
885        inner.reupload_timers.push(ReuploadTimer {
886            period: time_period,
887            when: reupload_when,
888        });
889
890        let mut upload_results = vec![];
891        for upload_res in results.hsdir_result {
892            let relay = period
893                .hs_dirs
894                .iter_mut()
895                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
896
897            let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
898                // This HSDir went away, so the result doesn't matter.
899                // Continue processing the rest of the results
900                continue;
901            };
902
903            if upload_res.upload_res.is_ok() {
904                let update_last_successful = match period.last_successful {
905                    None => true,
906                    Some(counter) => counter <= upload_res.revision_counter,
907                };
908
909                if update_last_successful {
910                    period.last_successful = Some(upload_res.revision_counter);
911                    // TODO (#1098): Is it possible that this won't update the statuses promptly
912                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
913                    // and start an upload task for a descriptor has already been uploaded (or is
914                    // being uploaded) in another task, but whose upload results have not yet been
915                    // processed.
916                    //
917                    // This is probably made worse by the fact that the statuses are updated in
918                    // batches (grouped by time period), rather than one by one as the upload tasks
919                    // complete (updating the status involves locking the inner mutex, and I wanted
920                    // to minimize the locking/unlocking overheads). I'm not sure handling the
921                    // updates in batches was the correct decision here.
922                    *status = DescriptorStatus::Clean;
923                }
924            }
925
926            upload_results.push(upload_res);
927        }
928
929        period.set_upload_results(upload_results);
930    }
931
932    /// Maybe update our list of HsDirs.
933    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
934        trace!("the consensus has changed; recomputing HSDirs");
935
936        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
937
938        self.recompute_hs_dirs()?;
939        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
940            .await?;
941
942        // If the time period has changed, some of our upload results may now be irrelevant,
943        // so we might need to update our status (for example, if our uploads are
944        // for a no-longer-relevant time period, it means we might be able to update
945        // out status from "degraded" to "running")
946        self.upload_result_to_svc_status()?;
947
948        Ok(())
949    }
950
951    /// Recompute the HsDirs for all relevant time periods.
952    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
953        let mut inner = self.inner.lock().expect("poisoned lock");
954        let inner = &mut *inner;
955
956        let netdir = Arc::clone(
957            inner
958                .netdir
959                .as_ref()
960                .ok_or_else(|| internal!("started upload task without a netdir"))?,
961        );
962
963        // Update our list of relevant time periods.
964        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
965        inner.time_periods = new_time_periods;
966
967        Ok(())
968    }
969
970    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
971    ///
972    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
973    /// HsDirs where possible.
974    fn compute_time_periods(
975        &self,
976        netdir: &Arc<NetDir>,
977        time_periods: &[TimePeriodContext],
978    ) -> Result<Vec<TimePeriodContext>, FatalError> {
979        netdir
980            .hs_all_time_periods()
981            .iter()
982            .map(|params| {
983                let period = params.time_period();
984                let blind_id_kp =
985                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
986                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
987                        // It's supposed to return Ok(None) if we're in offline hsid mode,
988                        // but that might change when we do #1194
989                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;
990
991                let blind_id: HsBlindIdKey = (&blind_id_kp).into();
992
993                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
994                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
995                // publishing the descriptor to the HsDirs that already have it (the ones that are
996                // marked with DescriptorStatus::Clean).
997                //
998                // In other words, we only want to publish to those HsDirs that
999                //   * are part of a new time period (which we have never published the descriptor
1000                //   for), or
1001                //   * have just been added to the ring of a time period we already knew about
1002                if let Some(ctx) = time_periods
1003                    .iter()
1004                    .find(|ctx| ctx.params.time_period() == period)
1005                {
1006                    TimePeriodContext::new(
1007                        params.clone(),
1008                        blind_id.into(),
1009                        netdir,
1010                        ctx.hs_dirs.iter(),
1011                        ctx.upload_results.clone(),
1012                    )
1013                } else {
1014                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
1015                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
1016                    TimePeriodContext::new(
1017                        params.clone(),
1018                        blind_id.into(),
1019                        netdir,
1020                        iter::empty(),
1021                        vec![],
1022                    )
1023                }
1024            })
1025            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
1026    }
1027
1028    /// Replace the old netdir with the new, returning the old.
1029    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
1030        self.inner
1031            .lock()
1032            .expect("poisoned lock")
1033            .netdir
1034            .replace(new_netdir)
1035    }
1036
1037    /// Replace our view of the service config with `new_config` if `new_config` contains changes
1038    /// that would cause us to generate a new descriptor.
1039    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
1040        let mut inner = self.inner.lock().expect("poisoned lock");
1041        let old_config = &mut inner.config;
1042
1043        // The fields we're interested in haven't changed, so there's no need to update
1044        // `inner.config`.
1045        if *old_config == new_config {
1046            return false;
1047        }
1048
1049        let log_change = match (
1050            old_config.restricted_discovery.enabled,
1051            new_config.restricted_discovery.enabled,
1052        ) {
1053            (true, false) => Some("Disabling restricted discovery mode"),
1054            (false, true) => Some("Enabling restricted discovery mode"),
1055            _ => None,
1056        };
1057
1058        if let Some(msg) = log_change {
1059            info!(nickname=%self.imm.nickname, "{}", msg);
1060        }
1061
1062        let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
1063
1064        true
1065    }
1066
1067    /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
1068    fn update_file_watcher(&self) {
1069        let mut inner = self.inner.lock().expect("poisoned lock");
1070        if inner.config.restricted_discovery.watch_configuration() {
1071            debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
1072            let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
1073
1074            let dirs = inner.config.restricted_discovery.key_dirs().clone();
1075
1076            watch_dirs(&mut watcher, &dirs, &self.path_resolver);
1077
1078            let watcher = watcher
1079                .start_watching(self.key_dirs_tx.clone())
1080                .map_err(|e| {
1081                    // TODO: update the publish status (see also the module-level TODO about this).
1082                    error_report!(e, "Cannot set file watcher");
1083                })
1084                .ok();
1085            inner.file_watcher = watcher;
1086        } else {
1087            if inner.file_watcher.is_some() {
1088                debug!("removing key_dirs watcher");
1089            }
1090            inner.file_watcher = None;
1091        }
1092    }
1093
1094    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
1095    /// uploading.
1096    fn note_ipt_change(&self) -> PublishStatus {
1097        let mut ipts = self.ipt_watcher.borrow_for_publish();
1098        match ipts.ipts.as_mut() {
1099            Some(_ipts) => PublishStatus::UploadScheduled,
1100            None => PublishStatus::AwaitingIpts,
1101        }
1102    }
1103
1104    /// Update our list of introduction points.
1105    async fn handle_ipt_change(
1106        &mut self,
1107        update: Option<Result<(), crate::FatalError>>,
1108    ) -> Result<ShutdownStatus, FatalError> {
1109        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
1110        match update {
1111            Some(Ok(())) => {
1112                let should_upload = self.note_ipt_change();
1113                debug!(nickname=%self.imm.nickname, "the introduction points have changed");
1114
1115                self.mark_all_dirty();
1116                self.update_publish_status_unless_rate_lim(should_upload)
1117                    .await?;
1118                Ok(ShutdownStatus::Continue)
1119            }
1120            Some(Err(e)) => Err(e),
1121            None => {
1122                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
1123                Ok(ShutdownStatus::Terminate)
1124            }
1125        }
1126    }
1127
1128    /// Update the `PublishStatus` of the reactor with `new_state`,
1129    /// unless the current state is `AwaitingIpts`.
1130    async fn update_publish_status_unless_waiting(
1131        &mut self,
1132        new_state: PublishStatus,
1133    ) -> Result<(), FatalError> {
1134        // Only update the state if we're not waiting for intro points.
1135        if self.status() != PublishStatus::AwaitingIpts {
1136            self.update_publish_status(new_state).await?;
1137        }
1138
1139        Ok(())
1140    }
1141
1142    /// Update the `PublishStatus` of the reactor with `new_state`,
1143    /// unless the current state is `RateLimited`.
1144    async fn update_publish_status_unless_rate_lim(
1145        &mut self,
1146        new_state: PublishStatus,
1147    ) -> Result<(), FatalError> {
1148        // We can't exit this state until the rate-limit expires.
1149        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1150            self.update_publish_status(new_state).await?;
1151        }
1152
1153        Ok(())
1154    }
1155
1156    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
1157    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
1158        let onion_status = match new_state {
1159            PublishStatus::Idle => None,
1160            PublishStatus::UploadScheduled
1161            | PublishStatus::AwaitingIpts
1162            | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
1163        };
1164
1165        if let Some(onion_status) = onion_status {
1166            self.imm.status_tx.send(onion_status, None);
1167        }
1168
1169        trace!(
1170            "publisher reactor status change: {:?} -> {:?}",
1171            self.status(),
1172            new_state
1173        );
1174
1175        self.publish_status_tx.send(new_state).await.map_err(
1176            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
1177        )?;
1178
1179        Ok(())
1180    }
1181
1182    /// Update the onion svc status based on the results of the last descriptor uploads.
1183    fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
1184        let inner = self.inner.lock().expect("poisoned lock");
1185        let netdir = inner
1186            .netdir
1187            .as_ref()
1188            .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
1189
1190        let (state, err) = upload_result_state(netdir, &inner.time_periods);
1191        self.imm.status_tx.send(state, err);
1192
1193        Ok(())
1194    }
1195
1196    /// Update the descriptors based on the config change.
1197    async fn handle_svc_config_change(
1198        &mut self,
1199        config: &OnionServiceConfig,
1200    ) -> Result<(), FatalError> {
1201        let new_config = Arc::new(config.into());
1202        if self.replace_config_if_changed(Arc::clone(&new_config)) {
1203            self.update_file_watcher();
1204            self.update_authorized_clients_if_changed().await?;
1205
1206            info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
1207            self.mark_all_dirty();
1208
1209            // Schedule an upload, unless we're still waiting for IPTs.
1210            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1211                .await?;
1212        }
1213
1214        Ok(())
1215    }
1216
1217    /// Update the descriptors based on a restricted discovery key_dirs change.
1218    ///
1219    /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
1220    /// this marks the descriptor as dirty for all time periods,
1221    /// and schedules a reupload.
1222    async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
1223        debug!("The configured key_dirs have changed");
1224        match event {
1225            FileEvent::Rescan | FileEvent::FileChanged => {
1226                // These events are handled in the same way, by re-reading the keys from disk
1227                // and republishing the descriptor if necessary
1228            }
1229            _ => return Err(internal!("file watcher event {event:?}").into()),
1230        };
1231
1232        // Update the file watcher, in case the change was triggered by a key_dir move.
1233        self.update_file_watcher();
1234
1235        if self.update_authorized_clients_if_changed().await? {
1236            self.mark_all_dirty();
1237
1238            // Schedule an upload, unless we're still waiting for IPTs.
1239            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1240                .await?;
1241        }
1242
1243        Ok(())
1244    }
1245
1246    /// Recreate the authorized_clients based on the current config.
1247    ///
1248    /// Returns `true` if the authorized clients have changed.
1249    async fn update_authorized_clients_if_changed(&mut self) -> Result<bool, FatalError> {
1250        let mut inner = self.inner.lock().expect("poisoned lock");
1251        let authorized_clients =
1252            Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
1253
1254        let clients = &mut inner.authorized_clients;
1255        let changed = clients.as_ref() != authorized_clients.as_ref();
1256
1257        if changed {
1258            info!("The restricted discovery mode authorized clients have changed");
1259            *clients = authorized_clients;
1260        }
1261
1262        Ok(changed)
1263    }
1264
1265    /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
1266    fn read_authorized_clients(
1267        config: &RestrictedDiscoveryConfig,
1268        path_resolver: &CfgPathResolver,
1269    ) -> Option<Arc<RestrictedDiscoveryKeys>> {
1270        let authorized_clients = config.read_keys(path_resolver);
1271
1272        if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
1273            warn!(
1274                "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
1275            );
1276        }
1277
1278        authorized_clients.map(Arc::new)
1279    }
1280
1281    /// Mark the descriptor dirty for all time periods.
1282    fn mark_all_dirty(&self) {
1283        trace!("marking the descriptor dirty for all time periods");
1284
1285        self.inner
1286            .lock()
1287            .expect("poisoned lock")
1288            .time_periods
1289            .iter_mut()
1290            .for_each(|tp| tp.mark_all_dirty());
1291    }
1292
1293    /// Mark the descriptor dirty for the specified time period.
1294    ///
1295    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
1296    fn mark_dirty(&self, period: &TimePeriod) -> bool {
1297        let mut inner = self.inner.lock().expect("poisoned lock");
1298        let period_ctx = inner
1299            .time_periods
1300            .iter_mut()
1301            .find(|tp| tp.params.time_period() == *period);
1302
1303        match period_ctx {
1304            Some(ctx) => {
1305                trace!(time_period=?period, "marking the descriptor dirty");
1306                ctx.mark_all_dirty();
1307                true
1308            }
1309            None => false,
1310        }
1311    }
1312
1313    /// Try to upload our descriptor to the HsDirs that need it.
1314    ///
1315    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
1316    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
1317    ///
1318    /// Failed uploads are retried
1319    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1320    ///
1321    /// If restricted discovery mode is enabled and there are no authorized clients,
1322    /// we abort the upload and set our status to [`State::Broken`].
1323    //
1324    // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
1325    // (for example if the IPTs change),
1326    // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
1327    // TODO: we might wish to refactor the publisher to be more sophisticated about this.
1328    //
1329    /// For each current time period, we spawn a task that uploads the descriptor to
1330    /// all the HsDirs on the HsDir ring of that time period.
1331    /// Each task shuts down on completion, or when the reactor is dropped.
1332    ///
1333    /// Each task reports its upload results (`TimePeriodUploadResult`)
1334    /// via the `upload_task_complete_tx` channel.
1335    /// The results are received and processed in the main loop of the reactor.
1336    ///
1337    /// Returns an error if it fails to spawn a task, or if an internal error occurs.
1338    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
1339    async fn upload_all(&mut self) -> Result<(), FatalError> {
1340        trace!("starting descriptor upload task...");
1341
1342        // Abort the upload entirely if we have an empty list of authorized clients
1343        let authorized_clients = match self.authorized_clients() {
1344            Ok(authorized_clients) => authorized_clients,
1345            Err(e) => {
1346                error_report!(e, "aborting upload");
1347                self.imm.status_tx.send_broken(e.clone());
1348
1349                // Returning an error would shut down the reactor, so we have to return Ok here.
1350                return Ok(());
1351            }
1352        };
1353
1354        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
1355        let now = self.imm.runtime.now();
1356        // Check if we should rate-limit this upload.
1357        if let Some(ts) = last_uploaded {
1358            let duration_since_upload = now.duration_since(ts);
1359
1360            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
1361                return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
1362            }
1363        }
1364
1365        let mut inner = self.inner.lock().expect("poisoned lock");
1366        let inner = &mut *inner;
1367
1368        let _ = inner.last_uploaded.insert(now);
1369
1370        for period_ctx in inner.time_periods.iter_mut() {
1371            let upload_task_complete_tx = self.upload_task_complete_tx.clone();
1372
1373            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
1374            // have our latest descriptor, so we filter them out).
1375            let hs_dirs = period_ctx
1376                .hs_dirs
1377                .iter()
1378                .filter_map(|(relay_id, status)| {
1379                    if *status == DescriptorStatus::Dirty {
1380                        Some(relay_id.clone())
1381                    } else {
1382                        None
1383                    }
1384                })
1385                .collect::<Vec<_>>();
1386
1387            if hs_dirs.is_empty() {
1388                trace!("the descriptor is clean for all HSDirs. Nothing to do");
1389                return Ok(());
1390            }
1391
1392            let time_period = period_ctx.params.time_period();
1393            // This scope exists because rng is not Send, so it needs to fall out of scope before we
1394            // await anything.
1395            let netdir = Arc::clone(
1396                inner
1397                    .netdir
1398                    .as_ref()
1399                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
1400            );
1401
1402            let imm = Arc::clone(&self.imm);
1403            let ipt_upload_view = self.ipt_watcher.upload_view();
1404            let config = Arc::clone(&inner.config);
1405            let authorized_clients = authorized_clients.clone();
1406
1407            trace!(nickname=%self.imm.nickname, time_period=?time_period,
1408                "spawning upload task"
1409            );
1410
1411            let params = period_ctx.params.clone();
1412            let shutdown_rx = self.shutdown_tx.subscribe();
1413
1414            // Spawn a task to upload the descriptor to all HsDirs of this time period.
1415            //
1416            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
1417            // dropped).
1418            let _handle: () = self
1419                .imm
1420                .runtime
1421                .spawn(async move {
1422                    if let Err(e) = Self::upload_for_time_period(
1423                        hs_dirs,
1424                        &netdir,
1425                        config,
1426                        params,
1427                        Arc::clone(&imm),
1428                        ipt_upload_view.clone(),
1429                        authorized_clients.clone(),
1430                        upload_task_complete_tx,
1431                        shutdown_rx,
1432                    )
1433                    .await
1434                    {
1435                        error_report!(
1436                            e,
1437                            "descriptor upload failed for HS service {} and time period {:?}",
1438                            imm.nickname,
1439                            time_period
1440                        );
1441                    }
1442                })
1443                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
1444        }
1445
1446        Ok(())
1447    }
1448
1449    /// Upload the descriptor for the time period specified in `params`.
1450    ///
1451    /// Failed uploads are retried
1452    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1453    #[allow(clippy::too_many_arguments)] // TODO: refactor
1454    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1455    async fn upload_for_time_period(
1456        hs_dirs: Vec<RelayIds>,
1457        netdir: &Arc<NetDir>,
1458        config: Arc<OnionServiceConfigPublisherView>,
1459        params: HsDirParams,
1460        imm: Arc<Immutable<R, M>>,
1461        ipt_upload_view: IptsPublisherUploadView,
1462        authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
1463        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
1464        shutdown_rx: broadcast::Receiver<Void>,
1465    ) -> Result<(), FatalError> {
1466        let time_period = params.time_period();
1467        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
1468
1469        let hsdir_count = hs_dirs.len();
1470
1471        /// An error returned from an upload future.
1472        //
1473        // Exhaustive, because this is a private type.
1474        #[derive(Clone, Debug, thiserror::Error)]
1475        enum PublishError {
1476            /// The upload was aborted because there are no IPTs.
1477            ///
1478            /// This happens because of an inevitable TOCTOU race, where after being notified by
1479            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
1480            /// we find out there actually are no IPTs, so we can't build the descriptor.
1481            ///
1482            /// This is a special kind of error that interrupts the current upload task, and is
1483            /// logged at `debug!` level rather than `warn!` or `error!`.
1484            ///
1485            /// Ideally, this shouldn't happen very often (if at all).
1486            #[error("No IPTs")]
1487            NoIpts,
1488
1489            /// The reactor has shut down
1490            #[error("The reactor has shut down")]
1491            Shutdown,
1492
1493            /// An fatal error.
1494            #[error("{0}")]
1495            Fatal(#[from] FatalError),
1496        }
1497
1498        let max_hsdesc_len: usize = netdir
1499            .params()
1500            .hsdir_max_desc_size
1501            .try_into()
1502            .expect("Unable to convert positive int32 to usize!?");
1503
1504        let upload_results = futures::stream::iter(hs_dirs)
1505            .map(|relay_ids| {
1506                let netdir = netdir.clone();
1507                let config = Arc::clone(&config);
1508                let imm = Arc::clone(&imm);
1509                let ipt_upload_view = ipt_upload_view.clone();
1510                let authorized_clients = authorized_clients.clone();
1511                let params = params.clone();
1512                let mut shutdown_rx = shutdown_rx.clone();
1513
1514                let ed_id = relay_ids
1515                    .rsa_identity()
1516                    .map(|id| id.to_string())
1517                    .unwrap_or_else(|| "unknown".into());
1518                let rsa_id = relay_ids
1519                    .rsa_identity()
1520                    .map(|id| id.to_string())
1521                    .unwrap_or_else(|| "unknown".into());
1522
1523                async move {
1524                    let run_upload = |desc| async {
1525                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
1526                            // This should never happen (all of our relay_ids are from the stored
1527                            // netdir).
1528                            let err =
1529                                "tried to upload descriptor to relay not found in consensus?!";
1530                            warn!(
1531                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1532                                "{err}"
1533                            );
1534                            return Err(internal!("{err}").into());
1535                        };
1536
1537                        Self::upload_descriptor_with_retries(
1538                            desc,
1539                            &netdir,
1540                            &hsdir,
1541                            &ed_id,
1542                            &rsa_id,
1543                            Arc::clone(&imm),
1544                        )
1545                        .await
1546                    };
1547
1548                    // How long until we're supposed to time out?
1549                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
1550                    // We generate a new descriptor before _each_ HsDir upload. This means each
1551                    // HsDir could, in theory, receive a different descriptor (not just in terms of
1552                    // revision-counters, but also with a different set of IPTs). It may seem like
1553                    // this could lead to some HsDirs being left with an outdated descriptor, but
1554                    // that's not the case: after the upload completes, the publisher will be
1555                    // notified by the ipt_watcher of the IPT change event (if there was one to
1556                    // begin with), which will trigger another upload job.
1557                    let hsdesc = {
1558                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
1559                        // needs to fall out of scope before the await point below
1560                        let mut ipt_set = ipt_upload_view.borrow_for_publish();
1561
1562                        // If there are no IPTs, we abort the upload. At this point, we might have
1563                        // uploaded the descriptor to some, but not all, HSDirs from the specified
1564                        // time period.
1565                        //
1566                        // Returning an error here means the upload completion task is never
1567                        // notified of the outcome of any of these uploads (which means the
1568                        // descriptor is not marked clean). This is OK, because if we suddenly find
1569                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
1570                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
1571                        // and generate a fresh descriptor anyway.
1572                        //
1573                        // Ideally, this shouldn't happen very often (if at all).
1574                        let Some(ipts) = ipt_set.ipts.as_mut() else {
1575                            return Err(PublishError::NoIpts);
1576                        };
1577
1578                        let hsdesc = {
1579                            trace!(
1580                                nickname=%imm.nickname, time_period=?time_period,
1581                                "building descriptor"
1582                            );
1583                            let mut rng = imm.mockable.thread_rng();
1584                            let mut key_rng = tor_llcrypto::rng::CautiousRng;
1585
1586                            // We're about to generate a new version of the descriptor,
1587                            // so let's generate a new revision counter.
1588                            let now = imm.runtime.wallclock();
1589                            let revision_counter = imm.generate_revision_counter(&params, now)?;
1590
1591                            build_sign(
1592                                &imm.keymgr,
1593                                &imm.pow_manager,
1594                                &config,
1595                                authorized_clients.as_deref(),
1596                                ipts,
1597                                time_period,
1598                                revision_counter,
1599                                &mut rng,
1600                                &mut key_rng,
1601                                imm.runtime.wallclock(),
1602                                max_hsdesc_len,
1603                            )?
1604                        };
1605
1606                        if let Err(e) =
1607                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
1608                        {
1609                            let wait = e.log_retry_max(&imm.nickname)?;
1610                            // TODO (#1226): retry instead of this
1611                            return Err(FatalError::Bug(internal!(
1612                                "ought to retry after {wait:?}, crashing instead"
1613                            ))
1614                            .into());
1615                        }
1616
1617                        hsdesc
1618                    };
1619
1620                    let VersionedDescriptor {
1621                        desc,
1622                        revision_counter,
1623                    } = hsdesc;
1624
1625                    trace!(
1626                        nickname=%imm.nickname, time_period=?time_period,
1627                        revision_counter=?revision_counter,
1628                        "generated new descriptor for time period",
1629                    );
1630
1631                    // (Actually launch the upload attempt. No timeout is needed
1632                    // here, since the backoff::Runner code will handle that for us.)
1633                    let upload_res: UploadResult = select_biased! {
1634                        shutdown = shutdown_rx.next().fuse() => {
1635                            // This will always be None, since Void is uninhabited.
1636                            let _: Option<Void> = shutdown;
1637
1638                            // It looks like the reactor has shut down,
1639                            // so there is no point in uploading the descriptor anymore.
1640                            //
1641                            // Let's shut down the upload task too.
1642                            trace!(
1643                                nickname=%imm.nickname, time_period=?time_period,
1644                                "upload task received shutdown signal"
1645                            );
1646
1647                            return Err(PublishError::Shutdown);
1648                        },
1649                        res = run_upload(desc.clone()).fuse() => res,
1650                    };
1651
1652                    // Note: UploadResult::Failure is only returned when
1653                    // upload_descriptor_with_retries fails, i.e. if all our retry
1654                    // attempts have failed
1655                    Ok(HsDirUploadStatus {
1656                        relay_ids,
1657                        upload_res,
1658                        revision_counter,
1659                    })
1660                }
1661            })
1662            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
1663            .boxed()
1664            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
1665            .try_collect::<Vec<_>>()
1666            .await;
1667
1668        let upload_results = match upload_results {
1669            Ok(v) => v,
1670            Err(PublishError::Fatal(e)) => return Err(e),
1671            Err(PublishError::NoIpts) => {
1672                debug!(
1673                    nickname=%imm.nickname, time_period=?time_period,
1674                     "no introduction points; skipping upload"
1675                );
1676
1677                return Ok(());
1678            }
1679            Err(PublishError::Shutdown) => {
1680                debug!(
1681                    nickname=%imm.nickname, time_period=?time_period,
1682                     "the reactor has shut down; aborting upload"
1683                );
1684
1685                return Ok(());
1686            }
1687        };
1688
1689        let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
1690            .iter()
1691            .partition(|res| res.upload_res.is_ok());
1692
1693        debug!(
1694            nickname=%imm.nickname, time_period=?time_period,
1695            "descriptor uploaded successfully to {}/{} HSDirs",
1696            succeeded.len(), hsdir_count
1697        );
1698
1699        if upload_task_complete_tx
1700            .send(TimePeriodUploadResult {
1701                time_period,
1702                hsdir_result: upload_results,
1703            })
1704            .await
1705            .is_err()
1706        {
1707            return Err(internal!(
1708                "failed to notify reactor of upload completion (reactor shut down)"
1709            )
1710            .into());
1711        }
1712
1713        Ok(())
1714    }
1715
1716    /// Upload a descriptor to the specified HSDir.
1717    ///
1718    /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
1719    /// to the caller to retry on failure.
1720    ///
1721    /// This function does not handle timeouts.
1722    async fn upload_descriptor(
1723        hsdesc: String,
1724        netdir: &Arc<NetDir>,
1725        hsdir: &Relay<'_>,
1726        imm: Arc<Immutable<R, M>>,
1727    ) -> Result<(), UploadError> {
1728        let request = HsDescUploadRequest::new(hsdesc);
1729
1730        trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
1731            "starting descriptor upload",
1732        );
1733
1734        let circuit = imm
1735            .mockable
1736            .get_or_launch_specific(
1737                netdir,
1738                HsCircKind::SvcHsDir,
1739                OwnedCircTarget::from_circ_target(hsdir),
1740            )
1741            .await?;
1742        let source: Option<SourceInfo> = circuit
1743            .source_info()
1744            .map_err(into_internal!("Couldn't get SourceInfo for circuit"))?;
1745        let mut stream = circuit
1746            .begin_dir_stream()
1747            .await
1748            .map_err(UploadError::Stream)?;
1749
1750        let _response: String = send_request(&imm.runtime, &request, &mut stream, source)
1751            .await
1752            .map_err(|dir_error| -> UploadError {
1753                match dir_error {
1754                    DirClientError::RequestFailed(e) => e.into(),
1755                    DirClientError::CircMgr(e) => into_internal!(
1756                        "tor-dirclient complains about circmgr going wrong but we gave it a stream"
1757                    )(e)
1758                    .into(),
1759                    e => into_internal!("unexpected error")(e).into(),
1760                }
1761            })?
1762            .into_output_string()?; // This returns an error if we received an error response
1763
1764        Ok(())
1765    }
1766
1767    /// Upload a descriptor to the specified HSDir, retrying if appropriate.
1768    ///
1769    /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
1770    /// Each failed upload is retried until it succeeds, or until the overall timeout specified
1771    /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
1772    /// according to the [`BackoffSchedule::single_attempt_timeout`].
1773    /// This function gives up after the overall timeout elapses,
1774    /// declaring the upload a failure, and never retrying it again.
1775    ///
1776    /// See also [`BackoffSchedule`].
1777    async fn upload_descriptor_with_retries(
1778        hsdesc: String,
1779        netdir: &Arc<NetDir>,
1780        hsdir: &Relay<'_>,
1781        ed_id: &str,
1782        rsa_id: &str,
1783        imm: Arc<Immutable<R, M>>,
1784    ) -> UploadResult {
1785        /// The base delay to use for the backoff schedule.
1786        const BASE_DELAY_MSEC: u32 = 1000;
1787        let schedule = PublisherBackoffSchedule {
1788            retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
1789            mockable: imm.mockable.clone(),
1790        };
1791
1792        let runner = Runner::new(
1793            "upload a hidden service descriptor".into(),
1794            schedule.clone(),
1795            imm.runtime.clone(),
1796        );
1797
1798        let fallible_op = || async {
1799            let r = Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm)).await;
1800
1801            if let Err(e) = &r {
1802                if e.should_report_as_suspicious() {
1803                    // Note that not every protocol violation is suspicious:
1804                    // we only warn on the protocol violations that look like attempts
1805                    // to do a traffic tagging attack via hsdir inflation.
1806                    // (See proposal 360.)
1807                    warn_report!(
1808                        &e.clone(),
1809                        "Suspicious error while uploading descriptor to {}/{}",
1810                        ed_id,
1811                        rsa_id
1812                    );
1813                }
1814            }
1815            r
1816        };
1817
1818        let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
1819        match outcome {
1820            Ok(()) => {
1821                debug!(
1822                    nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1823                    "successfully uploaded descriptor to HSDir",
1824                );
1825
1826                Ok(())
1827            }
1828            Err(e) => {
1829                warn_report!(
1830                    e,
1831                    "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
1832                    imm.nickname,
1833                    ed_id,
1834                    rsa_id
1835                );
1836
1837                Err(e.into())
1838            }
1839        }
1840    }
1841
1842    /// Stop publishing descriptors until the specified delay elapses.
1843    async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
1844        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1845            debug!(
1846                "We are rate-limited for {}; pausing descriptor publication",
1847                humantime::format_duration(delay)
1848            );
1849            let until = self.imm.runtime.now() + delay;
1850            self.update_publish_status(PublishStatus::RateLimited(until))
1851                .await?;
1852        }
1853
1854        Ok(())
1855    }
1856
1857    /// Handle the upload rate-limit being lifted.
1858    async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
1859        debug!("We are no longer rate-limited; resuming descriptor publication");
1860        self.update_publish_status(PublishStatus::UploadScheduled)
1861            .await?;
1862        Ok(())
1863    }
1864
1865    /// Return the authorized clients, if restricted mode is enabled.
1866    ///
1867    /// Returns `Ok(None)` if restricted discovery mode is disabled.
1868    ///
1869    /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
1870    #[cfg_attr(
1871        not(feature = "restricted-discovery"),
1872        allow(clippy::unnecessary_wraps)
1873    )]
1874    fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
1875        cfg_if::cfg_if! {
1876            if #[cfg(feature = "restricted-discovery")] {
1877                let authorized_clients = self
1878                    .inner
1879                    .lock()
1880                    .expect("poisoned lock")
1881                    .authorized_clients
1882                    .clone();
1883
1884                if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
1885                    return Err(FatalError::RestrictedDiscoveryNoClients);
1886                }
1887
1888                Ok(authorized_clients)
1889            } else {
1890                Ok(None)
1891            }
1892        }
1893    }
1894}
1895
1896/// Try to expand a path, logging a warning on failure.
1897fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
1898    // map_err returns unit for clarity
1899    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1900    p.path(r)
1901        .map_err(|e| {
1902            tor_error::warn_report!(e, "invalid path");
1903            ()
1904        })
1905        .ok()
1906}
1907
1908/// Add `path` to the specified `watcher`.
1909macro_rules! watch_path {
1910    ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
1911        if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
1912            warn_report!(e, "failed to watch path {:?}", $path);
1913        } else {
1914            debug!("watching path {:?}", $path);
1915        }
1916    }}
1917}
1918
1919/// Add the specified directories to the watcher.
1920#[allow(clippy::cognitive_complexity)]
1921fn watch_dirs<R: Runtime>(
1922    watcher: &mut FileWatcherBuilder<R>,
1923    dirs: &DirectoryKeyProviderList,
1924    path_resolver: &CfgPathResolver,
1925) {
1926    for path in dirs {
1927        let path = path.path();
1928        let Some(path) = maybe_expand_path(path, path_resolver) else {
1929            warn!("failed to expand key_dir path {:?}", path);
1930            continue;
1931        };
1932
1933        // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
1934        // so we skip over paths that don't exist at this time
1935        // (this obviously suffers from a TOCTOU race, but most of the time,
1936        // it is good enough at preventing the watcher from failing to watch.
1937        // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
1938        // but it will fail to set the watcher).
1939        if matches!(path.try_exists(), Ok(true)) {
1940            watch_path!(watcher, &path, watch_dir, "auth",);
1941        }
1942        // FileWatcher::watch_path causes the parent dir of the path to be watched.
1943        if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
1944            watch_path!(watcher, &path, watch_path,);
1945        }
1946    }
1947}
1948
1949/// Try to read the blinded identity key for a given `TimePeriod`.
1950///
1951/// Returns `None` if the service is running in "offline" mode.
1952///
1953// TODO (#1194): we don't currently have support for "offline" mode so this can never return
1954// `Ok(None)`.
1955pub(super) fn read_blind_id_keypair(
1956    keymgr: &Arc<KeyMgr>,
1957    nickname: &HsNickname,
1958    period: TimePeriod,
1959) -> Result<Option<HsBlindIdKeypair>, FatalError> {
1960    let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
1961    let hsid_kp = keymgr
1962        .get::<HsIdKeypair>(&svc_key_spec)?
1963        .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
1964
1965    let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
1966
1967    // TODO: make the keystore selector configurable
1968    let keystore_selector = Default::default();
1969    match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
1970        Some(kp) => Ok(Some(kp)),
1971        None => {
1972            let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
1973                .compute_blinded_key(period)
1974                .map_err(|_| internal!("failed to compute blinded key"))?;
1975
1976            // Note: we can't use KeyMgr::generate because this key is derived from the HsId
1977            // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
1978            // which assumes keys are randomly generated, rather than derived from existing keys).
1979
1980            keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
1981
1982            let arti_path = |spec: &dyn KeySpecifier| {
1983                spec.arti_path()
1984                    .map_err(into_internal!("invalid key specifier?!"))
1985            };
1986
1987            Ok(Some(
1988                keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
1989                    FatalError::KeystoreRace {
1990                        action: "read",
1991                        path: arti_path(&blind_id_key_spec)?,
1992                    },
1993                )?,
1994            ))
1995        }
1996    }
1997}
1998
1999/// Determine the [`State`] of the publisher based on the upload results
2000/// from the current `time_periods`.
2001fn upload_result_state(
2002    netdir: &NetDir,
2003    time_periods: &[TimePeriodContext],
2004) -> (State, Option<Problem>) {
2005    let current_period = netdir.hs_time_period();
2006    let current_period_res = time_periods
2007        .iter()
2008        .find(|ctx| ctx.params.time_period() == current_period);
2009
2010    let succeeded_current_tp = current_period_res
2011        .iter()
2012        .flat_map(|res| &res.upload_results)
2013        .filter(|res| res.upload_res.is_ok())
2014        .collect_vec();
2015
2016    let secondary_tp_res = time_periods
2017        .iter()
2018        .filter(|ctx| ctx.params.time_period() != current_period)
2019        .collect_vec();
2020
2021    let succeeded_secondary_tp = secondary_tp_res
2022        .iter()
2023        .flat_map(|res| &res.upload_results)
2024        .filter(|res| res.upload_res.is_ok())
2025        .collect_vec();
2026
2027    // All of the failed uploads (for all TPs)
2028    let failed = time_periods
2029        .iter()
2030        .flat_map(|res| &res.upload_results)
2031        .filter(|res| res.upload_res.is_err())
2032        .collect_vec();
2033    let problems: Vec<DescUploadRetryError> = failed
2034        .iter()
2035        .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
2036        .collect();
2037
2038    let err = match problems.as_slice() {
2039        [_, ..] => Some(problems.into()),
2040        [] => None,
2041    };
2042
2043    if time_periods.len() < 2 {
2044        // We need at least TP contexts (one for the primary TP,
2045        // and another for the secondary one).
2046        //
2047        // If either is missing, we are unreachable for some or all clients.
2048        return (State::DegradedUnreachable, err);
2049    }
2050
2051    let state = match (
2052        succeeded_current_tp.as_slice(),
2053        succeeded_secondary_tp.as_slice(),
2054    ) {
2055        (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
2056            // We don't have any upload results for one or both TPs.
2057            // We are still bootstrapping.
2058            State::Bootstrapping
2059        }
2060        (&[_, ..], &[_, ..]) if failed.is_empty() => {
2061            // We have uploaded the descriptor to one or more HsDirs from both
2062            // HsDir rings (primary and secondary), and none of the uploads failed.
2063            // We are fully reachable.
2064            State::Running
2065        }
2066        (&[_, ..], &[_, ..]) => {
2067            // We have uploaded the descriptor to one or more HsDirs from both
2068            // HsDir rings (primary and secondary), but some of the uploads failed.
2069            // We are reachable, but we failed to upload the descriptor to all the HsDirs
2070            // that were supposed to have it.
2071            State::DegradedReachable
2072        }
2073        (&[..], &[]) | (&[], &[..]) => {
2074            // We have either
2075            //   * uploaded the descriptor to some of the HsDirs from one of the rings,
2076            //   but haven't managed to upload it to any of the HsDirs on the other ring, or
2077            //   * all of the uploads failed
2078            //
2079            // Either way, we are definitely not reachable by all clients.
2080            State::DegradedUnreachable
2081        }
2082    };
2083
2084    (state, err)
2085}
2086
2087/// Whether the reactor should initiate an upload.
2088#[derive(Copy, Clone, Debug, Default, PartialEq)]
2089enum PublishStatus {
2090    /// We need to call upload_all.
2091    UploadScheduled,
2092    /// We are rate-limited until the specified [`Instant`].
2093    ///
2094    /// We have tried to schedule multiple uploads in a short time span,
2095    /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
2096    /// channel to unblock us.
2097    RateLimited(Instant),
2098    /// We are idle and waiting for external events.
2099    ///
2100    /// We have enough information to build the descriptor, but since we have already called
2101    /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
2102    Idle,
2103    /// We are waiting for the IPT manager to establish some introduction points.
2104    ///
2105    /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
2106    /// `UploadScheduled`.
2107    #[default]
2108    AwaitingIpts,
2109}
2110
2111/// The backoff schedule for the task that publishes descriptors.
2112#[derive(Clone, Debug)]
2113struct PublisherBackoffSchedule<M: Mockable> {
2114    /// The delays
2115    retry_delay: RetryDelay,
2116    /// The mockable reactor state, needed for obtaining an rng.
2117    mockable: M,
2118}
2119
2120impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
2121    fn max_retries(&self) -> Option<usize> {
2122        None
2123    }
2124
2125    fn overall_timeout(&self) -> Option<Duration> {
2126        Some(OVERALL_UPLOAD_TIMEOUT)
2127    }
2128
2129    fn single_attempt_timeout(&self) -> Option<Duration> {
2130        Some(self.mockable.estimate_upload_timeout())
2131    }
2132
2133    fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
2134        Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
2135    }
2136}
2137
2138impl RetriableError for UploadError {
2139    fn should_retry(&self) -> bool {
2140        match self {
2141            UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
2142            UploadError::Bug(_) => false,
2143        }
2144    }
2145}
2146
2147/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
2148#[derive(Debug, Clone)]
2149struct TimePeriodUploadResult {
2150    /// The time period.
2151    time_period: TimePeriod,
2152    /// The upload results.
2153    hsdir_result: Vec<HsDirUploadStatus>,
2154}
2155
2156/// The outcome of uploading a descriptor to a particular HsDir.
2157#[derive(Clone, Debug)]
2158struct HsDirUploadStatus {
2159    /// The identity of the HsDir we attempted to upload the descriptor to.
2160    relay_ids: RelayIds,
2161    /// The outcome of this attempt.
2162    upload_res: UploadResult,
2163    /// The revision counter of the descriptor we tried to upload.
2164    revision_counter: RevisionCounter,
2165}
2166
2167/// The outcome of uploading a descriptor.
2168type UploadResult = Result<(), DescUploadRetryError>;
2169
2170impl From<BackoffError<UploadError>> for DescUploadRetryError {
2171    fn from(e: BackoffError<UploadError>) -> Self {
2172        use BackoffError as BE;
2173        use DescUploadRetryError as DURE;
2174
2175        match e {
2176            BE::FatalError(e) => DURE::FatalError(e),
2177            BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
2178            BE::Timeout(e) => DURE::Timeout(e),
2179            BE::ExplicitStop(_) => {
2180                DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
2181            }
2182        }
2183    }
2184}
2185
2186// NOTE: the rest of the publisher tests live in publish.rs
2187#[cfg(test)]
2188mod test {
2189    // @@ begin test lint list maintained by maint/add_warning @@
2190    #![allow(clippy::bool_assert_comparison)]
2191    #![allow(clippy::clone_on_copy)]
2192    #![allow(clippy::dbg_macro)]
2193    #![allow(clippy::mixed_attributes_style)]
2194    #![allow(clippy::print_stderr)]
2195    #![allow(clippy::print_stdout)]
2196    #![allow(clippy::single_char_pattern)]
2197    #![allow(clippy::unwrap_used)]
2198    #![allow(clippy::unchecked_duration_subtraction)]
2199    #![allow(clippy::useless_vec)]
2200    #![allow(clippy::needless_pass_by_value)]
2201    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
2202    use super::*;
2203    use tor_netdir::testnet;
2204
2205    /// Create a `TimePeriodContext` from the specified upload results.
2206    fn create_time_period_ctx(
2207        params: &HsDirParams,
2208        upload_results: Vec<HsDirUploadStatus>,
2209    ) -> TimePeriodContext {
2210        TimePeriodContext {
2211            params: params.clone(),
2212            hs_dirs: vec![],
2213            last_successful: None,
2214            upload_results,
2215        }
2216    }
2217
2218    /// Create a single `HsDirUploadStatus`
2219    fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
2220        HsDirUploadStatus {
2221            relay_ids: RelayIds::empty(),
2222            upload_res,
2223            revision_counter: RevisionCounter::from(13),
2224        }
2225    }
2226
2227    /// Create a bunch of results, all with the specified `upload_res`.
2228    fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
2229        std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
2230            .take(10)
2231            .collect()
2232    }
2233
2234    fn construct_netdir() -> NetDir {
2235        const SRV1: [u8; 32] = *b"The door refused to open.       ";
2236        const SRV2: [u8; 32] = *b"It said, 'Five cents, please.'  ";
2237
2238        let dir = testnet::construct_custom_netdir(|_, _, bld| {
2239            bld.shared_rand_prev(7, SRV1.into(), None)
2240                .shared_rand_prev(7, SRV2.into(), None);
2241        })
2242        .unwrap();
2243
2244        dir.unwrap_if_sufficient().unwrap()
2245    }
2246
2247    #[test]
2248    fn upload_result_status_bootstrapping() {
2249        let netdir = construct_netdir();
2250        let all_params = netdir.hs_all_time_periods();
2251        let current_period = netdir.hs_time_period();
2252        let primary_params = all_params
2253            .iter()
2254            .find(|param| param.time_period() == current_period)
2255            .unwrap();
2256        let results = [
2257            (vec![], vec![]),
2258            (vec![], create_upload_results(Ok(()))),
2259            (create_upload_results(Ok(())), vec![]),
2260        ];
2261
2262        for (primary_result, secondary_result) in results {
2263            let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2264
2265            let secondary_params = all_params
2266                .iter()
2267                .find(|param| param.time_period() != current_period)
2268                .unwrap();
2269            let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2270
2271            let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2272            assert_eq!(status, State::Bootstrapping);
2273            assert!(err.is_none());
2274        }
2275    }
2276
2277    #[test]
2278    fn upload_result_status_running() {
2279        let netdir = construct_netdir();
2280        let all_params = netdir.hs_all_time_periods();
2281        let current_period = netdir.hs_time_period();
2282        let primary_params = all_params
2283            .iter()
2284            .find(|param| param.time_period() == current_period)
2285            .unwrap();
2286
2287        let secondary_result = create_upload_results(Ok(()));
2288        let secondary_params = all_params
2289            .iter()
2290            .find(|param| param.time_period() != current_period)
2291            .unwrap();
2292        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2293
2294        let primary_result = create_upload_results(Ok(()));
2295        let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2296        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2297        assert_eq!(status, State::Running);
2298        assert!(err.is_none());
2299    }
2300
2301    #[test]
2302    fn upload_result_status_reachable() {
2303        let netdir = construct_netdir();
2304        let all_params = netdir.hs_all_time_periods();
2305        let current_period = netdir.hs_time_period();
2306        let primary_params = all_params
2307            .iter()
2308            .find(|param| param.time_period() == current_period)
2309            .unwrap();
2310
2311        let primary_result = create_upload_results(Ok(()));
2312        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2313        let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2314        let secondary_result = create_upload_results(Ok(()))
2315            .into_iter()
2316            .chain(failed_res.iter().cloned())
2317            .collect();
2318        let secondary_params = all_params
2319            .iter()
2320            .find(|param| param.time_period() != current_period)
2321            .unwrap();
2322        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
2323        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2324
2325        // Degraded but reachable (because some of the secondary HsDir uploads failed).
2326        assert_eq!(status, State::DegradedReachable);
2327        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2328    }
2329
2330    #[test]
2331    fn upload_result_status_unreachable() {
2332        let netdir = construct_netdir();
2333        let all_params = netdir.hs_all_time_periods();
2334        let current_period = netdir.hs_time_period();
2335        let primary_params = all_params
2336            .iter()
2337            .find(|param| param.time_period() == current_period)
2338            .unwrap();
2339        let mut primary_result =
2340            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2341        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2342        // No secondary TP (we are unreachable).
2343        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2344        assert_eq!(status, State::DegradedUnreachable);
2345        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2346
2347        // Add a successful result
2348        primary_result.push(create_upload_status(Ok(())));
2349        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2350        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2351        // Still degraded, and unreachable (because we don't have a TimePeriodContext
2352        // for the secondary TP)
2353        assert_eq!(status, State::DegradedUnreachable);
2354        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2355
2356        // If we add another time period where none of the uploads were successful,
2357        // we're *still* unreachable
2358        let secondary_result =
2359            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2360        let secondary_params = all_params
2361            .iter()
2362            .find(|param| param.time_period() != current_period)
2363            .unwrap();
2364        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2365        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2366        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2367        assert_eq!(status, State::DegradedUnreachable);
2368        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2369    }
2370}