1
//! The onion service publisher reactor.
2
//!
3
//! TODO (#1216): write the docs
4
//!
5
//! With respect to [`OnionServiceStatus`] reporting,
6
//! the following state transitions are possible:
7
//!
8
//!
9
//! ```ignore
10
//!
11
//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited) +---------------+
12
//!                +--------------------------------------------------------------->| Bootstrapping |
13
//!                |                                                                +---------------+
14
//! +----------+   | update_publish_status(Idle)        +---------+                         |
15
//! | Shutdown |-- +----------------------------------->| Running |----+                    |
16
//! +----------+   |                                    +---------+    |                    |
17
//!                |                                                   |                    |
18
//!                |                                                   |                    |
19
//!                | run_once() returns an error  +--------+           |                    |
20
//!                +----------------------------->| Broken |<----------+--------------------+
21
//!                                               +--------+ run_once() returns an error
22
//! ```
23
//!
24
//! Ideally, the publisher should also set the
25
//! [`OnionServiceStatus`] to `Recovering` whenever a transient
26
//! upload error occurs, but this is currently not possible:
27
//!
28
//!   * making the upload tasks set the status to `Recovering` (on failure) and `Running` (on
29
//!     success) wouldn't work, because the upload tasks run in parallel (they would race with each
30
//!     other, and the final status (`Recovering`/`Running`) would be the status of the last upload
31
//!     task, rather than the real status of the publisher
32
//!   * making the upload task set the status to `Recovering` on upload failure, and letting
33
//!     `upload_publish_status` reset it back to `Running also would not work:
34
//!     `upload_publish_status` sets the status back to `Running` when the publisher enters its
35
//!     `Idle` state, regardless of the status of its upload tasks
36
//!
37
//! TODO: Indeed, setting the status to `Recovering` _anywhere_ would not work, because
38
//! `upload_publish_status` will just overwrite it. We would need to introduce some new
39
//! `PublishStatus` variant (currently, the publisher only has 3 states, `Idle`, `UploadScheduled`,
40
//! `AwaitingIpts`), for the `Recovering` (retrying a failed upload) and `Broken` (the upload
41
//! failed and we've given up) states. However, adding these 2 new states is non-trivial:
42
//!
43
//!   * how do we define "failure"? Is it the failure to upload to a single HsDir, or the failure
44
//!     to upload to **any** HsDirs?
45
//!   * what should make the publisher transition out of the `Broken`/`Recovering` states? While
46
//!     `handle_upload_results` can see the upload results for a batch of HsDirs (corresponding to
47
//!     a time period), the publisher doesn't do any sort of bookkeeping to know if a previously
48
//!     failed HsDir upload succeeded in a later upload "batch"
49
//!
50
//! For the time being, the publisher never sets the status to `Recovering`, and uses the `Broken`
51
//! status for reporting fatal errors (crashes).
52

            
53
use tor_config::file_watcher::{
54
    self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
55
};
56
use tor_netdir::DirEvent;
57

            
58
use crate::config::restricted_discovery::{
59
    DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
60
};
61
use crate::config::OnionServiceConfigPublisherView;
62

            
63
use super::*;
64

            
65
// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
66
// for rate-limiting the publish jobs triggered by a change in the config?
67
//
68
// Currently the descriptor publish tasks triggered by changes in the config
69
// are rate-limited via the usual rate limiting mechanism
70
// (which rate-limits the uploads for 1m).
71
//
72
// I think this is OK for now, but we might need to rethink this if it becomes problematic
73
// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
74
// each time the config is modified).
75

            
76
/// The upload rate-limiting threshold.
77
///
78
/// Before initiating an upload, the reactor checks if the last upload was at least
79
/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
80
/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
81
/// current time.
82
//
83
// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
84
const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
85

            
86
/// The maximum number of concurrent upload tasks per time period.
87
//
88
// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
89
// will have no effect, since the current number of replicas is far less than
90
// this value.
91
//
92
// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
93
// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
94
// (currently 2, which means the concurrency limit will, in fact, be 32).
95
//
96
// We should try to decouple this value from the TP parameters.
97
const MAX_CONCURRENT_UPLOADS: usize = 16;
98

            
99
/// The maximum time allowed for uploading a descriptor to a single HSDir,
100
/// across all attempts.
101
pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
102

            
103
/// A reactor for the HsDir [`Publisher`]
104
///
105
/// The entrypoint is [`Reactor::run`].
106
#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
107
pub(super) struct Reactor<R: Runtime, M: Mockable> {
108
    /// The immutable, shared inner state.
109
    imm: Arc<Immutable<R, M>>,
110
    /// A source for new network directories that we use to determine
111
    /// our HsDirs.
112
    dir_provider: Arc<dyn NetDirProvider>,
113
    /// The mutable inner state,
114
    inner: Arc<Mutex<Inner>>,
115
    /// A channel for receiving IPT change notifications.
116
    ipt_watcher: IptsPublisherView,
117
    /// A channel for receiving onion service config change notifications.
118
    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
119
    /// A channel for receiving restricted discovery key_dirs change notifications.
120
    key_dirs_rx: FileEventReceiver,
121
    /// A channel for sending restricted discovery key_dirs change notifications.
122
    ///
123
    /// A copy of this sender is handed out to every `FileWatcher` created.
124
    key_dirs_tx: FileEventSender,
125
    /// A channel for receiving updates regarding our [`PublishStatus`].
126
    ///
127
    /// The main loop of the reactor watches for updates on this channel.
128
    ///
129
    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
130
    /// we can start publishing descriptors.
131
    ///
132
    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
133
    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
134
    /// established some introduction points.
135
    publish_status_rx: watch::Receiver<PublishStatus>,
136
    /// A sender for updating our [`PublishStatus`].
137
    ///
138
    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
139
    /// we can start publishing descriptors.
140
    publish_status_tx: watch::Sender<PublishStatus>,
141
    /// A channel for sending upload completion notifications.
142
    ///
143
    /// This channel is polled in the main loop of the reactor.
144
    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
145
    /// A channel for receiving upload completion notifications.
146
    ///
147
    /// A copy of this sender is handed to each upload task.
148
    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
149
    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
150
    ///
151
    /// Receivers can use this channel to find out when reactor is dropped.
152
    ///
153
    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
154
    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
155
    ///
156
    /// Closing this channel will cause any pending upload tasks to be dropped.
157
    shutdown_tx: broadcast::Sender<Void>,
158
}
159

            
160
/// The immutable, shared state of the descriptor publisher reactor.
161
#[derive(Clone)]
162
struct Immutable<R: Runtime, M: Mockable> {
163
    /// The runtime.
164
    runtime: R,
165
    /// Mockable state.
166
    ///
167
    /// This is used for launching circuits and for obtaining random number generators.
168
    mockable: M,
169
    /// The service for which we're publishing descriptors.
170
    nickname: HsNickname,
171
    /// The key manager,
172
    keymgr: Arc<KeyMgr>,
173
    /// The restricted discovery authorized clients.
174
    ///
175
    /// `None`, unless the service is running in restricted discovery mode.
176
    authorized_clients: Arc<Mutex<Option<RestrictedDiscoveryKeys>>>,
177
    /// A sender for updating the status of the onion service.
178
    status_tx: PublisherStatusSender,
179
}
180

            
181
impl<R: Runtime, M: Mockable> Immutable<R, M> {
182
    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
183
    /// with the specified [`TimePeriod`].
184
    ///
185
    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
186
    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
187
    /// descriptor signing key.
188
    ///
189
    /// Returns an error if the service is running in offline mode and the descriptor signing
190
    /// keypair of the specified `period` is not available.
191
    //
192
    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
193
    // built from the blinded id key
194
128
    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
195
128
        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
196
128
            Some(key) => {
197
128
                let key: ed25519::ExpandedKeypair = key.into();
198
128
                key.to_secret_key_bytes()[0..32]
199
128
                    .try_into()
200
128
                    .expect("Wrong length on slice")
201
            }
202
            None => {
203
                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
204
                // is unreachable (for now).
205
                let desc_sign_key_spec =
206
                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
207
                let key: ed25519::Keypair = self
208
                    .keymgr
209
                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
210
                    // TODO (#1194): internal! is not the right type for this error (we need an
211
                    // error type for the case where a hidden service running in offline mode has
212
                    // run out of its pre-previsioned keys).
213
                    //
214
                    // This will be addressed when we add support for offline hs_id mode
215
                    .ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
216
                    .into();
217
                key.to_bytes()
218
            }
219
        };
