tor_hsservice/publish/
reactor.rs

1//! The onion service publisher reactor.
2//!
3//! Generates and publishes hidden service descriptors in response to various events.
4//!
5//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8//! any of the channels the reactor is receiving events from is closed
9//! (i.e. when the senders are dropped).
10//!
11//! ## Publisher status
12//!
13//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14//! which is used for onion service status reporting.
15//!
16//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17//! and responds by generating and publishing a new descriptor if needed.
18//!
19//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20//!
21//! ## When do we publish?
22//!
23//! We generate and publish a new descriptor if
24//!   * the introduction points have changed
25//!   * the onion service configuration has changed in a meaningful way (for example,
26//!     if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27//!     has changed. See [`OnionServiceConfigPublisherView`]).
28//!   * there is a new consensus
29//!   * it is time to republish the descriptor (after we upload a descriptor,
30//!     we schedule it for republishing at a random time between 60 minutes and 120 minutes
31//!     in the future)
32//!
33//! ## Onion service status
34//!
35//! With respect to [`OnionServiceStatus`] reporting,
36//! the following state transitions are possible:
37//!
38//!
39//! ```ignore
40//!
41//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42//!                +---------------------------------------+
43//!                |                                       |
44//!                |                                       v
45//!                |                               +---------------+
46//!                |                               | Bootstrapping |
47//!                |                               +---------------+
48//!                |                                       |
49//!                |                                       |           uploaded to at least
50//!                |  not enough HsDir uploads succeeded   |        some HsDirs from each ring
51//!                |         +-----------------------------+-----------------------+
52//!                |         |                             |                       |
53//!                |         |              all HsDir uploads succeeded            |
54//!                |         |                             |                       |
55//!                |         v                             v                       v
56//!                |  +---------------------+         +---------+        +---------------------+
57//!                |  | DegradedUnreachable |         | Running |        |  DegradedReachable  |
58//! +----------+   |  +---------------------+         +---------+        +---------------------+
59//! | Shutdown |-- |         |                           |                        |
60//! +----------+   |         |                           |                        |
61//!                |         |                           |                        |
62//!                |         |                           |                        |
63//!                |         +---------------------------+------------------------+
64//!                |                                     |   invalid authorized_clients
65//!                |                                     |      after handling config change
66//!                |                                     |
67//!                |                                     v
68//!                |     run_once() returns an error +--------+
69//!                +-------------------------------->| Broken |
70//!                                                  +--------+
71//! ```
72//!
73//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74//! back to `Bootstrapping` (those transitions were omitted for brevity).
75
76use tor_config::file_watcher::{
77    self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
78};
79use tor_config_path::{CfgPath, CfgPathResolver};
80use tor_netdir::{DirEvent, NetDir};
81
82use crate::config::restricted_discovery::{
83    DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
84};
85use crate::config::OnionServiceConfigPublisherView;
86use crate::status::{DescUploadRetryError, Problem};
87
88use super::*;
89
90// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
91// for rate-limiting the publish jobs triggered by a change in the config?
92//
93// Currently the descriptor publish tasks triggered by changes in the config
94// are rate-limited via the usual rate limiting mechanism
95// (which rate-limits the uploads for 1m).
96//
97// I think this is OK for now, but we might need to rethink this if it becomes problematic
98// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
99// each time the config is modified).
100
101/// The upload rate-limiting threshold.
102///
103/// Before initiating an upload, the reactor checks if the last upload was at least
104/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
105/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
106/// current time.
107//
108// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
109const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
110
111/// The maximum number of concurrent upload tasks per time period.
112//
113// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
114// will have no effect, since the current number of replicas is far less than
115// this value.
116//
117// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
118// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
119// (currently 2, which means the concurrency limit will, in fact, be 32).
120//
121// We should try to decouple this value from the TP parameters.
122const MAX_CONCURRENT_UPLOADS: usize = 16;
123
124/// The maximum time allowed for uploading a descriptor to a single HSDir,
125/// across all attempts.
126pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
127
128/// A reactor for the HsDir [`Publisher`]
129///
130/// The entrypoint is [`Reactor::run`].
131#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
132pub(super) struct Reactor<R: Runtime, M: Mockable> {
133    /// The immutable, shared inner state.
134    imm: Arc<Immutable<R, M>>,
135    /// A source for new network directories that we use to determine
136    /// our HsDirs.
137    dir_provider: Arc<dyn NetDirProvider>,
138    /// The mutable inner state,
139    inner: Arc<Mutex<Inner>>,
140    /// A channel for receiving IPT change notifications.
141    ipt_watcher: IptsPublisherView,
142    /// A channel for receiving onion service config change notifications.
143    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
144    /// A channel for receiving restricted discovery key_dirs change notifications.
145    key_dirs_rx: FileEventReceiver,
146    /// A channel for sending restricted discovery key_dirs change notifications.
147    ///
148    /// A copy of this sender is handed out to every `FileWatcher` created.
149    key_dirs_tx: FileEventSender,
150    /// A channel for receiving updates regarding our [`PublishStatus`].
151    ///
152    /// The main loop of the reactor watches for updates on this channel.
153    ///
154    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
155    /// we can start publishing descriptors.
156    ///
157    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
158    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
159    /// established some introduction points.
160    publish_status_rx: watch::Receiver<PublishStatus>,
161    /// A sender for updating our [`PublishStatus`].
162    ///
163    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
164    /// we can start publishing descriptors.
165    publish_status_tx: watch::Sender<PublishStatus>,
166    /// A channel for sending upload completion notifications.
167    ///
168    /// This channel is polled in the main loop of the reactor.
169    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
170    /// A channel for receiving upload completion notifications.
171    ///
172    /// A copy of this sender is handed to each upload task.
173    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
174    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
175    ///
176    /// Receivers can use this channel to find out when reactor is dropped.
177    ///
178    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
179    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
180    ///
181    /// Closing this channel will cause any pending upload tasks to be dropped.
182    shutdown_tx: broadcast::Sender<Void>,
183    /// Path resolver for configuration files.
184    path_resolver: Arc<CfgPathResolver>,
185    /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
186    /// rotated and thus we need to republish the descriptor for a particular time period.
187    update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
188}
189
190/// The immutable, shared state of the descriptor publisher reactor.
191#[derive(Clone)]
192struct Immutable<R: Runtime, M: Mockable> {
193    /// The runtime.
194    runtime: R,
195    /// Mockable state.
196    ///
197    /// This is used for launching circuits and for obtaining random number generators.
198    mockable: M,
199    /// The service for which we're publishing descriptors.
200    nickname: HsNickname,
201    /// The key manager,
202    keymgr: Arc<KeyMgr>,
203    /// A sender for updating the status of the onion service.
204    status_tx: PublisherStatusSender,
205    /// Proof-of-work state.
206    pow_manager: Arc<PowManager<R>>,
207}
208
209impl<R: Runtime, M: Mockable> Immutable<R, M> {
210    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
211    /// with the specified [`TimePeriod`].
212    ///
213    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
214    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
215    /// descriptor signing key.
216    ///
217    /// Returns an error if the service is running in offline mode and the descriptor signing
218    /// keypair of the specified `period` is not available.
219    //
220    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
221    // built from the blinded id key
222    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
223        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
224            Some(key) => {
225                let key: ed25519::ExpandedKeypair = key.into();
226                key.to_secret_key_bytes()[0..32]
227                    .try_into()
228                    .expect("Wrong length on slice")
229            }
230            None => {
231                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
232                // is unreachable (for now).
233                let desc_sign_key_spec =
234                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
235                let key: ed25519::Keypair = self
236                    .keymgr
237                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
238                    // TODO (#1194): internal! is not the right type for this error (we need an
239                    // error type for the case where a hidden service running in offline mode has
240                    // run out of its pre-previsioned keys).
241                    //
242                    // This will be addressed when we add support for offline hs_id mode
243                    .ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
244                    .into();
245                key.to_bytes()
246            }
247        };
248
249        Ok(AesOpeKey::from_secret(&ope_key))
250    }
251
252    /// Generate a revision counter for a descriptor associated with the specified
253    /// [`TimePeriod`].
254    ///
255    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
256    ///
257    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
258    fn generate_revision_counter(
259        &self,
260        params: &HsDirParams,
261        now: SystemTime,
262    ) -> Result<RevisionCounter, FatalError> {
263        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
264        // to each time we generate a new descriptor), for performance reasons.
265        let ope_key = self.create_ope_key(params.time_period())?;
266
267        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
268        let srv_start = params.start_of_shard_rand_period();
269        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
270            internal!(
271                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
272                now,
273                srv_start
274            )
275        })?;
276        let rev = ope_key.encrypt(offset);
277
278        Ok(RevisionCounter::from(rev))
279    }
280}
281
282/// Mockable state for the descriptor publisher reactor.
283///
284/// This enables us to mock parts of the [`Reactor`] for testing purposes.
285#[async_trait]
286pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
287    /// The type of random number generator.
288    type Rng: rand::Rng + rand::CryptoRng;
289
290    /// The type of client circuit.
291    type ClientCirc: MockableClientCirc;
292
293    /// Return a random number generator.
294    fn thread_rng(&self) -> Self::Rng;
295
296    /// Create a circuit of the specified `kind` to `target`.
297    async fn get_or_launch_specific<T>(
298        &self,
299        netdir: &NetDir,
300        kind: HsCircKind,
301        target: T,
302    ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
303    where
304        T: CircTarget + Send + Sync;
305
306    /// Return an estimate-based value for how long we should allow a single
307    /// directory upload operation to complete.
308    ///
309    /// Includes circuit construction, stream opening, upload, and waiting for a
310    /// response.
311    fn estimate_upload_timeout(&self) -> Duration;
312}
313
314/// Mockable client circuit
315#[async_trait]
316pub(crate) trait MockableClientCirc: Send + Sync {
317    /// The data stream type.
318    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
319
320    /// Start a new stream to the last relay in the circuit, using
321    /// a BEGIN_DIR cell.
322    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
323}
324
325#[async_trait]
326impl MockableClientCirc for ClientCirc {
327    type DataStream = tor_proto::stream::DataStream;
328
329    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
330        ClientCirc::begin_dir_stream(self).await
331    }
332}
333
334/// The real version of the mockable state of the reactor.
335#[derive(Clone, From, Into)]
336pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
337
338#[async_trait]
339impl<R: Runtime> Mockable for Real<R> {
340    type Rng = rand::rngs::ThreadRng;
341    type ClientCirc = ClientCirc;
342
343    fn thread_rng(&self) -> Self::Rng {
344        rand::rng()
345    }
346
347    async fn get_or_launch_specific<T>(
348        &self,
349        netdir: &NetDir,
350        kind: HsCircKind,
351        target: T,
352    ) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
353    where
354        T: CircTarget + Send + Sync,
355    {
356        self.0.get_or_launch_specific(netdir, kind, target).await
357    }
358
359    fn estimate_upload_timeout(&self) -> Duration {
360        use tor_circmgr::timeouts::Action;
361        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
362        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
363        // We assume that in the worst case we'll have to wait for an entire
364        // circuit construction and two round-trips to the hsdir.
365        let est_total = est_build + est_roundtrip * 2;
366        // We always allow _at least_ this much time, in case our estimate is
367        // ridiculously low.
368        let min_timeout = Duration::from_secs(30);
369        max(est_total, min_timeout)
370    }
371}
372
373/// The mutable state of a [`Reactor`].
374struct Inner {
375    /// The onion service config.
376    config: Arc<OnionServiceConfigPublisherView>,
377    /// Watcher for key_dirs.
378    ///
379    /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
380    ///
381    /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
382    file_watcher: Option<FileWatcher>,
383    /// The relevant time periods.
384    ///
385    /// This includes the current time period, as well as any other time periods we need to be
386    /// publishing descriptors for.
387    ///
388    /// This is empty until we fetch our first netdir in [`Reactor::run`].
389    time_periods: Vec<TimePeriodContext>,
390    /// Our most up to date netdir.
391    ///
392    /// This is initialized in [`Reactor::run`].
393    netdir: Option<Arc<NetDir>>,
394    /// The timestamp of our last upload.
395    ///
396    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
397    /// the publisher from spawning multiple upload tasks at once in response to multiple external
398    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
399    /// notifications in a short time frame (#1142), or an IPT change notification that's
400    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
401    /// inefficient, but it also causes the publisher to generate two different descriptors with
402    /// the same revision counter (the revision counter is derived from the current timestamp),
403    /// which ultimately causes the slower upload task to fail (see #1142).
404    ///
405    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
406    /// used for retrying failed uploads (these are handled internally by
407    /// [`Reactor::upload_descriptor_with_retries`]).
408    last_uploaded: Option<Instant>,
409    /// A max-heap containing the time periods for which we need to reupload the descriptor.
410    // TODO: we are currently reuploading more than nececessary.
411    // Ideally, this shouldn't contain contain duplicate TimePeriods,
412    // because we only need to retain the latest reupload time for each time period.
413    //
414    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
415    // we will end up with multiple ReuploadTimer entries for that TP,
416    // each of which will (eventually) result in a reupload.
417    //
418    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
419    //
420    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
421    reupload_timers: BinaryHeap<ReuploadTimer>,
422    /// The restricted discovery authorized clients.
423    ///
424    /// `None`, unless the service is running in restricted discovery mode.
425    authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
426}
427
428/// The part of the reactor state that changes with every time period.
429struct TimePeriodContext {
430    /// The HsDir params.
431    params: HsDirParams,
432    /// The HsDirs to use in this time period.
433    ///
434    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
435    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
436    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
437    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
438    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
439    /// The revision counter of the last successful upload, if any.
440    last_successful: Option<RevisionCounter>,
441    /// The outcome of the last upload, if any.
442    upload_results: Vec<HsDirUploadStatus>,
443}
444
445impl TimePeriodContext {
446    /// Create a new `TimePeriodContext`.
447    ///
448    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
449    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
450    fn new<'r>(
451        params: HsDirParams,
452        blind_id: HsBlindId,
453        netdir: &Arc<NetDir>,
454        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
455        old_upload_results: Vec<HsDirUploadStatus>,
456    ) -> Result<Self, FatalError> {
457        let period = params.time_period();
458        let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
459        let upload_results = old_upload_results
460            .into_iter()
461            .filter(|res|
462                // Check if the HsDir of this result still exists
463                hs_dirs
464                    .iter()
465                    .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
466            .collect();
467
468        Ok(Self {
469            params,
470            hs_dirs,
471            last_successful: None,
472            upload_results,
473        })
474    }
475
476    /// Recompute the HsDirs for this time period.
477    fn compute_hsdirs<'r>(
478        period: TimePeriod,
479        blind_id: HsBlindId,
480        netdir: &Arc<NetDir>,
481        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
482    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
483        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
484
485        Ok(hs_dirs
486            .map(|hs_dir| {
487                let mut builder = RelayIds::builder();
488                if let Some(ed_id) = hs_dir.ed_identity() {
489                    builder.ed_identity(*ed_id);
490                }
491
492                if let Some(rsa_id) = hs_dir.rsa_identity() {
493                    builder.rsa_identity(*rsa_id);
494                }
495
496                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
497
498                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
499                // reupload it unless it was already dirty and due for a reupload.
500                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
501                    Some((_, status)) => *status,
502                    None => DescriptorStatus::Dirty,
503                };
504
505                (relay_id, status)
506            })
507            .collect::<Vec<_>>())
508    }
509
510    /// Mark the descriptor dirty for all HSDirs of this time period.
511    fn mark_all_dirty(&mut self) {
512        self.hs_dirs
513            .iter_mut()
514            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
515    }
516
517    /// Update the upload result for this time period.
518    fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
519        self.upload_results = upload_results;
520    }
521}
522
523/// An error that occurs while trying to upload a descriptor.
524#[derive(Clone, Debug, thiserror::Error)]
525#[non_exhaustive]
526pub enum UploadError {
527    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
528    #[error("descriptor upload request failed: {}", _0.error)]
529    Request(#[from] RequestFailedError),
530
531    /// Failed to establish circuit to hidden service directory
532    #[error("could not build circuit to HsDir")]
533    Circuit(#[from] tor_circmgr::Error),
534
535    /// Failed to establish stream to hidden service directory
536    #[error("failed to establish directory stream to HsDir")]
537    Stream(#[source] tor_proto::Error),
538
539    /// An internal error.
540    #[error("Internal error")]
541    Bug(#[from] tor_error::Bug),
542}
543define_asref_dyn_std_error!(UploadError);
544
545impl<R: Runtime, M: Mockable> Reactor<R, M> {
546    /// Create a new `Reactor`.
547    #[allow(clippy::too_many_arguments)]
548    pub(super) fn new(
549        runtime: R,
550        nickname: HsNickname,
551        dir_provider: Arc<dyn NetDirProvider>,
552        mockable: M,
553        config: &OnionServiceConfig,
554        ipt_watcher: IptsPublisherView,
555        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
556        status_tx: PublisherStatusSender,
557        keymgr: Arc<KeyMgr>,
558        path_resolver: Arc<CfgPathResolver>,
559        pow_manager: Arc<PowManager<R>>,
560        update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
561    ) -> Self {
562        /// The maximum size of the upload completion notifier channel.
563        ///
564        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
565        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
566        /// each sender will send exactly one message.
567        const UPLOAD_CHAN_BUF_SIZE: usize = 0;
568
569        // Internally-generated instructions, no need for mq.
570        let (upload_task_complete_tx, upload_task_complete_rx) =
571            mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
572
573        let (publish_status_tx, publish_status_rx) = watch::channel();
574        // Setting the buffer size to zero here is OK,
575        // since we never actually send anything on this channel.
576        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
577
578        let authorized_clients =
579            Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
580
581        // Create a channel for watching for changes in the configured
582        // restricted_discovery.key_dirs.
583        let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
584
585        let imm = Immutable {
586            runtime,
587            mockable,
588            nickname,
589            keymgr,
590            status_tx,
591            pow_manager,
592        };
593
594        let inner = Inner {
595            time_periods: vec![],
596            config: Arc::new(config.into()),
597            file_watcher: None,
598            netdir: None,
599            last_uploaded: None,
600            reupload_timers: Default::default(),
601            authorized_clients,
602        };
603
604        Self {
605            imm: Arc::new(imm),
606            inner: Arc::new(Mutex::new(inner)),
607            dir_provider,
608            ipt_watcher,
609            config_rx,
610            key_dirs_rx,
611            key_dirs_tx,
612            publish_status_rx,
613            publish_status_tx,
614            upload_task_complete_rx,
615            upload_task_complete_tx,
616            shutdown_tx,
617            path_resolver,
618            update_from_pow_manager_rx,
619        }
620    }
621
622    /// Start the reactor.
623    ///
624    /// Under normal circumstances, this function runs indefinitely.
625    ///
626    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
627    /// upload fails or is rate-limited.
628    pub(super) async fn run(mut self) -> Result<(), FatalError> {
629        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
630
631        {
632            let netdir = self
633                .dir_provider
634                .wait_for_netdir(Timeliness::Timely)
635                .await?;
636            let time_periods = self.compute_time_periods(&netdir, &[])?;
637
638            let mut inner = self.inner.lock().expect("poisoned lock");
639
640            inner.netdir = Some(netdir);
641            inner.time_periods = time_periods;
642        }
643
644        // Create the initial key_dirs watcher.
645        self.update_file_watcher();
646
647        loop {
648            match self.run_once().await {
649                Ok(ShutdownStatus::Continue) => continue,
650                Ok(ShutdownStatus::Terminate) => {
651                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
652
653                    self.imm.status_tx.send_shutdown();
654                    return Ok(());
655                }
656                Err(e) => {
657                    error_report!(
658                        e,
659                        "HS service {}: descriptor publisher crashed!",
660                        self.imm.nickname
661                    );
662
663                    self.imm.status_tx.send_broken(e.clone());
664
665                    return Err(e);
666                }
667            }
668        }
669    }
670
671    /// Run one iteration of the reactor loop.
672    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
673    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
674        let mut netdir_events = self.dir_provider.events();
675
676        // Note: TrackingNow tracks the values it is compared with.
677        // This is equivalent to sleeping for (until - now) units of time,
678        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
679        if let PublishStatus::RateLimited(until) = self.status() {
680            if upload_rate_lim > until {
681                // We are no longer rate-limited
682                self.expire_rate_limit().await?;
683            }
684        }
685
686        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
687        let mut reupload_periods = vec![];
688        {
689            let mut inner = self.inner.lock().expect("poisoned lock");
690            let inner = &mut *inner;
691            while let Some(reupload) = inner.reupload_timers.peek().copied() {
692                // First, extract all the timeouts that already elapsed.
693                if reupload.when <= reupload_tracking {
694                    inner.reupload_timers.pop();
695                    reupload_periods.push(reupload.period);
696                } else {
697                    // We are not ready to schedule any more reuploads.
698                    //
699                    // How much we need to sleep is implicitly
700                    // tracked in reupload_tracking (through
701                    // the TrackingNow implementation)
702                    break;
703                }
704            }
705        }
706
707        // Check if it's time to schedule any reuploads.
708        for period in reupload_periods {
709            if self.mark_dirty(&period) {
710                debug!(
711                    time_period=?period,
712                    "descriptor reupload timer elapsed; scheduling reupload",
713                );
714                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
715                    .await?;
716            }
717        }
718
719        select_biased! {
720            res = self.upload_task_complete_rx.next().fuse() => {
721                let Some(upload_res) = res else {
722                    return Ok(ShutdownStatus::Terminate);
723                };
724
725                self.handle_upload_results(upload_res);
726                self.upload_result_to_svc_status()?;
727            },
728            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
729                self.expire_rate_limit().await?;
730            },
731            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
732                // Run another iteration, executing run_once again. This time, we will remove the
733                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
734                // relevant HsDirs, and schedule the upload by setting our status to
735                // UploadScheduled.
736                return Ok(ShutdownStatus::Continue);
737            },
738            netdir_event = netdir_events.next().fuse() => {
739                let Some(netdir_event) = netdir_event else {
740                    debug!("netdir event stream ended");
741                    return Ok(ShutdownStatus::Terminate);
742                };
743
744                if !matches!(netdir_event, DirEvent::NewConsensus) {
745                    return Ok(ShutdownStatus::Continue);
746                };
747
748                // The consensus changed. Grab a new NetDir.
749                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
750                    Ok(y) => y,
751                    Err(e) => {
752                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
753                        // Hopefully a netdir will appear in the future.
754                        // in the meantime, suspend operations.
755                        //
756                        // TODO (#1218): there is a bug here: we stop reading on our inputs
757                        // including eg publish_status_rx, but it is our job to log some of
758                        // these things.  While we are waiting for a netdir, all those messages
759                        // are "stuck"; they'll appear later, with misleading timestamps.
760                        //
761                        // Probably this should be fixed by moving the logging
762                        // out of the reactor, where it won't be blocked.
763                        self.dir_provider.wait_for_netdir(Timeliness::Timely)
764                            .await?
765                    }
766                };
767                let relevant_periods = netdir.hs_all_time_periods();
768                self.handle_consensus_change(netdir).await?;
769                expire_publisher_keys(
770                    &self.imm.keymgr,
771                    &self.imm.nickname,
772                    &relevant_periods,
773                ).unwrap_or_else(|e| {
774                    error_report!(e, "failed to remove expired keys");
775                });
776            }
777            update = self.ipt_watcher.await_update().fuse() => {
778                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
779                    return Ok(ShutdownStatus::Terminate);
780                }
781            },
782            config = self.config_rx.next().fuse() => {
783                let Some(config) = config else {
784                    return Ok(ShutdownStatus::Terminate);
785                };
786
787                self.handle_svc_config_change(&config).await?;
788            },
789            res = self.key_dirs_rx.next().fuse() => {
790                let Some(event) = res else {
791                    return Ok(ShutdownStatus::Terminate);
792                };
793
794                while let Some(_ignore) = self.key_dirs_rx.try_recv() {
795                    // Discard other events, so that we only reload once.
796                }
797
798                self.handle_key_dirs_change(event).await?;
799            }
800            should_upload = self.publish_status_rx.next().fuse() => {
801                let Some(should_upload) = should_upload else {
802                    return Ok(ShutdownStatus::Terminate);
803                };
804
805                // Our PublishStatus changed -- are we ready to publish?
806                if should_upload == PublishStatus::UploadScheduled {
807                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
808                    self.upload_all().await?;
809                }
810            }
811            update_tp_pow_seed = self.update_from_pow_manager_rx.next().fuse() => {
812                debug!("Update PoW seed for TP!");
813                let Some(time_period) = update_tp_pow_seed else {
814                    return Ok(ShutdownStatus::Terminate);
815                };
816                self.mark_dirty(&time_period);
817                self.upload_all().await?;
818            }
819        }
820
821        Ok(ShutdownStatus::Continue)
822    }
823
824    /// Returns the current status of the publisher
825    fn status(&self) -> PublishStatus {
826        *self.publish_status_rx.borrow()
827    }
828
829    /// Handle a batch of upload outcomes,
830    /// possibly updating the status of the descriptor for the corresponding HSDirs.
831    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
832        let mut inner = self.inner.lock().expect("poisoned lock");
833        let inner = &mut *inner;
834
835        // Check which time period these uploads pertain to.
836        let period = inner
837            .time_periods
838            .iter_mut()
839            .find(|ctx| ctx.params.time_period() == results.time_period);
840
841        let Some(period) = period else {
842            // The uploads were for a time period that is no longer relevant, so we
843            // can ignore the result.
844            return;
845        };
846
847        // We will need to reupload this descriptor at at some point, so we pick
848        // a random time between 60 minutes and 120 minutes in the future.
849        //
850        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
851        let mut rng = self.imm.mockable.thread_rng();
852        // TODO SPEC: Control republish period using a consensus parameter?
853        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
854        let duration = Duration::from_secs(minutes * 60);
855        let reupload_when = self.imm.runtime.now() + duration;
856        let time_period = period.params.time_period();
857
858        info!(
859            time_period=?time_period,
860            "reuploading descriptor in {}",
861            humantime::format_duration(duration),
862        );
863
864        inner.reupload_timers.push(ReuploadTimer {
865            period: time_period,
866            when: reupload_when,
867        });
868
869        let mut upload_results = vec![];
870        for upload_res in results.hsdir_result {
871            let relay = period
872                .hs_dirs
873                .iter_mut()
874                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
875
876            let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
877                // This HSDir went away, so the result doesn't matter.
878                // Continue processing the rest of the results
879                continue;
880            };
881
882            if upload_res.upload_res.is_ok() {
883                let update_last_successful = match period.last_successful {
884                    None => true,
885                    Some(counter) => counter <= upload_res.revision_counter,
886                };
887
888                if update_last_successful {
889                    period.last_successful = Some(upload_res.revision_counter);
890                    // TODO (#1098): Is it possible that this won't update the statuses promptly
891                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
892                    // and start an upload task for a descriptor has already been uploaded (or is
893                    // being uploaded) in another task, but whose upload results have not yet been
894                    // processed.
895                    //
896                    // This is probably made worse by the fact that the statuses are updated in
897                    // batches (grouped by time period), rather than one by one as the upload tasks
898                    // complete (updating the status involves locking the inner mutex, and I wanted
899                    // to minimize the locking/unlocking overheads). I'm not sure handling the
900                    // updates in batches was the correct decision here.
901                    *status = DescriptorStatus::Clean;
902                }
903            }
904
905            upload_results.push(upload_res);
906        }
907
908        period.set_upload_results(upload_results);
909    }
910
911    /// Maybe update our list of HsDirs.
912    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
913        trace!("the consensus has changed; recomputing HSDirs");
914
915        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
916
917        self.recompute_hs_dirs()?;
918        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
919            .await?;
920
921        // If the time period has changed, some of our upload results may now be irrelevant,
922        // so we might need to update our status (for example, if our uploads are
923        // for a no-longer-relevant time period, it means we might be able to update
924        // out status from "degraded" to "running")
925        self.upload_result_to_svc_status()?;
926
927        Ok(())
928    }
929
930    /// Recompute the HsDirs for all relevant time periods.
931    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
932        let mut inner = self.inner.lock().expect("poisoned lock");
933        let inner = &mut *inner;
934
935        let netdir = Arc::clone(
936            inner
937                .netdir
938                .as_ref()
939                .ok_or_else(|| internal!("started upload task without a netdir"))?,
940        );
941
942        // Update our list of relevant time periods.
943        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
944        inner.time_periods = new_time_periods;
945
946        Ok(())
947    }
948
949    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
950    ///
951    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
952    /// HsDirs where possible.
953    fn compute_time_periods(
954        &self,
955        netdir: &Arc<NetDir>,
956        time_periods: &[TimePeriodContext],
957    ) -> Result<Vec<TimePeriodContext>, FatalError> {
958        netdir
959            .hs_all_time_periods()
960            .iter()
961            .map(|params| {
962                let period = params.time_period();
963                let blind_id_kp =
964                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
965                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
966                        // It's supposed to return Ok(None) if we're in offline hsid mode,
967                        // but that might change when we do #1194
968                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;
969
970                let blind_id: HsBlindIdKey = (&blind_id_kp).into();
971
972                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
973                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
974                // publishing the descriptor to the HsDirs that already have it (the ones that are
975                // marked with DescriptorStatus::Clean).
976                //
977                // In other words, we only want to publish to those HsDirs that
978                //   * are part of a new time period (which we have never published the descriptor
979                //   for), or
980                //   * have just been added to the ring of a time period we already knew about
981                if let Some(ctx) = time_periods
982                    .iter()
983                    .find(|ctx| ctx.params.time_period() == period)
984                {
985                    TimePeriodContext::new(
986                        params.clone(),
987                        blind_id.into(),
988                        netdir,
989                        ctx.hs_dirs.iter(),
990                        ctx.upload_results.clone(),
991                    )
992                } else {
993                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
994                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
995                    TimePeriodContext::new(
996                        params.clone(),
997                        blind_id.into(),
998                        netdir,
999                        iter::empty(),
1000                        vec![],
1001                    )
1002                }
1003            })
1004            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
1005    }
1006
1007    /// Replace the old netdir with the new, returning the old.
1008    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
1009        self.inner
1010            .lock()
1011            .expect("poisoned lock")
1012            .netdir
1013            .replace(new_netdir)
1014    }
1015
1016    /// Replace our view of the service config with `new_config` if `new_config` contains changes
1017    /// that would cause us to generate a new descriptor.
1018    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
1019        let mut inner = self.inner.lock().expect("poisoned lock");
1020        let old_config = &mut inner.config;
1021
1022        // The fields we're interested in haven't changed, so there's no need to update
1023        // `inner.config`.
1024        if *old_config == new_config {
1025            return false;
1026        }
1027
1028        let log_change = match (
1029            old_config.restricted_discovery.enabled,
1030            new_config.restricted_discovery.enabled,
1031        ) {
1032            (true, false) => Some("Disabling restricted discovery mode"),
1033            (false, true) => Some("Enabling restricted discovery mode"),
1034            _ => None,
1035        };
1036
1037        if let Some(msg) = log_change {
1038            info!(nickname=%self.imm.nickname, "{}", msg);
1039        }
1040
1041        let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
1042
1043        true
1044    }
1045
1046    /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
1047    fn update_file_watcher(&self) {
1048        let mut inner = self.inner.lock().expect("poisoned lock");
1049        if inner.config.restricted_discovery.watch_configuration() {
1050            debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
1051            let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
1052
1053            let dirs = inner.config.restricted_discovery.key_dirs().clone();
1054
1055            watch_dirs(&mut watcher, &dirs, &self.path_resolver);
1056
1057            let watcher = watcher
1058                .start_watching(self.key_dirs_tx.clone())
1059                .map_err(|e| {
1060                    // TODO: update the publish status (see also the module-level TODO about this).
1061                    error_report!(e, "Cannot set file watcher");
1062                })
1063                .ok();
1064            inner.file_watcher = watcher;
1065        } else {
1066            if inner.file_watcher.is_some() {
1067                debug!("removing key_dirs watcher");
1068            }
1069            inner.file_watcher = None;
1070        }
1071    }
1072
1073    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
1074    /// uploading.
1075    fn note_ipt_change(&self) -> PublishStatus {
1076        let mut ipts = self.ipt_watcher.borrow_for_publish();
1077        match ipts.ipts.as_mut() {
1078            Some(_ipts) => PublishStatus::UploadScheduled,
1079            None => PublishStatus::AwaitingIpts,
1080        }
1081    }
1082
1083    /// Update our list of introduction points.
1084    async fn handle_ipt_change(
1085        &mut self,
1086        update: Option<Result<(), crate::FatalError>>,
1087    ) -> Result<ShutdownStatus, FatalError> {
1088        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
1089        match update {
1090            Some(Ok(())) => {
1091                let should_upload = self.note_ipt_change();
1092                debug!(nickname=%self.imm.nickname, "the introduction points have changed");
1093
1094                self.mark_all_dirty();
1095                self.update_publish_status_unless_rate_lim(should_upload)
1096                    .await?;
1097                Ok(ShutdownStatus::Continue)
1098            }
1099            Some(Err(e)) => Err(e),
1100            None => {
1101                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
1102                Ok(ShutdownStatus::Terminate)
1103            }
1104        }
1105    }
1106
1107    /// Update the `PublishStatus` of the reactor with `new_state`,
1108    /// unless the current state is `AwaitingIpts`.
1109    async fn update_publish_status_unless_waiting(
1110        &mut self,
1111        new_state: PublishStatus,
1112    ) -> Result<(), FatalError> {
1113        // Only update the state if we're not waiting for intro points.
1114        if self.status() != PublishStatus::AwaitingIpts {
1115            self.update_publish_status(new_state).await?;
1116        }
1117
1118        Ok(())
1119    }
1120
1121    /// Update the `PublishStatus` of the reactor with `new_state`,
1122    /// unless the current state is `RateLimited`.
1123    async fn update_publish_status_unless_rate_lim(
1124        &mut self,
1125        new_state: PublishStatus,
1126    ) -> Result<(), FatalError> {
1127        // We can't exit this state until the rate-limit expires.
1128        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1129            self.update_publish_status(new_state).await?;
1130        }
1131
1132        Ok(())
1133    }
1134
1135    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
1136    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
1137        let onion_status = match new_state {
1138            PublishStatus::Idle => None,
1139            PublishStatus::UploadScheduled
1140            | PublishStatus::AwaitingIpts
1141            | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
1142        };
1143
1144        if let Some(onion_status) = onion_status {
1145            self.imm.status_tx.send(onion_status, None);
1146        }
1147
1148        trace!(
1149            "publisher reactor status change: {:?} -> {:?}",
1150            self.status(),
1151            new_state
1152        );
1153
1154        self.publish_status_tx.send(new_state).await.map_err(
1155            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
1156        )?;
1157
1158        Ok(())
1159    }
1160
1161    /// Update the onion svc status based on the results of the last descriptor uploads.
1162    fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
1163        let inner = self.inner.lock().expect("poisoned lock");
1164        let netdir = inner
1165            .netdir
1166            .as_ref()
1167            .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
1168
1169        let (state, err) = upload_result_state(netdir, &inner.time_periods);
1170        self.imm.status_tx.send(state, err);
1171
1172        Ok(())
1173    }
1174
1175    /// Update the descriptors based on the config change.
1176    async fn handle_svc_config_change(
1177        &mut self,
1178        config: &OnionServiceConfig,
1179    ) -> Result<(), FatalError> {
1180        let new_config = Arc::new(config.into());
1181        if self.replace_config_if_changed(Arc::clone(&new_config)) {
1182            self.update_file_watcher();
1183            self.update_authorized_clients_if_changed().await?;
1184
1185            info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
1186            self.mark_all_dirty();
1187
1188            // Schedule an upload, unless we're still waiting for IPTs.
1189            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1190                .await?;
1191        }
1192
1193        Ok(())
1194    }
1195
1196    /// Update the descriptors based on a restricted discovery key_dirs change.
1197    ///
1198    /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
1199    /// this marks the descriptor as dirty for all time periods,
1200    /// and schedules a reupload.
1201    async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
1202        debug!("The configured key_dirs have changed");
1203        match event {
1204            FileEvent::Rescan | FileEvent::FileChanged => {
1205                // These events are handled in the same way, by re-reading the keys from disk
1206                // and republishing the descriptor if necessary
1207            }
1208            _ => return Err(internal!("file watcher event {event:?}").into()),
1209        };
1210
1211        // Update the file watcher, in case the change was triggered by a key_dir move.
1212        self.update_file_watcher();
1213
1214        if self.update_authorized_clients_if_changed().await? {
1215            self.mark_all_dirty();
1216
1217            // Schedule an upload, unless we're still waiting for IPTs.
1218            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1219                .await?;
1220        }
1221
1222        Ok(())
1223    }
1224
1225    /// Recreate the authorized_clients based on the current config.
1226    ///
1227    /// Returns `true` if the authorized clients have changed.
1228    async fn update_authorized_clients_if_changed(&mut self) -> Result<bool, FatalError> {
1229        let mut inner = self.inner.lock().expect("poisoned lock");
1230        let authorized_clients =
1231            Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
1232
1233        let clients = &mut inner.authorized_clients;
1234        let changed = clients.as_ref() != authorized_clients.as_ref();
1235
1236        if changed {
1237            info!("The restricted discovery mode authorized clients have changed");
1238            *clients = authorized_clients;
1239        }
1240
1241        Ok(changed)
1242    }
1243
1244    /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
1245    fn read_authorized_clients(
1246        config: &RestrictedDiscoveryConfig,
1247        path_resolver: &CfgPathResolver,
1248    ) -> Option<Arc<RestrictedDiscoveryKeys>> {
1249        let authorized_clients = config.read_keys(path_resolver);
1250
1251        if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
1252            warn!(
1253                "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
1254            );
1255        }
1256
1257        authorized_clients.map(Arc::new)
1258    }
1259
1260    /// Mark the descriptor dirty for all time periods.
1261    fn mark_all_dirty(&self) {
1262        trace!("marking the descriptor dirty for all time periods");
1263
1264        self.inner
1265            .lock()
1266            .expect("poisoned lock")
1267            .time_periods
1268            .iter_mut()
1269            .for_each(|tp| tp.mark_all_dirty());
1270    }
1271
1272    /// Mark the descriptor dirty for the specified time period.
1273    ///
1274    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
1275    fn mark_dirty(&self, period: &TimePeriod) -> bool {
1276        let mut inner = self.inner.lock().expect("poisoned lock");
1277        let period_ctx = inner
1278            .time_periods
1279            .iter_mut()
1280            .find(|tp| tp.params.time_period() == *period);
1281
1282        match period_ctx {
1283            Some(ctx) => {
1284                trace!(time_period=?period, "marking the descriptor dirty");
1285                ctx.mark_all_dirty();
1286                true
1287            }
1288            None => false,
1289        }
1290    }
1291
1292    /// Try to upload our descriptor to the HsDirs that need it.
1293    ///
1294    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
1295    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
1296    ///
1297    /// Failed uploads are retried
1298    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1299    ///
1300    /// If restricted discovery mode is enabled and there are no authorized clients,
1301    /// we abort the upload and set our status to [`State::Broken`].
1302    //
1303    // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
1304    // (for example if the IPTs change),
1305    // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
1306    // TODO: we might wish to refactor the publisher to be more sophisticated about this.
1307    //
1308    /// For each current time period, we spawn a task that uploads the descriptor to
1309    /// all the HsDirs on the HsDir ring of that time period.
1310    /// Each task shuts down on completion, or when the reactor is dropped.
1311    ///
1312    /// Each task reports its upload results (`TimePeriodUploadResult`)
1313    /// via the `upload_task_complete_tx` channel.
1314    /// The results are received and processed in the main loop of the reactor.
1315    ///
1316    /// Returns an error if it fails to spawn a task, or if an internal error occurs.
1317    #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
1318    async fn upload_all(&mut self) -> Result<(), FatalError> {
1319        trace!("starting descriptor upload task...");
1320
1321        // Abort the upload entirely if we have an empty list of authorized clients
1322        let authorized_clients = match self.authorized_clients() {
1323            Ok(authorized_clients) => authorized_clients,
1324            Err(e) => {
1325                error_report!(e, "aborting upload");
1326                self.imm.status_tx.send_broken(e.clone());
1327
1328                // Returning an error would shut down the reactor, so we have to return Ok here.
1329                return Ok(());
1330            }
1331        };
1332
1333        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
1334        let now = self.imm.runtime.now();
1335        // Check if we should rate-limit this upload.
1336        if let Some(ts) = last_uploaded {
1337            let duration_since_upload = now.duration_since(ts);
1338
1339            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
1340                return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
1341            }
1342        }
1343
1344        let mut inner = self.inner.lock().expect("poisoned lock");
1345        let inner = &mut *inner;
1346
1347        let _ = inner.last_uploaded.insert(now);
1348
1349        for period_ctx in inner.time_periods.iter_mut() {
1350            let upload_task_complete_tx = self.upload_task_complete_tx.clone();
1351
1352            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
1353            // have our latest descriptor, so we filter them out).
1354            let hs_dirs = period_ctx
1355                .hs_dirs
1356                .iter()
1357                .filter_map(|(relay_id, status)| {
1358                    if *status == DescriptorStatus::Dirty {
1359                        Some(relay_id.clone())
1360                    } else {
1361                        None
1362                    }
1363                })
1364                .collect::<Vec<_>>();
1365
1366            if hs_dirs.is_empty() {
1367                trace!("the descriptor is clean for all HSDirs. Nothing to do");
1368                return Ok(());
1369            }
1370
1371            let time_period = period_ctx.params.time_period();
1372            // This scope exists because rng is not Send, so it needs to fall out of scope before we
1373            // await anything.
1374            let netdir = Arc::clone(
1375                inner
1376                    .netdir
1377                    .as_ref()
1378                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
1379            );
1380
1381            let imm = Arc::clone(&self.imm);
1382            let ipt_upload_view = self.ipt_watcher.upload_view();
1383            let config = Arc::clone(&inner.config);
1384            let authorized_clients = authorized_clients.clone();
1385
1386            trace!(nickname=%self.imm.nickname, time_period=?time_period,
1387                "spawning upload task"
1388            );
1389
1390            let params = period_ctx.params.clone();
1391            let shutdown_rx = self.shutdown_tx.subscribe();
1392
1393            // Spawn a task to upload the descriptor to all HsDirs of this time period.
1394            //
1395            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
1396            // dropped).
1397            let _handle: () = self
1398                .imm
1399                .runtime
1400                .spawn(async move {
1401                    if let Err(e) = Self::upload_for_time_period(
1402                        hs_dirs,
1403                        &netdir,
1404                        config,
1405                        params,
1406                        Arc::clone(&imm),
1407                        ipt_upload_view.clone(),
1408                        authorized_clients.clone(),
1409                        upload_task_complete_tx,
1410                        shutdown_rx,
1411                    )
1412                    .await
1413                    {
1414                        error_report!(
1415                            e,
1416                            "descriptor upload failed for HS service {} and time period {:?}",
1417                            imm.nickname,
1418                            time_period
1419                        );
1420                    }
1421                })
1422                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
1423        }
1424
1425        Ok(())
1426    }
1427
1428    /// Upload the descriptor for the time period specified in `params`.
1429    ///
1430    /// Failed uploads are retried
1431    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1432    #[allow(clippy::too_many_arguments)] // TODO: refactor
1433    #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1434    async fn upload_for_time_period(
1435        hs_dirs: Vec<RelayIds>,
1436        netdir: &Arc<NetDir>,
1437        config: Arc<OnionServiceConfigPublisherView>,
1438        params: HsDirParams,
1439        imm: Arc<Immutable<R, M>>,
1440        ipt_upload_view: IptsPublisherUploadView,
1441        authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
1442        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
1443        shutdown_rx: broadcast::Receiver<Void>,
1444    ) -> Result<(), FatalError> {
1445        let time_period = params.time_period();
1446        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
1447
1448        let hsdir_count = hs_dirs.len();
1449
1450        /// An error returned from an upload future.
1451        //
1452        // Exhaustive, because this is a private type.
1453        #[derive(Clone, Debug, thiserror::Error)]
1454        enum PublishError {
1455            /// The upload was aborted because there are no IPTs.
1456            ///
1457            /// This happens because of an inevitable TOCTOU race, where after being notified by
1458            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
1459            /// we find out there actually are no IPTs, so we can't build the descriptor.
1460            ///
1461            /// This is a special kind of error that interrupts the current upload task, and is
1462            /// logged at `debug!` level rather than `warn!` or `error!`.
1463            ///
1464            /// Ideally, this shouldn't happen very often (if at all).
1465            #[error("No IPTs")]
1466            NoIpts,
1467
1468            /// The reactor has shut down
1469            #[error("The reactor has shut down")]
1470            Shutdown,
1471
1472            /// An fatal error.
1473            #[error("{0}")]
1474            Fatal(#[from] FatalError),
1475        }
1476
1477        let upload_results = futures::stream::iter(hs_dirs)
1478            .map(|relay_ids| {
1479                let netdir = netdir.clone();
1480                let config = Arc::clone(&config);
1481                let imm = Arc::clone(&imm);
1482                let ipt_upload_view = ipt_upload_view.clone();
1483                let authorized_clients = authorized_clients.clone();
1484                let params = params.clone();
1485                let mut shutdown_rx = shutdown_rx.clone();
1486
1487                let ed_id = relay_ids
1488                    .rsa_identity()
1489                    .map(|id| id.to_string())
1490                    .unwrap_or_else(|| "unknown".into());
1491                let rsa_id = relay_ids
1492                    .rsa_identity()
1493                    .map(|id| id.to_string())
1494                    .unwrap_or_else(|| "unknown".into());
1495
1496                async move {
1497                    let run_upload = |desc| async {
1498                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
1499                            // This should never happen (all of our relay_ids are from the stored
1500                            // netdir).
1501                            let err =
1502                                "tried to upload descriptor to relay not found in consensus?!";
1503                            warn!(
1504                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1505                                "{err}"
1506                            );
1507                            return Err(internal!("{err}").into());
1508                        };
1509
1510                        Self::upload_descriptor_with_retries(
1511                            desc,
1512                            &netdir,
1513                            &hsdir,
1514                            &ed_id,
1515                            &rsa_id,
1516                            Arc::clone(&imm),
1517                        )
1518                        .await
1519                    };
1520
1521                    // How long until we're supposed to time out?
1522                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
1523                    // We generate a new descriptor before _each_ HsDir upload. This means each
1524                    // HsDir could, in theory, receive a different descriptor (not just in terms of
1525                    // revision-counters, but also with a different set of IPTs). It may seem like
1526                    // this could lead to some HsDirs being left with an outdated descriptor, but
1527                    // that's not the case: after the upload completes, the publisher will be
1528                    // notified by the ipt_watcher of the IPT change event (if there was one to
1529                    // begin with), which will trigger another upload job.
1530                    let hsdesc = {
1531                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
1532                        // needs to fall out of scope before the await point below
1533                        let mut ipt_set = ipt_upload_view.borrow_for_publish();
1534
1535                        // If there are no IPTs, we abort the upload. At this point, we might have
1536                        // uploaded the descriptor to some, but not all, HSDirs from the specified
1537                        // time period.
1538                        //
1539                        // Returning an error here means the upload completion task is never
1540                        // notified of the outcome of any of these uploads (which means the
1541                        // descriptor is not marked clean). This is OK, because if we suddenly find
1542                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
1543                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
1544                        // and generate a fresh descriptor anyway.
1545                        //
1546                        // Ideally, this shouldn't happen very often (if at all).
1547                        let Some(ipts) = ipt_set.ipts.as_mut() else {
1548                            return Err(PublishError::NoIpts);
1549                        };
1550
1551                        let hsdesc = {
1552                            trace!(
1553                                nickname=%imm.nickname, time_period=?time_period,
1554                                "building descriptor"
1555                            );
1556                            let mut rng = imm.mockable.thread_rng();
1557                            let mut key_rng = tor_llcrypto::rng::CautiousRng;
1558
1559                            // We're about to generate a new version of the descriptor,
1560                            // so let's generate a new revision counter.
1561                            let now = imm.runtime.wallclock();
1562                            let revision_counter = imm.generate_revision_counter(&params, now)?;
1563
1564                            build_sign(
1565                                &imm.keymgr,
1566                                &imm.pow_manager,
1567                                &config,
1568                                authorized_clients.as_deref(),
1569                                ipts,
1570                                time_period,
1571                                revision_counter,
1572                                &mut rng,
1573                                &mut key_rng,
1574                                imm.runtime.wallclock(),
1575                            )?
1576                        };
1577
1578                        if let Err(e) =
1579                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
1580                        {
1581                            let wait = e.log_retry_max(&imm.nickname)?;
1582                            // TODO (#1226): retry instead of this
1583                            return Err(FatalError::Bug(internal!(
1584                                "ought to retry after {wait:?}, crashing instead"
1585                            ))
1586                            .into());
1587                        }
1588
1589                        hsdesc
1590                    };
1591
1592                    let VersionedDescriptor {
1593                        desc,
1594                        revision_counter,
1595                    } = hsdesc;
1596
1597                    trace!(
1598                        nickname=%imm.nickname, time_period=?time_period,
1599                        revision_counter=?revision_counter,
1600                        "generated new descriptor for time period",
1601                    );
1602
1603                    // (Actually launch the upload attempt. No timeout is needed
1604                    // here, since the backoff::Runner code will handle that for us.)
1605                    let upload_res: UploadResult = select_biased! {
1606                        shutdown = shutdown_rx.next().fuse() => {
1607                            // This will always be None, since Void is uninhabited.
1608                            let _: Option<Void> = shutdown;
1609
1610                            // It looks like the reactor has shut down,
1611                            // so there is no point in uploading the descriptor anymore.
1612                            //
1613                            // Let's shut down the upload task too.
1614                            trace!(
1615                                nickname=%imm.nickname, time_period=?time_period,
1616                                "upload task received shutdown signal"
1617                            );
1618
1619                            return Err(PublishError::Shutdown);
1620                        },
1621                        res = run_upload(desc.clone()).fuse() => res,
1622                    };
1623
1624                    // Note: UploadResult::Failure is only returned when
1625                    // upload_descriptor_with_retries fails, i.e. if all our retry
1626                    // attempts have failed
1627                    Ok(HsDirUploadStatus {
1628                        relay_ids,
1629                        upload_res,
1630                        revision_counter,
1631                    })
1632                }
1633            })
1634            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
1635            .boxed()
1636            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
1637            .try_collect::<Vec<_>>()
1638            .await;
1639
1640        let upload_results = match upload_results {
1641            Ok(v) => v,
1642            Err(PublishError::Fatal(e)) => return Err(e),
1643            Err(PublishError::NoIpts) => {
1644                debug!(
1645                    nickname=%imm.nickname, time_period=?time_period,
1646                     "no introduction points; skipping upload"
1647                );
1648
1649                return Ok(());
1650            }
1651            Err(PublishError::Shutdown) => {
1652                debug!(
1653                    nickname=%imm.nickname, time_period=?time_period,
1654                     "the reactor has shut down; aborting upload"
1655                );
1656
1657                return Ok(());
1658            }
1659        };
1660
1661        let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
1662            .iter()
1663            .partition(|res| res.upload_res.is_ok());
1664
1665        debug!(
1666            nickname=%imm.nickname, time_period=?time_period,
1667            "descriptor uploaded successfully to {}/{} HSDirs",
1668            succeeded.len(), hsdir_count
1669        );
1670
1671        if upload_task_complete_tx
1672            .send(TimePeriodUploadResult {
1673                time_period,
1674                hsdir_result: upload_results,
1675            })
1676            .await
1677            .is_err()
1678        {
1679            return Err(internal!(
1680                "failed to notify reactor of upload completion (reactor shut down)"
1681            )
1682            .into());
1683        }
1684
1685        Ok(())
1686    }
1687
1688    /// Upload a descriptor to the specified HSDir.
1689    ///
1690    /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
1691    /// to the caller to retry on failure.
1692    ///
1693    /// This function does not handle timeouts.
1694    async fn upload_descriptor(
1695        hsdesc: String,
1696        netdir: &Arc<NetDir>,
1697        hsdir: &Relay<'_>,
1698        imm: Arc<Immutable<R, M>>,
1699    ) -> Result<(), UploadError> {
1700        let request = HsDescUploadRequest::new(hsdesc);
1701
1702        trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
1703            "starting descriptor upload",
1704        );
1705
1706        let circuit = imm
1707            .mockable
1708            .get_or_launch_specific(
1709                netdir,
1710                HsCircKind::SvcHsDir,
1711                OwnedCircTarget::from_circ_target(hsdir),
1712            )
1713            .await?;
1714
1715        let mut stream = circuit
1716            .begin_dir_stream()
1717            .await
1718            .map_err(UploadError::Stream)?;
1719
1720        let _response: String = send_request(&imm.runtime, &request, &mut stream, None)
1721            .await
1722            .map_err(|dir_error| -> UploadError {
1723                match dir_error {
1724                    DirClientError::RequestFailed(e) => e.into(),
1725                    DirClientError::CircMgr(e) => into_internal!(
1726                        "tor-dirclient complains about circmgr going wrong but we gave it a stream"
1727                    )(e)
1728                    .into(),
1729                    e => into_internal!("unexpected error")(e).into(),
1730                }
1731            })?
1732            .into_output_string()?; // This returns an error if we received an error response
1733
1734        Ok(())
1735    }
1736
1737    /// Upload a descriptor to the specified HSDir, retrying if appropriate.
1738    ///
1739    /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
1740    /// Each failed upload is retried until it succeeds, or until the overall timeout specified
1741    /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
1742    /// according to the [`BackoffSchedule::single_attempt_timeout`].
1743    /// This function gives up after the overall timeout elapses,
1744    /// declaring the upload a failure, and never retrying it again.
1745    ///
1746    /// See also [`BackoffSchedule`].
1747    async fn upload_descriptor_with_retries(
1748        hsdesc: String,
1749        netdir: &Arc<NetDir>,
1750        hsdir: &Relay<'_>,
1751        ed_id: &str,
1752        rsa_id: &str,
1753        imm: Arc<Immutable<R, M>>,
1754    ) -> UploadResult {
1755        /// The base delay to use for the backoff schedule.
1756        const BASE_DELAY_MSEC: u32 = 1000;
1757        let schedule = PublisherBackoffSchedule {
1758            retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
1759            mockable: imm.mockable.clone(),
1760        };
1761
1762        let runner = Runner::new(
1763            "upload a hidden service descriptor".into(),
1764            schedule.clone(),
1765            imm.runtime.clone(),
1766        );
1767
1768        let fallible_op =
1769            || Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm));
1770
1771        let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
1772        match outcome {
1773            Ok(()) => {
1774                debug!(
1775                    nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1776                    "successfully uploaded descriptor to HSDir",
1777                );
1778
1779                Ok(())
1780            }
1781            Err(e) => {
1782                warn_report!(
1783                    e,
1784                    "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
1785                    imm.nickname,
1786                    ed_id,
1787                    rsa_id
1788                );
1789
1790                Err(e.into())
1791            }
1792        }
1793    }
1794
1795    /// Stop publishing descriptors until the specified delay elapses.
1796    async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
1797        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1798            debug!(
1799                "We are rate-limited for {}; pausing descriptor publication",
1800                humantime::format_duration(delay)
1801            );
1802            let until = self.imm.runtime.now() + delay;
1803            self.update_publish_status(PublishStatus::RateLimited(until))
1804                .await?;
1805        }
1806
1807        Ok(())
1808    }
1809
1810    /// Handle the upload rate-limit being lifted.
1811    async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
1812        debug!("We are no longer rate-limited; resuming descriptor publication");
1813        self.update_publish_status(PublishStatus::UploadScheduled)
1814            .await?;
1815        Ok(())
1816    }
1817
1818    /// Return the authorized clients, if restricted mode is enabled.
1819    ///
1820    /// Returns `Ok(None)` if restricted discovery mode is disabled.
1821    ///
1822    /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
1823    #[cfg_attr(
1824        not(feature = "restricted-discovery"),
1825        allow(clippy::unnecessary_wraps)
1826    )]
1827    fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
1828        cfg_if::cfg_if! {
1829            if #[cfg(feature = "restricted-discovery")] {
1830                let authorized_clients = self
1831                    .inner
1832                    .lock()
1833                    .expect("poisoned lock")
1834                    .authorized_clients
1835                    .clone();
1836
1837                if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
1838                    return Err(FatalError::RestrictedDiscoveryNoClients);
1839                }
1840
1841                Ok(authorized_clients)
1842            } else {
1843                Ok(None)
1844            }
1845        }
1846    }
1847}
1848
1849/// Try to expand a path, logging a warning on failure.
1850fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
1851    // map_err returns unit for clarity
1852    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1853    p.path(r)
1854        .map_err(|e| {
1855            tor_error::warn_report!(e, "invalid path");
1856            ()
1857        })
1858        .ok()
1859}
1860
1861/// Add `path` to the specified `watcher`.
1862macro_rules! watch_path {
1863    ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
1864        if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
1865            warn_report!(e, "failed to watch path {:?}", $path);
1866        } else {
1867            debug!("watching path {:?}", $path);
1868        }
1869    }}
1870}
1871
1872/// Add the specified directories to the watcher.
1873#[allow(clippy::cognitive_complexity)]
1874fn watch_dirs<R: Runtime>(
1875    watcher: &mut FileWatcherBuilder<R>,
1876    dirs: &DirectoryKeyProviderList,
1877    path_resolver: &CfgPathResolver,
1878) {
1879    for path in dirs {
1880        let path = path.path();
1881        let Some(path) = maybe_expand_path(path, path_resolver) else {
1882            warn!("failed to expand key_dir path {:?}", path);
1883            continue;
1884        };
1885
1886        // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
1887        // so we skip over paths that don't exist at this time
1888        // (this obviously suffers from a TOCTOU race, but most of the time,
1889        // it is good enough at preventing the watcher from failing to watch.
1890        // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
1891        // but it will fail to set the watcher).
1892        if matches!(path.try_exists(), Ok(true)) {
1893            watch_path!(watcher, &path, watch_dir, "auth",);
1894        }
1895        // FileWatcher::watch_path causes the parent dir of the path to be watched.
1896        if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
1897            watch_path!(watcher, &path, watch_path,);
1898        }
1899    }
1900}
1901
1902/// Try to read the blinded identity key for a given `TimePeriod`.
1903///
1904/// Returns `None` if the service is running in "offline" mode.
1905///
1906// TODO (#1194): we don't currently have support for "offline" mode so this can never return
1907// `Ok(None)`.
1908pub(super) fn read_blind_id_keypair(
1909    keymgr: &Arc<KeyMgr>,
1910    nickname: &HsNickname,
1911    period: TimePeriod,
1912) -> Result<Option<HsBlindIdKeypair>, FatalError> {
1913    let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
1914    let hsid_kp = keymgr
1915        .get::<HsIdKeypair>(&svc_key_spec)?
1916        .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
1917
1918    let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
1919
1920    // TODO: make the keystore selector configurable
1921    let keystore_selector = Default::default();
1922    match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
1923        Some(kp) => Ok(Some(kp)),
1924        None => {
1925            let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
1926                .compute_blinded_key(period)
1927                .map_err(|_| internal!("failed to compute blinded key"))?;
1928
1929            // Note: we can't use KeyMgr::generate because this key is derived from the HsId
1930            // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
1931            // which assumes keys are randomly generated, rather than derived from existing keys).
1932
1933            keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
1934
1935            let arti_path = |spec: &dyn KeySpecifier| {
1936                spec.arti_path()
1937                    .map_err(into_internal!("invalid key specifier?!"))
1938            };
1939
1940            Ok(Some(
1941                keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
1942                    FatalError::KeystoreRace {
1943                        action: "read",
1944                        path: arti_path(&blind_id_key_spec)?,
1945                    },
1946                )?,
1947            ))
1948        }
1949    }
1950}
1951
1952/// Determine the [`State`] of the publisher based on the upload results
1953/// from the current `time_periods`.
1954fn upload_result_state(
1955    netdir: &NetDir,
1956    time_periods: &[TimePeriodContext],
1957) -> (State, Option<Problem>) {
1958    let current_period = netdir.hs_time_period();
1959    let current_period_res = time_periods
1960        .iter()
1961        .find(|ctx| ctx.params.time_period() == current_period);
1962
1963    let succeeded_current_tp = current_period_res
1964        .iter()
1965        .flat_map(|res| &res.upload_results)
1966        .filter(|res| res.upload_res.is_ok())
1967        .collect_vec();
1968
1969    let secondary_tp_res = time_periods
1970        .iter()
1971        .filter(|ctx| ctx.params.time_period() != current_period)
1972        .collect_vec();
1973
1974    let succeeded_secondary_tp = secondary_tp_res
1975        .iter()
1976        .flat_map(|res| &res.upload_results)
1977        .filter(|res| res.upload_res.is_ok())
1978        .collect_vec();
1979
1980    // All of the failed uploads (for all TPs)
1981    let failed = time_periods
1982        .iter()
1983        .flat_map(|res| &res.upload_results)
1984        .filter(|res| res.upload_res.is_err())
1985        .collect_vec();
1986    let problems: Vec<DescUploadRetryError> = failed
1987        .iter()
1988        .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
1989        .collect();
1990
1991    let err = match problems.as_slice() {
1992        [_, ..] => Some(problems.into()),
1993        [] => None,
1994    };
1995
1996    if time_periods.len() < 2 {
1997        // We need at least TP contexts (one for the primary TP,
1998        // and another for the secondary one).
1999        //
2000        // If either is missing, we are unreachable for some or all clients.
2001        return (State::DegradedUnreachable, err);
2002    }
2003
2004    let state = match (
2005        succeeded_current_tp.as_slice(),
2006        succeeded_secondary_tp.as_slice(),
2007    ) {
2008        (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
2009            // We don't have any upload results for one or both TPs.
2010            // We are still bootstrapping.
2011            State::Bootstrapping
2012        }
2013        (&[_, ..], &[_, ..]) if failed.is_empty() => {
2014            // We have uploaded the descriptor to one or more HsDirs from both
2015            // HsDir rings (primary and secondary), and none of the uploads failed.
2016            // We are fully reachable.
2017            State::Running
2018        }
2019        (&[_, ..], &[_, ..]) => {
2020            // We have uploaded the descriptor to one or more HsDirs from both
2021            // HsDir rings (primary and secondary), but some of the uploads failed.
2022            // We are reachable, but we failed to upload the descriptor to all the HsDirs
2023            // that were supposed to have it.
2024            State::DegradedReachable
2025        }
2026        (&[..], &[]) | (&[], &[..]) => {
2027            // We have either
2028            //   * uploaded the descriptor to some of the HsDirs from one of the rings,
2029            //   but haven't managed to upload it to any of the HsDirs on the other ring, or
2030            //   * all of the uploads failed
2031            //
2032            // Either way, we are definitely not reachable by all clients.
2033            State::DegradedUnreachable
2034        }
2035    };
2036
2037    (state, err)
2038}
2039
2040/// Whether the reactor should initiate an upload.
2041#[derive(Copy, Clone, Debug, Default, PartialEq)]
2042enum PublishStatus {
2043    /// We need to call upload_all.
2044    UploadScheduled,
2045    /// We are rate-limited until the specified [`Instant`].
2046    ///
2047    /// We have tried to schedule multiple uploads in a short time span,
2048    /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
2049    /// channel to unblock us.
2050    RateLimited(Instant),
2051    /// We are idle and waiting for external events.
2052    ///
2053    /// We have enough information to build the descriptor, but since we have already called
2054    /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
2055    Idle,
2056    /// We are waiting for the IPT manager to establish some introduction points.
2057    ///
2058    /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
2059    /// `UploadScheduled`.
2060    #[default]
2061    AwaitingIpts,
2062}
2063
2064/// The backoff schedule for the task that publishes descriptors.
2065#[derive(Clone, Debug)]
2066struct PublisherBackoffSchedule<M: Mockable> {
2067    /// The delays
2068    retry_delay: RetryDelay,
2069    /// The mockable reactor state, needed for obtaining an rng.
2070    mockable: M,
2071}
2072
2073impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
2074    fn max_retries(&self) -> Option<usize> {
2075        None
2076    }
2077
2078    fn overall_timeout(&self) -> Option<Duration> {
2079        Some(OVERALL_UPLOAD_TIMEOUT)
2080    }
2081
2082    fn single_attempt_timeout(&self) -> Option<Duration> {
2083        Some(self.mockable.estimate_upload_timeout())
2084    }
2085
2086    fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
2087        Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
2088    }
2089}
2090
2091impl RetriableError for UploadError {
2092    fn should_retry(&self) -> bool {
2093        match self {
2094            UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
2095            UploadError::Bug(_) => false,
2096        }
2097    }
2098}
2099
2100/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
2101#[derive(Debug, Clone)]
2102struct TimePeriodUploadResult {
2103    /// The time period.
2104    time_period: TimePeriod,
2105    /// The upload results.
2106    hsdir_result: Vec<HsDirUploadStatus>,
2107}
2108
2109/// The outcome of uploading a descriptor to a particular HsDir.
2110#[derive(Clone, Debug)]
2111struct HsDirUploadStatus {
2112    /// The identity of the HsDir we attempted to upload the descriptor to.
2113    relay_ids: RelayIds,
2114    /// The outcome of this attempt.
2115    upload_res: UploadResult,
2116    /// The revision counter of the descriptor we tried to upload.
2117    revision_counter: RevisionCounter,
2118}
2119
2120/// The outcome of uploading a descriptor.
2121type UploadResult = Result<(), DescUploadRetryError>;
2122
2123impl From<BackoffError<UploadError>> for DescUploadRetryError {
2124    fn from(e: BackoffError<UploadError>) -> Self {
2125        use BackoffError as BE;
2126        use DescUploadRetryError as DURE;
2127
2128        match e {
2129            BE::FatalError(e) => DURE::FatalError(e),
2130            BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
2131            BE::Timeout(e) => DURE::Timeout(e),
2132            BE::ExplicitStop(_) => {
2133                DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
2134            }
2135        }
2136    }
2137}
2138
2139// NOTE: the rest of the publisher tests live in publish.rs
2140#[cfg(test)]
2141mod test {
2142    // @@ begin test lint list maintained by maint/add_warning @@
2143    #![allow(clippy::bool_assert_comparison)]
2144    #![allow(clippy::clone_on_copy)]
2145    #![allow(clippy::dbg_macro)]
2146    #![allow(clippy::mixed_attributes_style)]
2147    #![allow(clippy::print_stderr)]
2148    #![allow(clippy::print_stdout)]
2149    #![allow(clippy::single_char_pattern)]
2150    #![allow(clippy::unwrap_used)]
2151    #![allow(clippy::unchecked_duration_subtraction)]
2152    #![allow(clippy::useless_vec)]
2153    #![allow(clippy::needless_pass_by_value)]
2154    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
2155    use super::*;
2156    use tor_netdir::testnet;
2157
2158    /// Create a `TimePeriodContext` from the specified upload results.
2159    fn create_time_period_ctx(
2160        params: &HsDirParams,
2161        upload_results: Vec<HsDirUploadStatus>,
2162    ) -> TimePeriodContext {
2163        TimePeriodContext {
2164            params: params.clone(),
2165            hs_dirs: vec![],
2166            last_successful: None,
2167            upload_results,
2168        }
2169    }
2170
2171    /// Create a single `HsDirUploadStatus`
2172    fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
2173        HsDirUploadStatus {
2174            relay_ids: RelayIds::empty(),
2175            upload_res,
2176            revision_counter: RevisionCounter::from(13),
2177        }
2178    }
2179
2180    /// Create a bunch of results, all with the specified `upload_res`.
2181    fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
2182        std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
2183            .take(10)
2184            .collect()
2185    }
2186
2187    fn construct_netdir() -> NetDir {
2188        const SRV1: [u8; 32] = *b"The door refused to open.       ";
2189        const SRV2: [u8; 32] = *b"It said, 'Five cents, please.'  ";
2190
2191        let dir = testnet::construct_custom_netdir(|_, _, bld| {
2192            bld.shared_rand_prev(7, SRV1.into(), None)
2193                .shared_rand_prev(7, SRV2.into(), None);
2194        })
2195        .unwrap();
2196
2197        dir.unwrap_if_sufficient().unwrap()
2198    }
2199
2200    #[test]
2201    fn upload_result_status_bootstrapping() {
2202        let netdir = construct_netdir();
2203        let all_params = netdir.hs_all_time_periods();
2204        let current_period = netdir.hs_time_period();
2205        let primary_params = all_params
2206            .iter()
2207            .find(|param| param.time_period() == current_period)
2208            .unwrap();
2209        let results = [
2210            (vec![], vec![]),
2211            (vec![], create_upload_results(Ok(()))),
2212            (create_upload_results(Ok(())), vec![]),
2213        ];
2214
2215        for (primary_result, secondary_result) in results {
2216            let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2217
2218            let secondary_params = all_params
2219                .iter()
2220                .find(|param| param.time_period() != current_period)
2221                .unwrap();
2222            let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2223
2224            let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2225            assert_eq!(status, State::Bootstrapping);
2226            assert!(err.is_none());
2227        }
2228    }
2229
2230    #[test]
2231    fn upload_result_status_running() {
2232        let netdir = construct_netdir();
2233        let all_params = netdir.hs_all_time_periods();
2234        let current_period = netdir.hs_time_period();
2235        let primary_params = all_params
2236            .iter()
2237            .find(|param| param.time_period() == current_period)
2238            .unwrap();
2239
2240        let secondary_result = create_upload_results(Ok(()));
2241        let secondary_params = all_params
2242            .iter()
2243            .find(|param| param.time_period() != current_period)
2244            .unwrap();
2245        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2246
2247        let primary_result = create_upload_results(Ok(()));
2248        let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2249        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2250        assert_eq!(status, State::Running);
2251        assert!(err.is_none());
2252    }
2253
2254    #[test]
2255    fn upload_result_status_reachable() {
2256        let netdir = construct_netdir();
2257        let all_params = netdir.hs_all_time_periods();
2258        let current_period = netdir.hs_time_period();
2259        let primary_params = all_params
2260            .iter()
2261            .find(|param| param.time_period() == current_period)
2262            .unwrap();
2263
2264        let primary_result = create_upload_results(Ok(()));
2265        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2266        let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2267        let secondary_result = create_upload_results(Ok(()))
2268            .into_iter()
2269            .chain(failed_res.iter().cloned())
2270            .collect();
2271        let secondary_params = all_params
2272            .iter()
2273            .find(|param| param.time_period() != current_period)
2274            .unwrap();
2275        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
2276        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2277
2278        // Degraded but reachable (because some of the secondary HsDir uploads failed).
2279        assert_eq!(status, State::DegradedReachable);
2280        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2281    }
2282
2283    #[test]
2284    fn upload_result_status_unreachable() {
2285        let netdir = construct_netdir();
2286        let all_params = netdir.hs_all_time_periods();
2287        let current_period = netdir.hs_time_period();
2288        let primary_params = all_params
2289            .iter()
2290            .find(|param| param.time_period() == current_period)
2291            .unwrap();
2292        let mut primary_result =
2293            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2294        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2295        // No secondary TP (we are unreachable).
2296        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2297        assert_eq!(status, State::DegradedUnreachable);
2298        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2299
2300        // Add a successful result
2301        primary_result.push(create_upload_status(Ok(())));
2302        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2303        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2304        // Still degraded, and unreachable (because we don't have a TimePeriodContext
2305        // for the secondary TP)
2306        assert_eq!(status, State::DegradedUnreachable);
2307        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2308
2309        // If we add another time period where none of the uploads were successful,
2310        // we're *still* unreachable
2311        let secondary_result =
2312            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2313        let secondary_params = all_params
2314            .iter()
2315            .find(|param| param.time_period() != current_period)
2316            .unwrap();
2317        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2318        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2319        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2320        assert_eq!(status, State::DegradedUnreachable);
2321        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2322    }
2323}