tor_hsservice/publish/
reactor.rs

1//! The onion service publisher reactor.
2//!
3//! Generates and publishes hidden service descriptors in response to various events.
4//!
5//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8//! any of the channels the reactor is receiving events from is closed
9//! (i.e. when the senders are dropped).
10//!
11//! ## Publisher status
12//!
13//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14//! which is used for onion service status reporting.
15//!
16//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17//! and responds by generating and publishing a new descriptor if needed.
18//!
19//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20//!
21//! ## When do we publish?
22//!
23//! We generate and publish a new descriptor if
24//!   * the introduction points have changed
25//!   * the onion service configuration has changed in a meaningful way (for example,
26//!     if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27//!     has changed. See [`OnionServiceConfigPublisherView`]).
28//!   * there is a new consensus
29//!   * it is time to republish the descriptor (after we upload a descriptor,
30//!     we schedule it for republishing at a random time between 60 minutes and 120 minutes
31//!     in the future)
32//!
33//! ## Onion service status
34//!
35//! With respect to [`OnionServiceStatus`] reporting,
36//! the following state transitions are possible:
37//!
38//!
39//! ```ignore
40//!
41//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42//!                +---------------------------------------+
43//!                |                                       |
44//!                |                                       v
45//!                |                               +---------------+
46//!                |                               | Bootstrapping |
47//!                |                               +---------------+
48//!                |                                       |
49//!                |                                       |           uploaded to at least
50//!                |  not enough HsDir uploads succeeded   |        some HsDirs from each ring
51//!                |         +-----------------------------+-----------------------+
52//!                |         |                             |                       |
53//!                |         |              all HsDir uploads succeeded            |
54//!                |         |                             |                       |
55//!                |         v                             v                       v
56//!                |  +---------------------+         +---------+        +---------------------+
57//!                |  | DegradedUnreachable |         | Running |        |  DegradedReachable  |
58//! +----------+   |  +---------------------+         +---------+        +---------------------+
59//! | Shutdown |-- |         |                           |                        |
60//! +----------+   |         |                           |                        |
61//!                |         |                           |                        |
62//!                |         |                           |                        |
63//!                |         +---------------------------+------------------------+
64//!                |                                     |   invalid authorized_clients
65//!                |                                     |      after handling config change
66//!                |                                     |
67//!                |                                     v
68//!                |     run_once() returns an error +--------+
69//!                +-------------------------------->| Broken |
70//!                                                  +--------+
71//! ```
72//!
73//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74//! back to `Bootstrapping` (those transitions were omitted for brevity).
75
76use tor_config::file_watcher::{
77    self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
78};
79use tor_config_path::{CfgPath, CfgPathResolver};
80use tor_netdir::{DirEvent, NetDir};
81
82use crate::config::restricted_discovery::{
83    DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
84};
85use crate::config::OnionServiceConfigPublisherView;
86use crate::status::{DescUploadRetryError, Problem};
87
88use super::*;
89
90// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
91// for rate-limiting the publish jobs triggered by a change in the config?
92//
93// Currently the descriptor publish tasks triggered by changes in the config
94// are rate-limited via the usual rate limiting mechanism
95// (which rate-limits the uploads for 1m).
96//
97// I think this is OK for now, but we might need to rethink this if it becomes problematic
98// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
99// each time the config is modified).
100
101/// The upload rate-limiting threshold.
102///
103/// Before initiating an upload, the reactor checks if the last upload was at least
104/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
105/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
106/// current time.
107//
108// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
109const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
110
111/// The maximum number of concurrent upload tasks per time period.
112//
113// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
114// will have no effect, since the current number of replicas is far less than
115// this value.
116//
117// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
118// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
119// (currently 2, which means the concurrency limit will, in fact, be 32).
120//
121// We should try to decouple this value from the TP parameters.
122const MAX_CONCURRENT_UPLOADS: usize = 16;
123
124/// The maximum time allowed for uploading a descriptor to a single HSDir,
125/// across all attempts.
126pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
127
128/// A reactor for the HsDir [`Publisher`]
129///
130/// The entrypoint is [`Reactor::run`].
131#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
132pub(super) struct Reactor<R: Runtime, M: Mockable> {
133    /// The immutable, shared inner state.
134    imm: Arc<Immutable<R, M>>,
135    /// A source for new network directories that we use to determine
136    /// our HsDirs.
137    dir_provider: Arc<dyn NetDirProvider>,
138    /// The mutable inner state,
139    inner: Arc<Mutex<Inner>>,
140    /// A channel for receiving IPT change notifications.
141    ipt_watcher: IptsPublisherView,
142    /// A channel for receiving onion service config change notifications.
143    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
144    /// A channel for receiving restricted discovery key_dirs change notifications.
145    key_dirs_rx: FileEventReceiver,
146    /// A channel for sending restricted discovery key_dirs change notifications.
147    ///
148    /// A copy of this sender is handed out to every `FileWatcher` created.
149    key_dirs_tx: FileEventSender,
150    /// A channel for receiving updates regarding our [`PublishStatus`].
151    ///
152    /// The main loop of the reactor watches for updates on this channel.
153    ///
154    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
155    /// we can start publishing descriptors.
156    ///
157    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
158    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
159    /// established some introduction points.
160    publish_status_rx: watch::Receiver<PublishStatus>,
161    /// A sender for updating our [`PublishStatus`].
162    ///
163    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
164    /// we can start publishing descriptors.
165    publish_status_tx: watch::Sender<PublishStatus>,
166    /// A channel for sending upload completion notifications.
167    ///
168    /// This channel is polled in the main loop of the reactor.
169    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
170    /// A channel for receiving upload completion notifications.
171    ///
172    /// A copy of this sender is handed to each upload task.
173    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
174    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
175    ///
176    /// Receivers can use this channel to find out when reactor is dropped.
177    ///
178    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
179    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
180    ///
181    /// Closing this channel will cause any pending upload tasks to be dropped.
182    shutdown_tx: broadcast::Sender<Void>,
183    /// Path resolver for configuration files.
184    path_resolver: Arc<CfgPathResolver>,
185}
186
187/// The immutable, shared state of the descriptor publisher reactor.
188#[derive(Clone)]
189struct Immutable<R: Runtime, M: Mockable> {
190    /// The runtime.
191    runtime: R,
192    /// Mockable state.
193    ///
194    /// This is used for launching circuits and for obtaining random number generators.
195    mockable: M,
196    /// The service for which we're publishing descriptors.
197    nickname: HsNickname,
198    /// The key manager,
199    keymgr: Arc<KeyMgr>,
200    /// A sender for updating the status of the onion service.
201    status_tx: PublisherStatusSender,
202}
203
204impl<R: Runtime, M: Mockable> Immutable<R, M> {
205    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
206    /// with the specified [`TimePeriod`].
207    ///
208    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
209    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
210    /// descriptor signing key.
211    ///
212    /// Returns an error if the service is running in offline mode and the descriptor signing
213    /// keypair of the specified `period` is not available.
214    //
215    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
216    // built from the blinded id key
217    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
218        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
219            Some(key) => {
220                let key: ed25519::ExpandedKeypair = key.into();
221                key.to_secret_key_bytes()[0..32]
222                    .try_into()
223                    .expect("Wrong length on slice")
224            }
225            None => {
226                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
227                // is unreachable (for now).
228                let desc_sign_key_spec =
229                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
230                let key: ed25519::Keypair = self
231                    .keymgr
232                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
233                    // TODO (#1194): internal! is not the right type for this error (we need an
234                    // error type for the case where a hidden service running in offline mode has
235                    // run out of its pre-previsioned keys).
236                    //
237                    // This will be addressed when we add support for offline hs_id mode
238                    .ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
239                    .into();
240                key.to_bytes()
241            }
242        };
243
244        Ok(AesOpeKey::from_secret(&ope_key))
245    }
246
247    /// Generate a revision counter for a descriptor associated with the specified
248    /// [`TimePeriod`].
249    ///
250    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
251    ///
252    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
253    fn generate_revision_counter(
254        &self,
255        params: &HsDirParams,
256        now: SystemTime,
257    ) -> Result<RevisionCounter, FatalError> {
258        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
259        // to each time we generate a new descriptor), for performance reasons.
260        let ope_key = self.create_ope_key(params.time_period())?;
261
262        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
263        let srv_start = params.start_of_shard_rand_period();
264        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
265            internal!(
266                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
267                now,
268                srv_start
269            )
270        })?;
271        let rev = ope_key.encrypt(offset);
272
273        Ok(RevisionCounter::from(rev))
274    }
275}
276
277/// Mockable state for the descriptor publisher reactor.
278///
279/// This enables us to mock parts of the [`Reactor`] for testing purposes.
280#[async_trait]
281pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
282    /// The type of random number generator.
283    type Rng: rand::Rng + rand::CryptoRng;
284
285    /// The type of client circuit.
286    type ClientCirc: MockableClientCirc;
287
288    /// Return a random number generator.
289    fn thread_rng(&self) -> Self::Rng;
290
291    /// Create a circuit of the specified `kind` to `target`.
292    async fn get_or_launch_specific<T>(
293        &self,
294        netdir: &NetDir,
295        kind: HsCircKind,
296        target: T,
297    ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
298    where
299        T: CircTarget + Send + Sync;
300
301    /// Return an estimate-based value for how long we should allow a single
302    /// directory upload operation to complete.
303    ///
304    /// Includes circuit construction, stream opening, upload, and waiting for a
305    /// response.
306    fn estimate_upload_timeout(&self) -> Duration;
307}
308
309/// Mockable client circuit
310#[async_trait]
311pub(crate) trait MockableClientCirc: Send + Sync {
312    /// The data stream type.
313    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
314
315    /// Start a new stream to the last relay in the circuit, using
316    /// a BEGIN_DIR cell.
317    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
318}
319
320#[async_trait]
321impl MockableClientCirc for ClientCirc {
322    type DataStream = tor_proto::stream::DataStream;
323
324    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
325        ClientCirc::begin_dir_stream(self).await
326    }
327}
328
329/// The real version of the mockable state of the reactor.
330#[derive(Clone, From, Into)]
331pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
332
333#[async_trait]
334impl<R: Runtime> Mockable for Real<R> {
335    type Rng = rand::rngs::ThreadRng;
336    type ClientCirc = ClientCirc;
337
338    fn thread_rng(&self) -> Self::Rng {
339        rand::rng()
340    }
341
342    async fn get_or_launch_specific<T>(
343        &self,
344        netdir: &NetDir,
345        kind: HsCircKind,
346        target: T,
347    ) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
348    where
349        T: CircTarget + Send + Sync,
350    {
351        self.0.get_or_launch_specific(netdir, kind, target).await
352    }
353
354    fn estimate_upload_timeout(&self) -> Duration {
355        use tor_circmgr::timeouts::Action;
356        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
357        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
358        // We assume that in the worst case we'll have to wait for an entire
359        // circuit construction and two round-trips to the hsdir.
360        let est_total = est_build + est_roundtrip * 2;
361        // We always allow _at least_ this much time, in case our estimate is
362        // ridiculously low.
363        let min_timeout = Duration::from_secs(30);
364        max(est_total, min_timeout)
365    }
366}
367
368/// The mutable state of a [`Reactor`].
369struct Inner {
370    /// The onion service config.
371    config: Arc<OnionServiceConfigPublisherView>,
372    /// Watcher for key_dirs.
373    ///
374    /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
375    ///
376    /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
377    file_watcher: Option<FileWatcher>,
378    /// The relevant time periods.
379    ///
380    /// This includes the current time period, as well as any other time periods we need to be
381    /// publishing descriptors for.
382    ///
383    /// This is empty until we fetch our first netdir in [`Reactor::run`].
384    time_periods: Vec<TimePeriodContext>,
385    /// Our most up to date netdir.
386    ///
387    /// This is initialized in [`Reactor::run`].
388    netdir: Option<Arc<NetDir>>,
389    /// The timestamp of our last upload.
390    ///
391    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
392    /// the publisher from spawning multiple upload tasks at once in response to multiple external
393    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
394    /// notifications in a short time frame (#1142), or an IPT change notification that's
395    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
396    /// inefficient, but it also causes the publisher to generate two different descriptors with
397    /// the same revision counter (the revision counter is derived from the current timestamp),
398    /// which ultimately causes the slower upload task to fail (see #1142).
399    ///
400    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
401    /// used for retrying failed uploads (these are handled internally by
402    /// [`Reactor::upload_descriptor_with_retries`]).
403    last_uploaded: Option<Instant>,
404    /// A max-heap containing the time periods for which we need to reupload the descriptor.
405    // TODO: we are currently reuploading more than nececessary.
406    // Ideally, this shouldn't contain contain duplicate TimePeriods,
407    // because we only need to retain the latest reupload time for each time period.
408    //
409    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
410    // we will end up with multiple ReuploadTimer entries for that TP,
411    // each of which will (eventually) result in a reupload.
412    //
413    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
414    //
415    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
416    reupload_timers: BinaryHeap<ReuploadTimer>,
417    /// The restricted discovery authorized clients.
418    ///
419    /// `None`, unless the service is running in restricted discovery mode.
420    authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
421}
422
423/// The part of the reactor state that changes with every time period.
424struct TimePeriodContext {
425    /// The HsDir params.
426    params: HsDirParams,
427    /// The HsDirs to use in this time period.
428    ///
429    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
430    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
431    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
432    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
433    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
434    /// The revision counter of the last successful upload, if any.
435    last_successful: Option<RevisionCounter>,
436    /// The outcome of the last upload, if any.
437    upload_results: Vec<HsDirUploadStatus>,
438}
439
440impl TimePeriodContext {
441    /// Create a new `TimePeriodContext`.
442    ///
443    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
444    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
445    fn new<'r>(
446        params: HsDirParams,
447        blind_id: HsBlindId,
448        netdir: &Arc<NetDir>,
449        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
450        old_upload_results: Vec<HsDirUploadStatus>,
451    ) -> Result<Self, FatalError> {
452        let period = params.time_period();
453        let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
454        let upload_results = old_upload_results
455            .into_iter()
456            .filter(|res|
457                // Check if the HsDir of this result still exists
458                hs_dirs
459                    .iter()
460                    .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
461            .collect();
462
463        Ok(Self {
464            params,
465            hs_dirs,
466            last_successful: None,
467            upload_results,
468        })
469    }
470
471    /// Recompute the HsDirs for this time period.
472    fn compute_hsdirs<'r>(
473        period: TimePeriod,
474        blind_id: HsBlindId,
475        netdir: &Arc<NetDir>,
476        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
477    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
478        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
479
480        Ok(hs_dirs
481            .map(|hs_dir| {
482                let mut builder = RelayIds::builder();
483                if let Some(ed_id) = hs_dir.ed_identity() {
484                    builder.ed_identity(*ed_id);
485                }
486
487                if let Some(rsa_id) = hs_dir.rsa_identity() {
488                    builder.rsa_identity(*rsa_id);
489                }
490
491                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
492
493                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
494                // reupload it unless it was already dirty and due for a reupload.
495                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
496                    Some((_, status)) => *status,
497                    None => DescriptorStatus::Dirty,
498                };
499
500                (relay_id, status)
501            })
502            .collect::<Vec<_>>())
503    }
504
505    /// Mark the descriptor dirty for all HSDirs of this time period.
506    fn mark_all_dirty(&mut self) {
507        self.hs_dirs
508            .iter_mut()
509            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
510    }
511
512    /// Update the upload result for this time period.
513    fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
514        self.upload_results = upload_results;
515    }
516}
517
518/// An error that occurs while trying to upload a descriptor.
519#[derive(Clone, Debug, thiserror::Error)]
520#[non_exhaustive]
521pub enum UploadError {
522    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
523    #[error("descriptor upload request failed: {}", _0.error)]
524    Request(#[from] RequestFailedError),
525
526    /// Failed to establish circuit to hidden service directory
527    #[error("could not build circuit to HsDir")]
528    Circuit(#[from] tor_circmgr::Error),
529
530    /// Failed to establish stream to hidden service directory
531    #[error("failed to establish directory stream to HsDir")]
532    Stream(#[source] tor_proto::Error),
533
534    /// An internal error.
535    #[error("Internal error")]
536    Bug(#[from] tor_error::Bug),
537}
538define_asref_dyn_std_error!(UploadError);
539
540impl<R: Runtime, M: Mockable> Reactor<R, M> {
541    /// Create a new `Reactor`.
542    #[allow(clippy::too_many_arguments)]
543    pub(super) fn new(
544        runtime: R,
545        nickname: HsNickname,
546        dir_provider: Arc<dyn NetDirProvider>,
547        mockable: M,
548        config: &OnionServiceConfig,
549        ipt_watcher: IptsPublisherView,
550        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
551        status_tx: PublisherStatusSender,
552        keymgr: Arc<KeyMgr>,
553        path_resolver: Arc<CfgPathResolver>,
554    ) -> Self {
555        /// The maximum size of the upload completion notifier channel.
556        ///
557        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
558        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
559        /// each sender will send exactly one message.
560        const UPLOAD_CHAN_BUF_SIZE: usize = 0;
561
562        // Internally-generated instructions, no need for mq.
563        let (upload_task_complete_tx, upload_task_complete_rx) =
564            mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
565
566        let (publish_status_tx, publish_status_rx) = watch::channel();
567        // Setting the buffer size to zero here is OK,
568        // since we never actually send anything on this channel.
569        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
570
571        let authorized_clients =
572            Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
573
574        // Create a channel for watching for changes in the configured
575        // restricted_discovery.key_dirs.
576        let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
577
578        let imm = Immutable {
579            runtime,
580            mockable,
581            nickname,
582            keymgr,
583            status_tx,
584        };
585
586        let inner = Inner {
587            time_periods: vec![],
588            config: Arc::new(config.into()),
589            file_watcher: None,
590            netdir: None,
591            last_uploaded: None,
592            reupload_timers: Default::default(),
593            authorized_clients,
594        };
595
596        Self {
597            imm: Arc::new(imm),
598            inner: Arc::new(Mutex::new(inner)),
599            dir_provider,
600            ipt_watcher,
601            config_rx,
602            key_dirs_rx,
603            key_dirs_tx,
604            publish_status_rx,
605            publish_status_tx,
606            upload_task_complete_rx,
607            upload_task_complete_tx,
608            shutdown_tx,
609            path_resolver,
610        }
611    }
612
613    /// Start the reactor.
614    ///
615    /// Under normal circumstances, this function runs indefinitely.
616    ///
617    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
618    /// upload fails or is rate-limited.
619    pub(super) async fn run(mut self) -> Result<(), FatalError> {
620        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
621
622        {
623            let netdir = self
624                .dir_provider
625                .wait_for_netdir(Timeliness::Timely)
626                .await?;
627            let time_periods = self.compute_time_periods(&netdir, &[])?;
628
629            let mut inner = self.inner.lock().expect("poisoned lock");
630
631            inner.netdir = Some(netdir);
632            inner.time_periods = time_periods;
633        }
634
635        // Create the initial key_dirs watcher.
636        self.update_file_watcher();
637
638        loop {
639            match self.run_once().await {
640                Ok(ShutdownStatus::Continue) => continue,
641                Ok(ShutdownStatus::Terminate) => {
642                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
643
644                    self.imm.status_tx.send_shutdown();
645                    return Ok(());
646                }
647                Err(e) => {
648                    error_report!(
649                        e,
650                        "HS service {}: descriptor publisher crashed!",
651                        self.imm.nickname
652                    );
653
654                    self.imm.status_tx.send_broken(e.clone());
655
656                    return Err(e);
657                }
658            }
659        }
660    }
661
662    /// Run one iteration of the reactor loop.
663    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
664        let mut netdir_events = self.dir_provider.events();
665
666        // Note: TrackingNow tracks the values it is compared with.
667        // This is equivalent to sleeping for (until - now) units of time,
668        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
669        if let PublishStatus::RateLimited(until) = self.status() {
670            if upload_rate_lim > until {
671                // We are no longer rate-limited
672                self.expire_rate_limit().await?;
673            }
674        }
675
676        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
677        let mut reupload_periods = vec![];
678        {
679            let mut inner = self.inner.lock().expect("poisoned lock");
680            let inner = &mut *inner;
681            while let Some(reupload) = inner.reupload_timers.peek().copied() {
682                // First, extract all the timeouts that already elapsed.
683                if reupload.when <= reupload_tracking {
684                    inner.reupload_timers.pop();
685                    reupload_periods.push(reupload.period);
686                } else {
687                    // We are not ready to schedule any more reuploads.
688                    //
689                    // How much we need to sleep is implicitly
690                    // tracked in reupload_tracking (through
691                    // the TrackingNow implementation)
692                    break;
693                }
694            }
695        }
696
697        // Check if it's time to schedule any reuploads.
698        for period in reupload_periods {
699            if self.mark_dirty(&period) {
700                debug!(
701                    time_period=?period,
702                    "descriptor reupload timer elapsed; scheduling reupload",
703                );
704                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
705                    .await?;
706            }
707        }
708
709        select_biased! {
710            res = self.upload_task_complete_rx.next().fuse() => {
711                let Some(upload_res) = res else {
712                    return Ok(ShutdownStatus::Terminate);
713                };
714
715                self.handle_upload_results(upload_res);
716                self.upload_result_to_svc_status()?;
717            },
718            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
719                self.expire_rate_limit().await?;
720            },
721            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
722                // Run another iteration, executing run_once again. This time, we will remove the
723                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
724                // relevant HsDirs, and schedule the upload by setting our status to
725                // UploadScheduled.
726                return Ok(ShutdownStatus::Continue);
727            },
728            netdir_event = netdir_events.next().fuse() => {
729                let Some(netdir_event) = netdir_event else {
730                    debug!("netdir event stream ended");
731                    return Ok(ShutdownStatus::Terminate);
732                };
733
734                if !matches!(netdir_event, DirEvent::NewConsensus) {
735                    return Ok(ShutdownStatus::Continue);
736                };
737
738                // The consensus changed. Grab a new NetDir.
739                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
740                    Ok(y) => y,
741                    Err(e) => {
742                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
743                        // Hopefully a netdir will appear in the future.
744                        // in the meantime, suspend operations.
745                        //
746                        // TODO (#1218): there is a bug here: we stop reading on our inputs
747                        // including eg publish_status_rx, but it is our job to log some of
748                        // these things.  While we are waiting for a netdir, all those messages
749                        // are "stuck"; they'll appear later, with misleading timestamps.
750                        //
751                        // Probably this should be fixed by moving the logging
752                        // out of the reactor, where it won't be blocked.
753                        self.dir_provider.wait_for_netdir(Timeliness::Timely)
754                            .await?
755                    }
756                };
757                let relevant_periods = netdir.hs_all_time_periods();
758                self.handle_consensus_change(netdir).await?;
759                expire_publisher_keys(
760                    &self.imm.keymgr,
761                    &self.imm.nickname,
762                    &relevant_periods,
763                ).unwrap_or_else(|e| {
764                    error_report!(e, "failed to remove expired keys");
765                });
766            }
767            update = self.ipt_watcher.await_update().fuse() => {
768                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
769                    return Ok(ShutdownStatus::Terminate);
770                }
771            },
772            config = self.config_rx.next().fuse() => {
773                let Some(config) = config else {
774                    return Ok(ShutdownStatus::Terminate);
775                };
776
777                self.handle_svc_config_change(&config).await?;
778            },
779            res = self.key_dirs_rx.next().fuse() => {
780                let Some(event) = res else {
781                    return Ok(ShutdownStatus::Terminate);
782                };
783
784                while let Some(_ignore) = self.key_dirs_rx.try_recv() {
785                    // Discard other events, so that we only reload once.
786                }
787
788                self.handle_key_dirs_change(event).await?;
789            }
790            should_upload = self.publish_status_rx.next().fuse() => {
791                let Some(should_upload) = should_upload else {
792                    return Ok(ShutdownStatus::Terminate);
793                };
794
795                // Our PublishStatus changed -- are we ready to publish?
796                if should_upload == PublishStatus::UploadScheduled {
797                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
798                    self.upload_all().await?;
799                }
800            }
801        }
802
803        Ok(ShutdownStatus::Continue)
804    }
805
806    /// Returns the current status of the publisher
807    fn status(&self) -> PublishStatus {
808        *self.publish_status_rx.borrow()
809    }
810
811    /// Handle a batch of upload outcomes,
812    /// possibly updating the status of the descriptor for the corresponding HSDirs.
813    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
814        let mut inner = self.inner.lock().expect("poisoned lock");
815        let inner = &mut *inner;
816
817        // Check which time period these uploads pertain to.
818        let period = inner
819            .time_periods
820            .iter_mut()
821            .find(|ctx| ctx.params.time_period() == results.time_period);
822
823        let Some(period) = period else {
824            // The uploads were for a time period that is no longer relevant, so we
825            // can ignore the result.
826            return;
827        };
828
829        // We will need to reupload this descriptor at at some point, so we pick
830        // a random time between 60 minutes and 120 minutes in the future.
831        //
832        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
833        let mut rng = self.imm.mockable.thread_rng();
834        // TODO SPEC: Control republish period using a consensus parameter?
835        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
836        let duration = Duration::from_secs(minutes * 60);
837        let reupload_when = self.imm.runtime.now() + duration;
838        let time_period = period.params.time_period();
839
840        info!(
841            time_period=?time_period,
842            "reuploading descriptor in {}",
843            humantime::format_duration(duration),
844        );
845
846        inner.reupload_timers.push(ReuploadTimer {
847            period: time_period,
848            when: reupload_when,
849        });
850
851        let mut upload_results = vec![];
852        for upload_res in results.hsdir_result {
853            let relay = period
854                .hs_dirs
855                .iter_mut()
856                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
857
858            let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
859                // This HSDir went away, so the result doesn't matter.
860                // Continue processing the rest of the results
861                continue;
862            };
863
864            if upload_res.upload_res.is_ok() {
865                let update_last_successful = match period.last_successful {
866                    None => true,
867                    Some(counter) => counter <= upload_res.revision_counter,
868                };
869
870                if update_last_successful {
871                    period.last_successful = Some(upload_res.revision_counter);
872                    // TODO (#1098): Is it possible that this won't update the statuses promptly
873                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
874                    // and start an upload task for a descriptor has already been uploaded (or is
875                    // being uploaded) in another task, but whose upload results have not yet been
876                    // processed.
877                    //
878                    // This is probably made worse by the fact that the statuses are updated in
879                    // batches (grouped by time period), rather than one by one as the upload tasks
880                    // complete (updating the status involves locking the inner mutex, and I wanted
881                    // to minimize the locking/unlocking overheads). I'm not sure handling the
882                    // updates in batches was the correct decision here.
883                    *status = DescriptorStatus::Clean;
884                }
885            }
886
887            upload_results.push(upload_res);
888        }
889
890        period.set_upload_results(upload_results);
891    }
892
893    /// Maybe update our list of HsDirs.
894    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
895        trace!("the consensus has changed; recomputing HSDirs");
896
897        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
898
899        self.recompute_hs_dirs()?;
900        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
901            .await?;
902
903        // If the time period has changed, some of our upload results may now be irrelevant,
904        // so we might need to update our status (for example, if our uploads are
905        // for a no-longer-relevant time period, it means we might be able to update
906        // out status from "degraded" to "running")
907        self.upload_result_to_svc_status()?;
908
909        Ok(())
910    }
911
912    /// Recompute the HsDirs for all relevant time periods.
913    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
914        let mut inner = self.inner.lock().expect("poisoned lock");
915        let inner = &mut *inner;
916
917        let netdir = Arc::clone(
918            inner
919                .netdir
920                .as_ref()
921                .ok_or_else(|| internal!("started upload task without a netdir"))?,
922        );
923
924        // Update our list of relevant time periods.
925        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
926        inner.time_periods = new_time_periods;
927
928        Ok(())
929    }
930
931    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
932    ///
933    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
934    /// HsDirs where possible.
935    fn compute_time_periods(
936        &self,
937        netdir: &Arc<NetDir>,
938        time_periods: &[TimePeriodContext],
939    ) -> Result<Vec<TimePeriodContext>, FatalError> {
940        netdir
941            .hs_all_time_periods()
942            .iter()
943            .map(|params| {
944                let period = params.time_period();
945                let blind_id_kp =
946                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
947                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
948                        // It's supposed to return Ok(None) if we're in offline hsid mode,
949                        // but that might change when we do #1194
950                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;
951
952                let blind_id: HsBlindIdKey = (&blind_id_kp).into();
953
954                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
955                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
956                // publishing the descriptor to the HsDirs that already have it (the ones that are
957                // marked with DescriptorStatus::Clean).
958                //
959                // In other words, we only want to publish to those HsDirs that
960                //   * are part of a new time period (which we have never published the descriptor
961                //   for), or
962                //   * have just been added to the ring of a time period we already knew about
963                if let Some(ctx) = time_periods
964                    .iter()
965                    .find(|ctx| ctx.params.time_period() == period)
966                {
967                    TimePeriodContext::new(
968                        params.clone(),
969                        blind_id.into(),
970                        netdir,
971                        ctx.hs_dirs.iter(),
972                        ctx.upload_results.clone(),
973                    )
974                } else {
975                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
976                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
977                    TimePeriodContext::new(
978                        params.clone(),
979                        blind_id.into(),
980                        netdir,
981                        iter::empty(),
982                        vec![],
983                    )
984                }
985            })
986            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
987    }
988
989    /// Replace the old netdir with the new, returning the old.
990    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
991        self.inner
992            .lock()
993            .expect("poisoned lock")
994            .netdir
995            .replace(new_netdir)
996    }
997
998    /// Replace our view of the service config with `new_config` if `new_config` contains changes
999    /// that would cause us to generate a new descriptor.
1000    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
1001        let mut inner = self.inner.lock().expect("poisoned lock");
1002        let old_config = &mut inner.config;
1003
1004        // The fields we're interested in haven't changed, so there's no need to update
1005        // `inner.config`.
1006        if *old_config == new_config {
1007            return false;
1008        }
1009
1010        let log_change = match (
1011            old_config.restricted_discovery.enabled,
1012            new_config.restricted_discovery.enabled,
1013        ) {
1014            (true, false) => Some("Disabling restricted discovery mode"),
1015            (false, true) => Some("Enabling restricted discovery mode"),
1016            _ => None,
1017        };
1018
1019        if let Some(msg) = log_change {
1020            info!(nickname=%self.imm.nickname, "{}", msg);
1021        }
1022
1023        let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
1024
1025        true
1026    }
1027
1028    /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
1029    fn update_file_watcher(&self) {
1030        let mut inner = self.inner.lock().expect("poisoned lock");
1031        if inner.config.restricted_discovery.watch_configuration() {
1032            debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
1033            let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
1034
1035            let dirs = inner.config.restricted_discovery.key_dirs().clone();
1036
1037            watch_dirs(&mut watcher, &dirs, &self.path_resolver);
1038
1039            let watcher = watcher
1040                .start_watching(self.key_dirs_tx.clone())
1041                .map_err(|e| {
1042                    // TODO: update the publish status (see also the module-level TODO about this).
1043                    error_report!(e, "Cannot set file watcher");
1044                })
1045                .ok();
1046            inner.file_watcher = watcher;
1047        } else {
1048            if inner.file_watcher.is_some() {
1049                debug!("removing key_dirs watcher");
1050            }
1051            inner.file_watcher = None;
1052        }
1053    }
1054
1055    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
1056    /// uploading.
1057    fn note_ipt_change(&self) -> PublishStatus {
1058        let mut ipts = self.ipt_watcher.borrow_for_publish();
1059        match ipts.ipts.as_mut() {
1060            Some(_ipts) => PublishStatus::UploadScheduled,
1061            None => PublishStatus::AwaitingIpts,
1062        }
1063    }
1064
1065    /// Update our list of introduction points.
1066    async fn handle_ipt_change(
1067        &mut self,
1068        update: Option<Result<(), crate::FatalError>>,
1069    ) -> Result<ShutdownStatus, FatalError> {
1070        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
1071        match update {
1072            Some(Ok(())) => {
1073                let should_upload = self.note_ipt_change();
1074                debug!(nickname=%self.imm.nickname, "the introduction points have changed");
1075
1076                self.mark_all_dirty();
1077                self.update_publish_status_unless_rate_lim(should_upload)
1078                    .await?;
1079                Ok(ShutdownStatus::Continue)
1080            }
1081            Some(Err(e)) => Err(e),
1082            None => {
1083                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
1084                Ok(ShutdownStatus::Terminate)
1085            }
1086        }
1087    }
1088
1089    /// Update the `PublishStatus` of the reactor with `new_state`,
1090    /// unless the current state is `AwaitingIpts`.
1091    async fn update_publish_status_unless_waiting(
1092        &mut self,
1093        new_state: PublishStatus,
1094    ) -> Result<(), FatalError> {
1095        // Only update the state if we're not waiting for intro points.
1096        if self.status() != PublishStatus::AwaitingIpts {
1097            self.update_publish_status(new_state).await?;
1098        }
1099
1100        Ok(())
1101    }
1102
1103    /// Update the `PublishStatus` of the reactor with `new_state`,
1104    /// unless the current state is `RateLimited`.
1105    async fn update_publish_status_unless_rate_lim(
1106        &mut self,
1107        new_state: PublishStatus,
1108    ) -> Result<(), FatalError> {
1109        // We can't exit this state until the rate-limit expires.
1110        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1111            self.update_publish_status(new_state).await?;
1112        }
1113
1114        Ok(())
1115    }
1116
1117    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
1118    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
1119        let onion_status = match new_state {
1120            PublishStatus::Idle => None,
1121            PublishStatus::UploadScheduled
1122            | PublishStatus::AwaitingIpts
1123            | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
1124        };
1125
1126        if let Some(onion_status) = onion_status {
1127            self.imm.status_tx.send(onion_status, None);
1128        }
1129
1130        trace!(
1131            "publisher reactor status change: {:?} -> {:?}",
1132            self.status(),
1133            new_state
1134        );
1135
1136        self.publish_status_tx.send(new_state).await.map_err(
1137            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
1138        )?;
1139
1140        Ok(())
1141    }
1142
1143    /// Update the onion svc status based on the results of the last descriptor uploads.
1144    fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
1145        let inner = self.inner.lock().expect("poisoned lock");
1146        let netdir = inner
1147            .netdir
1148            .as_ref()
1149            .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
1150
1151        let (state, err) = upload_result_state(netdir, &inner.time_periods);
1152        self.imm.status_tx.send(state, err);
1153
1154        Ok(())
1155    }
1156
1157    /// Update the descriptors based on the config change.
1158    async fn handle_svc_config_change(
1159        &mut self,
1160        config: &OnionServiceConfig,
1161    ) -> Result<(), FatalError> {
1162        let new_config = Arc::new(config.into());
1163        if self.replace_config_if_changed(Arc::clone(&new_config)) {
1164            self.update_file_watcher();
1165            self.update_authorized_clients_if_changed().await?;
1166
1167            info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
1168            self.mark_all_dirty();
1169
1170            // Schedule an upload, unless we're still waiting for IPTs.
1171            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1172                .await?;
1173        }
1174
1175        Ok(())
1176    }
1177
1178    /// Update the descriptors based on a restricted discovery key_dirs change.
1179    ///
1180    /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
1181    /// this marks the descriptor as dirty for all time periods,
1182    /// and schedules a reupload.
1183    async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
1184        debug!("The configured key_dirs have changed");
1185        match event {
1186            FileEvent::Rescan | FileEvent::FileChanged => {
1187                // These events are handled in the same way, by re-reading the keys from disk
1188                // and republishing the descriptor if necessary
1189            }
1190            _ => return Err(internal!("file watcher event {event:?}").into()),
1191        };
1192
1193        // Update the file watcher, in case the change was triggered by a key_dir move.
1194        self.update_file_watcher();
1195
1196        if self.update_authorized_clients_if_changed().await? {
1197            self.mark_all_dirty();
1198
1199            // Schedule an upload, unless we're still waiting for IPTs.
1200            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1201                .await?;
1202        }
1203
1204        Ok(())
1205    }
1206
1207    /// Recreate the authorized_clients based on the current config.
1208    ///
1209    /// Returns `true` if the authorized clients have changed.
1210    async fn update_authorized_clients_if_changed(&mut self) -> Result<bool, FatalError> {
1211        let mut inner = self.inner.lock().expect("poisoned lock");
1212        let authorized_clients =
1213            Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
1214
1215        let clients = &mut inner.authorized_clients;
1216        let changed = clients.as_ref() != authorized_clients.as_ref();
1217
1218        if changed {
1219            info!("The restricted discovery mode authorized clients have changed");
1220            *clients = authorized_clients;
1221        }
1222
1223        Ok(changed)
1224    }
1225
1226    /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
1227    fn read_authorized_clients(
1228        config: &RestrictedDiscoveryConfig,
1229        path_resolver: &CfgPathResolver,
1230    ) -> Option<Arc<RestrictedDiscoveryKeys>> {
1231        let authorized_clients = config.read_keys(path_resolver);
1232
1233        if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
1234            warn!(
1235                "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
1236            );
1237        }
1238
1239        authorized_clients.map(Arc::new)
1240    }
1241
1242    /// Mark the descriptor dirty for all time periods.
1243    fn mark_all_dirty(&self) {
1244        trace!("marking the descriptor dirty for all time periods");
1245
1246        self.inner
1247            .lock()
1248            .expect("poisoned lock")
1249            .time_periods
1250            .iter_mut()
1251            .for_each(|tp| tp.mark_all_dirty());
1252    }
1253
1254    /// Mark the descriptor dirty for the specified time period.
1255    ///
1256    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
1257    fn mark_dirty(&self, period: &TimePeriod) -> bool {
1258        let mut inner = self.inner.lock().expect("poisoned lock");
1259        let period_ctx = inner
1260            .time_periods
1261            .iter_mut()
1262            .find(|tp| tp.params.time_period() == *period);
1263
1264        match period_ctx {
1265            Some(ctx) => {
1266                trace!(time_period=?period, "marking the descriptor dirty");
1267                ctx.mark_all_dirty();
1268                true
1269            }
1270            None => false,
1271        }
1272    }
1273
1274    /// Try to upload our descriptor to the HsDirs that need it.
1275    ///
1276    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
1277    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
1278    ///
1279    /// Failed uploads are retried
1280    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1281    ///
1282    /// If restricted discovery mode is enabled and there are no authorized clients,
1283    /// we abort the upload and set our status to [`State::Broken`].
1284    //
1285    // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
1286    // (for example if the IPTs change),
1287    // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
1288    // TODO: we might wish to refactor the publisher to be more sophisticated about this.
1289    //
1290    /// For each current time period, we spawn a task that uploads the descriptor to
1291    /// all the HsDirs on the HsDir ring of that time period.
1292    /// Each task shuts down on completion, or when the reactor is dropped.
1293    ///
1294    /// Each task reports its upload results (`TimePeriodUploadResult`)
1295    /// via the `upload_task_complete_tx` channel.
1296    /// The results are received and processed in the main loop of the reactor.
1297    ///
1298    /// Returns an error if it fails to spawn a task, or if an internal error occurs.
1299    async fn upload_all(&mut self) -> Result<(), FatalError> {
1300        trace!("starting descriptor upload task...");
1301
1302        // Abort the upload entirely if we have an empty list of authorized clients
1303        let authorized_clients = match self.authorized_clients() {
1304            Ok(authorized_clients) => authorized_clients,
1305            Err(e) => {
1306                error_report!(e, "aborting upload");
1307                self.imm.status_tx.send_broken(e.clone());
1308
1309                // Returning an error would shut down the reactor, so we have to return Ok here.
1310                return Ok(());
1311            }
1312        };
1313
1314        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
1315        let now = self.imm.runtime.now();
1316        // Check if we should rate-limit this upload.
1317        if let Some(ts) = last_uploaded {
1318            let duration_since_upload = now.duration_since(ts);
1319
1320            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
1321                return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
1322            }
1323        }
1324
1325        let mut inner = self.inner.lock().expect("poisoned lock");
1326        let inner = &mut *inner;
1327
1328        let _ = inner.last_uploaded.insert(now);
1329
1330        for period_ctx in inner.time_periods.iter_mut() {
1331            let upload_task_complete_tx = self.upload_task_complete_tx.clone();
1332
1333            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
1334            // have our latest descriptor, so we filter them out).
1335            let hs_dirs = period_ctx
1336                .hs_dirs
1337                .iter()
1338                .filter_map(|(relay_id, status)| {
1339                    if *status == DescriptorStatus::Dirty {
1340                        Some(relay_id.clone())
1341                    } else {
1342                        None
1343                    }
1344                })
1345                .collect::<Vec<_>>();
1346
1347            if hs_dirs.is_empty() {
1348                trace!("the descriptor is clean for all HSDirs. Nothing to do");
1349                return Ok(());
1350            }
1351
1352            let time_period = period_ctx.params.time_period();
1353            // This scope exists because rng is not Send, so it needs to fall out of scope before we
1354            // await anything.
1355            let netdir = Arc::clone(
1356                inner
1357                    .netdir
1358                    .as_ref()
1359                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
1360            );
1361
1362            let imm = Arc::clone(&self.imm);
1363            let ipt_upload_view = self.ipt_watcher.upload_view();
1364            let config = Arc::clone(&inner.config);
1365            let authorized_clients = authorized_clients.clone();
1366
1367            trace!(nickname=%self.imm.nickname, time_period=?time_period,
1368                "spawning upload task"
1369            );
1370
1371            let params = period_ctx.params.clone();
1372            let shutdown_rx = self.shutdown_tx.subscribe();
1373
1374            // Spawn a task to upload the descriptor to all HsDirs of this time period.
1375            //
1376            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
1377            // dropped).
1378            let _handle: () = self
1379                .imm
1380                .runtime
1381                .spawn(async move {
1382                    if let Err(e) = Self::upload_for_time_period(
1383                        hs_dirs,
1384                        &netdir,
1385                        config,
1386                        params,
1387                        Arc::clone(&imm),
1388                        ipt_upload_view.clone(),
1389                        authorized_clients.clone(),
1390                        upload_task_complete_tx,
1391                        shutdown_rx,
1392                    )
1393                    .await
1394                    {
1395                        error_report!(
1396                            e,
1397                            "descriptor upload failed for HS service {} and time period {:?}",
1398                            imm.nickname,
1399                            time_period
1400                        );
1401                    }
1402                })
1403                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
1404        }
1405
1406        Ok(())
1407    }
1408
1409    /// Upload the descriptor for the time period specified in `params`.
1410    ///
1411    /// Failed uploads are retried
1412    /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1413    #[allow(clippy::too_many_arguments)] // TODO: refactor
1414    async fn upload_for_time_period(
1415        hs_dirs: Vec<RelayIds>,
1416        netdir: &Arc<NetDir>,
1417        config: Arc<OnionServiceConfigPublisherView>,
1418        params: HsDirParams,
1419        imm: Arc<Immutable<R, M>>,
1420        ipt_upload_view: IptsPublisherUploadView,
1421        authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
1422        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
1423        shutdown_rx: broadcast::Receiver<Void>,
1424    ) -> Result<(), FatalError> {
1425        let time_period = params.time_period();
1426        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
1427
1428        let hsdir_count = hs_dirs.len();
1429
1430        /// An error returned from an upload future.
1431        //
1432        // Exhaustive, because this is a private type.
1433        #[derive(Clone, Debug, thiserror::Error)]
1434        enum PublishError {
1435            /// The upload was aborted because there are no IPTs.
1436            ///
1437            /// This happens because of an inevitable TOCTOU race, where after being notified by
1438            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
1439            /// we find out there actually are no IPTs, so we can't build the descriptor.
1440            ///
1441            /// This is a special kind of error that interrupts the current upload task, and is
1442            /// logged at `debug!` level rather than `warn!` or `error!`.
1443            ///
1444            /// Ideally, this shouldn't happen very often (if at all).
1445            #[error("No IPTs")]
1446            NoIpts,
1447
1448            /// The reactor has shut down
1449            #[error("The reactor has shut down")]
1450            Shutdown,
1451
1452            /// An fatal error.
1453            #[error("{0}")]
1454            Fatal(#[from] FatalError),
1455        }
1456
1457        let upload_results = futures::stream::iter(hs_dirs)
1458            .map(|relay_ids| {
1459                let netdir = netdir.clone();
1460                let config = Arc::clone(&config);
1461                let imm = Arc::clone(&imm);
1462                let ipt_upload_view = ipt_upload_view.clone();
1463                let authorized_clients = authorized_clients.clone();
1464                let params = params.clone();
1465                let mut shutdown_rx = shutdown_rx.clone();
1466
1467                let ed_id = relay_ids
1468                    .rsa_identity()
1469                    .map(|id| id.to_string())
1470                    .unwrap_or_else(|| "unknown".into());
1471                let rsa_id = relay_ids
1472                    .rsa_identity()
1473                    .map(|id| id.to_string())
1474                    .unwrap_or_else(|| "unknown".into());
1475
1476                async move {
1477                    let run_upload = |desc| async {
1478                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
1479                            // This should never happen (all of our relay_ids are from the stored
1480                            // netdir).
1481                            let err =
1482                                "tried to upload descriptor to relay not found in consensus?!";
1483                            warn!(
1484                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1485                                "{err}"
1486                            );
1487                            return Err(internal!("{err}").into());
1488                        };
1489
1490                        Self::upload_descriptor_with_retries(
1491                            desc,
1492                            &netdir,
1493                            &hsdir,
1494                            &ed_id,
1495                            &rsa_id,
1496                            Arc::clone(&imm),
1497                        )
1498                        .await
1499                    };
1500
1501                    // How long until we're supposed to time out?
1502                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
1503                    // We generate a new descriptor before _each_ HsDir upload. This means each
1504                    // HsDir could, in theory, receive a different descriptor (not just in terms of
1505                    // revision-counters, but also with a different set of IPTs). It may seem like
1506                    // this could lead to some HsDirs being left with an outdated descriptor, but
1507                    // that's not the case: after the upload completes, the publisher will be
1508                    // notified by the ipt_watcher of the IPT change event (if there was one to
1509                    // begin with), which will trigger another upload job.
1510                    let hsdesc = {
1511                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
1512                        // needs to fall out of scope before the await point below
1513                        let mut ipt_set = ipt_upload_view.borrow_for_publish();
1514
1515                        // If there are no IPTs, we abort the upload. At this point, we might have
1516                        // uploaded the descriptor to some, but not all, HSDirs from the specified
1517                        // time period.
1518                        //
1519                        // Returning an error here means the upload completion task is never
1520                        // notified of the outcome of any of these uploads (which means the
1521                        // descriptor is not marked clean). This is OK, because if we suddenly find
1522                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
1523                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
1524                        // and generate a fresh descriptor anyway.
1525                        //
1526                        // Ideally, this shouldn't happen very often (if at all).
1527                        let Some(ipts) = ipt_set.ipts.as_mut() else {
1528                            return Err(PublishError::NoIpts);
1529                        };
1530
1531                        let hsdesc = {
1532                            trace!(
1533                                nickname=%imm.nickname, time_period=?time_period,
1534                                "building descriptor"
1535                            );
1536                            let mut rng = imm.mockable.thread_rng();
1537                            let mut key_rng = tor_llcrypto::rng::CautiousRng;
1538
1539                            // We're about to generate a new version of the descriptor,
1540                            // so let's generate a new revision counter.
1541                            let now = imm.runtime.wallclock();
1542                            let revision_counter = imm.generate_revision_counter(&params, now)?;
1543
1544                            build_sign(
1545                                &imm.keymgr,
1546                                &config,
1547                                authorized_clients.as_deref(),
1548                                ipts,
1549                                time_period,
1550                                revision_counter,
1551                                &mut rng,
1552                                &mut key_rng,
1553                                imm.runtime.wallclock(),
1554                            )?
1555                        };
1556
1557                        if let Err(e) =
1558                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
1559                        {
1560                            let wait = e.log_retry_max(&imm.nickname)?;
1561                            // TODO (#1226): retry instead of this
1562                            return Err(FatalError::Bug(internal!(
1563                                "ought to retry after {wait:?}, crashing instead"
1564                            ))
1565                            .into());
1566                        }
1567
1568                        hsdesc
1569                    };
1570
1571                    let VersionedDescriptor {
1572                        desc,
1573                        revision_counter,
1574                    } = hsdesc;
1575
1576                    trace!(
1577                        nickname=%imm.nickname, time_period=?time_period,
1578                        revision_counter=?revision_counter,
1579                        "generated new descriptor for time period",
1580                    );
1581
1582                    // (Actually launch the upload attempt. No timeout is needed
1583                    // here, since the backoff::Runner code will handle that for us.)
1584                    let upload_res: UploadResult = select_biased! {
1585                        shutdown = shutdown_rx.next().fuse() => {
1586                            // This will always be None, since Void is uninhabited.
1587                            let _: Option<Void> = shutdown;
1588
1589                            // It looks like the reactor has shut down,
1590                            // so there is no point in uploading the descriptor anymore.
1591                            //
1592                            // Let's shut down the upload task too.
1593                            trace!(
1594                                nickname=%imm.nickname, time_period=?time_period,
1595                                "upload task received shutdown signal"
1596                            );
1597
1598                            return Err(PublishError::Shutdown);
1599                        },
1600                        res = run_upload(desc.clone()).fuse() => res,
1601                    };
1602
1603                    // Note: UploadResult::Failure is only returned when
1604                    // upload_descriptor_with_retries fails, i.e. if all our retry
1605                    // attempts have failed
1606                    Ok(HsDirUploadStatus {
1607                        relay_ids,
1608                        upload_res,
1609                        revision_counter,
1610                    })
1611                }
1612            })
1613            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
1614            .boxed()
1615            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
1616            .try_collect::<Vec<_>>()
1617            .await;
1618
1619        let upload_results = match upload_results {
1620            Ok(v) => v,
1621            Err(PublishError::Fatal(e)) => return Err(e),
1622            Err(PublishError::NoIpts) => {
1623                debug!(
1624                    nickname=%imm.nickname, time_period=?time_period,
1625                     "no introduction points; skipping upload"
1626                );
1627
1628                return Ok(());
1629            }
1630            Err(PublishError::Shutdown) => {
1631                debug!(
1632                    nickname=%imm.nickname, time_period=?time_period,
1633                     "the reactor has shut down; aborting upload"
1634                );
1635
1636                return Ok(());
1637            }
1638        };
1639
1640        let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
1641            .iter()
1642            .partition(|res| res.upload_res.is_ok());
1643
1644        debug!(
1645            nickname=%imm.nickname, time_period=?time_period,
1646            "descriptor uploaded successfully to {}/{} HSDirs",
1647            succeeded.len(), hsdir_count
1648        );
1649
1650        if upload_task_complete_tx
1651            .send(TimePeriodUploadResult {
1652                time_period,
1653                hsdir_result: upload_results,
1654            })
1655            .await
1656            .is_err()
1657        {
1658            return Err(internal!(
1659                "failed to notify reactor of upload completion (reactor shut down)"
1660            )
1661            .into());
1662        }
1663
1664        Ok(())
1665    }
1666
1667    /// Upload a descriptor to the specified HSDir.
1668    ///
1669    /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
1670    /// to the caller to retry on failure.
1671    ///
1672    /// This function does not handle timeouts.
1673    async fn upload_descriptor(
1674        hsdesc: String,
1675        netdir: &Arc<NetDir>,
1676        hsdir: &Relay<'_>,
1677        imm: Arc<Immutable<R, M>>,
1678    ) -> Result<(), UploadError> {
1679        let request = HsDescUploadRequest::new(hsdesc);
1680
1681        trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
1682            "starting descriptor upload",
1683        );
1684
1685        let circuit = imm
1686            .mockable
1687            .get_or_launch_specific(
1688                netdir,
1689                HsCircKind::SvcHsDir,
1690                OwnedCircTarget::from_circ_target(hsdir),
1691            )
1692            .await?;
1693
1694        let mut stream = circuit
1695            .begin_dir_stream()
1696            .await
1697            .map_err(UploadError::Stream)?;
1698
1699        let _response: String = send_request(&imm.runtime, &request, &mut stream, None)
1700            .await
1701            .map_err(|dir_error| -> UploadError {
1702                match dir_error {
1703                    DirClientError::RequestFailed(e) => e.into(),
1704                    DirClientError::CircMgr(e) => into_internal!(
1705                        "tor-dirclient complains about circmgr going wrong but we gave it a stream"
1706                    )(e)
1707                    .into(),
1708                    e => into_internal!("unexpected error")(e).into(),
1709                }
1710            })?
1711            .into_output_string()?; // This returns an error if we received an error response
1712
1713        Ok(())
1714    }
1715
1716    /// Upload a descriptor to the specified HSDir, retrying if appropriate.
1717    ///
1718    /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
1719    /// Each failed upload is retried until it succeeds, or until the overall timeout specified
1720    /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
1721    /// according to the [`BackoffSchedule::single_attempt_timeout`].
1722    /// This function gives up after the overall timeout elapses,
1723    /// declaring the upload a failure, and never retrying it again.
1724    ///
1725    /// See also [`BackoffSchedule`].
1726    async fn upload_descriptor_with_retries(
1727        hsdesc: String,
1728        netdir: &Arc<NetDir>,
1729        hsdir: &Relay<'_>,
1730        ed_id: &str,
1731        rsa_id: &str,
1732        imm: Arc<Immutable<R, M>>,
1733    ) -> UploadResult {
1734        /// The base delay to use for the backoff schedule.
1735        const BASE_DELAY_MSEC: u32 = 1000;
1736        let schedule = PublisherBackoffSchedule {
1737            retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
1738            mockable: imm.mockable.clone(),
1739        };
1740
1741        let runner = Runner::new(
1742            "upload a hidden service descriptor".into(),
1743            schedule.clone(),
1744            imm.runtime.clone(),
1745        );
1746
1747        let fallible_op =
1748            || Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm));
1749
1750        let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
1751        match outcome {
1752            Ok(()) => {
1753                debug!(
1754                    nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1755                    "successfully uploaded descriptor to HSDir",
1756                );
1757
1758                Ok(())
1759            }
1760            Err(e) => {
1761                warn_report!(
1762                    e,
1763                    "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
1764                    imm.nickname,
1765                    ed_id,
1766                    rsa_id
1767                );
1768
1769                Err(e.into())
1770            }
1771        }
1772    }
1773
1774    /// Stop publishing descriptors until the specified delay elapses.
1775    async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
1776        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1777            debug!(
1778                "We are rate-limited for {}; pausing descriptor publication",
1779                humantime::format_duration(delay)
1780            );
1781            let until = self.imm.runtime.now() + delay;
1782            self.update_publish_status(PublishStatus::RateLimited(until))
1783                .await?;
1784        }
1785
1786        Ok(())
1787    }
1788
1789    /// Handle the upload rate-limit being lifted.
1790    async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
1791        debug!("We are no longer rate-limited; resuming descriptor publication");
1792        self.update_publish_status(PublishStatus::UploadScheduled)
1793            .await?;
1794        Ok(())
1795    }
1796
1797    /// Return the authorized clients, if restricted mode is enabled.
1798    ///
1799    /// Returns `Ok(None)` if restricted discovery mode is disabled.
1800    ///
1801    /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
1802    #[cfg_attr(
1803        not(feature = "restricted-discovery"),
1804        allow(clippy::unnecessary_wraps)
1805    )]
1806    fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
1807        cfg_if::cfg_if! {
1808            if #[cfg(feature = "restricted-discovery")] {
1809                let authorized_clients = self
1810                    .inner
1811                    .lock()
1812                    .expect("poisoned lock")
1813                    .authorized_clients
1814                    .clone();
1815
1816                if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
1817                    return Err(FatalError::RestrictedDiscoveryNoClients);
1818                }
1819
1820                Ok(authorized_clients)
1821            } else {
1822                Ok(None)
1823            }
1824        }
1825    }
1826}
1827
1828/// Try to expand a path, logging a warning on failure.
1829fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
1830    // map_err returns unit for clarity
1831    #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1832    p.path(r)
1833        .map_err(|e| {
1834            tor_error::warn_report!(e, "invalid path");
1835            ()
1836        })
1837        .ok()
1838}
1839
1840/// Add `path` to the specified `watcher`.
1841macro_rules! watch_path {
1842    ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
1843        if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
1844            warn_report!(e, "failed to watch path {:?}", $path);
1845        } else {
1846            debug!("watching path {:?}", $path);
1847        }
1848    }}
1849}
1850
1851/// Add the specified directories to the watcher.
1852#[allow(clippy::cognitive_complexity)]
1853fn watch_dirs<R: Runtime>(
1854    watcher: &mut FileWatcherBuilder<R>,
1855    dirs: &DirectoryKeyProviderList,
1856    path_resolver: &CfgPathResolver,
1857) {
1858    for path in dirs {
1859        let path = path.path();
1860        let Some(path) = maybe_expand_path(path, path_resolver) else {
1861            warn!("failed to expand key_dir path {:?}", path);
1862            continue;
1863        };
1864
1865        // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
1866        // so we skip over paths that don't exist at this time
1867        // (this obviously suffers from a TOCTOU race, but most of the time,
1868        // it is good enough at preventing the watcher from failing to watch.
1869        // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
1870        // but it will fail to set the watcher).
1871        if matches!(path.try_exists(), Ok(true)) {
1872            watch_path!(watcher, &path, watch_dir, "auth",);
1873        }
1874        // FileWatcher::watch_path causes the parent dir of the path to be watched.
1875        if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
1876            watch_path!(watcher, &path, watch_path,);
1877        }
1878    }
1879}
1880
1881/// Try to read the blinded identity key for a given `TimePeriod`.
1882///
1883/// Returns `None` if the service is running in "offline" mode.
1884///
1885// TODO (#1194): we don't currently have support for "offline" mode so this can never return
1886// `Ok(None)`.
1887pub(super) fn read_blind_id_keypair(
1888    keymgr: &Arc<KeyMgr>,
1889    nickname: &HsNickname,
1890    period: TimePeriod,
1891) -> Result<Option<HsBlindIdKeypair>, FatalError> {
1892    let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
1893    let hsid_kp = keymgr
1894        .get::<HsIdKeypair>(&svc_key_spec)?
1895        .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
1896
1897    let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
1898
1899    // TODO: make the keystore selector configurable
1900    let keystore_selector = Default::default();
1901    match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
1902        Some(kp) => Ok(Some(kp)),
1903        None => {
1904            let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
1905                .compute_blinded_key(period)
1906                .map_err(|_| internal!("failed to compute blinded key"))?;
1907
1908            // Note: we can't use KeyMgr::generate because this key is derived from the HsId
1909            // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
1910            // which assumes keys are randomly generated, rather than derived from existing keys).
1911
1912            keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
1913
1914            let arti_path = |spec: &dyn KeySpecifier| {
1915                spec.arti_path()
1916                    .map_err(into_internal!("invalid key specifier?!"))
1917            };
1918
1919            Ok(Some(
1920                keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
1921                    FatalError::KeystoreRace {
1922                        action: "read",
1923                        path: arti_path(&blind_id_key_spec)?,
1924                    },
1925                )?,
1926            ))
1927        }
1928    }
1929}
1930
1931/// Determine the [`State`] of the publisher based on the upload results
1932/// from the current `time_periods`.
1933fn upload_result_state(
1934    netdir: &NetDir,
1935    time_periods: &[TimePeriodContext],
1936) -> (State, Option<Problem>) {
1937    let current_period = netdir.hs_time_period();
1938    let current_period_res = time_periods
1939        .iter()
1940        .find(|ctx| ctx.params.time_period() == current_period);
1941
1942    let succeeded_current_tp = current_period_res
1943        .iter()
1944        .flat_map(|res| &res.upload_results)
1945        .filter(|res| res.upload_res.is_ok())
1946        .collect_vec();
1947
1948    let secondary_tp_res = time_periods
1949        .iter()
1950        .filter(|ctx| ctx.params.time_period() != current_period)
1951        .collect_vec();
1952
1953    let succeeded_secondary_tp = secondary_tp_res
1954        .iter()
1955        .flat_map(|res| &res.upload_results)
1956        .filter(|res| res.upload_res.is_ok())
1957        .collect_vec();
1958
1959    // All of the failed uploads (for all TPs)
1960    let failed = time_periods
1961        .iter()
1962        .flat_map(|res| &res.upload_results)
1963        .filter(|res| res.upload_res.is_err())
1964        .collect_vec();
1965    let problems: Vec<DescUploadRetryError> = failed
1966        .iter()
1967        .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
1968        .collect();
1969
1970    let err = match problems.as_slice() {
1971        [_, ..] => Some(problems.into()),
1972        [] => None,
1973    };
1974
1975    if time_periods.len() < 2 {
1976        // We need at least TP contexts (one for the primary TP,
1977        // and another for the secondary one).
1978        //
1979        // If either is missing, we are unreachable for some or all clients.
1980        return (State::DegradedUnreachable, err);
1981    }
1982
1983    let state = match (
1984        succeeded_current_tp.as_slice(),
1985        succeeded_secondary_tp.as_slice(),
1986    ) {
1987        (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
1988            // We don't have any upload results for one or both TPs.
1989            // We are still bootstrapping.
1990            State::Bootstrapping
1991        }
1992        (&[_, ..], &[_, ..]) if failed.is_empty() => {
1993            // We have uploaded the descriptor to one or more HsDirs from both
1994            // HsDir rings (primary and secondary), and none of the uploads failed.
1995            // We are fully reachable.
1996            State::Running
1997        }
1998        (&[_, ..], &[_, ..]) => {
1999            // We have uploaded the descriptor to one or more HsDirs from both
2000            // HsDir rings (primary and secondary), but some of the uploads failed.
2001            // We are reachable, but we failed to upload the descriptor to all the HsDirs
2002            // that were supposed to have it.
2003            State::DegradedReachable
2004        }
2005        (&[..], &[]) | (&[], &[..]) => {
2006            // We have either
2007            //   * uploaded the descriptor to some of the HsDirs from one of the rings,
2008            //   but haven't managed to upload it to any of the HsDirs on the other ring, or
2009            //   * all of the uploads failed
2010            //
2011            // Either way, we are definitely not reachable by all clients.
2012            State::DegradedUnreachable
2013        }
2014    };
2015
2016    (state, err)
2017}
2018
2019/// Whether the reactor should initiate an upload.
2020#[derive(Copy, Clone, Debug, Default, PartialEq)]
2021enum PublishStatus {
2022    /// We need to call upload_all.
2023    UploadScheduled,
2024    /// We are rate-limited until the specified [`Instant`].
2025    ///
2026    /// We have tried to schedule multiple uploads in a short time span,
2027    /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
2028    /// channel to unblock us.
2029    RateLimited(Instant),
2030    /// We are idle and waiting for external events.
2031    ///
2032    /// We have enough information to build the descriptor, but since we have already called
2033    /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
2034    Idle,
2035    /// We are waiting for the IPT manager to establish some introduction points.
2036    ///
2037    /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
2038    /// `UploadScheduled`.
2039    #[default]
2040    AwaitingIpts,
2041}
2042
2043/// The backoff schedule for the task that publishes descriptors.
2044#[derive(Clone, Debug)]
2045struct PublisherBackoffSchedule<M: Mockable> {
2046    /// The delays
2047    retry_delay: RetryDelay,
2048    /// The mockable reactor state, needed for obtaining an rng.
2049    mockable: M,
2050}
2051
2052impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
2053    fn max_retries(&self) -> Option<usize> {
2054        None
2055    }
2056
2057    fn overall_timeout(&self) -> Option<Duration> {
2058        Some(OVERALL_UPLOAD_TIMEOUT)
2059    }
2060
2061    fn single_attempt_timeout(&self) -> Option<Duration> {
2062        Some(self.mockable.estimate_upload_timeout())
2063    }
2064
2065    fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
2066        Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
2067    }
2068}
2069
2070impl RetriableError for UploadError {
2071    fn should_retry(&self) -> bool {
2072        match self {
2073            UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
2074            UploadError::Bug(_) => false,
2075        }
2076    }
2077}
2078
2079/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
2080#[derive(Debug, Clone)]
2081struct TimePeriodUploadResult {
2082    /// The time period.
2083    time_period: TimePeriod,
2084    /// The upload results.
2085    hsdir_result: Vec<HsDirUploadStatus>,
2086}
2087
2088/// The outcome of uploading a descriptor to a particular HsDir.
2089#[derive(Clone, Debug)]
2090struct HsDirUploadStatus {
2091    /// The identity of the HsDir we attempted to upload the descriptor to.
2092    relay_ids: RelayIds,
2093    /// The outcome of this attempt.
2094    upload_res: UploadResult,
2095    /// The revision counter of the descriptor we tried to upload.
2096    revision_counter: RevisionCounter,
2097}
2098
2099/// The outcome of uploading a descriptor.
2100type UploadResult = Result<(), DescUploadRetryError>;
2101
2102impl From<BackoffError<UploadError>> for DescUploadRetryError {
2103    fn from(e: BackoffError<UploadError>) -> Self {
2104        use BackoffError as BE;
2105        use DescUploadRetryError as DURE;
2106
2107        match e {
2108            BE::FatalError(e) => DURE::FatalError(e),
2109            BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
2110            BE::Timeout(e) => DURE::Timeout(e),
2111            BE::ExplicitStop(_) => {
2112                DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
2113            }
2114        }
2115    }
2116}
2117
2118// NOTE: the rest of the publisher tests live in publish.rs
2119#[cfg(test)]
2120mod test {
2121    // @@ begin test lint list maintained by maint/add_warning @@
2122    #![allow(clippy::bool_assert_comparison)]
2123    #![allow(clippy::clone_on_copy)]
2124    #![allow(clippy::dbg_macro)]
2125    #![allow(clippy::mixed_attributes_style)]
2126    #![allow(clippy::print_stderr)]
2127    #![allow(clippy::print_stdout)]
2128    #![allow(clippy::single_char_pattern)]
2129    #![allow(clippy::unwrap_used)]
2130    #![allow(clippy::unchecked_duration_subtraction)]
2131    #![allow(clippy::useless_vec)]
2132    #![allow(clippy::needless_pass_by_value)]
2133    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
2134    use super::*;
2135    use tor_netdir::testnet;
2136
2137    /// Create a `TimePeriodContext` from the specified upload results.
2138    fn create_time_period_ctx(
2139        params: &HsDirParams,
2140        upload_results: Vec<HsDirUploadStatus>,
2141    ) -> TimePeriodContext {
2142        TimePeriodContext {
2143            params: params.clone(),
2144            hs_dirs: vec![],
2145            last_successful: None,
2146            upload_results,
2147        }
2148    }
2149
2150    /// Create a single `HsDirUploadStatus`
2151    fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
2152        HsDirUploadStatus {
2153            relay_ids: RelayIds::empty(),
2154            upload_res,
2155            revision_counter: RevisionCounter::from(13),
2156        }
2157    }
2158
2159    /// Create a bunch of results, all with the specified `upload_res`.
2160    fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
2161        std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
2162            .take(10)
2163            .collect()
2164    }
2165
2166    fn construct_netdir() -> NetDir {
2167        const SRV1: [u8; 32] = *b"The door refused to open.       ";
2168        const SRV2: [u8; 32] = *b"It said, 'Five cents, please.'  ";
2169
2170        let dir = testnet::construct_custom_netdir(|_, _, bld| {
2171            bld.shared_rand_prev(7, SRV1.into(), None)
2172                .shared_rand_prev(7, SRV2.into(), None);
2173        })
2174        .unwrap();
2175
2176        dir.unwrap_if_sufficient().unwrap()
2177    }
2178
2179    #[test]
2180    fn upload_result_status_bootstrapping() {
2181        let netdir = construct_netdir();
2182        let all_params = netdir.hs_all_time_periods();
2183        let current_period = netdir.hs_time_period();
2184        let primary_params = all_params
2185            .iter()
2186            .find(|param| param.time_period() == current_period)
2187            .unwrap();
2188        let results = [
2189            (vec![], vec![]),
2190            (vec![], create_upload_results(Ok(()))),
2191            (create_upload_results(Ok(())), vec![]),
2192        ];
2193
2194        for (primary_result, secondary_result) in results {
2195            let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2196
2197            let secondary_params = all_params
2198                .iter()
2199                .find(|param| param.time_period() != current_period)
2200                .unwrap();
2201            let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2202
2203            let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2204            assert_eq!(status, State::Bootstrapping);
2205            assert!(err.is_none());
2206        }
2207    }
2208
2209    #[test]
2210    fn upload_result_status_running() {
2211        let netdir = construct_netdir();
2212        let all_params = netdir.hs_all_time_periods();
2213        let current_period = netdir.hs_time_period();
2214        let primary_params = all_params
2215            .iter()
2216            .find(|param| param.time_period() == current_period)
2217            .unwrap();
2218
2219        let secondary_result = create_upload_results(Ok(()));
2220        let secondary_params = all_params
2221            .iter()
2222            .find(|param| param.time_period() != current_period)
2223            .unwrap();
2224        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2225
2226        let primary_result = create_upload_results(Ok(()));
2227        let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2228        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2229        assert_eq!(status, State::Running);
2230        assert!(err.is_none());
2231    }
2232
2233    #[test]
2234    fn upload_result_status_reachable() {
2235        let netdir = construct_netdir();
2236        let all_params = netdir.hs_all_time_periods();
2237        let current_period = netdir.hs_time_period();
2238        let primary_params = all_params
2239            .iter()
2240            .find(|param| param.time_period() == current_period)
2241            .unwrap();
2242
2243        let primary_result = create_upload_results(Ok(()));
2244        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2245        let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2246        let secondary_result = create_upload_results(Ok(()))
2247            .into_iter()
2248            .chain(failed_res.iter().cloned())
2249            .collect();
2250        let secondary_params = all_params
2251            .iter()
2252            .find(|param| param.time_period() != current_period)
2253            .unwrap();
2254        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
2255        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2256
2257        // Degraded but reachable (because some of the secondary HsDir uploads failed).
2258        assert_eq!(status, State::DegradedReachable);
2259        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2260    }
2261
2262    #[test]
2263    fn upload_result_status_unreachable() {
2264        let netdir = construct_netdir();
2265        let all_params = netdir.hs_all_time_periods();
2266        let current_period = netdir.hs_time_period();
2267        let primary_params = all_params
2268            .iter()
2269            .find(|param| param.time_period() == current_period)
2270            .unwrap();
2271        let mut primary_result =
2272            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2273        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2274        // No secondary TP (we are unreachable).
2275        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2276        assert_eq!(status, State::DegradedUnreachable);
2277        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2278
2279        // Add a successful result
2280        primary_result.push(create_upload_status(Ok(())));
2281        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2282        let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2283        // Still degraded, and unreachable (because we don't have a TimePeriodContext
2284        // for the secondary TP)
2285        assert_eq!(status, State::DegradedUnreachable);
2286        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2287
2288        // If we add another time period where none of the uploads were successful,
2289        // we're *still* unreachable
2290        let secondary_result =
2291            create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2292        let secondary_params = all_params
2293            .iter()
2294            .find(|param| param.time_period() != current_period)
2295            .unwrap();
2296        let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2297        let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2298        let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2299        assert_eq!(status, State::DegradedUnreachable);
2300        assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2301    }
2302}