220

            
221
128
        Ok(AesOpeKey::from_secret(&ope_key))
222
128
    }
223

            
224
    /// Generate a revision counter for a descriptor associated with the specified
225
    /// [`TimePeriod`].
226
    ///
227
    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
228
    ///
229
    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
230
128
    fn generate_revision_counter(
231
128
        &self,
232
128
        params: &HsDirParams,
233
128
        now: SystemTime,
234
128
    ) -> Result<RevisionCounter, FatalError> {
235
        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
236
        // to each time we generate a new descriptor), for performance reasons.
237
128
        let ope_key = self.create_ope_key(params.time_period())?;
238

            
239
        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
240
128
        let srv_start = params.start_of_shard_rand_period();
241
128
        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
242
            internal!(
243
                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
244
                now,
245
                srv_start
246
            )
247
128
        })?;
248
128
        let rev = ope_key.encrypt(offset);
249
128

            
250
128
        Ok(RevisionCounter::from(rev))
251
128
    }
252
}
253

            
254
/// Mockable state for the descriptor publisher reactor.
255
///
256
/// This enables us to mock parts of the [`Reactor`] for testing purposes.
257
#[async_trait]
258
pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
259
    /// The type of random number generator.
260
    type Rng: rand::Rng + rand::CryptoRng;
261

            
262
    /// The type of client circuit.
263
    type ClientCirc: MockableClientCirc;
264

            
265
    /// Return a random number generator.
266
    fn thread_rng(&self) -> Self::Rng;
267

            
268
    /// Create a circuit of the specified `kind` to `target`.
269
    async fn get_or_launch_specific<T>(
270
        &self,
271
        netdir: &NetDir,
272
        kind: HsCircKind,
273
        target: T,
274
    ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
275
    where
276
        T: CircTarget + Send + Sync;
277

            
278
    /// Return an estimate-based value for how long we should allow a single
279
    /// directory upload operation to complete.
280
    ///
281
    /// Includes circuit construction, stream opening, upload, and waiting for a
282
    /// response.
283
    fn estimate_upload_timeout(&self) -> Duration;
284
}
285

            
286
/// Mockable client circuit
287
#[async_trait]
288
pub(crate) trait MockableClientCirc: Send + Sync {
289
    /// The data stream type.
290
    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
291

            
292
    /// Start a new stream to the last relay in the circuit, using
293
    /// a BEGIN_DIR cell.
294
    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
295
}
296

            
297
#[async_trait]
298
impl MockableClientCirc for ClientCirc {
299
    type DataStream = tor_proto::stream::DataStream;
300

            
301
    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
302
        ClientCirc::begin_dir_stream(self).await
303
    }
