tor_hsservice/publish/reactor.rs
1//! The onion service publisher reactor.
2//!
3//! Generates and publishes hidden service descriptors in response to various events.
4//!
5//! [`Reactor::run`] is the entry-point of the reactor. It starts the reactor,
6//! and runs until [`Reactor::run_once`] returns [`ShutdownStatus::Terminate`]
7//! or a fatal error occurs. `ShutdownStatus::Terminate` is returned if
8//! any of the channels the reactor is receiving events from is closed
9//! (i.e. when the senders are dropped).
10//!
11//! ## Publisher status
12//!
13//! The publisher has an internal [`PublishStatus`], distinct from its [`State`],
14//! which is used for onion service status reporting.
15//!
16//! The main loop of the reactor reads the current `PublishStatus` from `publish_status_rx`,
17//! and responds by generating and publishing a new descriptor if needed.
18//!
19//! See [`PublishStatus`] and [`Reactor::publish_status_rx`] for more details.
20//!
21//! ## When do we publish?
22//!
23//! We generate and publish a new descriptor if
24//! * the introduction points have changed
25//! * the onion service configuration has changed in a meaningful way (for example,
26//! if the `restricted_discovery` configuration or its [`Anonymity`](crate::Anonymity)
27//! has changed. See [`OnionServiceConfigPublisherView`]).
28//! * there is a new consensus
29//! * it is time to republish the descriptor (after we upload a descriptor,
30//! we schedule it for republishing at a random time between 60 minutes and 120 minutes
31//! in the future)
32//!
33//! ## Onion service status
34//!
35//! With respect to [`OnionServiceStatus`] reporting,
36//! the following state transitions are possible:
37//!
38//!
39//! ```ignore
40//!
41//! update_publish_status(UploadScheduled|AwaitingIpts|RateLimited)
42//! +---------------------------------------+
43//! | |
44//! | v
45//! | +---------------+
46//! | | Bootstrapping |
47//! | +---------------+
48//! | |
49//! | | uploaded to at least
50//! | not enough HsDir uploads succeeded | some HsDirs from each ring
51//! | +-----------------------------+-----------------------+
52//! | | | |
53//! | | all HsDir uploads succeeded |
54//! | | | |
55//! | v v v
56//! | +---------------------+ +---------+ +---------------------+
57//! | | DegradedUnreachable | | Running | | DegradedReachable |
58//! +----------+ | +---------------------+ +---------+ +---------------------+
59//! | Shutdown |-- | | | |
60//! +----------+ | | | |
61//! | | | |
62//! | | | |
63//! | +---------------------------+------------------------+
64//! | | invalid authorized_clients
65//! | | after handling config change
66//! | |
67//! | v
68//! | run_once() returns an error +--------+
69//! +-------------------------------->| Broken |
70//! +--------+
71//! ```
72//!
73//! We can also transition from `Broken`, `DegradedReachable`, or `DegradedUnreachable`
74//! back to `Bootstrapping` (those transitions were omitted for brevity).
75
76use tor_config::file_watcher::{
77 self, Event as FileEvent, FileEventReceiver, FileEventSender, FileWatcher, FileWatcherBuilder,
78};
79use tor_config_path::{CfgPath, CfgPathResolver};
80use tor_netdir::{DirEvent, NetDir};
81
82use crate::config::restricted_discovery::{
83 DirectoryKeyProviderList, RestrictedDiscoveryConfig, RestrictedDiscoveryKeys,
84};
85use crate::config::OnionServiceConfigPublisherView;
86use crate::status::{DescUploadRetryError, Problem};
87
88use super::*;
89
90// TODO-CLIENT-AUTH: perhaps we should add a separate CONFIG_CHANGE_REPUBLISH_DEBOUNCE_INTERVAL
91// for rate-limiting the publish jobs triggered by a change in the config?
92//
93// Currently the descriptor publish tasks triggered by changes in the config
94// are rate-limited via the usual rate limiting mechanism
95// (which rate-limits the uploads for 1m).
96//
97// I think this is OK for now, but we might need to rethink this if it becomes problematic
98// (for example, we might want an even longer rate-limit, or to reset any existing rate-limits
99// each time the config is modified).
100
101/// The upload rate-limiting threshold.
102///
103/// Before initiating an upload, the reactor checks if the last upload was at least
104/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
105/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
106/// current time.
107//
108// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
109const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
110
111/// The maximum number of concurrent upload tasks per time period.
112//
113// TODO: this value was arbitrarily chosen and may not be optimal. For now, it
114// will have no effect, since the current number of replicas is far less than
115// this value.
116//
117// The uploads for all TPs happen in parallel. As a result, the actual limit for the maximum
118// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
119// (currently 2, which means the concurrency limit will, in fact, be 32).
120//
121// We should try to decouple this value from the TP parameters.
122const MAX_CONCURRENT_UPLOADS: usize = 16;
123
124/// The maximum time allowed for uploading a descriptor to a single HSDir,
125/// across all attempts.
126pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
127
128/// A reactor for the HsDir [`Publisher`]
129///
130/// The entrypoint is [`Reactor::run`].
131#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
132pub(super) struct Reactor<R: Runtime, M: Mockable> {
133 /// The immutable, shared inner state.
134 imm: Arc<Immutable<R, M>>,
135 /// A source for new network directories that we use to determine
136 /// our HsDirs.
137 dir_provider: Arc<dyn NetDirProvider>,
138 /// The mutable inner state,
139 inner: Arc<Mutex<Inner>>,
140 /// A channel for receiving IPT change notifications.
141 ipt_watcher: IptsPublisherView,
142 /// A channel for receiving onion service config change notifications.
143 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
144 /// A channel for receiving restricted discovery key_dirs change notifications.
145 key_dirs_rx: FileEventReceiver,
146 /// A channel for sending restricted discovery key_dirs change notifications.
147 ///
148 /// A copy of this sender is handed out to every `FileWatcher` created.
149 key_dirs_tx: FileEventSender,
150 /// A channel for receiving updates regarding our [`PublishStatus`].
151 ///
152 /// The main loop of the reactor watches for updates on this channel.
153 ///
154 /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
155 /// we can start publishing descriptors.
156 ///
157 /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
158 /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
159 /// established some introduction points.
160 publish_status_rx: watch::Receiver<PublishStatus>,
161 /// A sender for updating our [`PublishStatus`].
162 ///
163 /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
164 /// we can start publishing descriptors.
165 publish_status_tx: watch::Sender<PublishStatus>,
166 /// A channel for sending upload completion notifications.
167 ///
168 /// This channel is polled in the main loop of the reactor.
169 upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
170 /// A channel for receiving upload completion notifications.
171 ///
172 /// A copy of this sender is handed to each upload task.
173 upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
174 /// A sender for notifying any pending upload tasks that the reactor is shutting down.
175 ///
176 /// Receivers can use this channel to find out when reactor is dropped.
177 ///
178 /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
179 /// Any future background tasks can also use this channel to detect if the reactor is dropped.
180 ///
181 /// Closing this channel will cause any pending upload tasks to be dropped.
182 shutdown_tx: broadcast::Sender<Void>,
183 /// Path resolver for configuration files.
184 path_resolver: Arc<CfgPathResolver>,
185 /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
186 /// rotated and thus we need to republish the descriptor for a particular time period.
187 update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
188}
189
190/// The immutable, shared state of the descriptor publisher reactor.
191#[derive(Clone)]
192struct Immutable<R: Runtime, M: Mockable> {
193 /// The runtime.
194 runtime: R,
195 /// Mockable state.
196 ///
197 /// This is used for launching circuits and for obtaining random number generators.
198 mockable: M,
199 /// The service for which we're publishing descriptors.
200 nickname: HsNickname,
201 /// The key manager,
202 keymgr: Arc<KeyMgr>,
203 /// A sender for updating the status of the onion service.
204 status_tx: PublisherStatusSender,
205 /// Proof-of-work state.
206 pow_manager: Arc<PowManager<R>>,
207}
208
209impl<R: Runtime, M: Mockable> Immutable<R, M> {
210 /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
211 /// with the specified [`TimePeriod`].
212 ///
213 /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
214 /// the private part of the blinded identity key. Otherwise, the key is the private part of the
215 /// descriptor signing key.
216 ///
217 /// Returns an error if the service is running in offline mode and the descriptor signing
218 /// keypair of the specified `period` is not available.
219 //
220 // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
221 // built from the blinded id key
222 fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
223 let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
224 Some(key) => {
225 let key: ed25519::ExpandedKeypair = key.into();
226 key.to_secret_key_bytes()[0..32]
227 .try_into()
228 .expect("Wrong length on slice")
229 }
230 None => {
231 // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
232 // is unreachable (for now).
233 let desc_sign_key_spec =
234 DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
235 let key: ed25519::Keypair = self
236 .keymgr
237 .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
238 // TODO (#1194): internal! is not the right type for this error (we need an
239 // error type for the case where a hidden service running in offline mode has
240 // run out of its pre-previsioned keys).
241 //
242 // This will be addressed when we add support for offline hs_id mode
243 .ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
244 .into();
245 key.to_bytes()
246 }
247 };
248
249 Ok(AesOpeKey::from_secret(&ope_key))
250 }
251
252 /// Generate a revision counter for a descriptor associated with the specified
253 /// [`TimePeriod`].
254 ///
255 /// Returns a revision counter generated according to the [encrypted time in period] scheme.
256 ///
257 /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
258 fn generate_revision_counter(
259 &self,
260 params: &HsDirParams,
261 now: SystemTime,
262 ) -> Result<RevisionCounter, FatalError> {
263 // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
264 // to each time we generate a new descriptor), for performance reasons.
265 let ope_key = self.create_ope_key(params.time_period())?;
266
267 // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
268 let srv_start = params.start_of_shard_rand_period();
269 let offset = params.offset_within_srv_period(now).ok_or_else(|| {
270 internal!(
271 "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
272 now,
273 srv_start
274 )
275 })?;
276 let rev = ope_key.encrypt(offset);
277
278 Ok(RevisionCounter::from(rev))
279 }
280}
281
282/// Mockable state for the descriptor publisher reactor.
283///
284/// This enables us to mock parts of the [`Reactor`] for testing purposes.
285#[async_trait]
286pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
287 /// The type of random number generator.
288 type Rng: rand::Rng + rand::CryptoRng;
289
290 /// The type of client circuit.
291 type ClientCirc: MockableClientCirc;
292
293 /// Return a random number generator.
294 fn thread_rng(&self) -> Self::Rng;
295
296 /// Create a circuit of the specified `kind` to `target`.
297 async fn get_or_launch_specific<T>(
298 &self,
299 netdir: &NetDir,
300 kind: HsCircKind,
301 target: T,
302 ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
303 where
304 T: CircTarget + Send + Sync;
305
306 /// Return an estimate-based value for how long we should allow a single
307 /// directory upload operation to complete.
308 ///
309 /// Includes circuit construction, stream opening, upload, and waiting for a
310 /// response.
311 fn estimate_upload_timeout(&self) -> Duration;
312}
313
314/// Mockable client circuit
315#[async_trait]
316pub(crate) trait MockableClientCirc: Send + Sync {
317 /// The data stream type.
318 type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
319
320 /// Start a new stream to the last relay in the circuit, using
321 /// a BEGIN_DIR cell.
322 async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
323}
324
325#[async_trait]
326impl MockableClientCirc for ClientCirc {
327 type DataStream = tor_proto::stream::DataStream;
328
329 async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
330 ClientCirc::begin_dir_stream(self).await
331 }
332}
333
334/// The real version of the mockable state of the reactor.
335#[derive(Clone, From, Into)]
336pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
337
338#[async_trait]
339impl<R: Runtime> Mockable for Real<R> {
340 type Rng = rand::rngs::ThreadRng;
341 type ClientCirc = ClientCirc;
342
343 fn thread_rng(&self) -> Self::Rng {
344 rand::rng()
345 }
346
347 async fn get_or_launch_specific<T>(
348 &self,
349 netdir: &NetDir,
350 kind: HsCircKind,
351 target: T,
352 ) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
353 where
354 T: CircTarget + Send + Sync,
355 {
356 self.0.get_or_launch_specific(netdir, kind, target).await
357 }
358
359 fn estimate_upload_timeout(&self) -> Duration {
360 use tor_circmgr::timeouts::Action;
361 let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
362 let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
363 // We assume that in the worst case we'll have to wait for an entire
364 // circuit construction and two round-trips to the hsdir.
365 let est_total = est_build + est_roundtrip * 2;
366 // We always allow _at least_ this much time, in case our estimate is
367 // ridiculously low.
368 let min_timeout = Duration::from_secs(30);
369 max(est_total, min_timeout)
370 }
371}
372
373/// The mutable state of a [`Reactor`].
374struct Inner {
375 /// The onion service config.
376 config: Arc<OnionServiceConfigPublisherView>,
377 /// Watcher for key_dirs.
378 ///
379 /// Set to `None` if the reactor is not running, or if `watch_configuration` is false.
380 ///
381 /// The watcher is recreated whenever the `restricted_discovery.key_dirs` change.
382 file_watcher: Option<FileWatcher>,
383 /// The relevant time periods.
384 ///
385 /// This includes the current time period, as well as any other time periods we need to be
386 /// publishing descriptors for.
387 ///
388 /// This is empty until we fetch our first netdir in [`Reactor::run`].
389 time_periods: Vec<TimePeriodContext>,
390 /// Our most up to date netdir.
391 ///
392 /// This is initialized in [`Reactor::run`].
393 netdir: Option<Arc<NetDir>>,
394 /// The timestamp of our last upload.
395 ///
396 /// This is the time when the last update was _initiated_ (rather than completed), to prevent
397 /// the publisher from spawning multiple upload tasks at once in response to multiple external
398 /// events happening in quick succession, such as the IPT manager sending multiple IPT change
399 /// notifications in a short time frame (#1142), or an IPT change notification that's
400 /// immediately followed by a consensus change. Starting two upload tasks at once is not only
401 /// inefficient, but it also causes the publisher to generate two different descriptors with
402 /// the same revision counter (the revision counter is derived from the current timestamp),
403 /// which ultimately causes the slower upload task to fail (see #1142).
404 ///
405 /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
406 /// used for retrying failed uploads (these are handled internally by
407 /// [`Reactor::upload_descriptor_with_retries`]).
408 last_uploaded: Option<Instant>,
409 /// A max-heap containing the time periods for which we need to reupload the descriptor.
410 // TODO: we are currently reuploading more than nececessary.
411 // Ideally, this shouldn't contain contain duplicate TimePeriods,
412 // because we only need to retain the latest reupload time for each time period.
413 //
414 // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
415 // we will end up with multiple ReuploadTimer entries for that TP,
416 // each of which will (eventually) result in a reupload.
417 //
418 // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
419 //
420 // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
421 reupload_timers: BinaryHeap<ReuploadTimer>,
422 /// The restricted discovery authorized clients.
423 ///
424 /// `None`, unless the service is running in restricted discovery mode.
425 authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
426}
427
428/// The part of the reactor state that changes with every time period.
429struct TimePeriodContext {
430 /// The HsDir params.
431 params: HsDirParams,
432 /// The HsDirs to use in this time period.
433 ///
434 // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
435 // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
436 // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
437 // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
438 hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
439 /// The revision counter of the last successful upload, if any.
440 last_successful: Option<RevisionCounter>,
441 /// The outcome of the last upload, if any.
442 upload_results: Vec<HsDirUploadStatus>,
443}
444
445impl TimePeriodContext {
446 /// Create a new `TimePeriodContext`.
447 ///
448 /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
449 /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
450 fn new<'r>(
451 params: HsDirParams,
452 blind_id: HsBlindId,
453 netdir: &Arc<NetDir>,
454 old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
455 old_upload_results: Vec<HsDirUploadStatus>,
456 ) -> Result<Self, FatalError> {
457 let period = params.time_period();
458 let hs_dirs = Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?;
459 let upload_results = old_upload_results
460 .into_iter()
461 .filter(|res|
462 // Check if the HsDir of this result still exists
463 hs_dirs
464 .iter()
465 .any(|(relay_ids, _status)| relay_ids == &res.relay_ids))
466 .collect();
467
468 Ok(Self {
469 params,
470 hs_dirs,
471 last_successful: None,
472 upload_results,
473 })
474 }
475
476 /// Recompute the HsDirs for this time period.
477 fn compute_hsdirs<'r>(
478 period: TimePeriod,
479 blind_id: HsBlindId,
480 netdir: &Arc<NetDir>,
481 mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
482 ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
483 let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
484
485 Ok(hs_dirs
486 .map(|hs_dir| {
487 let mut builder = RelayIds::builder();
488 if let Some(ed_id) = hs_dir.ed_identity() {
489 builder.ed_identity(*ed_id);
490 }
491
492 if let Some(rsa_id) = hs_dir.rsa_identity() {
493 builder.rsa_identity(*rsa_id);
494 }
495
496 let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
497
498 // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
499 // reupload it unless it was already dirty and due for a reupload.
500 let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
501 Some((_, status)) => *status,
502 None => DescriptorStatus::Dirty,
503 };
504
505 (relay_id, status)
506 })
507 .collect::<Vec<_>>())
508 }
509
510 /// Mark the descriptor dirty for all HSDirs of this time period.
511 fn mark_all_dirty(&mut self) {
512 self.hs_dirs
513 .iter_mut()
514 .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
515 }
516
517 /// Update the upload result for this time period.
518 fn set_upload_results(&mut self, upload_results: Vec<HsDirUploadStatus>) {
519 self.upload_results = upload_results;
520 }
521}
522
523/// An error that occurs while trying to upload a descriptor.
524#[derive(Clone, Debug, thiserror::Error)]
525#[non_exhaustive]
526pub enum UploadError {
527 /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
528 #[error("descriptor upload request failed: {}", _0.error)]
529 Request(#[from] RequestFailedError),
530
531 /// Failed to establish circuit to hidden service directory
532 #[error("could not build circuit to HsDir")]
533 Circuit(#[from] tor_circmgr::Error),
534
535 /// Failed to establish stream to hidden service directory
536 #[error("failed to establish directory stream to HsDir")]
537 Stream(#[source] tor_proto::Error),
538
539 /// An internal error.
540 #[error("Internal error")]
541 Bug(#[from] tor_error::Bug),
542}
543define_asref_dyn_std_error!(UploadError);
544
545impl<R: Runtime, M: Mockable> Reactor<R, M> {
546 /// Create a new `Reactor`.
547 #[allow(clippy::too_many_arguments)]
548 pub(super) fn new(
549 runtime: R,
550 nickname: HsNickname,
551 dir_provider: Arc<dyn NetDirProvider>,
552 mockable: M,
553 config: &OnionServiceConfig,
554 ipt_watcher: IptsPublisherView,
555 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
556 status_tx: PublisherStatusSender,
557 keymgr: Arc<KeyMgr>,
558 path_resolver: Arc<CfgPathResolver>,
559 pow_manager: Arc<PowManager<R>>,
560 update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
561 ) -> Self {
562 /// The maximum size of the upload completion notifier channel.
563 ///
564 /// The channel we use this for is a futures::mpsc channel, which has a capacity of
565 /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
566 /// each sender will send exactly one message.
567 const UPLOAD_CHAN_BUF_SIZE: usize = 0;
568
569 // Internally-generated instructions, no need for mq.
570 let (upload_task_complete_tx, upload_task_complete_rx) =
571 mpsc_channel_no_memquota(UPLOAD_CHAN_BUF_SIZE);
572
573 let (publish_status_tx, publish_status_rx) = watch::channel();
574 // Setting the buffer size to zero here is OK,
575 // since we never actually send anything on this channel.
576 let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
577
578 let authorized_clients =
579 Self::read_authorized_clients(&config.restricted_discovery, &path_resolver);
580
581 // Create a channel for watching for changes in the configured
582 // restricted_discovery.key_dirs.
583 let (key_dirs_tx, key_dirs_rx) = file_watcher::channel();
584
585 let imm = Immutable {
586 runtime,
587 mockable,
588 nickname,
589 keymgr,
590 status_tx,
591 pow_manager,
592 };
593
594 let inner = Inner {
595 time_periods: vec![],
596 config: Arc::new(config.into()),
597 file_watcher: None,
598 netdir: None,
599 last_uploaded: None,
600 reupload_timers: Default::default(),
601 authorized_clients,
602 };
603
604 Self {
605 imm: Arc::new(imm),
606 inner: Arc::new(Mutex::new(inner)),
607 dir_provider,
608 ipt_watcher,
609 config_rx,
610 key_dirs_rx,
611 key_dirs_tx,
612 publish_status_rx,
613 publish_status_tx,
614 upload_task_complete_rx,
615 upload_task_complete_tx,
616 shutdown_tx,
617 path_resolver,
618 update_from_pow_manager_rx,
619 }
620 }
621
622 /// Start the reactor.
623 ///
624 /// Under normal circumstances, this function runs indefinitely.
625 ///
626 /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
627 /// upload fails or is rate-limited.
628 pub(super) async fn run(mut self) -> Result<(), FatalError> {
629 debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
630
631 {
632 let netdir = self
633 .dir_provider
634 .wait_for_netdir(Timeliness::Timely)
635 .await?;
636 let time_periods = self.compute_time_periods(&netdir, &[])?;
637
638 let mut inner = self.inner.lock().expect("poisoned lock");
639
640 inner.netdir = Some(netdir);
641 inner.time_periods = time_periods;
642 }
643
644 // Create the initial key_dirs watcher.
645 self.update_file_watcher();
646
647 loop {
648 match self.run_once().await {
649 Ok(ShutdownStatus::Continue) => continue,
650 Ok(ShutdownStatus::Terminate) => {
651 debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
652
653 self.imm.status_tx.send_shutdown();
654 return Ok(());
655 }
656 Err(e) => {
657 error_report!(
658 e,
659 "HS service {}: descriptor publisher crashed!",
660 self.imm.nickname
661 );
662
663 self.imm.status_tx.send_broken(e.clone());
664
665 return Err(e);
666 }
667 }
668 }
669 }
670
671 /// Run one iteration of the reactor loop.
672 #[allow(clippy::cognitive_complexity)] // TODO: Refactor
673 async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
674 let mut netdir_events = self.dir_provider.events();
675
676 // Note: TrackingNow tracks the values it is compared with.
677 // This is equivalent to sleeping for (until - now) units of time,
678 let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
679 if let PublishStatus::RateLimited(until) = self.status() {
680 if upload_rate_lim > until {
681 // We are no longer rate-limited
682 self.expire_rate_limit().await?;
683 }
684 }
685
686 let reupload_tracking = TrackingNow::now(&self.imm.runtime);
687 let mut reupload_periods = vec![];
688 {
689 let mut inner = self.inner.lock().expect("poisoned lock");
690 let inner = &mut *inner;
691 while let Some(reupload) = inner.reupload_timers.peek().copied() {
692 // First, extract all the timeouts that already elapsed.
693 if reupload.when <= reupload_tracking {
694 inner.reupload_timers.pop();
695 reupload_periods.push(reupload.period);
696 } else {
697 // We are not ready to schedule any more reuploads.
698 //
699 // How much we need to sleep is implicitly
700 // tracked in reupload_tracking (through
701 // the TrackingNow implementation)
702 break;
703 }
704 }
705 }
706
707 // Check if it's time to schedule any reuploads.
708 for period in reupload_periods {
709 if self.mark_dirty(&period) {
710 debug!(
711 time_period=?period,
712 "descriptor reupload timer elapsed; scheduling reupload",
713 );
714 self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
715 .await?;
716 }
717 }
718
719 select_biased! {
720 res = self.upload_task_complete_rx.next().fuse() => {
721 let Some(upload_res) = res else {
722 return Ok(ShutdownStatus::Terminate);
723 };
724
725 self.handle_upload_results(upload_res);
726 self.upload_result_to_svc_status()?;
727 },
728 () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
729 self.expire_rate_limit().await?;
730 },
731 () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
732 // Run another iteration, executing run_once again. This time, we will remove the
733 // expired reupload from self.reupload_timers, mark the descriptor dirty for all
734 // relevant HsDirs, and schedule the upload by setting our status to
735 // UploadScheduled.
736 return Ok(ShutdownStatus::Continue);
737 },
738 netdir_event = netdir_events.next().fuse() => {
739 let Some(netdir_event) = netdir_event else {
740 debug!("netdir event stream ended");
741 return Ok(ShutdownStatus::Terminate);
742 };
743
744 if !matches!(netdir_event, DirEvent::NewConsensus) {
745 return Ok(ShutdownStatus::Continue);
746 };
747
748 // The consensus changed. Grab a new NetDir.
749 let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
750 Ok(y) => y,
751 Err(e) => {
752 error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
753 // Hopefully a netdir will appear in the future.
754 // in the meantime, suspend operations.
755 //
756 // TODO (#1218): there is a bug here: we stop reading on our inputs
757 // including eg publish_status_rx, but it is our job to log some of
758 // these things. While we are waiting for a netdir, all those messages
759 // are "stuck"; they'll appear later, with misleading timestamps.
760 //
761 // Probably this should be fixed by moving the logging
762 // out of the reactor, where it won't be blocked.
763 self.dir_provider.wait_for_netdir(Timeliness::Timely)
764 .await?
765 }
766 };
767 let relevant_periods = netdir.hs_all_time_periods();
768 self.handle_consensus_change(netdir).await?;
769 expire_publisher_keys(
770 &self.imm.keymgr,
771 &self.imm.nickname,
772 &relevant_periods,
773 ).unwrap_or_else(|e| {
774 error_report!(e, "failed to remove expired keys");
775 });
776 }
777 update = self.ipt_watcher.await_update().fuse() => {
778 if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
779 return Ok(ShutdownStatus::Terminate);
780 }
781 },
782 config = self.config_rx.next().fuse() => {
783 let Some(config) = config else {
784 return Ok(ShutdownStatus::Terminate);
785 };
786
787 self.handle_svc_config_change(&config).await?;
788 },
789 res = self.key_dirs_rx.next().fuse() => {
790 let Some(event) = res else {
791 return Ok(ShutdownStatus::Terminate);
792 };
793
794 while let Some(_ignore) = self.key_dirs_rx.try_recv() {
795 // Discard other events, so that we only reload once.
796 }
797
798 self.handle_key_dirs_change(event).await?;
799 }
800 should_upload = self.publish_status_rx.next().fuse() => {
801 let Some(should_upload) = should_upload else {
802 return Ok(ShutdownStatus::Terminate);
803 };
804
805 // Our PublishStatus changed -- are we ready to publish?
806 if should_upload == PublishStatus::UploadScheduled {
807 self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
808 self.upload_all().await?;
809 }
810 }
811 update_tp_pow_seed = self.update_from_pow_manager_rx.next().fuse() => {
812 debug!("Update PoW seed for TP!");
813 let Some(time_period) = update_tp_pow_seed else {
814 return Ok(ShutdownStatus::Terminate);
815 };
816 self.mark_dirty(&time_period);
817 self.upload_all().await?;
818 }
819 }
820
821 Ok(ShutdownStatus::Continue)
822 }
823
824 /// Returns the current status of the publisher
825 fn status(&self) -> PublishStatus {
826 *self.publish_status_rx.borrow()
827 }
828
829 /// Handle a batch of upload outcomes,
830 /// possibly updating the status of the descriptor for the corresponding HSDirs.
831 fn handle_upload_results(&self, results: TimePeriodUploadResult) {
832 let mut inner = self.inner.lock().expect("poisoned lock");
833 let inner = &mut *inner;
834
835 // Check which time period these uploads pertain to.
836 let period = inner
837 .time_periods
838 .iter_mut()
839 .find(|ctx| ctx.params.time_period() == results.time_period);
840
841 let Some(period) = period else {
842 // The uploads were for a time period that is no longer relevant, so we
843 // can ignore the result.
844 return;
845 };
846
847 // We will need to reupload this descriptor at at some point, so we pick
848 // a random time between 60 minutes and 120 minutes in the future.
849 //
850 // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
851 let mut rng = self.imm.mockable.thread_rng();
852 // TODO SPEC: Control republish period using a consensus parameter?
853 let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
854 let duration = Duration::from_secs(minutes * 60);
855 let reupload_when = self.imm.runtime.now() + duration;
856 let time_period = period.params.time_period();
857
858 info!(
859 time_period=?time_period,
860 "reuploading descriptor in {}",
861 humantime::format_duration(duration),
862 );
863
864 inner.reupload_timers.push(ReuploadTimer {
865 period: time_period,
866 when: reupload_when,
867 });
868
869 let mut upload_results = vec![];
870 for upload_res in results.hsdir_result {
871 let relay = period
872 .hs_dirs
873 .iter_mut()
874 .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
875
876 let Some((_relay, status)): Option<&mut (RelayIds, _)> = relay else {
877 // This HSDir went away, so the result doesn't matter.
878 // Continue processing the rest of the results
879 continue;
880 };
881
882 if upload_res.upload_res.is_ok() {
883 let update_last_successful = match period.last_successful {
884 None => true,
885 Some(counter) => counter <= upload_res.revision_counter,
886 };
887
888 if update_last_successful {
889 period.last_successful = Some(upload_res.revision_counter);
890 // TODO (#1098): Is it possible that this won't update the statuses promptly
891 // enough. For example, it's possible for the reactor to see a Dirty descriptor
892 // and start an upload task for a descriptor has already been uploaded (or is
893 // being uploaded) in another task, but whose upload results have not yet been
894 // processed.
895 //
896 // This is probably made worse by the fact that the statuses are updated in
897 // batches (grouped by time period), rather than one by one as the upload tasks
898 // complete (updating the status involves locking the inner mutex, and I wanted
899 // to minimize the locking/unlocking overheads). I'm not sure handling the
900 // updates in batches was the correct decision here.
901 *status = DescriptorStatus::Clean;
902 }
903 }
904
905 upload_results.push(upload_res);
906 }
907
908 period.set_upload_results(upload_results);
909 }
910
911 /// Maybe update our list of HsDirs.
912 async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
913 trace!("the consensus has changed; recomputing HSDirs");
914
915 let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
916
917 self.recompute_hs_dirs()?;
918 self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
919 .await?;
920
921 // If the time period has changed, some of our upload results may now be irrelevant,
922 // so we might need to update our status (for example, if our uploads are
923 // for a no-longer-relevant time period, it means we might be able to update
924 // out status from "degraded" to "running")
925 self.upload_result_to_svc_status()?;
926
927 Ok(())
928 }
929
930 /// Recompute the HsDirs for all relevant time periods.
931 fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
932 let mut inner = self.inner.lock().expect("poisoned lock");
933 let inner = &mut *inner;
934
935 let netdir = Arc::clone(
936 inner
937 .netdir
938 .as_ref()
939 .ok_or_else(|| internal!("started upload task without a netdir"))?,
940 );
941
942 // Update our list of relevant time periods.
943 let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
944 inner.time_periods = new_time_periods;
945
946 Ok(())
947 }
948
949 /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
950 ///
951 /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
952 /// HsDirs where possible.
953 fn compute_time_periods(
954 &self,
955 netdir: &Arc<NetDir>,
956 time_periods: &[TimePeriodContext],
957 ) -> Result<Vec<TimePeriodContext>, FatalError> {
958 netdir
959 .hs_all_time_periods()
960 .iter()
961 .map(|params| {
962 let period = params.time_period();
963 let blind_id_kp =
964 read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
965 // Note: for now, read_blind_id_keypair cannot return Ok(None).
966 // It's supposed to return Ok(None) if we're in offline hsid mode,
967 // but that might change when we do #1194
968 .ok_or_else(|| internal!("offline hsid mode not supported"))?;
969
970 let blind_id: HsBlindIdKey = (&blind_id_kp).into();
971
972 // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
973 // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
974 // publishing the descriptor to the HsDirs that already have it (the ones that are
975 // marked with DescriptorStatus::Clean).
976 //
977 // In other words, we only want to publish to those HsDirs that
978 // * are part of a new time period (which we have never published the descriptor
979 // for), or
980 // * have just been added to the ring of a time period we already knew about
981 if let Some(ctx) = time_periods
982 .iter()
983 .find(|ctx| ctx.params.time_period() == period)
984 {
985 TimePeriodContext::new(
986 params.clone(),
987 blind_id.into(),
988 netdir,
989 ctx.hs_dirs.iter(),
990 ctx.upload_results.clone(),
991 )
992 } else {
993 // Passing an empty iterator here means all HsDirs in this TimePeriodContext
994 // will be marked as dirty, meaning we will need to upload our descriptor to them.
995 TimePeriodContext::new(
996 params.clone(),
997 blind_id.into(),
998 netdir,
999 iter::empty(),
1000 vec![],
1001 )
1002 }
1003 })
1004 .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
1005 }
1006
1007 /// Replace the old netdir with the new, returning the old.
1008 fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
1009 self.inner
1010 .lock()
1011 .expect("poisoned lock")
1012 .netdir
1013 .replace(new_netdir)
1014 }
1015
1016 /// Replace our view of the service config with `new_config` if `new_config` contains changes
1017 /// that would cause us to generate a new descriptor.
1018 fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfigPublisherView>) -> bool {
1019 let mut inner = self.inner.lock().expect("poisoned lock");
1020 let old_config = &mut inner.config;
1021
1022 // The fields we're interested in haven't changed, so there's no need to update
1023 // `inner.config`.
1024 if *old_config == new_config {
1025 return false;
1026 }
1027
1028 let log_change = match (
1029 old_config.restricted_discovery.enabled,
1030 new_config.restricted_discovery.enabled,
1031 ) {
1032 (true, false) => Some("Disabling restricted discovery mode"),
1033 (false, true) => Some("Enabling restricted discovery mode"),
1034 _ => None,
1035 };
1036
1037 if let Some(msg) = log_change {
1038 info!(nickname=%self.imm.nickname, "{}", msg);
1039 }
1040
1041 let _old: Arc<OnionServiceConfigPublisherView> = std::mem::replace(old_config, new_config);
1042
1043 true
1044 }
1045
1046 /// Recreate the FileWatcher for watching the restricted discovery key_dirs.
1047 fn update_file_watcher(&self) {
1048 let mut inner = self.inner.lock().expect("poisoned lock");
1049 if inner.config.restricted_discovery.watch_configuration() {
1050 debug!("The restricted_discovery.key_dirs have changed, updating file watcher");
1051 let mut watcher = FileWatcher::builder(self.imm.runtime.clone());
1052
1053 let dirs = inner.config.restricted_discovery.key_dirs().clone();
1054
1055 watch_dirs(&mut watcher, &dirs, &self.path_resolver);
1056
1057 let watcher = watcher
1058 .start_watching(self.key_dirs_tx.clone())
1059 .map_err(|e| {
1060 // TODO: update the publish status (see also the module-level TODO about this).
1061 error_report!(e, "Cannot set file watcher");
1062 })
1063 .ok();
1064 inner.file_watcher = watcher;
1065 } else {
1066 if inner.file_watcher.is_some() {
1067 debug!("removing key_dirs watcher");
1068 }
1069 inner.file_watcher = None;
1070 }
1071 }
1072
1073 /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
1074 /// uploading.
1075 fn note_ipt_change(&self) -> PublishStatus {
1076 let mut ipts = self.ipt_watcher.borrow_for_publish();
1077 match ipts.ipts.as_mut() {
1078 Some(_ipts) => PublishStatus::UploadScheduled,
1079 None => PublishStatus::AwaitingIpts,
1080 }
1081 }
1082
1083 /// Update our list of introduction points.
1084 async fn handle_ipt_change(
1085 &mut self,
1086 update: Option<Result<(), crate::FatalError>>,
1087 ) -> Result<ShutdownStatus, FatalError> {
1088 trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
1089 match update {
1090 Some(Ok(())) => {
1091 let should_upload = self.note_ipt_change();
1092 debug!(nickname=%self.imm.nickname, "the introduction points have changed");
1093
1094 self.mark_all_dirty();
1095 self.update_publish_status_unless_rate_lim(should_upload)
1096 .await?;
1097 Ok(ShutdownStatus::Continue)
1098 }
1099 Some(Err(e)) => Err(e),
1100 None => {
1101 debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
1102 Ok(ShutdownStatus::Terminate)
1103 }
1104 }
1105 }
1106
1107 /// Update the `PublishStatus` of the reactor with `new_state`,
1108 /// unless the current state is `AwaitingIpts`.
1109 async fn update_publish_status_unless_waiting(
1110 &mut self,
1111 new_state: PublishStatus,
1112 ) -> Result<(), FatalError> {
1113 // Only update the state if we're not waiting for intro points.
1114 if self.status() != PublishStatus::AwaitingIpts {
1115 self.update_publish_status(new_state).await?;
1116 }
1117
1118 Ok(())
1119 }
1120
1121 /// Update the `PublishStatus` of the reactor with `new_state`,
1122 /// unless the current state is `RateLimited`.
1123 async fn update_publish_status_unless_rate_lim(
1124 &mut self,
1125 new_state: PublishStatus,
1126 ) -> Result<(), FatalError> {
1127 // We can't exit this state until the rate-limit expires.
1128 if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1129 self.update_publish_status(new_state).await?;
1130 }
1131
1132 Ok(())
1133 }
1134
1135 /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
1136 async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), Bug> {
1137 let onion_status = match new_state {
1138 PublishStatus::Idle => None,
1139 PublishStatus::UploadScheduled
1140 | PublishStatus::AwaitingIpts
1141 | PublishStatus::RateLimited(_) => Some(State::Bootstrapping),
1142 };
1143
1144 if let Some(onion_status) = onion_status {
1145 self.imm.status_tx.send(onion_status, None);
1146 }
1147
1148 trace!(
1149 "publisher reactor status change: {:?} -> {:?}",
1150 self.status(),
1151 new_state
1152 );
1153
1154 self.publish_status_tx.send(new_state).await.map_err(
1155 |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
1156 )?;
1157
1158 Ok(())
1159 }
1160
1161 /// Update the onion svc status based on the results of the last descriptor uploads.
1162 fn upload_result_to_svc_status(&self) -> Result<(), FatalError> {
1163 let inner = self.inner.lock().expect("poisoned lock");
1164 let netdir = inner
1165 .netdir
1166 .as_ref()
1167 .ok_or_else(|| internal!("handling upload results without netdir?!"))?;
1168
1169 let (state, err) = upload_result_state(netdir, &inner.time_periods);
1170 self.imm.status_tx.send(state, err);
1171
1172 Ok(())
1173 }
1174
1175 /// Update the descriptors based on the config change.
1176 async fn handle_svc_config_change(
1177 &mut self,
1178 config: &OnionServiceConfig,
1179 ) -> Result<(), FatalError> {
1180 let new_config = Arc::new(config.into());
1181 if self.replace_config_if_changed(Arc::clone(&new_config)) {
1182 self.update_file_watcher();
1183 self.update_authorized_clients_if_changed().await?;
1184
1185 info!(nickname=%self.imm.nickname, "Config has changed, generating a new descriptor");
1186 self.mark_all_dirty();
1187
1188 // Schedule an upload, unless we're still waiting for IPTs.
1189 self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1190 .await?;
1191 }
1192
1193 Ok(())
1194 }
1195
1196 /// Update the descriptors based on a restricted discovery key_dirs change.
1197 ///
1198 /// If the authorized clients from the [`RestrictedDiscoveryConfig`] have changed,
1199 /// this marks the descriptor as dirty for all time periods,
1200 /// and schedules a reupload.
1201 async fn handle_key_dirs_change(&mut self, event: FileEvent) -> Result<(), FatalError> {
1202 debug!("The configured key_dirs have changed");
1203 match event {
1204 FileEvent::Rescan | FileEvent::FileChanged => {
1205 // These events are handled in the same way, by re-reading the keys from disk
1206 // and republishing the descriptor if necessary
1207 }
1208 _ => return Err(internal!("file watcher event {event:?}").into()),
1209 };
1210
1211 // Update the file watcher, in case the change was triggered by a key_dir move.
1212 self.update_file_watcher();
1213
1214 if self.update_authorized_clients_if_changed().await? {
1215 self.mark_all_dirty();
1216
1217 // Schedule an upload, unless we're still waiting for IPTs.
1218 self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
1219 .await?;
1220 }
1221
1222 Ok(())
1223 }
1224
1225 /// Recreate the authorized_clients based on the current config.
1226 ///
1227 /// Returns `true` if the authorized clients have changed.
1228 async fn update_authorized_clients_if_changed(&mut self) -> Result<bool, FatalError> {
1229 let mut inner = self.inner.lock().expect("poisoned lock");
1230 let authorized_clients =
1231 Self::read_authorized_clients(&inner.config.restricted_discovery, &self.path_resolver);
1232
1233 let clients = &mut inner.authorized_clients;
1234 let changed = clients.as_ref() != authorized_clients.as_ref();
1235
1236 if changed {
1237 info!("The restricted discovery mode authorized clients have changed");
1238 *clients = authorized_clients;
1239 }
1240
1241 Ok(changed)
1242 }
1243
1244 /// Read the authorized `RestrictedDiscoveryKeys` from `config`.
1245 fn read_authorized_clients(
1246 config: &RestrictedDiscoveryConfig,
1247 path_resolver: &CfgPathResolver,
1248 ) -> Option<Arc<RestrictedDiscoveryKeys>> {
1249 let authorized_clients = config.read_keys(path_resolver);
1250
1251 if matches!(authorized_clients.as_ref(), Some(c) if c.is_empty()) {
1252 warn!(
1253 "Running in restricted discovery mode, but we have no authorized clients. Service will be unreachable"
1254 );
1255 }
1256
1257 authorized_clients.map(Arc::new)
1258 }
1259
1260 /// Mark the descriptor dirty for all time periods.
1261 fn mark_all_dirty(&self) {
1262 trace!("marking the descriptor dirty for all time periods");
1263
1264 self.inner
1265 .lock()
1266 .expect("poisoned lock")
1267 .time_periods
1268 .iter_mut()
1269 .for_each(|tp| tp.mark_all_dirty());
1270 }
1271
1272 /// Mark the descriptor dirty for the specified time period.
1273 ///
1274 /// Returns `true` if the specified period is still relevant, and `false` otherwise.
1275 fn mark_dirty(&self, period: &TimePeriod) -> bool {
1276 let mut inner = self.inner.lock().expect("poisoned lock");
1277 let period_ctx = inner
1278 .time_periods
1279 .iter_mut()
1280 .find(|tp| tp.params.time_period() == *period);
1281
1282 match period_ctx {
1283 Some(ctx) => {
1284 trace!(time_period=?period, "marking the descriptor dirty");
1285 ctx.mark_all_dirty();
1286 true
1287 }
1288 None => false,
1289 }
1290 }
1291
1292 /// Try to upload our descriptor to the HsDirs that need it.
1293 ///
1294 /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
1295 /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
1296 ///
1297 /// Failed uploads are retried
1298 /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1299 ///
1300 /// If restricted discovery mode is enabled and there are no authorized clients,
1301 /// we abort the upload and set our status to [`State::Broken`].
1302 //
1303 // Note: a broken restricted discovery config won't prevent future uploads from being scheduled
1304 // (for example if the IPTs change),
1305 // which can can cause the publisher's status to oscillate between `Bootstrapping` and `Broken`.
1306 // TODO: we might wish to refactor the publisher to be more sophisticated about this.
1307 //
1308 /// For each current time period, we spawn a task that uploads the descriptor to
1309 /// all the HsDirs on the HsDir ring of that time period.
1310 /// Each task shuts down on completion, or when the reactor is dropped.
1311 ///
1312 /// Each task reports its upload results (`TimePeriodUploadResult`)
1313 /// via the `upload_task_complete_tx` channel.
1314 /// The results are received and processed in the main loop of the reactor.
1315 ///
1316 /// Returns an error if it fails to spawn a task, or if an internal error occurs.
1317 #[allow(clippy::cognitive_complexity)] // TODO #2010: Refactor
1318 async fn upload_all(&mut self) -> Result<(), FatalError> {
1319 trace!("starting descriptor upload task...");
1320
1321 // Abort the upload entirely if we have an empty list of authorized clients
1322 let authorized_clients = match self.authorized_clients() {
1323 Ok(authorized_clients) => authorized_clients,
1324 Err(e) => {
1325 error_report!(e, "aborting upload");
1326 self.imm.status_tx.send_broken(e.clone());
1327
1328 // Returning an error would shut down the reactor, so we have to return Ok here.
1329 return Ok(());
1330 }
1331 };
1332
1333 let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
1334 let now = self.imm.runtime.now();
1335 // Check if we should rate-limit this upload.
1336 if let Some(ts) = last_uploaded {
1337 let duration_since_upload = now.duration_since(ts);
1338
1339 if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
1340 return Ok(self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await?);
1341 }
1342 }
1343
1344 let mut inner = self.inner.lock().expect("poisoned lock");
1345 let inner = &mut *inner;
1346
1347 let _ = inner.last_uploaded.insert(now);
1348
1349 for period_ctx in inner.time_periods.iter_mut() {
1350 let upload_task_complete_tx = self.upload_task_complete_tx.clone();
1351
1352 // Figure out which HsDirs we need to upload the descriptor to (some of them might already
1353 // have our latest descriptor, so we filter them out).
1354 let hs_dirs = period_ctx
1355 .hs_dirs
1356 .iter()
1357 .filter_map(|(relay_id, status)| {
1358 if *status == DescriptorStatus::Dirty {
1359 Some(relay_id.clone())
1360 } else {
1361 None
1362 }
1363 })
1364 .collect::<Vec<_>>();
1365
1366 if hs_dirs.is_empty() {
1367 trace!("the descriptor is clean for all HSDirs. Nothing to do");
1368 return Ok(());
1369 }
1370
1371 let time_period = period_ctx.params.time_period();
1372 // This scope exists because rng is not Send, so it needs to fall out of scope before we
1373 // await anything.
1374 let netdir = Arc::clone(
1375 inner
1376 .netdir
1377 .as_ref()
1378 .ok_or_else(|| internal!("started upload task without a netdir"))?,
1379 );
1380
1381 let imm = Arc::clone(&self.imm);
1382 let ipt_upload_view = self.ipt_watcher.upload_view();
1383 let config = Arc::clone(&inner.config);
1384 let authorized_clients = authorized_clients.clone();
1385
1386 trace!(nickname=%self.imm.nickname, time_period=?time_period,
1387 "spawning upload task"
1388 );
1389
1390 let params = period_ctx.params.clone();
1391 let shutdown_rx = self.shutdown_tx.subscribe();
1392
1393 // Spawn a task to upload the descriptor to all HsDirs of this time period.
1394 //
1395 // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
1396 // dropped).
1397 let _handle: () = self
1398 .imm
1399 .runtime
1400 .spawn(async move {
1401 if let Err(e) = Self::upload_for_time_period(
1402 hs_dirs,
1403 &netdir,
1404 config,
1405 params,
1406 Arc::clone(&imm),
1407 ipt_upload_view.clone(),
1408 authorized_clients.clone(),
1409 upload_task_complete_tx,
1410 shutdown_rx,
1411 )
1412 .await
1413 {
1414 error_report!(
1415 e,
1416 "descriptor upload failed for HS service {} and time period {:?}",
1417 imm.nickname,
1418 time_period
1419 );
1420 }
1421 })
1422 .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
1423 }
1424
1425 Ok(())
1426 }
1427
1428 /// Upload the descriptor for the time period specified in `params`.
1429 ///
1430 /// Failed uploads are retried
1431 /// (see [`upload_descriptor_with_retries`](Reactor::upload_descriptor_with_retries)).
1432 #[allow(clippy::too_many_arguments)] // TODO: refactor
1433 #[allow(clippy::cognitive_complexity)] // TODO: Refactor
1434 async fn upload_for_time_period(
1435 hs_dirs: Vec<RelayIds>,
1436 netdir: &Arc<NetDir>,
1437 config: Arc<OnionServiceConfigPublisherView>,
1438 params: HsDirParams,
1439 imm: Arc<Immutable<R, M>>,
1440 ipt_upload_view: IptsPublisherUploadView,
1441 authorized_clients: Option<Arc<RestrictedDiscoveryKeys>>,
1442 mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
1443 shutdown_rx: broadcast::Receiver<Void>,
1444 ) -> Result<(), FatalError> {
1445 let time_period = params.time_period();
1446 trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
1447
1448 let hsdir_count = hs_dirs.len();
1449
1450 /// An error returned from an upload future.
1451 //
1452 // Exhaustive, because this is a private type.
1453 #[derive(Clone, Debug, thiserror::Error)]
1454 enum PublishError {
1455 /// The upload was aborted because there are no IPTs.
1456 ///
1457 /// This happens because of an inevitable TOCTOU race, where after being notified by
1458 /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
1459 /// we find out there actually are no IPTs, so we can't build the descriptor.
1460 ///
1461 /// This is a special kind of error that interrupts the current upload task, and is
1462 /// logged at `debug!` level rather than `warn!` or `error!`.
1463 ///
1464 /// Ideally, this shouldn't happen very often (if at all).
1465 #[error("No IPTs")]
1466 NoIpts,
1467
1468 /// The reactor has shut down
1469 #[error("The reactor has shut down")]
1470 Shutdown,
1471
1472 /// An fatal error.
1473 #[error("{0}")]
1474 Fatal(#[from] FatalError),
1475 }
1476
1477 let upload_results = futures::stream::iter(hs_dirs)
1478 .map(|relay_ids| {
1479 let netdir = netdir.clone();
1480 let config = Arc::clone(&config);
1481 let imm = Arc::clone(&imm);
1482 let ipt_upload_view = ipt_upload_view.clone();
1483 let authorized_clients = authorized_clients.clone();
1484 let params = params.clone();
1485 let mut shutdown_rx = shutdown_rx.clone();
1486
1487 let ed_id = relay_ids
1488 .rsa_identity()
1489 .map(|id| id.to_string())
1490 .unwrap_or_else(|| "unknown".into());
1491 let rsa_id = relay_ids
1492 .rsa_identity()
1493 .map(|id| id.to_string())
1494 .unwrap_or_else(|| "unknown".into());
1495
1496 async move {
1497 let run_upload = |desc| async {
1498 let Some(hsdir) = netdir.by_ids(&relay_ids) else {
1499 // This should never happen (all of our relay_ids are from the stored
1500 // netdir).
1501 let err =
1502 "tried to upload descriptor to relay not found in consensus?!";
1503 warn!(
1504 nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1505 "{err}"
1506 );
1507 return Err(internal!("{err}").into());
1508 };
1509
1510 Self::upload_descriptor_with_retries(
1511 desc,
1512 &netdir,
1513 &hsdir,
1514 &ed_id,
1515 &rsa_id,
1516 Arc::clone(&imm),
1517 )
1518 .await
1519 };
1520
1521 // How long until we're supposed to time out?
1522 let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
1523 // We generate a new descriptor before _each_ HsDir upload. This means each
1524 // HsDir could, in theory, receive a different descriptor (not just in terms of
1525 // revision-counters, but also with a different set of IPTs). It may seem like
1526 // this could lead to some HsDirs being left with an outdated descriptor, but
1527 // that's not the case: after the upload completes, the publisher will be
1528 // notified by the ipt_watcher of the IPT change event (if there was one to
1529 // begin with), which will trigger another upload job.
1530 let hsdesc = {
1531 // This scope is needed because the ipt_set MutexGuard is not Send, so it
1532 // needs to fall out of scope before the await point below
1533 let mut ipt_set = ipt_upload_view.borrow_for_publish();
1534
1535 // If there are no IPTs, we abort the upload. At this point, we might have
1536 // uploaded the descriptor to some, but not all, HSDirs from the specified
1537 // time period.
1538 //
1539 // Returning an error here means the upload completion task is never
1540 // notified of the outcome of any of these uploads (which means the
1541 // descriptor is not marked clean). This is OK, because if we suddenly find
1542 // out we have no IPTs, it means our built `hsdesc` has an outdated set of
1543 // IPTs, so we need to go back to the main loop to wait for IPT changes,
1544 // and generate a fresh descriptor anyway.
1545 //
1546 // Ideally, this shouldn't happen very often (if at all).
1547 let Some(ipts) = ipt_set.ipts.as_mut() else {
1548 return Err(PublishError::NoIpts);
1549 };
1550
1551 let hsdesc = {
1552 trace!(
1553 nickname=%imm.nickname, time_period=?time_period,
1554 "building descriptor"
1555 );
1556 let mut rng = imm.mockable.thread_rng();
1557 let mut key_rng = tor_llcrypto::rng::CautiousRng;
1558
1559 // We're about to generate a new version of the descriptor,
1560 // so let's generate a new revision counter.
1561 let now = imm.runtime.wallclock();
1562 let revision_counter = imm.generate_revision_counter(¶ms, now)?;
1563
1564 build_sign(
1565 &imm.keymgr,
1566 &imm.pow_manager,
1567 &config,
1568 authorized_clients.as_deref(),
1569 ipts,
1570 time_period,
1571 revision_counter,
1572 &mut rng,
1573 &mut key_rng,
1574 imm.runtime.wallclock(),
1575 )?
1576 };
1577
1578 if let Err(e) =
1579 ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
1580 {
1581 let wait = e.log_retry_max(&imm.nickname)?;
1582 // TODO (#1226): retry instead of this
1583 return Err(FatalError::Bug(internal!(
1584 "ought to retry after {wait:?}, crashing instead"
1585 ))
1586 .into());
1587 }
1588
1589 hsdesc
1590 };
1591
1592 let VersionedDescriptor {
1593 desc,
1594 revision_counter,
1595 } = hsdesc;
1596
1597 trace!(
1598 nickname=%imm.nickname, time_period=?time_period,
1599 revision_counter=?revision_counter,
1600 "generated new descriptor for time period",
1601 );
1602
1603 // (Actually launch the upload attempt. No timeout is needed
1604 // here, since the backoff::Runner code will handle that for us.)
1605 let upload_res: UploadResult = select_biased! {
1606 shutdown = shutdown_rx.next().fuse() => {
1607 // This will always be None, since Void is uninhabited.
1608 let _: Option<Void> = shutdown;
1609
1610 // It looks like the reactor has shut down,
1611 // so there is no point in uploading the descriptor anymore.
1612 //
1613 // Let's shut down the upload task too.
1614 trace!(
1615 nickname=%imm.nickname, time_period=?time_period,
1616 "upload task received shutdown signal"
1617 );
1618
1619 return Err(PublishError::Shutdown);
1620 },
1621 res = run_upload(desc.clone()).fuse() => res,
1622 };
1623
1624 // Note: UploadResult::Failure is only returned when
1625 // upload_descriptor_with_retries fails, i.e. if all our retry
1626 // attempts have failed
1627 Ok(HsDirUploadStatus {
1628 relay_ids,
1629 upload_res,
1630 revision_counter,
1631 })
1632 }
1633 })
1634 // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
1635 .boxed()
1636 .buffer_unordered(MAX_CONCURRENT_UPLOADS)
1637 .try_collect::<Vec<_>>()
1638 .await;
1639
1640 let upload_results = match upload_results {
1641 Ok(v) => v,
1642 Err(PublishError::Fatal(e)) => return Err(e),
1643 Err(PublishError::NoIpts) => {
1644 debug!(
1645 nickname=%imm.nickname, time_period=?time_period,
1646 "no introduction points; skipping upload"
1647 );
1648
1649 return Ok(());
1650 }
1651 Err(PublishError::Shutdown) => {
1652 debug!(
1653 nickname=%imm.nickname, time_period=?time_period,
1654 "the reactor has shut down; aborting upload"
1655 );
1656
1657 return Ok(());
1658 }
1659 };
1660
1661 let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
1662 .iter()
1663 .partition(|res| res.upload_res.is_ok());
1664
1665 debug!(
1666 nickname=%imm.nickname, time_period=?time_period,
1667 "descriptor uploaded successfully to {}/{} HSDirs",
1668 succeeded.len(), hsdir_count
1669 );
1670
1671 if upload_task_complete_tx
1672 .send(TimePeriodUploadResult {
1673 time_period,
1674 hsdir_result: upload_results,
1675 })
1676 .await
1677 .is_err()
1678 {
1679 return Err(internal!(
1680 "failed to notify reactor of upload completion (reactor shut down)"
1681 )
1682 .into());
1683 }
1684
1685 Ok(())
1686 }
1687
1688 /// Upload a descriptor to the specified HSDir.
1689 ///
1690 /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
1691 /// to the caller to retry on failure.
1692 ///
1693 /// This function does not handle timeouts.
1694 async fn upload_descriptor(
1695 hsdesc: String,
1696 netdir: &Arc<NetDir>,
1697 hsdir: &Relay<'_>,
1698 imm: Arc<Immutable<R, M>>,
1699 ) -> Result<(), UploadError> {
1700 let request = HsDescUploadRequest::new(hsdesc);
1701
1702 trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
1703 "starting descriptor upload",
1704 );
1705
1706 let circuit = imm
1707 .mockable
1708 .get_or_launch_specific(
1709 netdir,
1710 HsCircKind::SvcHsDir,
1711 OwnedCircTarget::from_circ_target(hsdir),
1712 )
1713 .await?;
1714
1715 let mut stream = circuit
1716 .begin_dir_stream()
1717 .await
1718 .map_err(UploadError::Stream)?;
1719
1720 let _response: String = send_request(&imm.runtime, &request, &mut stream, None)
1721 .await
1722 .map_err(|dir_error| -> UploadError {
1723 match dir_error {
1724 DirClientError::RequestFailed(e) => e.into(),
1725 DirClientError::CircMgr(e) => into_internal!(
1726 "tor-dirclient complains about circmgr going wrong but we gave it a stream"
1727 )(e)
1728 .into(),
1729 e => into_internal!("unexpected error")(e).into(),
1730 }
1731 })?
1732 .into_output_string()?; // This returns an error if we received an error response
1733
1734 Ok(())
1735 }
1736
1737 /// Upload a descriptor to the specified HSDir, retrying if appropriate.
1738 ///
1739 /// Any failed uploads are retried according to a [`PublisherBackoffSchedule`].
1740 /// Each failed upload is retried until it succeeds, or until the overall timeout specified
1741 /// by [`BackoffSchedule::overall_timeout`] elapses. Individual attempts are timed out
1742 /// according to the [`BackoffSchedule::single_attempt_timeout`].
1743 /// This function gives up after the overall timeout elapses,
1744 /// declaring the upload a failure, and never retrying it again.
1745 ///
1746 /// See also [`BackoffSchedule`].
1747 async fn upload_descriptor_with_retries(
1748 hsdesc: String,
1749 netdir: &Arc<NetDir>,
1750 hsdir: &Relay<'_>,
1751 ed_id: &str,
1752 rsa_id: &str,
1753 imm: Arc<Immutable<R, M>>,
1754 ) -> UploadResult {
1755 /// The base delay to use for the backoff schedule.
1756 const BASE_DELAY_MSEC: u32 = 1000;
1757 let schedule = PublisherBackoffSchedule {
1758 retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
1759 mockable: imm.mockable.clone(),
1760 };
1761
1762 let runner = Runner::new(
1763 "upload a hidden service descriptor".into(),
1764 schedule.clone(),
1765 imm.runtime.clone(),
1766 );
1767
1768 let fallible_op =
1769 || Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm));
1770
1771 let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
1772 match outcome {
1773 Ok(()) => {
1774 debug!(
1775 nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
1776 "successfully uploaded descriptor to HSDir",
1777 );
1778
1779 Ok(())
1780 }
1781 Err(e) => {
1782 warn_report!(
1783 e,
1784 "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
1785 imm.nickname,
1786 ed_id,
1787 rsa_id
1788 );
1789
1790 Err(e.into())
1791 }
1792 }
1793 }
1794
1795 /// Stop publishing descriptors until the specified delay elapses.
1796 async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), Bug> {
1797 if !matches!(self.status(), PublishStatus::RateLimited(_)) {
1798 debug!(
1799 "We are rate-limited for {}; pausing descriptor publication",
1800 humantime::format_duration(delay)
1801 );
1802 let until = self.imm.runtime.now() + delay;
1803 self.update_publish_status(PublishStatus::RateLimited(until))
1804 .await?;
1805 }
1806
1807 Ok(())
1808 }
1809
1810 /// Handle the upload rate-limit being lifted.
1811 async fn expire_rate_limit(&mut self) -> Result<(), Bug> {
1812 debug!("We are no longer rate-limited; resuming descriptor publication");
1813 self.update_publish_status(PublishStatus::UploadScheduled)
1814 .await?;
1815 Ok(())
1816 }
1817
1818 /// Return the authorized clients, if restricted mode is enabled.
1819 ///
1820 /// Returns `Ok(None)` if restricted discovery mode is disabled.
1821 ///
1822 /// Returns an error if restricted discovery mode is enabled, but the client list is empty.
1823 #[cfg_attr(
1824 not(feature = "restricted-discovery"),
1825 allow(clippy::unnecessary_wraps)
1826 )]
1827 fn authorized_clients(&self) -> Result<Option<Arc<RestrictedDiscoveryKeys>>, FatalError> {
1828 cfg_if::cfg_if! {
1829 if #[cfg(feature = "restricted-discovery")] {
1830 let authorized_clients = self
1831 .inner
1832 .lock()
1833 .expect("poisoned lock")
1834 .authorized_clients
1835 .clone();
1836
1837 if authorized_clients.as_ref().as_ref().map(|v| v.is_empty()).unwrap_or_default() {
1838 return Err(FatalError::RestrictedDiscoveryNoClients);
1839 }
1840
1841 Ok(authorized_clients)
1842 } else {
1843 Ok(None)
1844 }
1845 }
1846 }
1847}
1848
1849/// Try to expand a path, logging a warning on failure.
1850fn maybe_expand_path(p: &CfgPath, r: &CfgPathResolver) -> Option<PathBuf> {
1851 // map_err returns unit for clarity
1852 #[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
1853 p.path(r)
1854 .map_err(|e| {
1855 tor_error::warn_report!(e, "invalid path");
1856 ()
1857 })
1858 .ok()
1859}
1860
1861/// Add `path` to the specified `watcher`.
1862macro_rules! watch_path {
1863 ($watcher:expr, $path:expr, $watch_fn:ident, $($watch_fn_args:expr,)*) => {{
1864 if let Err(e) = $watcher.$watch_fn(&$path, $($watch_fn_args)*) {
1865 warn_report!(e, "failed to watch path {:?}", $path);
1866 } else {
1867 debug!("watching path {:?}", $path);
1868 }
1869 }}
1870}
1871
1872/// Add the specified directories to the watcher.
1873#[allow(clippy::cognitive_complexity)]
1874fn watch_dirs<R: Runtime>(
1875 watcher: &mut FileWatcherBuilder<R>,
1876 dirs: &DirectoryKeyProviderList,
1877 path_resolver: &CfgPathResolver,
1878) {
1879 for path in dirs {
1880 let path = path.path();
1881 let Some(path) = maybe_expand_path(path, path_resolver) else {
1882 warn!("failed to expand key_dir path {:?}", path);
1883 continue;
1884 };
1885
1886 // If the path doesn't exist, the notify watcher will return an error if we attempt to watch it,
1887 // so we skip over paths that don't exist at this time
1888 // (this obviously suffers from a TOCTOU race, but most of the time,
1889 // it is good enough at preventing the watcher from failing to watch.
1890 // If the race *does* happen it is not disastrous, i.e. the reactor won't crash,
1891 // but it will fail to set the watcher).
1892 if matches!(path.try_exists(), Ok(true)) {
1893 watch_path!(watcher, &path, watch_dir, "auth",);
1894 }
1895 // FileWatcher::watch_path causes the parent dir of the path to be watched.
1896 if matches!(path.parent().map(|p| p.try_exists()), Some(Ok(true))) {
1897 watch_path!(watcher, &path, watch_path,);
1898 }
1899 }
1900}
1901
1902/// Try to read the blinded identity key for a given `TimePeriod`.
1903///
1904/// Returns `None` if the service is running in "offline" mode.
1905///
1906// TODO (#1194): we don't currently have support for "offline" mode so this can never return
1907// `Ok(None)`.
1908pub(super) fn read_blind_id_keypair(
1909 keymgr: &Arc<KeyMgr>,
1910 nickname: &HsNickname,
1911 period: TimePeriod,
1912) -> Result<Option<HsBlindIdKeypair>, FatalError> {
1913 let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
1914 let hsid_kp = keymgr
1915 .get::<HsIdKeypair>(&svc_key_spec)?
1916 .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
1917
1918 let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
1919
1920 // TODO: make the keystore selector configurable
1921 let keystore_selector = Default::default();
1922 match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
1923 Some(kp) => Ok(Some(kp)),
1924 None => {
1925 let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
1926 .compute_blinded_key(period)
1927 .map_err(|_| internal!("failed to compute blinded key"))?;
1928
1929 // Note: we can't use KeyMgr::generate because this key is derived from the HsId
1930 // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
1931 // which assumes keys are randomly generated, rather than derived from existing keys).
1932
1933 keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector, true)?;
1934
1935 let arti_path = |spec: &dyn KeySpecifier| {
1936 spec.arti_path()
1937 .map_err(into_internal!("invalid key specifier?!"))
1938 };
1939
1940 Ok(Some(
1941 keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
1942 FatalError::KeystoreRace {
1943 action: "read",
1944 path: arti_path(&blind_id_key_spec)?,
1945 },
1946 )?,
1947 ))
1948 }
1949 }
1950}
1951
1952/// Determine the [`State`] of the publisher based on the upload results
1953/// from the current `time_periods`.
1954fn upload_result_state(
1955 netdir: &NetDir,
1956 time_periods: &[TimePeriodContext],
1957) -> (State, Option<Problem>) {
1958 let current_period = netdir.hs_time_period();
1959 let current_period_res = time_periods
1960 .iter()
1961 .find(|ctx| ctx.params.time_period() == current_period);
1962
1963 let succeeded_current_tp = current_period_res
1964 .iter()
1965 .flat_map(|res| &res.upload_results)
1966 .filter(|res| res.upload_res.is_ok())
1967 .collect_vec();
1968
1969 let secondary_tp_res = time_periods
1970 .iter()
1971 .filter(|ctx| ctx.params.time_period() != current_period)
1972 .collect_vec();
1973
1974 let succeeded_secondary_tp = secondary_tp_res
1975 .iter()
1976 .flat_map(|res| &res.upload_results)
1977 .filter(|res| res.upload_res.is_ok())
1978 .collect_vec();
1979
1980 // All of the failed uploads (for all TPs)
1981 let failed = time_periods
1982 .iter()
1983 .flat_map(|res| &res.upload_results)
1984 .filter(|res| res.upload_res.is_err())
1985 .collect_vec();
1986 let problems: Vec<DescUploadRetryError> = failed
1987 .iter()
1988 .flat_map(|e| e.upload_res.as_ref().map_err(|e| e.clone()).err())
1989 .collect();
1990
1991 let err = match problems.as_slice() {
1992 [_, ..] => Some(problems.into()),
1993 [] => None,
1994 };
1995
1996 if time_periods.len() < 2 {
1997 // We need at least TP contexts (one for the primary TP,
1998 // and another for the secondary one).
1999 //
2000 // If either is missing, we are unreachable for some or all clients.
2001 return (State::DegradedUnreachable, err);
2002 }
2003
2004 let state = match (
2005 succeeded_current_tp.as_slice(),
2006 succeeded_secondary_tp.as_slice(),
2007 ) {
2008 (&[], &[..]) | (&[..], &[]) if failed.is_empty() => {
2009 // We don't have any upload results for one or both TPs.
2010 // We are still bootstrapping.
2011 State::Bootstrapping
2012 }
2013 (&[_, ..], &[_, ..]) if failed.is_empty() => {
2014 // We have uploaded the descriptor to one or more HsDirs from both
2015 // HsDir rings (primary and secondary), and none of the uploads failed.
2016 // We are fully reachable.
2017 State::Running
2018 }
2019 (&[_, ..], &[_, ..]) => {
2020 // We have uploaded the descriptor to one or more HsDirs from both
2021 // HsDir rings (primary and secondary), but some of the uploads failed.
2022 // We are reachable, but we failed to upload the descriptor to all the HsDirs
2023 // that were supposed to have it.
2024 State::DegradedReachable
2025 }
2026 (&[..], &[]) | (&[], &[..]) => {
2027 // We have either
2028 // * uploaded the descriptor to some of the HsDirs from one of the rings,
2029 // but haven't managed to upload it to any of the HsDirs on the other ring, or
2030 // * all of the uploads failed
2031 //
2032 // Either way, we are definitely not reachable by all clients.
2033 State::DegradedUnreachable
2034 }
2035 };
2036
2037 (state, err)
2038}
2039
2040/// Whether the reactor should initiate an upload.
2041#[derive(Copy, Clone, Debug, Default, PartialEq)]
2042enum PublishStatus {
2043 /// We need to call upload_all.
2044 UploadScheduled,
2045 /// We are rate-limited until the specified [`Instant`].
2046 ///
2047 /// We have tried to schedule multiple uploads in a short time span,
2048 /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
2049 /// channel to unblock us.
2050 RateLimited(Instant),
2051 /// We are idle and waiting for external events.
2052 ///
2053 /// We have enough information to build the descriptor, but since we have already called
2054 /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
2055 Idle,
2056 /// We are waiting for the IPT manager to establish some introduction points.
2057 ///
2058 /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
2059 /// `UploadScheduled`.
2060 #[default]
2061 AwaitingIpts,
2062}
2063
2064/// The backoff schedule for the task that publishes descriptors.
2065#[derive(Clone, Debug)]
2066struct PublisherBackoffSchedule<M: Mockable> {
2067 /// The delays
2068 retry_delay: RetryDelay,
2069 /// The mockable reactor state, needed for obtaining an rng.
2070 mockable: M,
2071}
2072
2073impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
2074 fn max_retries(&self) -> Option<usize> {
2075 None
2076 }
2077
2078 fn overall_timeout(&self) -> Option<Duration> {
2079 Some(OVERALL_UPLOAD_TIMEOUT)
2080 }
2081
2082 fn single_attempt_timeout(&self) -> Option<Duration> {
2083 Some(self.mockable.estimate_upload_timeout())
2084 }
2085
2086 fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
2087 Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
2088 }
2089}
2090
2091impl RetriableError for UploadError {
2092 fn should_retry(&self) -> bool {
2093 match self {
2094 UploadError::Request(_) | UploadError::Circuit(_) | UploadError::Stream(_) => true,
2095 UploadError::Bug(_) => false,
2096 }
2097 }
2098}
2099
2100/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
2101#[derive(Debug, Clone)]
2102struct TimePeriodUploadResult {
2103 /// The time period.
2104 time_period: TimePeriod,
2105 /// The upload results.
2106 hsdir_result: Vec<HsDirUploadStatus>,
2107}
2108
2109/// The outcome of uploading a descriptor to a particular HsDir.
2110#[derive(Clone, Debug)]
2111struct HsDirUploadStatus {
2112 /// The identity of the HsDir we attempted to upload the descriptor to.
2113 relay_ids: RelayIds,
2114 /// The outcome of this attempt.
2115 upload_res: UploadResult,
2116 /// The revision counter of the descriptor we tried to upload.
2117 revision_counter: RevisionCounter,
2118}
2119
2120/// The outcome of uploading a descriptor.
2121type UploadResult = Result<(), DescUploadRetryError>;
2122
2123impl From<BackoffError<UploadError>> for DescUploadRetryError {
2124 fn from(e: BackoffError<UploadError>) -> Self {
2125 use BackoffError as BE;
2126 use DescUploadRetryError as DURE;
2127
2128 match e {
2129 BE::FatalError(e) => DURE::FatalError(e),
2130 BE::MaxRetryCountExceeded(e) => DURE::MaxRetryCountExceeded(e),
2131 BE::Timeout(e) => DURE::Timeout(e),
2132 BE::ExplicitStop(_) => {
2133 DURE::Bug(internal!("explicit stop in publisher backoff schedule?!"))
2134 }
2135 }
2136 }
2137}
2138
2139// NOTE: the rest of the publisher tests live in publish.rs
2140#[cfg(test)]
2141mod test {
2142 // @@ begin test lint list maintained by maint/add_warning @@
2143 #![allow(clippy::bool_assert_comparison)]
2144 #![allow(clippy::clone_on_copy)]
2145 #![allow(clippy::dbg_macro)]
2146 #![allow(clippy::mixed_attributes_style)]
2147 #![allow(clippy::print_stderr)]
2148 #![allow(clippy::print_stdout)]
2149 #![allow(clippy::single_char_pattern)]
2150 #![allow(clippy::unwrap_used)]
2151 #![allow(clippy::unchecked_duration_subtraction)]
2152 #![allow(clippy::useless_vec)]
2153 #![allow(clippy::needless_pass_by_value)]
2154 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
2155 use super::*;
2156 use tor_netdir::testnet;
2157
2158 /// Create a `TimePeriodContext` from the specified upload results.
2159 fn create_time_period_ctx(
2160 params: &HsDirParams,
2161 upload_results: Vec<HsDirUploadStatus>,
2162 ) -> TimePeriodContext {
2163 TimePeriodContext {
2164 params: params.clone(),
2165 hs_dirs: vec![],
2166 last_successful: None,
2167 upload_results,
2168 }
2169 }
2170
2171 /// Create a single `HsDirUploadStatus`
2172 fn create_upload_status(upload_res: UploadResult) -> HsDirUploadStatus {
2173 HsDirUploadStatus {
2174 relay_ids: RelayIds::empty(),
2175 upload_res,
2176 revision_counter: RevisionCounter::from(13),
2177 }
2178 }
2179
2180 /// Create a bunch of results, all with the specified `upload_res`.
2181 fn create_upload_results(upload_res: UploadResult) -> Vec<HsDirUploadStatus> {
2182 std::iter::repeat_with(|| create_upload_status(upload_res.clone()))
2183 .take(10)
2184 .collect()
2185 }
2186
2187 fn construct_netdir() -> NetDir {
2188 const SRV1: [u8; 32] = *b"The door refused to open. ";
2189 const SRV2: [u8; 32] = *b"It said, 'Five cents, please.' ";
2190
2191 let dir = testnet::construct_custom_netdir(|_, _, bld| {
2192 bld.shared_rand_prev(7, SRV1.into(), None)
2193 .shared_rand_prev(7, SRV2.into(), None);
2194 })
2195 .unwrap();
2196
2197 dir.unwrap_if_sufficient().unwrap()
2198 }
2199
2200 #[test]
2201 fn upload_result_status_bootstrapping() {
2202 let netdir = construct_netdir();
2203 let all_params = netdir.hs_all_time_periods();
2204 let current_period = netdir.hs_time_period();
2205 let primary_params = all_params
2206 .iter()
2207 .find(|param| param.time_period() == current_period)
2208 .unwrap();
2209 let results = [
2210 (vec![], vec![]),
2211 (vec![], create_upload_results(Ok(()))),
2212 (create_upload_results(Ok(())), vec![]),
2213 ];
2214
2215 for (primary_result, secondary_result) in results {
2216 let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2217
2218 let secondary_params = all_params
2219 .iter()
2220 .find(|param| param.time_period() != current_period)
2221 .unwrap();
2222 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2223
2224 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2225 assert_eq!(status, State::Bootstrapping);
2226 assert!(err.is_none());
2227 }
2228 }
2229
2230 #[test]
2231 fn upload_result_status_running() {
2232 let netdir = construct_netdir();
2233 let all_params = netdir.hs_all_time_periods();
2234 let current_period = netdir.hs_time_period();
2235 let primary_params = all_params
2236 .iter()
2237 .find(|param| param.time_period() == current_period)
2238 .unwrap();
2239
2240 let secondary_result = create_upload_results(Ok(()));
2241 let secondary_params = all_params
2242 .iter()
2243 .find(|param| param.time_period() != current_period)
2244 .unwrap();
2245 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2246
2247 let primary_result = create_upload_results(Ok(()));
2248 let primary_ctx = create_time_period_ctx(primary_params, primary_result);
2249 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2250 assert_eq!(status, State::Running);
2251 assert!(err.is_none());
2252 }
2253
2254 #[test]
2255 fn upload_result_status_reachable() {
2256 let netdir = construct_netdir();
2257 let all_params = netdir.hs_all_time_periods();
2258 let current_period = netdir.hs_time_period();
2259 let primary_params = all_params
2260 .iter()
2261 .find(|param| param.time_period() == current_period)
2262 .unwrap();
2263
2264 let primary_result = create_upload_results(Ok(()));
2265 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2266 let failed_res = create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2267 let secondary_result = create_upload_results(Ok(()))
2268 .into_iter()
2269 .chain(failed_res.iter().cloned())
2270 .collect();
2271 let secondary_params = all_params
2272 .iter()
2273 .find(|param| param.time_period() != current_period)
2274 .unwrap();
2275 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result);
2276 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2277
2278 // Degraded but reachable (because some of the secondary HsDir uploads failed).
2279 assert_eq!(status, State::DegradedReachable);
2280 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2281 }
2282
2283 #[test]
2284 fn upload_result_status_unreachable() {
2285 let netdir = construct_netdir();
2286 let all_params = netdir.hs_all_time_periods();
2287 let current_period = netdir.hs_time_period();
2288 let primary_params = all_params
2289 .iter()
2290 .find(|param| param.time_period() == current_period)
2291 .unwrap();
2292 let mut primary_result =
2293 create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2294 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2295 // No secondary TP (we are unreachable).
2296 let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2297 assert_eq!(status, State::DegradedUnreachable);
2298 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2299
2300 // Add a successful result
2301 primary_result.push(create_upload_status(Ok(())));
2302 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2303 let (status, err) = upload_result_state(&netdir, &[primary_ctx]);
2304 // Still degraded, and unreachable (because we don't have a TimePeriodContext
2305 // for the secondary TP)
2306 assert_eq!(status, State::DegradedUnreachable);
2307 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2308
2309 // If we add another time period where none of the uploads were successful,
2310 // we're *still* unreachable
2311 let secondary_result =
2312 create_upload_results(Err(DescUploadRetryError::Bug(internal!("test"))));
2313 let secondary_params = all_params
2314 .iter()
2315 .find(|param| param.time_period() != current_period)
2316 .unwrap();
2317 let secondary_ctx = create_time_period_ctx(secondary_params, secondary_result.clone());
2318 let primary_ctx = create_time_period_ctx(primary_params, primary_result.clone());
2319 let (status, err) = upload_result_state(&netdir, &[primary_ctx, secondary_ctx]);
2320 assert_eq!(status, State::DegradedUnreachable);
2321 assert!(matches!(err, Some(Problem::DescriptorUpload(_))));
2322 }
2323}