tor_hsservice/
publish.rs

1//! Publish and maintain onion service descriptors
2//!
3//! See the [`reactor`] module-level documentation for more details.
4
5mod backoff;
6mod descriptor;
7mod reactor;
8mod reupload_timer;
9
10use crate::config::restricted_discovery::RestrictedDiscoveryKeys;
11use crate::internal_prelude::*;
12use crate::pow::PowManager;
13
14use backoff::{BackoffError, BackoffSchedule, RetriableError, Runner};
15use descriptor::{build_sign, DescriptorStatus, VersionedDescriptor};
16use reactor::read_blind_id_keypair;
17use reactor::Reactor;
18use reupload_timer::ReuploadTimer;
19
20use tor_config_path::CfgPathResolver;
21
22pub use reactor::UploadError;
23pub(crate) use reactor::{Mockable, Real, OVERALL_UPLOAD_TIMEOUT};
24
25/// A handle for the Hsdir Publisher for an onion service.
26///
27/// This handle represents a set of tasks that identify the hsdirs for each
28/// relevant time period, construct descriptors, publish them, and keep them
29/// up-to-date.
30#[must_use = "If you don't call launch() on the publisher, it won't publish any descriptors."]
31pub(crate) struct Publisher<R: Runtime, M: Mockable> {
32    /// The runtime.
33    runtime: R,
34    /// The service for which we're publishing descriptors.
35    nickname: HsNickname,
36    /// A source for new network directories that we use to determine
37    /// our HsDirs.
38    dir_provider: Arc<dyn NetDirProvider>,
39    /// Mockable state.
40    ///
41    /// This is used for launching circuits and for obtaining random number generators.
42    mockable: M,
43    /// The onion service config.
44    config: Arc<OnionServiceConfig>,
45    /// A channel for receiving IPT change notifications.
46    ipt_watcher: IptsPublisherView,
47    /// A channel for receiving onion service config change notifications.
48    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
49    /// The key manager.
50    keymgr: Arc<KeyMgr>,
51    /// A sender for updating the status of the onion service.
52    status_tx: PublisherStatusSender,
53    /// Path resolver for configuration files.
54    path_resolver: Arc<CfgPathResolver>,
55    /// Proof-of-work state
56    pow_manager: Arc<PowManager<R>>,
57    /// Queue on which we receive messages from the [`PowManager`] telling us that a seed has
58    /// rotated and thus we need to republish the descriptor for a particular time period.
59    update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
60}
61
62impl<R: Runtime, M: Mockable> Publisher<R, M> {
63    /// Create a new publisher.
64    ///
65    /// When it launches, it will know no keys or introduction points,
66    /// and will therefore not upload any descriptors.
67    ///
68    /// The publisher won't start publishing until you call [`Publisher::launch`].
69    #[allow(clippy::too_many_arguments)]
70    pub(crate) fn new(
71        runtime: R,
72        nickname: HsNickname,
73        dir_provider: Arc<dyn NetDirProvider>,
74        mockable: impl Into<M>,
75        ipt_watcher: IptsPublisherView,
76        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
77        status_tx: PublisherStatusSender,
78        keymgr: Arc<KeyMgr>,
79        path_resolver: Arc<CfgPathResolver>,
80        pow_manager: Arc<PowManager<R>>,
81        update_from_pow_manager_rx: mpsc::Receiver<TimePeriod>,
82    ) -> Self {
83        let config = config_rx.borrow().clone();
84        Self {
85            runtime,
86            nickname,
87            dir_provider,
88            mockable: mockable.into(),
89            config,
90            ipt_watcher,
91            config_rx,
92            status_tx,
93            keymgr,
94            path_resolver,
95            pow_manager,
96            update_from_pow_manager_rx,
97        }
98    }
99
100    /// Launch the publisher reactor.
101    pub(crate) fn launch(self) -> Result<(), StartupError> {
102        let Publisher {
103            runtime,
104            nickname,
105            dir_provider,
106            mockable,
107            config,
108            ipt_watcher,
109            config_rx,
110            status_tx,
111            keymgr,
112            path_resolver,
113            pow_manager,
114            update_from_pow_manager_rx: publisher_update_rx,
115        } = self;
116
117        let reactor = Reactor::new(
118            runtime.clone(),
119            nickname,
120            dir_provider,
121            mockable,
122            &config,
123            ipt_watcher,
124            config_rx,
125            status_tx,
126            keymgr,
127            path_resolver,
128            pow_manager,
129            publisher_update_rx,
130        );
131
132        runtime
133            .spawn(async move {
134                match reactor.run().await {
135                    Ok(()) => debug!("the publisher reactor has shut down"),
136                    Err(e) => warn_report!(e, "the publisher reactor has shut down"),
137                }
138            })
139            .map_err(|e| StartupError::Spawn {
140                spawning: "publisher reactor task",
141                cause: e.into(),
142            })?;
143
144        Ok(())
145    }
146}
147
148// TODO POW: Enable this test for hs-pow-full once the MockExecutor supports this
149#[cfg(all(test, not(feature = "hs-pow-full")))]
150mod test {
151    // @@ begin test lint list maintained by maint/add_warning @@
152    #![allow(clippy::bool_assert_comparison)]
153    #![allow(clippy::clone_on_copy)]
154    #![allow(clippy::dbg_macro)]
155    #![allow(clippy::mixed_attributes_style)]
156    #![allow(clippy::print_stderr)]
157    #![allow(clippy::print_stdout)]
158    #![allow(clippy::single_char_pattern)]
159    #![allow(clippy::unwrap_used)]
160    #![allow(clippy::unchecked_duration_subtraction)]
161    #![allow(clippy::useless_vec)]
162    #![allow(clippy::needless_pass_by_value)]
163    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
164    use super::*;
165
166    use std::collections::HashMap;
167    use std::io;
168    use std::path::Path;
169    use std::pin::Pin;
170    use std::sync::atomic::{AtomicUsize, Ordering};
171    use std::sync::Mutex;
172    use std::task::{Context, Poll};
173    use std::time::Duration;
174
175    use async_trait::async_trait;
176    use fs_mistrust::Mistrust;
177    use futures::{AsyncRead, AsyncWrite};
178    use tempfile::{tempdir, TempDir};
179    use test_temp_dir::test_temp_dir;
180
181    use tor_basic_utils::test_rng::{testing_rng, TestingRng};
182    use tor_circmgr::hspool::HsCircKind;
183    use tor_hscrypto::pk::{HsBlindId, HsDescSigningKeypair, HsId, HsIdKey, HsIdKeypair};
184    use tor_key_forge::ToEncodableKey;
185    use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeySpecifier};
186    use tor_llcrypto::pk::{ed25519, rsa};
187    use tor_netdir::testprovider::TestNetDirProvider;
188    use tor_netdir::{testnet, NetDir};
189    use tor_netdoc::doc::hsdesc::test_data;
190    use tor_rtcompat::ToplevelBlockOn;
191    use tor_rtmock::MockRuntime;
192
193    use crate::config::OnionServiceConfigBuilder;
194    use crate::ipt_set::{ipts_channel, IptInSet, IptSet};
195    use crate::pow::NewPowManager;
196    use crate::publish::reactor::MockableClientCirc;
197    use crate::status::{OnionServiceStatus, StatusSender};
198    use crate::test::create_storage_handles;
199    use crate::HsNickname;
200    use crate::{
201        BlindIdKeypairSpecifier, BlindIdPublicKeySpecifier, DescSigningKeypairSpecifier,
202        HsIdKeypairSpecifier, HsIdPublicKeySpecifier,
203    };
204
205    /// The nickname of the test service.
206    const TEST_SVC_NICKNAME: &str = "test-svc";
207
208    /// The HTTP response the HSDir returns if everything went well.
209    const OK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n\r\n";
210
211    /// The HTTP response the HSDir returns if something went wrong
212    const ERR_RESPONSE: &str = "HTTP/1.1 500 UH_OH\r\n\r\n";
213
214    /// The error doesn't matter (we return a dummy io::Error from poll_read).
215    ///
216    /// NOTE: ideally, this would be an io::Result, but io::Error isn't Clone (the tests need to
217    /// clone the iterator over these Results for each HSDir).
218    type PollReadResult<T> = Result<T, ()>;
219
220    /// A trait for our poll_read response iterator.
221    trait PollReadIter:
222        Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
223    {
224    }
225
226    impl<I> PollReadIter for I where
227        I: Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
228    {
229    }
230
231    #[derive(Clone, Debug, Default)]
232    struct MockReactorState<I: PollReadIter> {
233        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
234        publish_count: Arc<AtomicUsize>,
235        /// The values returned by `DataStream::poll_read` when uploading to an HSDir.
236        ///
237        /// The values represent the HTTP response (or lack thereof) each HSDir sends upon
238        /// receiving a POST request for uploading a descriptor.
239        ///
240        /// Note: this field is only used for populating responses_for_hsdir. Each time
241        /// get_or_launch_specific is called for a new CircTarget, this iterator is cloned and
242        /// added to the responses_for_hsdir entry corresponding to the new CircTarget (HSDir).
243        poll_read_responses: I,
244        /// The responses that will be returned by each test HSDir (identified by its RsaIdentity).
245        ///
246        /// Used for testing whether the reactor correctly retries on failure.
247        responses_for_hsdir: Arc<Mutex<HashMap<rsa::RsaIdentity, I>>>,
248    }
249
250    #[async_trait]
251    impl<I: PollReadIter> Mockable for MockReactorState<I> {
252        type Rng = TestingRng;
253        type ClientCirc = MockClientCirc<I>;
254
255        fn thread_rng(&self) -> Self::Rng {
256            testing_rng()
257        }
258
259        async fn get_or_launch_specific<T>(
260            &self,
261            _netdir: &tor_netdir::NetDir,
262            kind: HsCircKind,
263            target: T,
264        ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
265        where
266            T: tor_linkspec::CircTarget + Send + Sync,
267        {
268            assert_eq!(kind, HsCircKind::SvcHsDir);
269
270            // Look up the next poll_read value to return for this relay.
271            let id = target.rsa_identity().unwrap();
272            let mut map = self.responses_for_hsdir.lock().unwrap();
273            let poll_read_responses = map
274                .entry(*id)
275                .or_insert_with(|| self.poll_read_responses.clone());
276
277            Ok(MockClientCirc {
278                publish_count: Arc::clone(&self.publish_count),
279                poll_read_responses: poll_read_responses.clone(),
280            }
281            .into())
282        }
283
284        fn estimate_upload_timeout(&self) -> Duration {
285            // chosen arbitrarily for testing.
286            Duration::from_secs(30)
287        }
288    }
289
290    #[derive(Debug, Clone)]
291    struct MockClientCirc<I: PollReadIter> {
292        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
293        publish_count: Arc<AtomicUsize>,
294        /// The values to return from `poll_read`.
295        ///
296        /// Used for testing whether the reactor correctly retries on failure.
297        poll_read_responses: I,
298    }
299
300    #[async_trait]
301    impl<I: PollReadIter> MockableClientCirc for MockClientCirc<I> {
302        type DataStream = MockDataStream<I>;
303
304        async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
305            Ok(MockDataStream {
306                publish_count: Arc::clone(&self.publish_count),
307                // TODO: this will need to change when we start reusing circuits (currently,
308                // we only ever create one data stream per circuit).
309                poll_read_responses: self.poll_read_responses.clone(),
310            })
311        }
312
313        fn source_info(&self) -> tor_proto::Result<Option<tor_dirclient::SourceInfo>> {
314            Ok(None)
315        }
316    }
317
318    #[derive(Debug)]
319    struct MockDataStream<I: PollReadIter> {
320        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
321        publish_count: Arc<AtomicUsize>,
322        /// The values to return from `poll_read`.
323        ///
324        /// Used for testing whether the reactor correctly retries on failure.
325        poll_read_responses: I,
326    }
327
328    impl<I: PollReadIter> AsyncRead for MockDataStream<I> {
329        fn poll_read(
330            mut self: Pin<&mut Self>,
331            _cx: &mut Context<'_>,
332            buf: &mut [u8],
333        ) -> Poll<io::Result<usize>> {
334            match self.as_mut().poll_read_responses.next() {
335                Some(res) => {
336                    match res {
337                        Ok(res) => {
338                            buf[..res.len()].copy_from_slice(res.as_bytes());
339
340                            Poll::Ready(Ok(res.len()))
341                        }
342                        Err(()) => {
343                            // Return an error. This should cause the reactor to reattempt the
344                            // upload.
345                            Poll::Ready(Err(io::Error::other("test error")))
346                        }
347                    }
348                }
349                None => Poll::Ready(Ok(0)),
350            }
351        }
352    }
353
354    impl<I: PollReadIter> AsyncWrite for MockDataStream<I> {
355        fn poll_write(
356            self: Pin<&mut Self>,
357            _cx: &mut Context<'_>,
358            buf: &[u8],
359        ) -> Poll<io::Result<usize>> {
360            let request = std::str::from_utf8(buf).unwrap();
361
362            assert!(request.starts_with("POST /tor/hs/3/publish HTTP/1.0\r\n"));
363            let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst);
364
365            Poll::Ready(Ok(request.len()))
366        }
367
368        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
369            Poll::Ready(Ok(()))
370        }
371
372        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
373            Poll::Ready(Ok(()))
374        }
375    }
376
377    /// Insert the specified key into the keystore.
378    fn insert_svc_key<K>(key: K, keymgr: &KeyMgr, svc_key_spec: &dyn KeySpecifier)
379    where
380        K: ToEncodableKey,
381    {
382        keymgr
383            .insert(
384                key,
385                svc_key_spec,
386                tor_keymgr::KeystoreSelector::Primary,
387                true,
388            )
389            .unwrap();
390    }
391
392    /// Create a new `KeyMgr`, provisioning its keystore with the necessary keys.
393    fn init_keymgr(
394        keystore_dir: &TempDir,
395        nickname: &HsNickname,
396        netdir: &NetDir,
397    ) -> (HsId, HsBlindId, Arc<KeyMgr>) {
398        let period = netdir.hs_time_period();
399
400        let mut rng = testing_rng();
401        let keypair = ed25519::Keypair::generate(&mut rng);
402        let id_pub = HsIdKey::from(keypair.verifying_key());
403        let id_keypair = HsIdKeypair::from(ed25519::ExpandedKeypair::from(&keypair));
404
405        let (hs_blind_id_key, hs_blind_id_kp, _subcredential) =
406            id_keypair.compute_blinded_key(period).unwrap();
407
408        let keystore = ArtiNativeKeystore::from_path_and_mistrust(
409            keystore_dir,
410            &Mistrust::new_dangerously_trust_everyone(),
411        )
412        .unwrap();
413
414        // Provision the keystore with the necessary keys:
415        let keymgr = KeyMgrBuilder::default()
416            .primary_store(Box::new(keystore))
417            .build()
418            .unwrap();
419
420        insert_svc_key(
421            id_keypair,
422            &keymgr,
423            &HsIdKeypairSpecifier::new(nickname.clone()),
424        );
425
426        insert_svc_key(
427            id_pub.clone(),
428            &keymgr,
429            &HsIdPublicKeySpecifier::new(nickname.clone()),
430        );
431
432        insert_svc_key(
433            hs_blind_id_kp,
434            &keymgr,
435            &BlindIdKeypairSpecifier::new(nickname.clone(), period),
436        );
437
438        insert_svc_key(
439            hs_blind_id_key.clone(),
440            &keymgr,
441            &BlindIdPublicKeySpecifier::new(nickname.clone(), period),
442        );
443
444        insert_svc_key(
445            HsDescSigningKeypair::from(ed25519::Keypair::generate(&mut rng)),
446            &keymgr,
447            &DescSigningKeypairSpecifier::new(nickname.clone(), period),
448        );
449
450        let hs_id = id_pub.into();
451        (hs_id, hs_blind_id_key.into(), keymgr.into())
452    }
453
454    fn build_test_config(nickname: HsNickname) -> OnionServiceConfig {
455        OnionServiceConfigBuilder::default()
456            .nickname(nickname)
457            .rate_limit_at_intro(None)
458            .build()
459            .unwrap()
460    }
461
462    #[allow(clippy::too_many_arguments)]
463    fn run_test<I: PollReadIter>(
464        runtime: MockRuntime,
465        nickname: HsNickname,
466        keymgr: Arc<KeyMgr>,
467        pv: IptsPublisherView,
468        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
469        status_tx: PublisherStatusSender,
470        netdir: NetDir,
471        reactor_event: impl FnOnce(),
472        poll_read_responses: I,
473        expected_upload_count: usize,
474        republish_count: usize,
475        expect_errors: bool,
476    ) {
477        runtime.clone().block_on(async move {
478            let netdir_provider: Arc<dyn NetDirProvider> =
479                Arc::new(TestNetDirProvider::from(netdir));
480            let publish_count = Default::default();
481            let circpool = MockReactorState {
482                publish_count: Arc::clone(&publish_count),
483                poll_read_responses,
484                responses_for_hsdir: Arc::new(Mutex::new(Default::default())),
485            };
486
487            let temp_dir = test_temp_dir!();
488            let state_dir = temp_dir.subdir_untracked("state_dir");
489            let mistrust = fs_mistrust::Mistrust::new_dangerously_trust_everyone();
490            let state_dir = StateDirectory::new(state_dir, &mistrust).unwrap();
491            let state_handle = state_dir.acquire_instance(&nickname).unwrap();
492            let pow_nonce_dir = state_handle.raw_subdir("pow_nonces").unwrap();
493            let pow_manager_storage_handle = state_handle.storage_handle("pow_manager").unwrap();
494
495            let NewPowManager {
496                pow_manager,
497                rend_req_tx: _,
498                rend_req_rx: _,
499                publisher_update_rx: update_from_pow_manager_rx,
500            } = PowManager::new(
501                runtime.clone(),
502                nickname.clone(),
503                pow_nonce_dir,
504                keymgr.clone(),
505                pow_manager_storage_handle,
506            )
507            .unwrap();
508            let mut status_rx = status_tx.subscribe();
509            let publisher: Publisher<MockRuntime, MockReactorState<_>> = Publisher::new(
510                runtime.clone(),
511                nickname,
512                netdir_provider,
513                circpool,
514                pv,
515                config_rx,
516                status_tx,
517                keymgr,
518                Arc::new(CfgPathResolver::default()),
519                pow_manager,
520                update_from_pow_manager_rx,
521            );
522
523            publisher.launch().unwrap();
524            runtime.progress_until_stalled().await;
525            let status = status_rx.next().await.unwrap().publisher_status();
526            assert_eq!(State::Shutdown, status.state());
527            assert!(status.current_problem().is_none());
528
529            // Check that we haven't published anything yet
530            assert_eq!(publish_count.load(Ordering::SeqCst), 0);
531
532            reactor_event();
533
534            runtime.progress_until_stalled().await;
535
536            // We need to manually advance the time, because some of our tests check that the
537            // failed uploads are retried, and there's a sleep() between the retries
538            // (see BackoffSchedule::next_delay).
539            runtime.advance_by(Duration::from_secs(1)).await;
540            runtime.progress_until_stalled().await;
541
542            let initial_publish_count = publish_count.load(Ordering::SeqCst);
543            assert_eq!(initial_publish_count, expected_upload_count);
544
545            let status = status_rx.next().await.unwrap().publisher_status();
546            if expect_errors {
547                // The upload results aren't ready yet.
548                assert_eq!(State::Bootstrapping, status.state());
549            } else {
550                // The test network doesn't have an SRV for the previous TP,
551                // so we are "unreachable".
552                assert_eq!(State::DegradedUnreachable, status.state());
553            }
554            assert!(status.current_problem().is_none());
555
556            if republish_count > 0 {
557                /// The latest time the descriptor can be republished.
558                const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 120);
559
560                // Wait until the reactor triggers the necessary number of reuploads.
561                runtime
562                    .advance_by(MAX_TIMEOUT * (republish_count as u32))
563                    .await;
564                runtime.progress_until_stalled().await;
565
566                let min_upload_count = expected_upload_count * republish_count;
567                // There will be twice as many reuploads if the publisher happens
568                // to reupload every hour (as opposed to every 2h).
569                let max_upload_count = 2 * min_upload_count;
570                let publish_count_now = publish_count.load(Ordering::SeqCst);
571                // This is the total number of reuploads (i.e. the number of times
572                // we published the descriptor to an HsDir).
573                let actual_reupload_count = publish_count_now - initial_publish_count;
574
575                assert!((min_upload_count..=max_upload_count).contains(&actual_reupload_count));
576            }
577        });
578    }
579
580    /// Test that the publisher publishes the descriptor when the IPTs change.
581    ///
582    /// The `poll_read_responses` are returned by each HSDir, in order, in response to each POST
583    /// request received from the publisher.
584    ///
585    /// The `multiplier` represents the multiplier by which to multiply the number of HSDirs to
586    /// obtain the total expected number of uploads (this works because the test "HSDirs" all
587    /// behave the same, so the number of uploads is the number of HSDirs multiplied by the number
588    /// of retries).
589    fn publish_after_ipt_change<I: PollReadIter>(
590        temp_dir: &Path,
591        poll_read_responses: I,
592        multiplier: usize,
593        republish_count: usize,
594        expect_errors: bool,
595    ) {
596        let runtime = MockRuntime::new();
597        let nickname = HsNickname::try_from(TEST_SVC_NICKNAME.to_string()).unwrap();
598        let config = build_test_config(nickname.clone());
599        let (_config_tx, config_rx) = watch::channel_with(Arc::new(config));
600
601        let (mut mv, pv) = ipts_channel(&runtime, create_storage_handles(temp_dir).1).unwrap();
602        let update_ipts = || {
603            let ipts: Vec<IptInSet> = test_data::test_parsed_hsdesc()
604                .unwrap()
605                .intro_points()
606                .iter()
607                .enumerate()
608                .map(|(i, ipt)| IptInSet {
609                    ipt: ipt.clone(),
610                    lid: [i.try_into().unwrap(); 32].into(),
611                })
612                .collect();
613
614            mv.borrow_for_update(runtime.clone()).ipts = Some(IptSet {
615                ipts,
616                lifetime: Duration::from_secs(20),
617            });
618        };
619
620        let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
621        let keystore_dir = tempdir().unwrap();
622
623        let (_hsid, blind_id, keymgr) = init_keymgr(&keystore_dir, &nickname, &netdir);
624
625        let hsdir_count = netdir
626            .hs_dirs_upload(blind_id, netdir.hs_time_period())
627            .unwrap()
628            .collect::<Vec<_>>()
629            .len();
630
631        assert!(hsdir_count > 0);
632
633        // If any of the uploads fail, they will be retried. Note that the upload failure will
634        // affect _each_ hsdir, so the expected number of uploads is a multiple of hsdir_count.
635        let expected_upload_count = hsdir_count * multiplier;
636        let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
637
638        run_test(
639            runtime.clone(),
640            nickname,
641            keymgr,
642            pv,
643            config_rx,
644            status_tx,
645            netdir,
646            update_ipts,
647            poll_read_responses,
648            expected_upload_count,
649            republish_count,
650            expect_errors,
651        );
652    }
653
654    #[test]
655    fn publish_after_ipt_change_no_errors() {
656        // The HSDirs always respond with 200 OK, so we expect to publish hsdir_count times.
657        let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
658
659        test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, 0, false));
660    }
661
662    #[test]
663    fn publish_after_ipt_change_with_errors() {
664        let err_responses = vec![
665            // The HSDir closed the connection without sending a response.
666            Err(()),
667            // The HSDir responded with an internal server error,
668            Ok(ERR_RESPONSE.to_string()),
669        ];
670
671        for error_res in err_responses.into_iter() {
672            let poll_reads = vec![
673                // Each HSDir first responds with an error, which causes the publisher to retry the
674                // upload. The HSDir then responds with "200 OK".
675                //
676                // We expect to publish hsdir_count * 2 times (for each HSDir, the first upload
677                // attempt fails, but the second succeeds).
678                error_res,
679                Ok(OK_RESPONSE.to_string()),
680            ]
681            .into_iter();
682
683            test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 2, 0, true));
684        }
685    }
686
687    #[test]
688    fn reupload_after_publishing() {
689        let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
690        // Test that 4 reuploads happen after the initial upload
691        const REUPLOAD_COUNT: usize = 4;
692
693        test_temp_dir!()
694            .used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, REUPLOAD_COUNT, false));
695    }
696
697    // TODO (#1120): test that the descriptor is republished when the config changes
698
699    // TODO (#1120): test that the descriptor is reuploaded only to the HSDirs that need it (i.e. the
700    // ones for which it's dirty)
701
702    // TODO (#1120): test that rate-limiting works correctly
703
704    // TODO (#1120): test that the uploaded descriptor contains the expected values
705
706    // TODO (#1120): test that the publisher stops publishing if the IPT manager sets the IPTs to
707    // `None`.
708}