304
}
305

            
306
/// The real version of the mockable state of the reactor.
307
#[derive(Clone, From, Into)]
308
pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
309

            
310
#[async_trait]
311
impl<R: Runtime> Mockable for Real<R> {
312
    type Rng = rand::rngs::ThreadRng;
313
    type ClientCirc = ClientCirc;
314

            
315
    fn thread_rng(&self) -> Self::Rng {
316
        rand::thread_rng()
317
    }
318

            
319
    async fn get_or_launch_specific<T>(
320
        &self,
321
        netdir: &NetDir,
322
        kind: HsCircKind,
323
        target: T,
324
    ) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
325
    where
326
        T: CircTarget + Send + Sync,
327
    {
328
        self.0.get_or_launch_specific(netdir, kind, target).await
329
    }
330

            
331
    fn estimate_upload_timeout(&self) -> Duration {
332
        use tor_circmgr::timeouts::Action;
333
        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
334
        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
335
        // We assume that in the worst case we'll have to wait for an entire
336
        // circuit construction and two round-trips to the hsdir.
337
        let est_total = est_build + est_roundtrip * 2;
338
        // We always allow _at least_ this much time, in case our estimate is
339
        // ridiculously low.
340
        let min_timeout = Duration::from_secs(30);
341
        max(est_total, min_timeout)
342
    }
343
}
344

            
345
/// The mutable state of a [`Reactor`].
346
struct Inner {
347
    /// The onion service config.
348
    config: Arc<OnionServiceConfigPublisherView>,
349
    /// Watcher for key_dirs.
350
    ///
351
    /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
352
    ///
353
    /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
354
    file_watcher: Option<FileWatcher>,
355
    /// The relevant time periods.
356
    ///
357
    /// This includes the current time period, as well as any other time periods we need to be
358
    /// publishing descriptors for.
359
    ///
360
    /// This is empty until we fetch our first netdir in [`Reactor::run`].
361
    time_periods: Vec<TimePeriodContext>,
362
    /// Our most up to date netdir.
363
    ///
364
    /// This is initialized in [`Reactor::run`].
365
    netdir: Option<Arc<NetDir>>,
366
    /// The timestamp of our last upload.
367
    ///
368
    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
369
    /// the publisher from spawning multiple upload tasks at once in response to multiple external
370
    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
371
    /// notifications in a short time frame (#1142), or an IPT change notification that's
372
    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
373
    /// inefficient, but it also causes the publisher to generate two different descriptors with
374
    /// the same revision counter (the revision counter is derived from the current timestamp),
375
    /// which ultimately causes the slower upload task to fail (see #1142).
376
    ///
377
    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
378
    /// used for retrying failed uploads (these are handled internally by
379
    /// [`Reactor::upload_descriptor_with_retries`]).
380
    last_uploaded: Option<Instant>,
381
    /// A max-heap containing the time periods for which we need to reupload the descriptor.
382
    // TODO: we are currently reuploading more than nececessary.
383
    // Ideally, this shouldn't contain contain duplicate TimePeriods,
384
    // because we only need to retain the latest reupload time for each time period.
385
    //
386
    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
387
    // we will end up with multiple ReuploadTimer entries for that TP,
388
    // each of which will (eventually) result in a reupload.
389
    //
390
    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
391
    //
392
    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
393
    reupload_timers: BinaryHeap<ReuploadTimer>,
394
}
395

            
396
/// The part of the reactor state that changes with every time period.
397
struct TimePeriodContext {
398
    /// The HsDir params.
399
    params: HsDirParams,
400
    /// The HsDirs to use in this time period.
401
    ///
402
    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
403
    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
404
    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
405
    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
406
    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
407
    /// The revision counter of the last successful upload, if any.
408
    last_successful: Option<RevisionCounter>,
409
}
410

            
411
impl TimePeriodContext {
412
    /// Create a new `TimePeriodContext`.
413
    ///
414
    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
415
    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
416
8
    fn new<'r>(
417
8
        params: HsDirParams,
418
8
        blind_id: HsBlindId,
419
8
        netdir: &Arc<NetDir>,
420
8
        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
421
8
    ) -> Result<Self, FatalError> {
422
8
        let period = params.time_period();
423
8
        Ok(Self {
424
8
            params,
425
8
            hs_dirs: Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?,
426
8
            last_successful: None,
427
        })
428
8
    }
429

            
430
    /// Recompute the HsDirs for this time period.
431
8
    fn compute_hsdirs<'r>(
432
8
        period: TimePeriod,
433
8
        blind_id: HsBlindId,
434
8
        netdir: &Arc<NetDir>,
435
8
        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
436
8
    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
437
8
        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
438

            
439
8
        Ok(hs_dirs
440
64
            .map(|hs_dir| {
441
64
                let mut builder = RelayIds::builder();
442
64
                if let Some(ed_id) = hs_dir.ed_identity() {
443
64
                    builder.ed_identity(*ed_id);
444
64
                }
445

            
446
64
                if let Some(rsa_id) = hs_dir.rsa_identity() {
447
64
                    builder.rsa_identity(*rsa_id);
448
64
                }
449

            
450
64
                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
451

            
452
                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
453
                // reupload it unless it was already dirty and due for a reupload.
454
64
                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
455
                    Some((_, status)) => *status,
456
64
                    None => DescriptorStatus::Dirty,
457
                };
458

            
459
64
                (relay_id, status)
460
64
            })
461
8
            .collect::<Vec<_>>())
462
8
    }
463

            
464
    /// Mark the descriptor dirty for all HSDirs of this time period.
465
16
    fn mark_all_dirty(&mut self) {
466
16
        self.hs_dirs
467
16
            .iter_mut()
468
136
            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
469
16
    }
