tor_hsservice/
publish.rs

1//! Publish and maintain onion service descriptors
2//!
3//! See the [`reactor`] module-level documentation for more details.
4
5mod backoff;
6mod descriptor;
7mod reactor;
8mod reupload_timer;
9
10use crate::config::restricted_discovery::RestrictedDiscoveryKeys;
11use crate::internal_prelude::*;
12
13use backoff::{BackoffError, BackoffSchedule, RetriableError, Runner};
14use descriptor::{build_sign, DescriptorStatus, VersionedDescriptor};
15use reactor::read_blind_id_keypair;
16use reactor::Reactor;
17use reupload_timer::ReuploadTimer;
18
19use tor_config_path::CfgPathResolver;
20
21pub use reactor::UploadError;
22pub(crate) use reactor::{Mockable, Real, OVERALL_UPLOAD_TIMEOUT};
23
24/// A handle for the Hsdir Publisher for an onion service.
25///
26/// This handle represents a set of tasks that identify the hsdirs for each
27/// relevant time period, construct descriptors, publish them, and keep them
28/// up-to-date.
29#[must_use = "If you don't call launch() on the publisher, it won't publish any descriptors."]
30pub(crate) struct Publisher<R: Runtime, M: Mockable> {
31    /// The runtime.
32    runtime: R,
33    /// The service for which we're publishing descriptors.
34    nickname: HsNickname,
35    /// A source for new network directories that we use to determine
36    /// our HsDirs.
37    dir_provider: Arc<dyn NetDirProvider>,
38    /// Mockable state.
39    ///
40    /// This is used for launching circuits and for obtaining random number generators.
41    mockable: M,
42    /// The onion service config.
43    config: Arc<OnionServiceConfig>,
44    /// A channel for receiving IPT change notifications.
45    ipt_watcher: IptsPublisherView,
46    /// A channel for receiving onion service config change notifications.
47    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
48    /// The key manager.
49    keymgr: Arc<KeyMgr>,
50    /// A sender for updating the status of the onion service.
51    status_tx: PublisherStatusSender,
52    /// Path resolver for configuration files.
53    path_resolver: Arc<CfgPathResolver>,
54}
55
56impl<R: Runtime, M: Mockable> Publisher<R, M> {
57    /// Create a new publisher.
58    ///
59    /// When it launches, it will know no keys or introduction points,
60    /// and will therefore not upload any descriptors.
61    ///
62    /// The publisher won't start publishing until you call [`Publisher::launch`].
63    #[allow(clippy::too_many_arguments)]
64    pub(crate) fn new(
65        runtime: R,
66        nickname: HsNickname,
67        dir_provider: Arc<dyn NetDirProvider>,
68        mockable: impl Into<M>,
69        ipt_watcher: IptsPublisherView,
70        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
71        status_tx: PublisherStatusSender,
72        keymgr: Arc<KeyMgr>,
73        path_resolver: Arc<CfgPathResolver>,
74    ) -> Self {
75        let config = config_rx.borrow().clone();
76        Self {
77            runtime,
78            nickname,
79            dir_provider,
80            mockable: mockable.into(),
81            config,
82            ipt_watcher,
83            config_rx,
84            status_tx,
85            keymgr,
86            path_resolver,
87        }
88    }
89
90    /// Launch the publisher reactor.
91    pub(crate) fn launch(self) -> Result<(), StartupError> {
92        let Publisher {
93            runtime,
94            nickname,
95            dir_provider,
96            mockable,
97            config,
98            ipt_watcher,
99            config_rx,
100            status_tx,
101            keymgr,
102            path_resolver,
103        } = self;
104
105        let reactor = Reactor::new(
106            runtime.clone(),
107            nickname,
108            dir_provider,
109            mockable,
110            &config,
111            ipt_watcher,
112            config_rx,
113            status_tx,
114            keymgr,
115            path_resolver,
116        );
117
118        runtime
119            .spawn(async move {
120                match reactor.run().await {
121                    Ok(()) => debug!("the publisher reactor has shut down"),
122                    Err(e) => warn_report!(e, "the publisher reactor has shut down"),
123                }
124            })
125            .map_err(|e| StartupError::Spawn {
126                spawning: "publisher reactor task",
127                cause: e.into(),
128            })?;
129
130        Ok(())
131    }
132}
133
134#[cfg(test)]
135mod test {
136    // @@ begin test lint list maintained by maint/add_warning @@
137    #![allow(clippy::bool_assert_comparison)]
138    #![allow(clippy::clone_on_copy)]
139    #![allow(clippy::dbg_macro)]
140    #![allow(clippy::mixed_attributes_style)]
141    #![allow(clippy::print_stderr)]
142    #![allow(clippy::print_stdout)]
143    #![allow(clippy::single_char_pattern)]
144    #![allow(clippy::unwrap_used)]
145    #![allow(clippy::unchecked_duration_subtraction)]
146    #![allow(clippy::useless_vec)]
147    #![allow(clippy::needless_pass_by_value)]
148    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
149    use super::*;
150
151    use std::collections::HashMap;
152    use std::io;
153    use std::path::Path;
154    use std::pin::Pin;
155    use std::sync::atomic::{AtomicUsize, Ordering};
156    use std::sync::Mutex;
157    use std::task::{Context, Poll};
158    use std::time::Duration;
159
160    use async_trait::async_trait;
161    use fs_mistrust::Mistrust;
162    use futures::{AsyncRead, AsyncWrite};
163    use tempfile::{tempdir, TempDir};
164    use test_temp_dir::test_temp_dir;
165
166    use tor_basic_utils::test_rng::{testing_rng, TestingRng};
167    use tor_circmgr::hspool::HsCircKind;
168    use tor_hscrypto::pk::{HsBlindId, HsDescSigningKeypair, HsId, HsIdKey, HsIdKeypair};
169    use tor_key_forge::ToEncodableKey;
170    use tor_keymgr::{ArtiNativeKeystore, KeyMgrBuilder, KeySpecifier};
171    use tor_llcrypto::pk::{ed25519, rsa};
172    use tor_netdir::testprovider::TestNetDirProvider;
173    use tor_netdir::{testnet, NetDir};
174    use tor_netdoc::doc::hsdesc::test_data;
175    use tor_rtcompat::ToplevelBlockOn;
176    use tor_rtmock::MockRuntime;
177
178    use crate::config::OnionServiceConfigBuilder;
179    use crate::ipt_set::{ipts_channel, IptInSet, IptSet};
180    use crate::publish::reactor::MockableClientCirc;
181    use crate::status::{OnionServiceStatus, StatusSender};
182    use crate::test::create_storage_handles;
183    use crate::HsNickname;
184    use crate::{
185        BlindIdKeypairSpecifier, BlindIdPublicKeySpecifier, DescSigningKeypairSpecifier,
186        HsIdKeypairSpecifier, HsIdPublicKeySpecifier,
187    };
188
189    /// The nickname of the test service.
190    const TEST_SVC_NICKNAME: &str = "test-svc";
191
192    /// The HTTP response the HSDir returns if everything went well.
193    const OK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n\r\n";
194
195    /// The HTTP response the HSDir returns if something went wrong
196    const ERR_RESPONSE: &str = "HTTP/1.1 500 UH_OH\r\n\r\n";
197
198    /// The error doesn't matter (we return a dummy io::Error from poll_read).
199    ///
200    /// NOTE: ideally, this would be an io::Result, but io::Error isn't Clone (the tests need to
201    /// clone the iterator over these Results for each HSDir).
202    type PollReadResult<T> = Result<T, ()>;
203
204    /// A trait for our poll_read response iterator.
205    trait PollReadIter:
206        Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
207    {
208    }
209
210    impl<I> PollReadIter for I where
211        I: Iterator<Item = PollReadResult<String>> + Send + Sync + Clone + Unpin + 'static
212    {
213    }
214
215    #[derive(Clone, Debug, Default)]
216    struct MockReactorState<I: PollReadIter> {
217        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
218        publish_count: Arc<AtomicUsize>,
219        /// The values returned by `DataStream::poll_read` when uploading to an HSDir.
220        ///
221        /// The values represent the HTTP response (or lack thereof) each HSDir sends upon
222        /// receiving a POST request for uploading a descriptor.
223        ///
224        /// Note: this field is only used for populating responses_for_hsdir. Each time
225        /// get_or_launch_specific is called for a new CircTarget, this iterator is cloned and
226        /// added to the responses_for_hsdir entry corresponding to the new CircTarget (HSDir).
227        poll_read_responses: I,
228        /// The responses that will be returned by each test HSDir (identified by its RsaIdentity).
229        ///
230        /// Used for testing whether the reactor correctly retries on failure.
231        responses_for_hsdir: Arc<Mutex<HashMap<rsa::RsaIdentity, I>>>,
232    }
233
234    #[async_trait]
235    impl<I: PollReadIter> Mockable for MockReactorState<I> {
236        type Rng = TestingRng;
237        type ClientCirc = MockClientCirc<I>;
238
239        fn thread_rng(&self) -> Self::Rng {
240            testing_rng()
241        }
242
243        async fn get_or_launch_specific<T>(
244            &self,
245            _netdir: &tor_netdir::NetDir,
246            kind: HsCircKind,
247            target: T,
248        ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
249        where
250            T: tor_linkspec::CircTarget + Send + Sync,
251        {
252            assert_eq!(kind, HsCircKind::SvcHsDir);
253
254            // Look up the next poll_read value to return for this relay.
255            let id = target.rsa_identity().unwrap();
256            let mut map = self.responses_for_hsdir.lock().unwrap();
257            let poll_read_responses = map
258                .entry(*id)
259                .or_insert_with(|| self.poll_read_responses.clone());
260
261            Ok(MockClientCirc {
262                publish_count: Arc::clone(&self.publish_count),
263                poll_read_responses: poll_read_responses.clone(),
264            }
265            .into())
266        }
267
268        fn estimate_upload_timeout(&self) -> Duration {
269            // chosen arbitrarily for testing.
270            Duration::from_secs(30)
271        }
272    }
273
274    #[derive(Debug, Clone)]
275    struct MockClientCirc<I: PollReadIter> {
276        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
277        publish_count: Arc<AtomicUsize>,
278        /// The values to return from `poll_read`.
279        ///
280        /// Used for testing whether the reactor correctly retries on failure.
281        poll_read_responses: I,
282    }
283
284    #[async_trait]
285    impl<I: PollReadIter> MockableClientCirc for MockClientCirc<I> {
286        type DataStream = MockDataStream<I>;
287
288        async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
289            Ok(MockDataStream {
290                publish_count: Arc::clone(&self.publish_count),
291                // TODO: this will need to change when we start reusing circuits (currently,
292                // we only ever create one data stream per circuit).
293                poll_read_responses: self.poll_read_responses.clone(),
294            })
295        }
296    }
297
298    #[derive(Debug)]
299    struct MockDataStream<I: PollReadIter> {
300        /// The number of `POST /tor/hs/3/publish` requests sent by the reactor.
301        publish_count: Arc<AtomicUsize>,
302        /// The values to return from `poll_read`.
303        ///
304        /// Used for testing whether the reactor correctly retries on failure.
305        poll_read_responses: I,
306    }
307
308    impl<I: PollReadIter> AsyncRead for MockDataStream<I> {
309        fn poll_read(
310            mut self: Pin<&mut Self>,
311            _cx: &mut Context<'_>,
312            buf: &mut [u8],
313        ) -> Poll<io::Result<usize>> {
314            match self.as_mut().poll_read_responses.next() {
315                Some(res) => {
316                    match res {
317                        Ok(res) => {
318                            buf[..res.len()].copy_from_slice(res.as_bytes());
319
320                            Poll::Ready(Ok(res.len()))
321                        }
322                        Err(()) => {
323                            // Return an error. This should cause the reactor to reattempt the
324                            // upload.
325                            Poll::Ready(Err(io::Error::other("test error")))
326                        }
327                    }
328                }
329                None => Poll::Ready(Ok(0)),
330            }
331        }
332    }
333
334    impl<I: PollReadIter> AsyncWrite for MockDataStream<I> {
335        fn poll_write(
336            self: Pin<&mut Self>,
337            _cx: &mut Context<'_>,
338            buf: &[u8],
339        ) -> Poll<io::Result<usize>> {
340            let request = std::str::from_utf8(buf).unwrap();
341
342            assert!(request.starts_with("POST /tor/hs/3/publish HTTP/1.0\r\n"));
343            let _prev = self.publish_count.fetch_add(1, Ordering::SeqCst);
344
345            Poll::Ready(Ok(request.len()))
346        }
347
348        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
349            Poll::Ready(Ok(()))
350        }
351
352        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
353            Poll::Ready(Ok(()))
354        }
355    }
356
357    /// Insert the specified key into the keystore.
358    fn insert_svc_key<K>(key: K, keymgr: &KeyMgr, svc_key_spec: &dyn KeySpecifier)
359    where
360        K: ToEncodableKey,
361    {
362        keymgr
363            .insert(
364                key,
365                svc_key_spec,
366                tor_keymgr::KeystoreSelector::Primary,
367                true,
368            )
369            .unwrap();
370    }
371
372    /// Create a new `KeyMgr`, provisioning its keystore with the necessary keys.
373    fn init_keymgr(
374        keystore_dir: &TempDir,
375        nickname: &HsNickname,
376        netdir: &NetDir,
377    ) -> (HsId, HsBlindId, Arc<KeyMgr>) {
378        let period = netdir.hs_time_period();
379
380        let mut rng = testing_rng();
381        let keypair = ed25519::Keypair::generate(&mut rng);
382        let id_pub = HsIdKey::from(keypair.verifying_key());
383        let id_keypair = HsIdKeypair::from(ed25519::ExpandedKeypair::from(&keypair));
384
385        let (hs_blind_id_key, hs_blind_id_kp, _subcredential) =
386            id_keypair.compute_blinded_key(period).unwrap();
387
388        let keystore = ArtiNativeKeystore::from_path_and_mistrust(
389            keystore_dir,
390            &Mistrust::new_dangerously_trust_everyone(),
391        )
392        .unwrap();
393
394        // Provision the keystore with the necessary keys:
395        let keymgr = KeyMgrBuilder::default()
396            .primary_store(Box::new(keystore))
397            .build()
398            .unwrap();
399
400        insert_svc_key(
401            id_keypair,
402            &keymgr,
403            &HsIdKeypairSpecifier::new(nickname.clone()),
404        );
405
406        insert_svc_key(
407            id_pub.clone(),
408            &keymgr,
409            &HsIdPublicKeySpecifier::new(nickname.clone()),
410        );
411
412        insert_svc_key(
413            hs_blind_id_kp,
414            &keymgr,
415            &BlindIdKeypairSpecifier::new(nickname.clone(), period),
416        );
417
418        insert_svc_key(
419            hs_blind_id_key.clone(),
420            &keymgr,
421            &BlindIdPublicKeySpecifier::new(nickname.clone(), period),
422        );
423
424        insert_svc_key(
425            HsDescSigningKeypair::from(ed25519::Keypair::generate(&mut rng)),
426            &keymgr,
427            &DescSigningKeypairSpecifier::new(nickname.clone(), period),
428        );
429
430        let hs_id = id_pub.into();
431        (hs_id, hs_blind_id_key.into(), keymgr.into())
432    }
433
434    fn build_test_config(nickname: HsNickname) -> OnionServiceConfig {
435        OnionServiceConfigBuilder::default()
436            .nickname(nickname)
437            .rate_limit_at_intro(None)
438            .build()
439            .unwrap()
440    }
441
442    #[allow(clippy::too_many_arguments)]
443    fn run_test<I: PollReadIter>(
444        runtime: MockRuntime,
445        nickname: HsNickname,
446        keymgr: Arc<KeyMgr>,
447        pv: IptsPublisherView,
448        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
449        status_tx: PublisherStatusSender,
450        netdir: NetDir,
451        reactor_event: impl FnOnce(),
452        poll_read_responses: I,
453        expected_upload_count: usize,
454        republish_count: usize,
455        expect_errors: bool,
456    ) {
457        runtime.clone().block_on(async move {
458            let netdir_provider: Arc<dyn NetDirProvider> =
459                Arc::new(TestNetDirProvider::from(netdir));
460            let publish_count = Default::default();
461            let circpool = MockReactorState {
462                publish_count: Arc::clone(&publish_count),
463                poll_read_responses,
464                responses_for_hsdir: Arc::new(Mutex::new(Default::default())),
465            };
466
467            let mut status_rx = status_tx.subscribe();
468            let publisher: Publisher<MockRuntime, MockReactorState<_>> = Publisher::new(
469                runtime.clone(),
470                nickname,
471                netdir_provider,
472                circpool,
473                pv,
474                config_rx,
475                status_tx,
476                keymgr,
477                Arc::new(CfgPathResolver::default()),
478            );
479
480            publisher.launch().unwrap();
481            runtime.progress_until_stalled().await;
482            let status = status_rx.next().await.unwrap().publisher_status();
483            assert_eq!(State::Shutdown, status.state());
484            assert!(status.current_problem().is_none());
485
486            // Check that we haven't published anything yet
487            assert_eq!(publish_count.load(Ordering::SeqCst), 0);
488
489            reactor_event();
490
491            runtime.progress_until_stalled().await;
492
493            // We need to manually advance the time, because some of our tests check that the
494            // failed uploads are retried, and there's a sleep() between the retries
495            // (see BackoffSchedule::next_delay).
496            runtime.advance_by(Duration::from_secs(1)).await;
497            runtime.progress_until_stalled().await;
498
499            let initial_publish_count = publish_count.load(Ordering::SeqCst);
500            assert_eq!(initial_publish_count, expected_upload_count);
501
502            let status = status_rx.next().await.unwrap().publisher_status();
503            if expect_errors {
504                // The upload results aren't ready yet.
505                assert_eq!(State::Bootstrapping, status.state());
506            } else {
507                // The test network doesn't have an SRV for the previous TP,
508                // so we are "unreachable".
509                assert_eq!(State::DegradedUnreachable, status.state());
510            }
511            assert!(status.current_problem().is_none());
512
513            if republish_count > 0 {
514                /// The latest time the descriptor can be republished.
515                const MAX_TIMEOUT: Duration = Duration::from_secs(60 * 120);
516
517                // Wait until the reactor triggers the necessary number of reuploads.
518                runtime
519                    .advance_by(MAX_TIMEOUT * (republish_count as u32))
520                    .await;
521                runtime.progress_until_stalled().await;
522
523                let min_upload_count = expected_upload_count * republish_count;
524                // There will be twice as many reuploads if the publisher happens
525                // to reupload every hour (as opposed to every 2h).
526                let max_upload_count = 2 * min_upload_count;
527                let publish_count_now = publish_count.load(Ordering::SeqCst);
528                // This is the total number of reuploads (i.e. the number of times
529                // we published the descriptor to an HsDir).
530                let actual_reupload_count = publish_count_now - initial_publish_count;
531
532                assert!((min_upload_count..=max_upload_count).contains(&actual_reupload_count));
533            }
534        });
535    }
536
537    /// Test that the publisher publishes the descriptor when the IPTs change.
538    ///
539    /// The `poll_read_responses` are returned by each HSDir, in order, in response to each POST
540    /// request received from the publisher.
541    ///
542    /// The `multiplier` represents the multiplier by which to multiply the number of HSDirs to
543    /// obtain the total expected number of uploads (this works because the test "HSDirs" all
544    /// behave the same, so the number of uploads is the number of HSDirs multiplied by the number
545    /// of retries).
546    fn publish_after_ipt_change<I: PollReadIter>(
547        temp_dir: &Path,
548        poll_read_responses: I,
549        multiplier: usize,
550        republish_count: usize,
551        expect_errors: bool,
552    ) {
553        let runtime = MockRuntime::new();
554        let nickname = HsNickname::try_from(TEST_SVC_NICKNAME.to_string()).unwrap();
555        let config = build_test_config(nickname.clone());
556        let (_config_tx, config_rx) = watch::channel_with(Arc::new(config));
557
558        let (mut mv, pv) = ipts_channel(&runtime, create_storage_handles(temp_dir).1).unwrap();
559        let update_ipts = || {
560            let ipts: Vec<IptInSet> = test_data::test_parsed_hsdesc()
561                .unwrap()
562                .intro_points()
563                .iter()
564                .enumerate()
565                .map(|(i, ipt)| IptInSet {
566                    ipt: ipt.clone(),
567                    lid: [i.try_into().unwrap(); 32].into(),
568                })
569                .collect();
570
571            mv.borrow_for_update(runtime.clone()).ipts = Some(IptSet {
572                ipts,
573                lifetime: Duration::from_secs(20),
574            });
575        };
576
577        let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
578        let keystore_dir = tempdir().unwrap();
579
580        let (_hsid, blind_id, keymgr) = init_keymgr(&keystore_dir, &nickname, &netdir);
581
582        let hsdir_count = netdir
583            .hs_dirs_upload(blind_id, netdir.hs_time_period())
584            .unwrap()
585            .collect::<Vec<_>>()
586            .len();
587
588        assert!(hsdir_count > 0);
589
590        // If any of the uploads fail, they will be retried. Note that the upload failure will
591        // affect _each_ hsdir, so the expected number of uploads is a multiple of hsdir_count.
592        let expected_upload_count = hsdir_count * multiplier;
593        let status_tx = StatusSender::new(OnionServiceStatus::new_shutdown()).into();
594
595        run_test(
596            runtime.clone(),
597            nickname,
598            keymgr,
599            pv,
600            config_rx,
601            status_tx,
602            netdir,
603            update_ipts,
604            poll_read_responses,
605            expected_upload_count,
606            republish_count,
607            expect_errors,
608        );
609    }
610
611    #[test]
612    fn publish_after_ipt_change_no_errors() {
613        // The HSDirs always respond with 200 OK, so we expect to publish hsdir_count times.
614        let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
615
616        test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, 0, false));
617    }
618
619    #[test]
620    fn publish_after_ipt_change_with_errors() {
621        let err_responses = vec![
622            // The HSDir closed the connection without sending a response.
623            Err(()),
624            // The HSDir responded with an internal server error,
625            Ok(ERR_RESPONSE.to_string()),
626        ];
627
628        for error_res in err_responses.into_iter() {
629            let poll_reads = vec![
630                // Each HSDir first responds with an error, which causes the publisher to retry the
631                // upload. The HSDir then responds with "200 OK".
632                //
633                // We expect to publish hsdir_count * 2 times (for each HSDir, the first upload
634                // attempt fails, but the second succeeds).
635                error_res,
636                Ok(OK_RESPONSE.to_string()),
637            ]
638            .into_iter();
639
640            test_temp_dir!().used_by(|dir| publish_after_ipt_change(dir, poll_reads, 2, 0, true));
641        }
642    }
643
644    #[test]
645    fn reupload_after_publishing() {
646        let poll_reads = [Ok(OK_RESPONSE.into())].into_iter();
647        // Test that 4 reuploads happen after the initial upload
648        const REUPLOAD_COUNT: usize = 4;
649
650        test_temp_dir!()
651            .used_by(|dir| publish_after_ipt_change(dir, poll_reads, 1, REUPLOAD_COUNT, false));
652    }
653
654    // TODO (#1120): test that the descriptor is republished when the config changes
655
656    // TODO (#1120): test that the descriptor is reuploaded only to the HSDirs that need it (i.e. the
657    // ones for which it's dirty)
658
659    // TODO (#1120): test that rate-limiting works correctly
660
661    // TODO (#1120): test that the uploaded descriptor contains the expected values
662
663    // TODO (#1120): test that the publisher stops publishing if the IPT manager sets the IPTs to
664    // `None`.
665}