1mod backoff;
6mod descriptor;
7mod reactor;
8mod reupload_timer;
9
10use crate::config::restricted_discovery::RestrictedDiscoveryKeys;
11use crate::internal_prelude::*;
12use crate::pow::PowManager;
13
14use backoff::{BackoffError, BackoffSchedule, RetriableError, Runner};
15use descriptor::{build_sign, DescriptorStatus, VersionedDescriptor};
16use reactor::read_blind_id_keypair;
17use reactor::Reactor;
18use reupload_timer::ReuploadTimer;
19
20use tor_config_path::CfgPathResolver;
21
22pub use reactor::UploadError;
23pub(crate) use reactor::{Mockable, Real, OVERALL_UPLOAD_TIMEOUT};
24
25#[must_use = "If you don't call launch() on the publisher, it won't publish any descriptors."]
31pub(crate) struct Publisher<R: Runtime, M: Mockable> {
32 runtime: R,
34 nickname: HsNickname,
36 dir_provider: Arc<dyn NetDirProvider>,
39 mockable: M,
43 config: Arc<OnionServiceConfig>,
45 ipt_watcher: IptsPublisherView,
47 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
49 keymgr: Arc<KeyMgr>,
51 status_tx: PublisherStatusSender,
53 path_resolver: Arc<CfgPathResolver>,
55 pow_manager: Arc<PowManager<R>>,
57 update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
60}
61
62impl<R: Runtime, M: Mockable> Publisher<R, M> {
63 #[allow(clippy::too_many_arguments)]
70 pub(crate) fn new(
71 runtime: R,
72 nickname: HsNickname,
73 dir_provider: Arc<dyn NetDirProvider>,
74 mockable: impl Into<M>,
75 ipt_watcher: IptsPublisherView,
76 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
77 status_tx: PublisherStatusSender,
78 keymgr: Arc<KeyMgr>,
79 path_resolver: Arc<CfgPathResolver>,
80 pow_manager: Arc<PowManager<R>>,
81 update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
82 ) -> Self {
83 let config = config_rx.borrow().clone();
84 Self {
85 runtime,
86 nickname,
87 dir_provider,
88 mockable: mockable.into(),
89 config,
90 ipt_watcher,
91 config_rx,
92 status_tx,
93 keymgr,
94 path_resolver,
95 pow_manager,
96 update_from_pow_manager_rx,
97 }
98 }
99
100 pub(crate) fn launch(self) -> Result<(), StartupError> {
102 let Publisher {
103 runtime,
104 nickname,
105 dir_provider,
106 mockable,
107 config,
108 ipt_watcher,
109 config_rx,
110 status_tx,
111 keymgr,
112 path_resolver,
113 pow_manager,
114 update_from_pow_manager_rx: publisher_update_rx,
115 } = self;
116
117 let reactor = Reactor::new(
118 runtime.clone(),
119 nickname,
120 dir_provider,
121 mockable,
122 &config,
123 ipt_watcher,
124 config_rx,
125 status_tx,
126 keymgr,
127 path_resolver,
128 pow_manager,
129 publisher_update_rx,
130 );
131
132 runtime
133 .spawn(async move {
134 match reactor.run().await {
135 Ok(()) => debug!("the publisher reactor has shut down"),
136 Err(e) => warn_report!(e, "the publisher reactor has shut down"),
137 }
138 })
139 .map_err(|e| StartupError::Spawn {
140 spawning: "publisher reactor task",
141 cause: e.into(),
142 })?;
143
144 Ok(())
145 }
146}
147
148#[cfg(all(test, not(feature = "hs-pow-full")))]
150mod test {
151 #![allow(clippy::bool_assert_comparison)]
153 #![allow(clippy::clone_on_copy)]
154 #![allow(clippy::dbg_macro)]
155 #![allow(clippy::mixed_attributes_style)]
156 #![allow(clippy::print_stderr)]
157 #![allow(clippy::print_stdout)]
158 #![allow(clippy::single_char_pattern)]
159 #![allow(clippy::unwrap_used)]
160 #![allow(clippy::unchecked_duration_subtraction)]
161 #![allow(clippy::useless_vec)]
162 #![allow(clippy::needless_pass_by_value)]
163 use super::*;
165
166 use std::collections::HashMap;
167 use std::io;
168 use std::path::Path;
169 use std::pin::Pin;
170 use std::sync::atomic::{AtomicUsize, Ordering};
171 use std::sync::Mutex;
172 use std::task::{Context, Poll};
173 use std::time::Duration;
174
175 use async_trait::async_trait;
176 use fs_mistrust::Mistrust;
177 use futures::{AsyncRead, AsyncWrite};
178 use tempfile::{tempdir, TempDir};
179 use test_temp_dir::test_temp_dir;
180
181 use tor_basic_utils::test_rng::{testing_rng, TestingRng};
182 use tor_circmgr::hspool::HsCircKind;
183 use tor_hscrypto::pk::{HsBlindId, HsDescSigningKeypair, HsId, HsIdKey, HsIdKeypair};
184 use tor_key_forge::ToEncodableKey;
185 use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeySpecifier};
186 use tor_llcrypto::pk::{ed25519, rsa};
187 use tor_netdir::testprovider::TestNetDirProvider;
188 use tor_netdir::{testnet, NetDir};
189 use tor_netdoc::doc::hsdesc::test_data;
190 use tor_rtcompat::ToplevelBlockOn;
191 use tor_rtmock::MockRuntime;
192
193 use crate::config::OnionServiceConfigBuilder;
194 use crate::ipt_set::{ipts_channel, IptInSet, IptSet};
195 use crate::pow::NewPowManager;
196 use crate::publish::reactor::MockableClientCirc;
197 use crate::status::{OnionServiceStatus, StatusSender};
198 use crate::test::create_storage_handles;
199 use crate::HsNickname;
200 use crate::{
201 BlindIdKeypairSpecifier, BlindIdPublicKeySpecifier, DescSigningKeypairSpecifier,
202 HsIdKeypairSpecifier, HsIdPublicKeySpecifier,
203 };
204
205 const TEST_SVC_NICKNAME: &str = "test-svc";
207
208 const OK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n\r\n";
210
211 const ERR_RESPONSE: &str = "HTTP/1.1 500 UH_OH\r\n\r\n";
213
214 type PollReadResult<T> = Result<T, ()>;
219
220 trait PollReadIter:
222 Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
223 {
224 }
225
226 impl<I> PollReadIter for I where
227 I: Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
228 {
229 }
230
231 #[derive(Clone, Debug, Default)]
232 struct MockReactorState<I: PollReadIter> {
233 publish_count: Arc<AtomicUsize>,
235 poll_read_responses: I,
244 responses_for_hsdir: Arc<Mutex<HashMap<rsa::RsaIdentity, I>>>,
248 }
249
250 #[async_trait]
251 impl<I: PollReadIter> Mockable for MockReactorState<I> {
252 type Rng = TestingRng;
253 type ClientCirc = MockClientCirc<I>;
254
255 fn thread_rng(&self) -> Self::Rng {
256 testing_rng()
257 }
258
259 async fn get_or_launch_specific<T>(
260 &self,
261 _netdir: &tor_netdir::NetDir,
262 kind: HsCircKind,
263 target: T,
264 ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
265 where
266 T: tor_linkspec::CircTarget + Send + Sync,
267 {
268 assert_eq!(kind, HsCircKind::SvcHsDir);
269
270 let id = target.rsa_identity().unwrap();
272 let mut map = self.responses_for_hsdir.lock().unwrap();
273 let poll_read_responses = map
274 .entry(*id)
275 .or_insert_with(|| self.poll_read_responses.clone());
276
277 Ok(MockClientCirc {
278 publish_count: Arc::clone(&self.publish_count),
279 poll_read_responses: poll_read_responses.clone(),
280 }
281 .into())
282 }
283
284 fn estimate_upload_timeout(&self) -> Duration {
285 Duration::from_secs(30)
287 }
288 }
289
290 #[derive(Debug, Clone)]
291 struct MockClientCirc<I: PollReadIter> {
292 publish_count: Arc<AtomicUsize>,
294 poll_read_responses: I,
298 }
299
300 #[async_trait]
301 impl<I: PollReadIter> MockableClientCirc for MockClientCirc<I> {
302 type DataStream = MockDataStream<I>;
303
304 async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
305 Ok(MockDataStream {
306 publish_count: Arc::clone(&self.publish_count),
307 poll_read_responses: self.poll_read_responses.clone(),
310 })
311 }
312
313 fn source_info(&self) -> tor_proto::Result<Option<tor_dirclient::SourceInfo>> {
314 Ok(None)
315 }
316 }
317
318 #[derive(Debug)]
319 struct MockDataStream<I: PollReadIter> {
320 publish_count: Arc<AtomicUsize>,
322 poll_read_responses: I,
326 }
327
328 impl<I: PollReadIter> AsyncRead for MockDataStream<I> {
329 fn poll_read(
330 mut self: Pin<&mut Self>,
331 _cx: &mut Context<'_>,
332 buf: &mut [u8],
333 ) -> Poll<io::Result<usize>> {
334 match self.as_mut().poll_read_responses.next() {
335 Some(res) => {
336 match res {
337 Ok(res) => {
338 buf[..res.len()].copy_from_slice(res.as_bytes());
339
340 Poll::Ready(Ok(res.len()))
341 }
342 Err(()) => {
343 Poll::Ready(Err(io::Error::other("test error")))
346 }
347 }
348 }
349 None => Poll::Ready(Ok(0)),
350 }
351 }
352 }
353
354 impl<I: PollReadIter> AsyncWrite for MockDataStream<I> {
355 fn poll_write(
356 self: Pin<&mut Self>,
357 _cx: &mut Context<'_>,
358 buf: &[u8],
359 ) -> Poll<io::Result<usize>> {
360 let request = std::str::from_utf8(buf).unwrap();
361
362 assert!(request.starts_with("POST /tor/hs/3/publish HTTP/1.0\r\n"));
363 let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst);
364
365 Poll::Ready(Ok(request.len()))
366 }
367
368 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
369 Poll::Ready(Ok(()))
370 }
371
372 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
373 Poll::Ready(Ok(()))
374 }
375 }
376
377 fn insert_svc_key<K>(key: K, keymgr: &KeyMgr, svc_key_spec: &dyn KeySpecifier)
379 where
380 K: ToEncodableKey,
381 {
382 keymgr
383 .insert(
384 key,
385 svc_key_spec,
386 tor_keymgr::KeystoreSelector::Primary,
387 true,
388 )
389 .unwrap();
390 }
391
392 fn init_keymgr(
394 keystore_dir: &TempDir,
395 nickname: &HsNickname,
396 netdir: &NetDir,
397 ) -> (HsId, HsBlindId, Arc<KeyMgr>) {
398 let period = netdir.hs_time_period();
399
400 let mut rng = testing_rng();
401 let keypair = ed25519::Keypair::generate(&mut rng);
402 let id_pub = HsIdKey::from(keypair.verifying_key());
403 let id_keypair = HsIdKeypair::from(ed25519::ExpandedKeypair::from(&keypair));
404
405 let (hs_blind_id_key, hs_blind_id_kp, _subcredential) =
406 id_keypair.compute_blinded_key(period).unwrap();
407
408 let keystore = ArtiNativeKeystore::from_path_and_mistrust(
409 keystore_dir,
410 &Mistrust::new_dangerously_trust_everyone(),
411 )
412 .unwrap();
413
414 let keymgr = KeyMgrBuilder::default()
416 .primary_store(Box::new(keystore))
417 .build()
418 .unwrap();
419
420 insert_svc_key(
421 id_keypair,
422 &keymgr,
423 &HsIdKeypairSpecifier::new(nickname.clone()),
424 );
425
426 insert_svc_key(
427 id_pub.clone(),
428 &keymgr,
429 &HsIdPublicKeySpecifier::new(nickname.clone()),
430 );
431
432 insert_svc_key(
433 hs_blind_id_kp,
434 &keymgr,
435 &BlindIdKeypairSpecifier::new(nickname.clone(), period),
436 );
437
438 insert_svc_key(
439 hs_blind_id_key.clone(),
440 &keymgr,
441 &BlindIdPublicKeySpecifier::new(nickname.clone(), period),
442 );
443
444 insert_svc_key(
445 HsDescSigningKeypair::from(ed25519::Keypair::generate(&mut rng)),
446 &keymgr,
447 &DescSigningKeypairSpecifier::new(nickname.clone(), period),
448 );
449
450 let hs_id = id_pub.into();
451 (hs_id, hs_blind_id_key.into(), keymgr.into())
452 }
453
454 fn build_test_config(nickname: HsNickname) -> OnionServiceConfig {
455 OnionServiceConfigBuilder::default()
456 .nickname(nickname)
457 .rate_limit_at_intro(None)
458 .build()
459 .unwrap()
460 }
461
462 #[allow(clippy::too_many_arguments)]
463 fn run_test<I: PollReadIter>(
464 runtime: MockRuntime,
465 nickname: HsNickname,
466 keymgr: Arc<KeyMgr>,
467 pv: IptsPublisherView,
468 config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
469 status_tx: PublisherStatusSender,
470 netdir: NetDir,
471 reactor_event: impl FnOnce(),
472 poll_read_responses: I,
473 expected_upload_count: usize,
474 republish_count: usize,
475 expect_errors: bool,
476 ) {
477 runtime.clone().block_on(async move {
478 let netdir_provider: Arc<dyn NetDirProvider> =
479 Arc::new(TestNetDirProvider::from(netdir));
480 let publish_count = Default::default();
481 let circpool = MockReactorState {
482 publish_count: Arc::clone(&publish_count),
483 poll_read_responses,
484 responses_for_hsdir: Arc::new(Mutex::new(Default::default())),
485 };
486
487 let temp_dir = test_temp_dir!();
488 let state_dir = temp_dir.subdir_untracked("state_dir");
489 let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
490 let state_dir = StateDirectory::new(state_dir, &mistrust).unwrap();
491 let state_handle = state_dir.acquire_instance(&nickname).unwrap();
492 let pow_nonce_dir = state_handle.raw_subdir("pow_nonces").unwrap();
493 let pow_manager_storage_handle = state_handle.storage_handle("pow_manager").unwrap();
494
495 let NewPowManager {
496 pow_manager,
497 rend_req_tx: _,
498 rend_req_rx: _,
499 publisher_update_rx: update_from_pow_manager_rx,
500 } = PowManager::new(
501 runtime.clone(),
502 nickname.clone(),
503 pow_nonce_dir,
504 keymgr.clone(),
505 pow_manager_storage_handle,
506 )
507 .unwrap();
508 let mut status_rx = status_tx.subscribe();
509 let publisher: Publisher<MockRuntime, MockReactorState<_>> = Publisher::new(
510 runtime.clone(),
511 nickname,
512 netdir_provider,
513 circpool,
514 pv,
515 config_rx,
516 status_tx,
517 keymgr,
518 Arc::new(CfgPathResolver::default()),
519 pow_manager,
520 update_from_pow_manager_rx,
521 );
522
523 publisher.launch().unwrap();
524 runtime.progress_until_stalled().await;
525 let status = status_rx.next().await.unwrap().publisher_status();
526 assert_eq!(State::Shutdown, status.state());
527 assert!(status.current_problem().is_none());
528
529 assert_eq!(publish_count.load(Ordering::SeqCst), 0);
531
532 reactor_event();
533
534 runtime.progress_until_stalled().await;
535
536 runtime.advance_by(Duration::from_secs(1)).await;
540 runtime.progress_until_stalled().await;
541
542 let initial_publish_count = publish_count.load(Ordering::SeqCst);
543 assert_eq!(initial_publish_count, expected_upload_count);
544
545 let status = status_rx.next().await.unwrap().publisher_status();
546 if expect_errors {
547 assert_eq!(State::Bootstrapping, status.state());
549 } else {
550 assert_eq!(State::DegradedUnreachable, status.state());
553 }
554 assert!(status.current_problem().is_none());
555
556 if republish_count > 0 {
557 const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 120);
559
560 runtime
562 .advance_by(MAX_TIMEOUT * (republish_count as u32))
563 .await;
564 runtime.progress_until_stalled().await;
565
566 let min_upload_count = expected_upload_count * republish_count;
567 let max_upload_count = 2 * min_upload_count;
570 let publish_count_now = publish_count.load(Ordering::SeqCst);
571 let actual_reupload_count = publish_count_now - initial_publish_count;
574
575 assert!((min_upload_count..=max_upload_count).contains(&actual_reupload_count));
576 }
577 });
578 }
579
580 fn publish_after_ipt_change<I: PollReadIter>(
590 temp_dir: &Path,
591 poll_read_responses: I,
592 multiplier: usize,
593 republish_count: usize,
594 expect_errors: bool,
595 ) {
596 let runtime = MockRuntime::new();
597 let nickname = HsNickname::try_from(TEST_SVC_NICKNAME.to_string()).unwrap();
598 let config = build_test_config(nickname.clone());
599 let (_config_tx, config_rx) = watch::channel_with(Arc::new(config));
600
601 let (mut mv, pv) = ipts_channel(&runtime, create_storage_handles(temp_dir).1).unwrap();
602 let update_ipts = || {
603 let ipts: Vec<IptInSet> = test_data::test_parsed_hsdesc()
604 .unwrap()
605 .intro_points()
606 .iter()
607 .enumerate()
608 .map(|(i, ipt)| IptInSet {
609 ipt: ipt.clone(),
610 lid: [i.try_into().unwrap(); 32].into(),
611 })
612 .collect();
613
614 mv.borrow_for_update(runtime.clone()).ipts = Some(IptSet {
615 ipts,
616 lifetime: Duration::from_secs(20),
617 });
618 };
619
620 let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
621 let keystore_dir = tempdir().unwrap();
622
623 let (_hsid, blind_id, keymgr) = init_keymgr(&keystore_dir, &nickname, &netdir);
624
625 let hsdir_count = netdir
626 .hs_dirs_upload(blind_id, netdir.hs_time_period())
627 .unwrap()
628 .collect::<Vec<_>>()
629 .len();
630
631 assert!(hsdir_count > 0);
632
633 let expected_upload_count = hsdir_count * multiplier;
636 let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
637
638 run_test(
639 runtime.clone(),
640 nickname,
641 keymgr,
642 pv,
643 config_rx,
644 status_tx,
645 netdir,
646 update_ipts,
647 poll_read_responses,
648 expected_upload_count,
649 republish_count,
650 expect_errors,
651 );
652 }
653
654 #[test]
655 fn publish_after_ipt_change_no_errors() {
656 let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
658
659 test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, 0, false));
660 }
661
662 #[test]
663 fn publish_after_ipt_change_with_errors() {
664 let err_responses = vec![
665 Err(()),
667 Ok(ERR_RESPONSE.to_string()),
669 ];
670
671 for error_res in err_responses.into_iter() {
672 let poll_reads = vec![
673 error_res,
679 Ok(OK_RESPONSE.to_string()),
680 ]
681 .into_iter();
682
683 test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 2, 0, true));
684 }
685 }
686
687 #[test]
688 fn reupload_after_publishing() {
689 let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
690 const REUPLOAD_COUNT: usize = 4;
692
693 test_temp_dir!()
694 .used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, REUPLOAD_COUNT, false));
695 }
696
697 }