470
}
471

            
472
/// An error that occurs while trying to upload a descriptor.
473
64
#[derive(Clone, Debug, thiserror::Error)]
474
#[non_exhaustive]
475
pub enum UploadError {
476
    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
477
    #[error("descriptor upload request failed: {}", _0.error)]
478
    Request(#[from] RequestFailedError),
479

            
480
    /// Failed to establish circuit to hidden service directory
481
    #[error("could not build circuit to HsDir")]
482
    Circuit(#[from] tor_circmgr::Error),
483

            
484
    /// Failed to establish stream to hidden service directory
485
    #[error("failed to establish directory stream to HsDir")]
486
    Stream(#[source] tor_proto::Error),
487

            
488
    /// A descriptor upload timed out before it could complete.
489
    #[error("descriptor publication timed out")]
490
    Timeout,
491

            
492
    /// An internal error.
493
    #[error("Internal error")]
494
    Bug(#[from] tor_error::Bug),
495
}
496
define_asref_dyn_std_error!(UploadError);
497

            
498
impl<R: Runtime, M: Mockable> Reactor<R, M> {
499
    /// Create a new `Reactor`.
500
    #[allow(clippy::too_many_arguments)]
501
8
    pub(super) fn new(
502
8
        runtime: R,
503
8
        nickname: HsNickname,
504
8
        dir_provider: Arc<dyn NetDirProvider>,
505
8
        mockable: M,
506
8
        config: &OnionServiceConfig,
507
8
        ipt_watcher: IptsPublisherView,
508
8
        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
509
8
        status_tx: PublisherStatusSender,
510
8
        keymgr: Arc<KeyMgr>,
511
8
    ) -> Self {
512
8
        /// The maximum size of the upload completion notifier channel.
513
8
        ///
514
8
        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
515
8
        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
516
8
        /// each sender will send exactly one message.
517
8
        const UPLOAD_CHAN_BUF_SIZE: usize = 0;
518
8

            
519
8
        let (upload_task_complete_tx, upload_task_complete_rx) =
520
8
            mpsc::channel(UPLOAD_CHAN_BUF_SIZE);
521
8

            
522
8
        let (publish_status_tx, publish_status_rx) = watch::channel();
523
8
        // Setting the buffer size to zero here is OK,
524
8
        // since we never actually send anything on this channel.
525
8
        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
526
8

            
527
8
        let authorized_clients = Self::read_authorized_clients(&config.restricted_discovery);
528
8

            
529
8
        // Create a channel for watching for changes in the configured
530
8
        // restricted_discovery.key_dirs.
531
8
        let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
532
8

            
533
8
        let imm = Immutable {
534
8
            runtime,
535
8
            mockable,
536
8
            nickname,
537
8
            keymgr,
538
8
            status_tx,
539
8
            authorized_clients: Arc::new(Mutex::new(authorized_clients)),
540
8
        };
541
8

            
542
8
        let inner = Inner {
543
8
            time_periods: vec![],
544
8
            config: Arc::new(config.into()),
545
8
            file_watcher: None,
546
8
            netdir: None,
547
8
            last_uploaded: None,
548
8
            reupload_timers: Default::default(),
549
8
        };
550
8

            
551
8
        Self {
552
8
            imm: Arc::new(imm),
553
8
            inner: Arc::new(Mutex::new(inner)),
554
8
            dir_provider,
555
8
            ipt_watcher,
556
8
            config_rx,
557
8
            key_dirs_rx,
558
8
            key_dirs_tx,
559
8
            publish_status_rx,
560
8
            publish_status_tx,
561
8
            upload_task_complete_rx,
562
8
            upload_task_complete_tx,
563
8
            shutdown_tx,
564
8
        }
565
8
    }
566

            
567
    /// Start the reactor.
568
    ///
569
    /// Under normal circumstances, this function runs indefinitely.
570
    ///
571
    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
572
    /// upload fails or is rate-limited.
573
8
    pub(super) async fn run(mut self) -> Result<(), FatalError> {
574
8
        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
575

            
576
        {
577
8
            let netdir = wait_for_netdir(self.dir_provider.as_ref(), Timeliness::Timely).await?;
578
8
            let time_periods = self.compute_time_periods(&netdir, &[])?;
579

            
580
8
            let mut inner = self.inner.lock().expect("poisoned lock");
581
8

            
582
8
            inner.netdir = Some(netdir);
583
8
            inner.time_periods = time_periods;
584
8
        }
585
8

            
586
8
        // Create the initial key_dirs watcher.
587
8
        self.update_file_watcher();
588

            
589
        loop {
590
92
            match self.run_once().await {
591
84
                Ok(ShutdownStatus::Continue) => continue,
592
                Ok(ShutdownStatus::Terminate) => {
593
                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
594

            
595
                    self.imm.status_tx.send_shutdown();
596
                    return Ok(());
597
                }
598
                Err(e) => {
599
                    // TODO: update the publish status (see also the module-level TODO about this).
600
                    error_report!(
601
                        e,
602
                        "HS service {}: descriptor publisher crashed!",
603
                        self.imm.nickname
604
                    );
605

            
606
                    self.imm.status_tx.send_broken(e.clone());
607

            
608
                    return Err(e);
609
                }
610
            }
611
        }
612
    }
613

            
614
    /// Run one iteration of the reactor loop.
615
92
    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
616
92
        let mut netdir_events = self.dir_provider.events();
617
92

            
618
92
        // Note: TrackingNow tracks the values it is compared with.
619
92
        // This is equivalent to sleeping for (until - now) units of time,
620
92
        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
621
92
        if let PublishStatus::RateLimited(until) = self.status() {
622
            if upload_rate_lim > until {
623
                // We are no longer rate-limited
624
                self.expire_rate_limit().await?;
625
            }
626
92
        }
627

            
628
92
        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
629
92
        let mut reupload_periods = vec![];
630
92
        {
631
92
            let mut inner = self.inner.lock().expect("poisoned lock");
632
92
            let inner = &mut *inner;
633
100
            while let Some(reupload) = inner.reupload_timers.peek().copied() {
634
                // First, extract all the timeouts that already elapsed.
635
20
                if reupload.when <= reupload_tracking {
636
8
                    inner.reupload_timers.pop();
637
8
                    reupload_periods.push(reupload.period);
638
8
                } else {
639
                    // We are not ready to schedule any more reuploads.
640
                    //
641
                    // How much we need to sleep is implicitly
642
                    // tracked in reupload_tracking (through
643
                    // the TrackingNow implementation)
644
12
                    break;
645
                }
646
            }
647
        }
648

            
649
        // Check if it's time to schedule any reuploads.
650
100
        for period in reupload_periods {
651
8
            if self.mark_dirty(&period) {
652
8
                debug!(
653
                    time_period=?period,
654
                    "descriptor reupload timer elapsed; scheduling reupload",
655
                );
656
8
                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
657
                    .await?;
658
            }
659
        }
660

            
661
        select_biased! {
662
            res = self.upload_task_complete_rx.next().fuse() => {
663
                let Some(upload_res) = res else {
664
                    return Ok(ShutdownStatus::Terminate);
665
                };
666

            
667
                self.handle_upload_results(upload_res);
668
            },
669
            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
670
                self.expire_rate_limit().await?;
671
            },
672
            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
673
                // Run another iteration, executing run_once again. This time, we will remove the
674
                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
675
                // relevant HsDirs, and schedule the upload by setting our status to
676
                // UploadScheduled.
677
                return Ok(ShutdownStatus::Continue);
678
            },
679
            netdir_event = netdir_events.next().fuse() => {
680
                let Some(netdir_event) = netdir_event else {
681
                    debug!("netdir event stream ended");
682
                    return Ok(ShutdownStatus::Terminate);
683
                };
684

            
685
                if !matches!(netdir_event, DirEvent::NewConsensus) {
686
                    return Ok(ShutdownStatus::Continue);
687
                };
688

            
689
                // The consensus changed. Grab a new NetDir.
690
                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
691
                    Ok(y) => y,
692
                    Err(e) => {
693
                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
694
                        // Hopefully a netdir will appear in the future.
695
                        // in the meantime, suspend operations.
696
                        //
697
                        // TODO (#1218): there is a bug here: we stop reading on our inputs
698
                        // including eg publish_status_rx, but it is our job to log some of
699
                        // these things.  While we are waiting for a netdir, all those messages
700
                        // are "stuck"; they'll appear later, with misleading timestamps.
701
                        //
702
                        // Probably this should be fixed by moving the logging
703
                        // out of the reactor, where it won't be blocked.
704
                        wait_for_netdir(self.dir_provider.as_ref(), Timeliness::Timely)
705
                            .await?
706
                    }
707
                };
708
                let relevant_periods = netdir.hs_all_time_periods();
709
                self.handle_consensus_change(netdir).await?;
710
                expire_publisher_keys(
711
                    &self.imm.keymgr,
712
                    &self.imm.nickname,
713
                    &relevant_periods,
714
                ).unwrap_or_else(|e| {
715
                    error_report!(e, "failed to remove expired keys");
716
                });
717
            }
718
            update = self.ipt_watcher.await_update().fuse() => {
719
                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
720
                    return Ok(ShutdownStatus::Terminate);
721
                }
722
            },
723
            config = self.config_rx.next().fuse() => {
724
                let Some(config) = config else {
725
                    return Ok(ShutdownStatus::Terminate);
726
                };
727

            
728
                self.handle_svc_config_change(&config).await?;
729
            },
730
            res = self.key_dirs_rx.next().fuse() => {
731
                let Some(event) = res else {
732
                    return Ok(ShutdownStatus::Terminate);
733
                };
734

            
735
                while let Some(_ignore) = self.key_dirs_rx.try_recv() {
736
                    // Discard other events, so that we only reload once.
737
                }
738

            
739
                self.handle_key_dirs_change(event).await?;
740
            }
741
            should_upload = self.publish_status_rx.next().fuse() => {
742
                let Some(should_upload) = should_upload else {
743
                    return Ok(ShutdownStatus::Terminate);
744
                };
745

            
746
                // Our PublishStatus changed -- are we ready to publish?
747
                if should_upload == PublishStatus::UploadScheduled {
748
                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
749
                    self.upload_all().await?;
750
                }
751
            }
752
        }
753

            
754
76
        Ok(ShutdownStatus::Continue)
755
84
    }
