1mod backoff;
6mod descriptor;
7mod reactor;
8mod reupload_timer;
9
10use crate::config::restricted_discovery::RestrictedDiscoveryKeys;
11use crate::internal_prelude::*;
12
13use backoff::{BackoffError, BackoffSchedule, RetriableError, Runner};
14use descriptor::{build_sign, DescriptorStatus, VersionedDescriptor};
15use reactor::read_blind_id_keypair;
16use reactor::Reactor;
17use reupload_timer::ReuploadTimer;
18
19use tor_config_path::CfgPathResolver;
20
21pub use reactor::UploadError;
22pub(crate) use reactor::{Mockable, Real, OVERALL_UPLOAD_TIMEOUT};
23
24#[must_use = "If you don't call launch() on the publisher, it won't publish any descriptors."]
30pub(crate) struct Publisher<R: Runtime, M: Mockable> {
31 runtime: R,
33 nickname: HsNickname,
35 dir_provider: Arc<dyn NetDirProvider>,
38 mockable: M,
42 config: Arc<OnionServiceConfig>,
44 ipt_watcher: IptsPublisherView,
46 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
48 keymgr: Arc<KeyMgr>,
50 status_tx: PublisherStatusSender,
52 path_resolver: Arc<CfgPathResolver>,
54}
55
56impl<R: Runtime, M: Mockable> Publisher<R, M> {
57 #[allow(clippy::too_many_arguments)]
64 pub(crate) fn new(
65 runtime: R,
66 nickname: HsNickname,
67 dir_provider: Arc<dyn NetDirProvider>,
68 mockable: impl Into<M>,
69 ipt_watcher: IptsPublisherView,
70 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
71 status_tx: PublisherStatusSender,
72 keymgr: Arc<KeyMgr>,
73 path_resolver: Arc<CfgPathResolver>,
74 ) -> Self {
75 let config = config_rx.borrow().clone();
76 Self {
77 runtime,
78 nickname,
79 dir_provider,
80 mockable: mockable.into(),
81 config,
82 ipt_watcher,
83 config_rx,
84 status_tx,
85 keymgr,
86 path_resolver,
87 }
88 }
89
90 pub(crate) fn launch(self) -> Result<(), StartupError> {
92 let Publisher {
93 runtime,
94 nickname,
95 dir_provider,
96 mockable,
97 config,
98 ipt_watcher,
99 config_rx,
100 status_tx,
101 keymgr,
102 path_resolver,
103 } = self;
104
105 let reactor = Reactor::new(
106 runtime.clone(),
107 nickname,
108 dir_provider,
109 mockable,
110 &config,
111 ipt_watcher,
112 config_rx,
113 status_tx,
114 keymgr,
115 path_resolver,
116 );
117
118 runtime
119 .spawn(async move {
120 match reactor.run().await {
121 Ok(()) => debug!("the publisher reactor has shut down"),
122 Err(e) => warn_report!(e, "the publisher reactor has shut down"),
123 }
124 })
125 .map_err(|e| StartupError::Spawn {
126 spawning: "publisher reactor task",
127 cause: e.into(),
128 })?;
129
130 Ok(())
131 }
132}
133
134#[cfg(test)]
135mod test {
136 #![allow(clippy::bool_assert_comparison)]
138 #![allow(clippy::clone_on_copy)]
139 #![allow(clippy::dbg_macro)]
140 #![allow(clippy::mixed_attributes_style)]
141 #![allow(clippy::print_stderr)]
142 #![allow(clippy::print_stdout)]
143 #![allow(clippy::single_char_pattern)]
144 #![allow(clippy::unwrap_used)]
145 #![allow(clippy::unchecked_duration_subtraction)]
146 #![allow(clippy::useless_vec)]
147 #![allow(clippy::needless_pass_by_value)]
148 use super::*;
150
151 use std::collections::HashMap;
152 use std::io;
153 use std::path::Path;
154 use std::pin::Pin;
155 use std::sync::atomic::{AtomicUsize, Ordering};
156 use std::sync::Mutex;
157 use std::task::{Context, Poll};
158 use std::time::Duration;
159
160 use async_trait::async_trait;
161 use fs_mistrust::Mistrust;
162 use futures::{AsyncRead, AsyncWrite};
163 use tempfile::{tempdir, TempDir};
164 use test_temp_dir::test_temp_dir;
165
166 use tor_basic_utils::test_rng::{testing_rng, TestingRng};
167 use tor_circmgr::hspool::HsCircKind;
168 use tor_hscrypto::pk::{HsBlindId, HsDescSigningKeypair, HsId, HsIdKey, HsIdKeypair};
169 use tor_key_forge::ToEncodableKey;
170 use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeySpecifier};
171 use tor_llcrypto::pk::{ed25519, rsa};
172 use tor_netdir::testprovider::TestNetDirProvider;
173 use tor_netdir::{testnet, NetDir};
174 use tor_netdoc::doc::hsdesc::test_data;
175 use tor_rtcompat::ToplevelBlockOn;
176 use tor_rtmock::MockRuntime;
177
178 use crate::config::OnionServiceConfigBuilder;
179 use crate::ipt_set::{ipts_channel, IptInSet, IptSet};
180 use crate::publish::reactor::MockableClientCirc;
181 use crate::status::{OnionServiceStatus, StatusSender};
182 use crate::test::create_storage_handles;
183 use crate::HsNickname;
184 use crate::{
185 BlindIdKeypairSpecifier, BlindIdPublicKeySpecifier, DescSigningKeypairSpecifier,
186 HsIdKeypairSpecifier, HsIdPublicKeySpecifier,
187 };
188
189 const TEST_SVC_NICKNAME: &str = "test-svc";
191
192 const OK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n\r\n";
194
195 const ERR_RESPONSE: &str = "HTTP/1.1 500 UH_OH\r\n\r\n";
197
198 type PollReadResult<T> = Result<T, ()>;
203
204 trait PollReadIter:
206 Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
207 {
208 }
209
210 impl<I> PollReadIter for I where
211 I: Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
212 {
213 }
214
215 #[derive(Clone, Debug, Default)]
216 struct MockReactorState<I: PollReadIter> {
217 publish_count: Arc<AtomicUsize>,
219 poll_read_responses: I,
228 responses_for_hsdir: Arc<Mutex<HashMap<rsa::RsaIdentity, I>>>,
232 }
233
234 #[async_trait]
235 impl<I: PollReadIter> Mockable for MockReactorState<I> {
236 type Rng = TestingRng;
237 type ClientCirc = MockClientCirc<I>;
238
239 fn thread_rng(&self) -> Self::Rng {
240 testing_rng()
241 }
242
243 async fn get_or_launch_specific<T>(
244 &self,
245 _netdir: &tor_netdir::NetDir,
246 kind: HsCircKind,
247 target: T,
248 ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
249 where
250 T: tor_linkspec::CircTarget + Send + Sync,
251 {
252 assert_eq!(kind, HsCircKind::SvcHsDir);
253
254 let id = target.rsa_identity().unwrap();
256 let mut map = self.responses_for_hsdir.lock().unwrap();
257 let poll_read_responses = map
258 .entry(*id)
259 .or_insert_with(|| self.poll_read_responses.clone());
260
261 Ok(MockClientCirc {
262 publish_count: Arc::clone(&self.publish_count),
263 poll_read_responses: poll_read_responses.clone(),
264 }
265 .into())
266 }
267
268 fn estimate_upload_timeout(&self) -> Duration {
269 Duration::from_secs(30)
271 }
272 }
273
274 #[derive(Debug, Clone)]
275 struct MockClientCirc<I: PollReadIter> {
276 publish_count: Arc<AtomicUsize>,
278 poll_read_responses: I,
282 }
283
284 #[async_trait]
285 impl<I: PollReadIter> MockableClientCirc for MockClientCirc<I> {
286 type DataStream = MockDataStream<I>;
287
288 async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
289 Ok(MockDataStream {
290 publish_count: Arc::clone(&self.publish_count),
291 poll_read_responses: self.poll_read_responses.clone(),
294 })
295 }
296 }
297
298 #[derive(Debug)]
299 struct MockDataStream<I: PollReadIter> {
300 publish_count: Arc<AtomicUsize>,
302 poll_read_responses: I,
306 }
307
308 impl<I: PollReadIter> AsyncRead for MockDataStream<I> {
309 fn poll_read(
310 mut self: Pin<&mut Self>,
311 _cx: &mut Context<'_>,
312 buf: &mut [u8],
313 ) -> Poll<io::Result<usize>> {
314 match self.as_mut().poll_read_responses.next() {
315 Some(res) => {
316 match res {
317 Ok(res) => {
318 buf[..res.len()].copy_from_slice(res.as_bytes());
319
320 Poll::Ready(Ok(res.len()))
321 }
322 Err(()) => {
323 Poll::Ready(Err(io::Error::other("test error")))
326 }
327 }
328 }
329 None => Poll::Ready(Ok(0)),
330 }
331 }
332 }
333
334 impl<I: PollReadIter> AsyncWrite for MockDataStream<I> {
335 fn poll_write(
336 self: Pin<&mut Self>,
337 _cx: &mut Context<'_>,
338 buf: &[u8],
339 ) -> Poll<io::Result<usize>> {
340 let request = std::str::from_utf8(buf).unwrap();
341
342 assert!(request.starts_with("POST /tor/hs/3/publish HTTP/1.0\r\n"));
343 let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst);
344
345 Poll::Ready(Ok(request.len()))
346 }
347
348 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
349 Poll::Ready(Ok(()))
350 }
351
352 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
353 Poll::Ready(Ok(()))
354 }
355 }
356
357 fn insert_svc_key<K>(key: K, keymgr: &KeyMgr, svc_key_spec: &dyn KeySpecifier)
359 where
360 K: ToEncodableKey,
361 {
362 keymgr
363 .insert(
364 key,
365 svc_key_spec,
366 tor_keymgr::KeystoreSelector::Primary,
367 true,
368 )
369 .unwrap();
370 }
371
372 fn init_keymgr(
374 keystore_dir: &TempDir,
375 nickname: &HsNickname,
376 netdir: &NetDir,
377 ) -> (HsId, HsBlindId, Arc<KeyMgr>) {
378 let period = netdir.hs_time_period();
379
380 let mut rng = testing_rng();
381 let keypair = ed25519::Keypair::generate(&mut rng);
382 let id_pub = HsIdKey::from(keypair.verifying_key());
383 let id_keypair = HsIdKeypair::from(ed25519::ExpandedKeypair::from(&keypair));
384
385 let (hs_blind_id_key, hs_blind_id_kp, _subcredential) =
386 id_keypair.compute_blinded_key(period).unwrap();
387
388 let keystore = ArtiNativeKeystore::from_path_and_mistrust(
389 keystore_dir,
390 &Mistrust::new_dangerously_trust_everyone(),
391 )
392 .unwrap();
393
394 let keymgr = KeyMgrBuilder::default()
396 .primary_store(Box::new(keystore))
397 .build()
398 .unwrap();
399
400 insert_svc_key(
401 id_keypair,
402 &keymgr,
403 &HsIdKeypairSpecifier::new(nickname.clone()),
404 );
405
406 insert_svc_key(
407 id_pub.clone(),
408 &keymgr,
409 &HsIdPublicKeySpecifier::new(nickname.clone()),
410 );
411
412 insert_svc_key(
413 hs_blind_id_kp,
414 &keymgr,
415 &BlindIdKeypairSpecifier::new(nickname.clone(), period),
416 );
417
418 insert_svc_key(
419 hs_blind_id_key.clone(),
420 &keymgr,
421 &BlindIdPublicKeySpecifier::new(nickname.clone(), period),
422 );
423
424 insert_svc_key(
425 HsDescSigningKeypair::from(ed25519::Keypair::generate(&mut rng)),
426 &keymgr,
427 &DescSigningKeypairSpecifier::new(nickname.clone(), period),
428 );
429
430 let hs_id = id_pub.into();
431 (hs_id, hs_blind_id_key.into(), keymgr.into())
432 }
433
434 fn build_test_config(nickname: HsNickname) -> OnionServiceConfig {
435 OnionServiceConfigBuilder::default()
436 .nickname(nickname)
437 .rate_limit_at_intro(None)
438 .build()
439 .unwrap()
440 }
441
442 #[allow(clippy::too_many_arguments)]
443 fn run_test<I: PollReadIter>(
444 runtime: MockRuntime,
445 nickname: HsNickname,
446 keymgr: Arc<KeyMgr>,
447 pv: IptsPublisherView,
448 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
449 status_tx: PublisherStatusSender,
450 netdir: NetDir,
451 reactor_event: impl FnOnce(),
452 poll_read_responses: I,
453 expected_upload_count: usize,
454 republish_count: usize,
455 expect_errors: bool,
456 ) {
457 runtime.clone().block_on(async move {
458 let netdir_provider: Arc<dyn NetDirProvider> =
459 Arc::new(TestNetDirProvider::from(netdir));
460 let publish_count = Default::default();
461 let circpool = MockReactorState {
462 publish_count: Arc::clone(&publish_count),
463 poll_read_responses,
464 responses_for_hsdir: Arc::new(Mutex::new(Default::default())),
465 };
466
467 let mut status_rx = status_tx.subscribe();
468 let publisher: Publisher<MockRuntime, MockReactorState<_>> = Publisher::new(
469 runtime.clone(),
470 nickname,
471 netdir_provider,
472 circpool,
473 pv,
474 config_rx,
475 status_tx,
476 keymgr,
477 Arc::new(CfgPathResolver::default()),
478 );
479
480 publisher.launch().unwrap();
481 runtime.progress_until_stalled().await;
482 let status = status_rx.next().await.unwrap().publisher_status();
483 assert_eq!(State::Shutdown, status.state());
484 assert!(status.current_problem().is_none());
485
486 assert_eq!(publish_count.load(Ordering::SeqCst), 0);
488
489 reactor_event();
490
491 runtime.progress_until_stalled().await;
492
493 runtime.advance_by(Duration::from_secs(1)).await;
497 runtime.progress_until_stalled().await;
498
499 let initial_publish_count = publish_count.load(Ordering::SeqCst);
500 assert_eq!(initial_publish_count, expected_upload_count);
501
502 let status = status_rx.next().await.unwrap().publisher_status();
503 if expect_errors {
504 assert_eq!(State::Bootstrapping, status.state());
506 } else {
507 assert_eq!(State::DegradedUnreachable, status.state());
510 }
511 assert!(status.current_problem().is_none());
512
513 if republish_count > 0 {
514 const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 120);
516
517 runtime
519 .advance_by(MAX_TIMEOUT * (republish_count as u32))
520 .await;
521 runtime.progress_until_stalled().await;
522
523 let min_upload_count = expected_upload_count * republish_count;
524 let max_upload_count = 2 * min_upload_count;
527 let publish_count_now = publish_count.load(Ordering::SeqCst);
528 let actual_reupload_count = publish_count_now - initial_publish_count;
531
532 assert!((min_upload_count..=max_upload_count).contains(&actual_reupload_count));
533 }
534 });
535 }
536
537 fn publish_after_ipt_change<I: PollReadIter>(
547 temp_dir: &Path,
548 poll_read_responses: I,
549 multiplier: usize,
550 republish_count: usize,
551 expect_errors: bool,
552 ) {
553 let runtime = MockRuntime::new();
554 let nickname = HsNickname::try_from(TEST_SVC_NICKNAME.to_string()).unwrap();
555 let config = build_test_config(nickname.clone());
556 let (_config_tx, config_rx) = watch::channel_with(Arc::new(config));
557
558 let (mut mv, pv) = ipts_channel(&runtime, create_storage_handles(temp_dir).1).unwrap();
559 let update_ipts = || {
560 let ipts: Vec<IptInSet> = test_data::test_parsed_hsdesc()
561 .unwrap()
562 .intro_points()
563 .iter()
564 .enumerate()
565 .map(|(i, ipt)| IptInSet {
566 ipt: ipt.clone(),
567 lid: [i.try_into().unwrap(); 32].into(),
568 })
569 .collect();
570
571 mv.borrow_for_update(runtime.clone()).ipts = Some(IptSet {
572 ipts,
573 lifetime: Duration::from_secs(20),
574 });
575 };
576
577 let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
578 let keystore_dir = tempdir().unwrap();
579
580 let (_hsid, blind_id, keymgr) = init_keymgr(&keystore_dir, &nickname, &netdir);
581
582 let hsdir_count = netdir
583 .hs_dirs_upload(blind_id, netdir.hs_time_period())
584 .unwrap()
585 .collect::<Vec<_>>()
586 .len();
587
588 assert!(hsdir_count > 0);
589
590 let expected_upload_count = hsdir_count * multiplier;
593 let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
594
595 run_test(
596 runtime.clone(),
597 nickname,
598 keymgr,
599 pv,
600 config_rx,
601 status_tx,
602 netdir,
603 update_ipts,
604 poll_read_responses,
605 expected_upload_count,
606 republish_count,
607 expect_errors,
608 );
609 }
610
611 #[test]
612 fn publish_after_ipt_change_no_errors() {
613 let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
615
616 test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, 0, false));
617 }
618
619 #[test]
620 fn publish_after_ipt_change_with_errors() {
621 let err_responses = vec![
622 Err(()),
624 Ok(ERR_RESPONSE.to_string()),
626 ];
627
628 for error_res in err_responses.into_iter() {
629 let poll_reads = vec![
630 error_res,
636 Ok(OK_RESPONSE.to_string()),
637 ]
638 .into_iter();
639
640 test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 2, 0, true));
641 }
642 }
643
644 #[test]
645 fn reupload_after_publishing() {
646 let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
647 const REUPLOAD_COUNT: usize = 4;
649
650 test_temp_dir!()
651 .used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, REUPLOAD_COUNT, false));
652 }
653
654 }