756

            
757
    /// Returns the current status of the publisher
758
156
    fn status(&self) -> PublishStatus {
759
156
        *self.publish_status_rx.borrow()
760
156
    }
761

            
762
    /// Handle a batch of upload outcomes,
763
    /// possibly updating the status of the descriptor for the corresponding HSDirs.
764
12
    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
765
12
        let mut inner = self.inner.lock().expect("poisoned lock");
766
12
        let inner = &mut *inner;
767
12

            
768
12
        // Check which time period these uploads pertain to.
769
12
        let period = inner
770
12
            .time_periods
771
12
            .iter_mut()
772
12
            .find(|ctx| ctx.params.time_period() == results.time_period);
773

            
774
12
        let Some(period) = period else {
775
            // The uploads were for a time period that is no longer relevant, so we
776
            // can ignore the result.
777
            return;
778
        };
779

            
780
        // We will need to reupload this descriptor at at some point, so we pick
781
        // a random time between 60 minutes and 120 minutes in the future.
782
        //
783
        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
784
12
        let mut rng = self.imm.mockable.thread_rng();
785
12
        // TODO SPEC: Control republish period using a consensus parameter?
786
12
        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
787
12
        let duration = Duration::from_secs(minutes * 60);
788
12
        let reupload_when = self.imm.runtime.now() + duration;
789
12
        let time_period = period.params.time_period();
790
12

            
791
12
        info!(
792
            time_period=?time_period,
793
            "reuploading descriptor in {}",
794
            humantime::format_duration(duration),
795
        );
796

            
797
12
        inner.reupload_timers.push(ReuploadTimer {
798
12
            period: time_period,
799
12
            when: reupload_when,
800
12
        });
801

            
802
108
        for upload_res in results.hsdir_result {
803
96
            let relay = period
804
96
                .hs_dirs
805
96
                .iter_mut()
806
432
                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
807

            
808
96
            let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
809
                // This HSDir went away, so the result doesn't matter.
810
                // Continue processing the rest of the results
811
                continue;
812
            };
813

            
814
96
            if upload_res.upload_res == UploadStatus::Success {
815
96
                let update_last_successful = match period.last_successful {
816
4
                    None => true,
817
92
                    Some(counter) => counter <= upload_res.revision_counter,
818
                };
819

            
820
96
                if update_last_successful {
821
96
                    period.last_successful = Some(upload_res.revision_counter);
822
96
                    // TODO (#1098): Is it possible that this won't update the statuses promptly
823
96
                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
824
96
                    // and start an upload task for a descriptor has already been uploaded (or is
825
96
                    // being uploaded) in another task, but whose upload results have not yet been
826
96
                    // processed.
827
96
                    //
828
96
                    // This is probably made worse by the fact that the statuses are updated in
829
96
                    // batches (grouped by time period), rather than one by one as the upload tasks
830
96
                    // complete (updating the status involves locking the inner mutex, and I wanted
831
96
                    // to minimize the locking/unlocking overheads). I'm not sure handling the
832
96
                    // updates in batches was the correct decision here.
833
96
                    *status = DescriptorStatus::Clean;
834
96
                }
835
            }
836
        }
837
12
    }
838

            
839
    /// Maybe update our list of HsDirs.
840
    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
841
        trace!("the consensus has changed; recomputing HSDirs");
842

            
843
        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
844

            
845
        self.recompute_hs_dirs()?;
846
        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
847
            .await?;
848

            
849
        Ok(())
850
    }
851

            
852
    /// Recompute the HsDirs for all relevant time periods.
853
    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
854
        let mut inner = self.inner.lock().expect("poisoned lock");
855
        let inner = &mut *inner;
856

            
857
        let netdir = Arc::clone(
858
            inner
859
                .netdir
860
                .as_ref()
861
                .ok_or_else(|| internal!("started upload task without a netdir"))?,
862
        );
863

            
864
        // Update our list of relevant time periods.
865
        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
866
        inner.time_periods = new_time_periods;
867

            
868
        Ok(())
869
    }
870

            
871
    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
872
    ///
873
    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
874
    /// HsDirs where possible.
875
8
    fn compute_time_periods(
876
8
        &self,
877
8
        netdir: &Arc<NetDir>,
878
8
        time_periods: &[TimePeriodContext],
879
8
    ) -> Result<Vec<TimePeriodContext>, FatalError> {
880
8
        netdir
881
8
            .hs_all_time_periods()
882
8
            .iter()
883
8
            .map(|params| {
884
8
                let period = params.time_period();
885
8
                let blind_id_kp =
886
8
                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
887
                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
888
                        // It's supposed to return Ok(None) if we're in offline hsid mode,
889
                        // but that might change when we do #1194
890
8
                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;
891

            
892
8
                let blind_id: HsBlindIdKey = (&blind_id_kp).into();
893

            
894
                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
895
                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
896
                // publishing the descriptor to the HsDirs that already have it (the ones that are
897
                // marked with DescriptorStatus::Clean).
898
                //
899
                // In other words, we only want to publish to those HsDirs that
900
                //   * are part of a new time period (which we have never published the descriptor
901
                //   for), or
902
                //   * have just been added to the ring of a time period we already knew about
903
8
                if let Some(ctx) = time_periods
904
8
                    .iter()
905
8
                    .find(|ctx| ctx.params.time_period() == period)
906
                {
907
                    TimePeriodContext::new(
908
                        params.clone(),
909
                        blind_id.into(),
910
                        netdir,
911
                        ctx.hs_dirs.iter(),
912
                    )
913
                } else {
914
                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
915
                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
916
8
                    TimePeriodContext::new(params.clone(), blind_id.into(), netdir, iter::empty())
917
                }
918
8
            })
919
8
            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
920
8
    }
921

            
922
    /// Replace the old netdir with the new, returning the old.
923
    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
924
        self.inner
925
            .lock()
926
            .expect("poisoned lock")
927
            .netdir
928
            .replace(new_netdir)
929
    }
930

            
931
    /// Replace our view of the service config with `new_config` if `new_config` contains changes
932
    /// that would cause us to generate a new descriptor.
933
8
    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
934
8
        let mut inner = self.inner.lock().expect("poisoned lock");
935
8
        let old_config = &mut inner.config;
936
8

            
937
8
        // The fields we're interested in haven't changed, so there's no need to update
938
8
        // `inner.config`.
939
8
        if *old_config == new_config {
940
8
            return false;
941
        }
942

            
943
        let log_change = match (
944
            old_config.restricted_discovery.enabled,
945
            new_config.restricted_discovery.enabled,
946
        ) {
947
            (true, false) => Some("Disabling restricted discovery mode"),
948
            (false, true) => Some("Enabling restricted discovery mode"),
949
            _ => None,
950
        };
951

            
952
        if let Some(msg) = log_change {
953
            info!(nickname=%self.imm.nickname, "{}", msg);
954
        }
955

            
956
        let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
957

            
958
        true
959
8
    }
960

            
961
    /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
962
16
    fn update_file_watcher(&self) {
963
16
        let mut inner = self.inner.lock().expect("poisoned lock");
964
16
        if inner.config.restricted_discovery.watch_configuration() {
965
            debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
966
            let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
967

            
968
            let dirs = inner.config.restricted_discovery.key_dirs().clone();
969

            
970
            watch_dirs(&mut watcher, &dirs);
971

            
972
            let watcher = watcher
973
                .start_watching(self.key_dirs_tx.clone())
974
                .map_err(|e| {
975
                    error_report!(e, "Cannot set file watcher");
976
                })
977
                .ok();
978
            inner.file_watcher = watcher;
979
        } else {
980
16
            if inner.file_watcher.is_some() {
981
                debug!("removing key_dirs watcher");
982
16
            }
983
16
            inner.file_watcher = None;
984
        }
985
16
    }
986

            
987
    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
988
    /// uploading.
989
8
    fn note_ipt_change(&self) -> PublishStatus {
990
8
        let mut ipts = self.ipt_watcher.borrow_for_publish();
991
8
        match ipts.ipts.as_mut() {
992
8
            Some(_ipts) => PublishStatus::UploadScheduled,
993
            None => PublishStatus::AwaitingIpts,
994
        }
995
8
    }
996

            
997
    /// Update our list of introduction points.
998
8
    async fn handle_ipt_change(
999
8
        &mut self,
8
        update: Option<Result<(), crate::FatalError>>,
8
    ) -> Result<ShutdownStatus, FatalError> {
8
        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
8
        match update {
            Some(Ok(())) => {
8
                let should_upload = self.note_ipt_change();
8
                debug!(nickname=%self.imm.nickname, "the introduction points have changed");
8
                self.mark_all_dirty();
8
                self.update_publish_status_unless_rate_lim(should_upload)
                    .await?;
8
                Ok(ShutdownStatus::Continue)
            }
            Some(Err(e)) => Err(e),
            None => {
                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
                Ok(ShutdownStatus::Terminate)
            }
        }
8
    }
    /// Update the `PublishStatus` of the reactor with `new_state`,
    /// unless the current state is `AwaitingIpts`.
16
    async fn update_publish_status_unless_waiting(
16
        &mut self,
16
        new_state: PublishStatus,
16
    ) -> Result<(), FatalError> {
16
        // Only update the state if we're not waiting for intro points.
16
        if self.status() != PublishStatus::AwaitingIpts {
16
            self.update_publish_status(new_state).await?;
        }
16
        Ok(())
16
    }
    /// Update the `PublishStatus` of the reactor with `new_state`,
    /// unless the current state is `RateLimited`.
16
    async fn update_publish_status_unless_rate_lim(
16
        &mut self,
16
        new_state: PublishStatus,
16
    ) -> Result<(), FatalError> {
        // We can't exit this state until the rate-limit expires.
16
        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
16
            self.update_publish_status(new_state).await?;
        }
16
        Ok(())
16
    }
    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
32
    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), FatalError> {
32
        let onion_status = match new_state {
16
            PublishStatus::Idle => State::Running,
            PublishStatus::UploadScheduled
            | PublishStatus::AwaitingIpts
16
            | PublishStatus::RateLimited(_) => State::Bootstrapping,
        };
32
        self.imm.status_tx.send(onion_status, None);
32

            
32
        trace!(
            "publisher reactor status change: {:?} -> {:?}",
            self.status(),
            new_state
        );
32
        self.publish_status_tx.send(new_state).await.map_err(
32
            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
32
        )?;
32
        Ok(())
32
    }
    /// Update the descriptors based on the config change.
8
    async fn handle_svc_config_change(
8
        &mut self,
8
        config: &OnionServiceConfig,
8
    ) -> Result<(), FatalError> {
8
        let new_config = Arc::new(config.into());
8
        if self.replace_config_if_changed(Arc::clone(&new_config)) {
            self.update_file_watcher();
            self.update_authorized_clients_if_changed().await?;
            info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
            self.mark_all_dirty();
            // Schedule an upload, unless we're still waiting for IPTs.
            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
                .await?;
8
        }
8
        Ok(())
8
    }
    /// Update the descriptors based on a restricted discovery key_dirs change.
    ///
    /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
    /// this marks the descriptor as dirty for all time periods,
    /// and schedules a reupload.
8
    async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
8
        debug!("The configured key_dirs have changed");
8
        match event {
8
            FileEvent::Rescan | FileEvent::FileChanged => {
8
                // These events are handled in the same way, by re-reading the keys from disk
8
                // and republishing the descriptor if necessary
8
            }
            _ => return Err(internal!("file watcher event {event:?}").into()),
        };
        // Update the file watcher, in case the change was triggered by a key_dir move.
8
        self.update_file_watcher();
8

            
8
        if self.update_authorized_clients_if_changed().await? {
            self.mark_all_dirty();
            // Schedule an upload, unless we're still waiting for IPTs.
            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
                .await?;
8
        }
8
        Ok(())
8
    }
    /// Recreate the authorized_clients based on the current config.
    ///
    /// Returns `true` if the authorized clients have changed.
8
    async fn update_authorized_clients_if_changed(&mut self) -> Result<bool, FatalError> {
8
        let authorized_clients = {
8
            let inner = self.inner.lock().expect("poisoned lock");
8
            Self::read_authorized_clients(&inner.config.restricted_discovery)
8
        };
8

            
8
        let mut clients_lock = self.imm.authorized_clients.lock().expect("poisoned lock");
8
        let changed = clients_lock.as_ref() != authorized_clients.as_ref();
8

            
8
        if changed {
            info!("The restricted discovery mode authorized clients have changed");
            *clients_lock = authorized_clients;
8
        }
8
        Ok(changed)
8
    }
    /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
16
    fn read_authorized_clients(
16
        config: &RestrictedDiscoveryConfig,
16
    ) -> Option<RestrictedDiscoveryKeys> {
16
        let authorized_clients = config.read_keys();
16
        if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
            warn!(
                "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
            );
16
        }
16
        authorized_clients
16
    }
    /// Mark the descriptor dirty for all time periods.
8
    fn mark_all_dirty(&self) {
8
        trace!("marking the descriptor dirty for all time periods");
8
        self.inner
8
            .lock()
8
            .expect("poisoned lock")
8
            .time_periods
8
            .iter_mut()
8
            .for_each(|tp| tp.mark_all_dirty());
8
    }
    /// Mark the descriptor dirty for the specified time period.
    ///
    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
8
    fn mark_dirty(&self, period: &TimePeriod) -> bool {
8
        let mut inner = self.inner.lock().expect("poisoned lock");
8
        let period_ctx = inner
8
            .time_periods
8
            .iter_mut()
8
            .find(|tp| tp.params.time_period() == *period);
8

            
8
        match period_ctx {
8
            Some(ctx) => {
8
                trace!(time_period=?period, "marking the descriptor dirty");
8
                ctx.mark_all_dirty();
8
                true
            }
            None => false,
        }
8
    }
    /// Try to upload our descriptor to the HsDirs that need it.
    ///
    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
    ///
    /// Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we
    /// implement it, as well as in what cases this will return an error).
16
    async fn upload_all(&mut self) -> Result<(), FatalError> {
16
        trace!("starting descriptor upload task...");
16
        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
16
        let now = self.imm.runtime.now();
        // Check if we should rate-limit this upload.
16
        if let Some(ts) = last_uploaded {
8
            let duration_since_upload = now.duration_since(ts);
8

            
8
            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
                return self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await;
8
            }
8
        }
16
        let mut inner = self.inner.lock().expect("poisoned lock");
16
        let inner = &mut *inner;
16

            
16
        let _ = inner.last_uploaded.insert(now);
16
        for period_ctx in inner.time_periods.iter_mut() {
16
            let upload_task_complete_tx = self.upload_task_complete_tx.clone();
16

            
16
            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
16
            // have our latest descriptor, so we filter them out).
16
            let hs_dirs = period_ctx
16
                .hs_dirs
16
                .iter()
128
                .filter_map(|(relay_id, status)| {
128
                    if *status == DescriptorStatus::Dirty {
128
                        Some(relay_id.clone())
                    } else {
                        None
                    }
128
                })
16
                .collect::<Vec<_>>();
16

            
16
            if hs_dirs.is_empty() {
                trace!("the descriptor is clean for all HSDirs. Nothing to do");
                return Ok(());
16
            }
16

            
16
            let time_period = period_ctx.params.time_period();
            // This scope exists because rng is not Send, so it needs to fall out of scope before we
            // await anything.
16
            let netdir = Arc::clone(
16
                inner
16
                    .netdir
16
                    .as_ref()
16
                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
            );
16
            let imm = Arc::clone(&self.imm);
16
            let ipt_upload_view = self.ipt_watcher.upload_view();
16
            let config = Arc::clone(&inner.config);
16

            
16
            trace!(nickname=%self.imm.nickname, time_period=?time_period,
                "spawning upload task"
            );
16
            let params = period_ctx.params.clone();
16
            let shutdown_rx = self.shutdown_tx.subscribe();
            // Spawn a task to upload the descriptor to all HsDirs of this time period.
            //
            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
            // dropped).
16
            let _handle: () = self
16
                .imm
16
                .runtime
16
                .spawn(async move {
16
                    if let Err(e) = Self::upload_for_time_period(
16
                        hs_dirs,
16
                        &netdir,
16
                        config,
16
                        params,
16
                        Arc::clone(&imm),
16
                        ipt_upload_view.clone(),
16
                        upload_task_complete_tx,
16
                        shutdown_rx,
16
                    )
24
                    .await
                    {
                        error_report!(
                            e,
                            "descriptor upload failed for HS service {} and time period {:?}",
                            imm.nickname,
                            time_period
                        );
12
                    }
16
                })
16
                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
        }
16
        Ok(())
16
    }
    /// Upload the descriptor for the specified time period.
    ///
    /// Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we
    /// implement it, as well as in what cases this will return an error).
    #[allow(clippy::too_many_arguments)] // TODO: refactor
16
    async fn upload_for_time_period(
16
        hs_dirs: Vec<RelayIds>,
16
        netdir: &Arc<NetDir>,
16
        config: Arc<OnionServiceConfigPublisherView>,
16
        params: HsDirParams,
16
        imm: Arc<Immutable<R, M>>,
16
        ipt_upload_view: IptsPublisherUploadView,
16
        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
16
        shutdown_rx: broadcast::Receiver<Void>,
16
    ) -> Result<(), FatalError> {
16
        let time_period = params.time_period();
16
        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
16
        let hsdir_count = hs_dirs.len();
        /// An error returned from an upload future.
        //
        // Exhaustive, because this is a private type.
        #[derive(Clone, Debug, thiserror::Error)]
        enum PublishError {
            /// The upload was aborted because there are no IPTs.
            ///
            /// This happens because of an inevitable TOCTOU race, where after being notified by
            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
            /// we find out there actually are no IPTs, so we can't build the descriptor.
            ///
            /// This is a special kind of error that interrupts the current upload task, and is
            /// logged at `debug!` level rather than `warn!` or `error!`.
            ///
            /// Ideally, this shouldn't happen very often (if at all).
            #[error("No IPTs")]
            NoIpts,
            /// The reactor has shut down
            #[error("The reactor has shut down")]
            Shutdown,
            /// An fatal error.
            #[error("{0}")]
            Fatal(#[from] FatalError),
        }
16
        let upload_results = futures::stream::iter(hs_dirs)
128
            .map(|relay_ids| {
128
                let netdir = netdir.clone();
128
                let config = Arc::clone(&config);
128
                let imm = Arc::clone(&imm);
128
                let ipt_upload_view = ipt_upload_view.clone();
128
                let params = params.clone();
128
                let mut shutdown_rx = shutdown_rx.clone();
128

            
128
                let ed_id = relay_ids
128
                    .rsa_identity()
128
                    .map(|id| id.to_string())
128
                    .unwrap_or_else(|| "unknown".into());
128
                let rsa_id = relay_ids
128
                    .rsa_identity()
128
                    .map(|id| id.to_string())
128
                    .unwrap_or_else(|| "unknown".into());
128
                async move {
128
                    let run_upload = |desc| async {
128
                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
128
                            // This should never happen (all of our relay_ids are from the stored
128
                            // netdir).
128
                            warn!(
128
                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
                                "tried to upload descriptor to relay not found in consensus?!"
128
                            );
128
                            return UploadStatus::Failure;
128
                        };
128

            
128
                        Self::upload_descriptor_with_retries(
128
                            desc,
128
                            &netdir,
128
                            &hsdir,
128
                            &ed_id,
128
                            &rsa_id,
128
                            Arc::clone(&imm),
128
                        )
128
                        .await
128
                    };
                    // How long until we're supposed to time out?
128
                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
                    // We generate a new descriptor before _each_ HsDir upload. This means each
                    // HsDir could, in theory, receive a different descriptor (not just in terms of
                    // revision-counters, but also with a different set of IPTs). It may seem like
                    // this could lead to some HsDirs being left with an outdated descriptor, but
                    // that's not the case: after the upload completes, the publisher will be
                    // notified by the ipt_watcher of the IPT change event (if there was one to
                    // begin with), which will trigger another upload job.
128
                    let hsdesc = {
                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
                        // needs to fall out of scope before the await point below
128
                        let mut ipt_set = ipt_upload_view.borrow_for_publish();
                        // If there are no IPTs, we abort the upload. At this point, we might have
                        // uploaded the descriptor to some, but not all, HSDirs from the specified
                        // time period.
                        //
                        // Returning an error here means the upload completion task is never
                        // notified of the outcome of any of these uploads (which means the
                        // descriptor is not marked clean). This is OK, because if we suddenly find
                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
                        // and generate a fresh descriptor anyway.
                        //
                        // Ideally, this shouldn't happen very often (if at all).
128
                        let Some(ipts) = ipt_set.ipts.as_mut() else {
                            return Err(PublishError::NoIpts);
                        };
128
                        let hsdesc = {
128
                            trace!(
                                nickname=%imm.nickname, time_period=?time_period,
                                "building descriptor"
                            );
128
                            let mut rng = imm.mockable.thread_rng();
128

            
128
                            // We're about to generate a new version of the descriptor,
128
                            // so let's generate a new revision counter.
128
                            let now = imm.runtime.wallclock();
128
                            let revision_counter = imm.generate_revision_counter(&params, now)?;
128
                            build_sign(
128
                                &imm.keymgr,
128
                                &config,
128
                                &imm.authorized_clients,
128
                                ipts,
128
                                time_period,
128
                                revision_counter,
128
                                &mut rng,
128
                                imm.runtime.wallclock(),
128
                            )?
                        };
                        if let Err(e) =
128
                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
                        {
                            let wait = e.log_retry_max(&imm.nickname)?;
                            // TODO (#1226): retry instead of this
                            return Err(FatalError::Bug(internal!(
                                "ought to retry after {wait:?}, crashing instead"
                            ))
                            .into());
128
                        }
128

            
128
                        hsdesc
128
                    };
128

            
128
                    let VersionedDescriptor {
128
                        desc,
128
                        revision_counter,
128
                    } = hsdesc;
128

            
128
                    trace!(
                        nickname=%imm.nickname, time_period=?time_period,
                        revision_counter=?revision_counter,
                        "generated new descriptor for time period",
                    );
                    // (Actually launch the upload attempt. No timeout is needed
                    // here, since the backoff::Runner code will handle that for us.)
96
                    let upload_res = select_biased! {
                        shutdown = shutdown_rx.next().fuse() => {
                            // This will always be None, since Void is uninhabited.
                            let _: Option<Void> = shutdown;
                            // It looks like the reactor has shut down,
                            // so there is no point in uploading the descriptor anymore.
                            //
                            // Let's shut down the upload task too.
                            trace!(
                                nickname=%imm.nickname, time_period=?time_period,
                                "upload task received shutdown signal"
                            );
                            return Err(PublishError::Shutdown);
                        },
                        res = run_upload(desc.clone()).fuse() => res,
                    };
                    // Note: UploadStatus::Failure is only returned when
                    // upload_descriptor_with_retries fails, i.e. if all our retry
                    // attempts have failed
96
                    Ok(HsDirUploadStatus {
96
                        relay_ids,
96
                        upload_res,
96
                        revision_counter,
96
                    })
96
                }
128
            })
16
            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
16
            .boxed()
16
            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
16
            .try_collect::<Vec<_>>()
12
            .await;
12
        let upload_results = match upload_results {
12
            Ok(v) => v,
            Err(PublishError::Fatal(e)) => return Err(e),
            Err(PublishError::NoIpts) => {
                debug!(
                    nickname=%imm.nickname, time_period=?time_period,
                     "no introduction points; skipping upload"
                );
                return Ok(());
            }
            Err(PublishError::Shutdown) => {
                debug!(
                    nickname=%imm.nickname, time_period=?time_period,
                     "the reactor has shut down; aborting upload"
                );
                return Ok(());