1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
//! The onion service publisher reactor.
//!
//! TODO (#1216): write the docs
//!
//! With respect to [`OnionServiceStatus`] reporting,
//! the following state transitions are possible:
//!
//!
//! ```ignore
//!
//!                 update_publish_status(UploadScheduled|AwaitingIpts|RateLimited) +---------------+
//!                +--------------------------------------------------------------->| Bootstrapping |
//!                |                                                                +---------------+
//! +----------+   | update_publish_status(Idle)        +---------+                         |
//! | Shutdown |-- +----------------------------------->| Running |----+                    |
//! +----------+   |                                    +---------+    |                    |
//!                |                                                   |                    |
//!                |                                                   |                    |
//!                | run_once() returns an error  +--------+           |                    |
//!                +----------------------------->| Broken |<----------+--------------------+
//!                                               +--------+ run_once() returns an error
//! ```
//!
//! Ideally, the publisher should also set the
//! [`OnionServiceStatus`] to `Recovering` whenever a transient
//! upload error occurs, but this is currently not possible:
//!
//!   * making the upload tasks set the status to `Recovering` (on failure) and `Running` (on
//!     success) wouldn't work, because the upload tasks run in parallel (they would race with each
//!     other, and the final status (`Recovering`/`Running`) would be the status of the last upload
//!     task, rather than the real status of the publisher
//!   * making the upload task set the status to `Recovering` on upload failure, and letting
//!    `upload_publish_status` reset it back to `Running also would not work:
//!    `upload_publish_status` sets the status back to `Running` when the publisher enters its
//!    `Idle` state, regardless of the status of its upload tasks
//!
//! TODO: Indeed, setting the status to `Recovering` _anywhere_ would not work, because
//! `upload_publish_status` will just overwrite it. We would need to introduce some new
//! `PublishStatus` variant (currently, the publisher only has 3 states, `Idle`, `UploadScheduled`,
//! `AwaitingIpts`), for the `Recovering` (retrying a failed upload) and `Broken` (the upload
//! failed and we've given up) states. However, adding these 2 new states is non-trivial:
//!
//!   * how do we define "failure"? Is it the failure to upload to a single HsDir, or the failure
//!     to upload to **any** HsDirs?
//!   * what should make the publisher transition out of the `Broken`/`Recovering` states? While
//!    `handle_upload_results` can see the upload results for a batch of HsDirs (corresponding to
//!     a time period), the publisher doesn't do any sort of bookkeeping to know if a previously
//!     failed HsDir upload succeeded in a later upload "batch"
//!
//! For the time being, the publisher never sets the status to `Recovering`, and uses the `Broken`
//! status for reporting fatal errors (crashes).

use super::*;

/// The upload rate-limiting threshold.
///
/// Before initiating an upload, the reactor checks if the last upload was at least
/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
/// current time.
//
// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);

/// The maximum number of concurrent upload tasks per time period.
//
// TODO: this value was arbitrarily chosen and may not be optimal.  For now, it
// will have no effect, since the current number of replicas is far less than
// this value.
//
// The uploads for all TPs happen in parallel.  As a result, the actual limit for the maximum
// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
// (currently 2, which means the concurrency limit will, in fact, be 32).
//
// We should try to decouple this value from the TP parameters.
const MAX_CONCURRENT_UPLOADS: usize = 16;

/// The maximum time allowed for uploading a descriptor to a single HSDir,
/// across all attempts.
pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);

/// A reactor for the HsDir [`Publisher`]
///
/// The entrypoint is [`Reactor::run`].
#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
pub(super) struct Reactor<R: Runtime, M: Mockable> {
    /// The immutable, shared inner state.
    imm: Arc<Immutable<R, M>>,
    /// A source for new network directories that we use to determine
    /// our HsDirs.
    dir_provider: Arc<dyn NetDirProvider>,
    /// The mutable inner state,
    inner: Arc<Mutex<Inner>>,
    /// A channel for receiving IPT change notifications.
    ipt_watcher: IptsPublisherView,
    /// A channel for receiving onion service config change notifications.
    config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
    /// A channel for receiving updates regarding our [`PublishStatus`].
    ///
    /// The main loop of the reactor watches for updates on this channel.
    ///
    /// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
    /// we can start publishing descriptors.
    ///
    /// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
    /// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
    /// established some introduction points.
    publish_status_rx: watch::Receiver<PublishStatus>,
    /// A sender for updating our [`PublishStatus`].
    ///
    /// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
    /// we can start publishing descriptors.
    publish_status_tx: watch::Sender<PublishStatus>,
    /// A channel for sending upload completion notifications.
    ///
    /// This channel is polled in the main loop of the reactor.
    upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
    /// A channel for receiving upload completion notifications.
    ///
    /// A copy of this sender is handed to each upload task.
    upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
    /// A sender for notifying any pending upload tasks that the reactor is shutting down.
    ///
    /// Receivers can use this channel to find out when reactor is dropped.
    ///
    /// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
    /// Any future background tasks can also use this channel to detect if the reactor is dropped.
    ///
    /// Closing this channel will cause any pending upload tasks to be dropped.
    shutdown_tx: broadcast::Sender<Void>,
}

/// The immutable, shared state of the descriptor publisher reactor.
#[derive(Clone)]
struct Immutable<R: Runtime, M: Mockable> {
    /// The runtime.
    runtime: R,
    /// Mockable state.
    ///
    /// This is used for launching circuits and for obtaining random number generators.
    mockable: M,
    /// The service for which we're publishing descriptors.
    nickname: HsNickname,
    /// The key manager,
    keymgr: Arc<KeyMgr>,
    /// A sender for updating the status of the onion service.
    status_tx: PublisherStatusSender,
}

impl<R: Runtime, M: Mockable> Immutable<R, M> {
    /// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
    /// with the specified [`TimePeriod`].
    ///
    /// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
    /// the private part of the blinded identity key. Otherwise, the key is the private part of the
    /// descriptor signing key.
    ///
    /// Returns an error if the service is running in offline mode and the descriptor signing
    /// keypair of the specified `period` is not available.
    //
    // TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
    // built from the blinded id key
    fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
        let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
            Some(key) => {
                let key: ed25519::ExpandedKeypair = key.into();
                key.to_secret_key_bytes()[0..32]
                    .try_into()
                    .expect("Wrong length on slice")
            }
            None => {
                // TODO (#1194): we don't support externally provisioned keys (yet), so this branch
                // is unreachable (for now).
                let desc_sign_key_spec =
                    DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
                let key: ed25519::Keypair = self
                    .keymgr
                    .get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
                    // TODO (#1194): internal! is not the right type for this error (we need an
                    // error type for the case where a hidden service running in offline mode has
                    // run out of its pre-previsioned keys).
                    //
                    // This will be addressed when we add support for offline hs_id mode
                    .ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
                    .into();
                key.to_bytes()
            }
        };

        Ok(AesOpeKey::from_secret(&ope_key))
    }

    /// Generate a revision counter for a descriptor associated with the specified
    /// [`TimePeriod`].
    ///
    /// Returns a revision counter generated according to the [encrypted time in period] scheme.
    ///
    /// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
    fn generate_revision_counter(
        &self,
        params: &HsDirParams,
        now: SystemTime,
    ) -> Result<RevisionCounter, FatalError> {
        // TODO: in the future, we might want to compute ope_key once per time period (as oppposed
        // to each time we generate a new descriptor), for performance reasons.
        let ope_key = self.create_ope_key(params.time_period())?;

        // TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
        let srv_start = params.start_of_shard_rand_period();
        let offset = params.offset_within_srv_period(now).ok_or_else(|| {
            internal!(
                "current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
                now,
                srv_start
            )
        })?;
        let rev = ope_key.encrypt(offset);

        Ok(RevisionCounter::from(rev))
    }
}

/// Mockable state for the descriptor publisher reactor.
///
/// This enables us to mock parts of the [`Reactor`] for testing purposes.
#[async_trait]
pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
    /// The type of random number generator.
    type Rng: rand::Rng + rand::CryptoRng;

    /// The type of client circuit.
    type ClientCirc: MockableClientCirc;

    /// Return a random number generator.
    fn thread_rng(&self) -> Self::Rng;

    /// Create a circuit of the specified `kind` to `target`.
    async fn get_or_launch_specific<T>(
        &self,
        netdir: &NetDir,
        kind: HsCircKind,
        target: T,
    ) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
    where
        T: CircTarget + Send + Sync;

    /// Return an estimate-based value for how long we should allow a single
    /// directory upload operation to complete.
    ///
    /// Includes circuit construction, stream opening, upload, and waiting for a
    /// response.
    fn estimate_upload_timeout(&self) -> Duration;
}

/// Mockable client circuit
#[async_trait]
pub(crate) trait MockableClientCirc: Send + Sync {
    /// The data stream type.
    type DataStream: AsyncRead + AsyncWrite + Send + Unpin;

    /// Start a new stream to the last relay in the circuit, using
    /// a BEGIN_DIR cell.
    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
}

#[async_trait]
impl MockableClientCirc for ClientCirc {
    type DataStream = tor_proto::stream::DataStream;

    async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
        ClientCirc::begin_dir_stream(self).await
    }
}

/// The real version of the mockable state of the reactor.
#[derive(Clone, From, Into)]
pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);

#[async_trait]
impl<R: Runtime> Mockable for Real<R> {
    type Rng = rand::rngs::ThreadRng;
    type ClientCirc = ClientCirc;

    fn thread_rng(&self) -> Self::Rng {
        rand::thread_rng()
    }

    async fn get_or_launch_specific<T>(
        &self,
        netdir: &NetDir,
        kind: HsCircKind,
        target: T,
    ) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
    where
        T: CircTarget + Send + Sync,
    {
        self.0.get_or_launch_specific(netdir, kind, target).await
    }

    fn estimate_upload_timeout(&self) -> Duration {
        use tor_circmgr::timeouts::Action;
        let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
        let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
        // We assume that in the worst case we'll have to wait for an entire
        // circuit construction and two round-trips to the hsdir.
        let est_total = est_build + est_roundtrip * 2;
        // We always allow _at least_ this much time, in case our estimate is
        // ridiculously low.
        let min_timeout = Duration::from_secs(30);
        max(est_total, min_timeout)
    }
}

/// The mutable state of a [`Reactor`].
struct Inner {
    /// The onion service config.
    config: Arc<OnionServiceConfig>,
    /// The relevant time periods.
    ///
    /// This includes the current time period, as well as any other time periods we need to be
    /// publishing descriptors for.
    ///
    /// This is empty until we fetch our first netdir in [`Reactor::run`].
    time_periods: Vec<TimePeriodContext>,
    /// Our most up to date netdir.
    ///
    /// This is initialized in [`Reactor::run`].
    netdir: Option<Arc<NetDir>>,
    /// The timestamp of our last upload.
    ///
    /// This is the time when the last update was _initiated_ (rather than completed), to prevent
    /// the publisher from spawning multiple upload tasks at once in response to multiple external
    /// events happening in quick succession, such as the IPT manager sending multiple IPT change
    /// notifications in a short time frame (#1142), or an IPT change notification that's
    /// immediately followed by a consensus change. Starting two upload tasks at once is not only
    /// inefficient, but it also causes the publisher to generate two different descriptors with
    /// the same revision counter (the revision counter is derived from the current timestamp),
    /// which ultimately causes the slower upload task to fail (see #1142).
    ///
    /// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
    /// used for retrying failed uploads (these are handled internally by
    /// [`Reactor::upload_descriptor_with_retries`]).
    last_uploaded: Option<Instant>,
    /// A max-heap containing the time periods for which we need to reupload the descriptor.
    // TODO: we are currently reuploading more than nececessary.
    // Ideally, this shouldn't contain contain duplicate TimePeriods,
    // because we only need to retain the latest reupload time for each time period.
    //
    // Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
    // we will end up with multiple ReuploadTimer entries for that TP,
    // each of which will (eventually) result in a reupload.
    //
    // TODO: maybe this should just be a HashMap<TimePeriod, Instant>
    //
    // See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
    reupload_timers: BinaryHeap<ReuploadTimer>,
}

/// The part of the reactor state that changes with every time period.
struct TimePeriodContext {
    /// The HsDir params.
    params: HsDirParams,
    /// The blinded HsId.
    blind_id: HsBlindId,
    /// The HsDirs to use in this time period.
    ///
    // We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
    // (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
    // store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
    // `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
    hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
    /// The revision counter of the last successful upload, if any.
    last_successful: Option<RevisionCounter>,
}

impl TimePeriodContext {
    /// Create a new `TimePeriodContext`.
    ///
    /// Any of the specified `old_hsdirs` also present in the new list of HsDirs
    /// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
    fn new<'r>(
        params: HsDirParams,
        blind_id: HsBlindId,
        netdir: &Arc<NetDir>,
        old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
    ) -> Result<Self, FatalError> {
        let period = params.time_period();
        Ok(Self {
            params,
            blind_id,
            hs_dirs: Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?,
            last_successful: None,
        })
    }

    /// Recompute the HsDirs for this time period.
    fn compute_hsdirs<'r>(
        period: TimePeriod,
        blind_id: HsBlindId,
        netdir: &Arc<NetDir>,
        mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
    ) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
        let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;

        Ok(hs_dirs
            .map(|hs_dir| {
                let mut builder = RelayIds::builder();
                if let Some(ed_id) = hs_dir.ed_identity() {
                    builder.ed_identity(*ed_id);
                }

                if let Some(rsa_id) = hs_dir.rsa_identity() {
                    builder.rsa_identity(*rsa_id);
                }

                let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());

                // Have we uploaded the descriptor to thiw relay before? If so, we don't need to
                // reupload it unless it was already dirty and due for a reupload.
                let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
                    Some((_, status)) => *status,
                    None => DescriptorStatus::Dirty,
                };

                (relay_id, status)
            })
            .collect::<Vec<_>>())
    }

    /// Mark the descriptor dirty for all HSDirs of this time period.
    fn mark_all_dirty(&mut self) {
        self.hs_dirs
            .iter_mut()
            .for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
    }
}

/// Authorized client configuration error.
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub(crate) enum AuthorizedClientConfigError {
    /// A key is malformed if it doesn't start with the "curve25519" prefix,
    /// or if its decoded content is not exactly 32 bytes long.
    #[error("Malformed authorized client key")]
    MalformedKey,

    /// Error while decoding an authorized client's key.
    #[error("Failed base64-decode an authorized client's key")]
    Base64Decode(#[from] base64ct::Error),

    /// Error while accessing the authorized_client key dir.
    #[error("Failed to {action} file {path}")]
    KeyDir {
        /// What we were doing when we encountered the error.
        action: &'static str,
        /// The file that we were trying to access.
        path: std::path::PathBuf,
        /// The underlying I/O error.
        #[source]
        error: Arc<std::io::Error>,
    },

    /// Error while accessing the authorized_client key dir.
    #[error("expected regular file, found directory: {path}")]
    MalformedFile {
        /// The file that we were trying to access.
        path: std::path::PathBuf,
    },
}

/// An error that occurs while trying to upload a descriptor.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum UploadError {
    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
    #[error("descriptor upload request failed: {}", _0.error)]
    Request(#[from] RequestFailedError),

    /// Failed to establish circuit to hidden service directory
    #[error("could not build circuit to HsDir")]
    Circuit(#[from] tor_circmgr::Error),

    /// Failed to establish stream to hidden service directory
    #[error("failed to establish directory stream to HsDir")]
    Stream(#[source] tor_proto::Error),

    /// A descriptor upload timed out before it could complete.
    #[error("descriptor publication timed out")]
    Timeout,

    /// An internal error.
    #[error("Internal error")]
    Bug(#[from] tor_error::Bug),
}
define_asref_dyn_std_error!(UploadError);

impl<R: Runtime, M: Mockable> Reactor<R, M> {
    /// Create a new `Reactor`.
    #[allow(clippy::too_many_arguments)]
    pub(super) fn new(
        runtime: R,
        nickname: HsNickname,
        dir_provider: Arc<dyn NetDirProvider>,
        mockable: M,
        config: Arc<OnionServiceConfig>,
        ipt_watcher: IptsPublisherView,
        config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
        status_tx: PublisherStatusSender,
        keymgr: Arc<KeyMgr>,
    ) -> Self {
        /// The maximum size of the upload completion notifier channel.
        ///
        /// The channel we use this for is a futures::mpsc channel, which has a capacity of
        /// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
        /// each sender will send exactly one message.
        const UPLOAD_CHAN_BUF_SIZE: usize = 0;

        let (upload_task_complete_tx, upload_task_complete_rx) =
            mpsc::channel(UPLOAD_CHAN_BUF_SIZE);

        let (publish_status_tx, publish_status_rx) = watch::channel();
        // Setting the buffer size to zero here is OK,
        // since we never actually send anything on this channel.
        let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);

        let imm = Immutable {
            runtime,
            mockable,
            nickname,
            keymgr,
            status_tx,
        };

        let inner = Inner {
            time_periods: vec![],
            config,
            netdir: None,
            last_uploaded: None,
            reupload_timers: Default::default(),
        };

        Self {
            imm: Arc::new(imm),
            inner: Arc::new(Mutex::new(inner)),
            dir_provider,
            ipt_watcher,
            config_rx,
            publish_status_rx,
            publish_status_tx,
            upload_task_complete_rx,
            upload_task_complete_tx,
            shutdown_tx,
        }
    }

    /// Start the reactor.
    ///
    /// Under normal circumstances, this function runs indefinitely.
    ///
    /// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
    /// upload fails or is rate-limited.
    pub(super) async fn run(mut self) -> Result<(), FatalError> {
        debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");

        {
            let netdir = wait_for_netdir(self.dir_provider.as_ref(), Timeliness::Timely).await?;
            let time_periods = self.compute_time_periods(&netdir, &[])?;

            let mut inner = self.inner.lock().expect("poisoned lock");

            inner.netdir = Some(netdir);
            inner.time_periods = time_periods;
        }

        let nickname = self.imm.nickname.clone();
        let rt = self.imm.runtime.clone();
        let status_tx = self.imm.status_tx.clone();

        loop {
            match self.run_once().await {
                Ok(ShutdownStatus::Continue) => continue,
                Ok(ShutdownStatus::Terminate) => {
                    debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");

                    self.imm.status_tx.send_shutdown();
                    return Ok(());
                }
                Err(e) => {
                    error_report!(
                        e,
                        "HS service {}: descriptor publisher crashed!",
                        self.imm.nickname
                    );

                    self.imm.status_tx.send_broken(e.clone());

                    return Err(e);
                }
            }
        }
    }

    /// Run one iteration of the reactor loop.
    async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
        let mut netdir_events = self.dir_provider.events();

        // Note: TrackingNow tracks the values it is compared with.
        // This is equivalent to sleeping for (until - now) units of time,
        let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
        if let PublishStatus::RateLimited(until) = self.status() {
            if upload_rate_lim > until {
                // We are no longer rate-limited
                self.expire_rate_limit().await?;
            }
        }

        let reupload_tracking = TrackingNow::now(&self.imm.runtime);
        let mut reupload_periods = vec![];
        {
            let mut inner = self.inner.lock().expect("poisoned lock");
            let inner = &mut *inner;
            while let Some(reupload) = inner.reupload_timers.peek().copied() {
                // First, extract all the timeouts that already elapsed.
                if reupload.when <= reupload_tracking {
                    inner.reupload_timers.pop();
                    reupload_periods.push(reupload.period);
                } else {
                    // We are not ready to schedule any more reuploads.
                    //
                    // How much we need to sleep is implicitly
                    // tracked in reupload_tracking (through
                    // the TrackingNow implementation)
                    break;
                }
            }
        }

        // Check if it's time to schedule any reuploads.
        for period in reupload_periods {
            if self.mark_dirty(&period) {
                debug!(
                    time_period=?period,
                    "descriptor reupload timer elapsed; scheduling reupload",
                );
                self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
                    .await?;
            }
        }

        select_biased! {
            res = self.upload_task_complete_rx.next().fuse() => {
                let Some(upload_res) = res else {
                    return Ok(ShutdownStatus::Terminate);
                };

                self.handle_upload_results(upload_res);
            },
            () = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
                self.expire_rate_limit().await?;
            },
            () = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
                // Run another iteration, executing run_once again. This time, we will remove the
                // expired reupload from self.reupload_timers, mark the descriptor dirty for all
                // relevant HsDirs, and schedule the upload by setting our status to
                // UploadScheduled.
                return Ok(ShutdownStatus::Continue);
            },
            netidr_event = netdir_events.next().fuse() => {
                // The consensus changed. Grab a new NetDir.
                let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
                    Ok(y) => y,
                    Err(e) => {
                        error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
                        // Hopefully a netdir will appear in the future.
                        // in the meantime, suspend operations.
                        //
                        // TODO (#1218): there is a bug here: we stop reading on our inputs
                        // including eg publish_status_rx, but it is our job to log some of
                        // these things.  While we are waiting for a netdir, all those messages
                        // are "stuck"; they'll appear later, with misleading timestamps.
                        //
                        // Probably this should be fixed by moving the logging
                        // out of the reactor, where it won't be blocked.
                        wait_for_netdir(self.dir_provider.as_ref(), Timeliness::Timely)
                            .await?
                    }
                };
                let relevant_periods = netdir.hs_all_time_periods();
                self.handle_consensus_change(netdir).await?;
                expire_publisher_keys(
                    &self.imm.keymgr,
                    &self.imm.nickname,
                    &relevant_periods,
                ).unwrap_or_else(|e| {
                    error_report!(e, "failed to remove expired keys");
                });
            }
            update = self.ipt_watcher.await_update().fuse() => {
                if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
                    return Ok(ShutdownStatus::Terminate);
                }
            },
            config = self.config_rx.next().fuse() => {
                let Some(config) = config else {
                    return Ok(ShutdownStatus::Terminate);
                };

                self.handle_svc_config_change(config).await?;
            },

            should_upload = self.publish_status_rx.next().fuse() => {
                let Some(should_upload) = should_upload else {
                    return Ok(ShutdownStatus::Terminate);
                };

                // Our PublishStatus changed -- are we ready to publish?
                if should_upload == PublishStatus::UploadScheduled {
                    self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
                    self.upload_all().await?;
                }
            }
        }

        Ok(ShutdownStatus::Continue)
    }

    /// Returns the current status of the publisher
    fn status(&self) -> PublishStatus {
        *self.publish_status_rx.borrow()
    }

    /// Handle a batch of upload outcomes,
    /// possibly updating the status of the descriptor for the corresponding HSDirs.
    fn handle_upload_results(&self, results: TimePeriodUploadResult) {
        let mut inner = self.inner.lock().expect("poisoned lock");
        let inner = &mut *inner;

        // Check which time period these uploads pertain to.
        let period = inner
            .time_periods
            .iter_mut()
            .find(|ctx| ctx.params.time_period() == results.time_period);

        let Some(period) = period else {
            // The uploads were for a time period that is no longer relevant, so we
            // can ignore the result.
            return;
        };

        // We will need to reupload this descriptor at at some point, so we pick
        // a random time between 60 minutes and 120 minutes in the future.
        //
        // See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
        let mut rng = self.imm.mockable.thread_rng();
        // TODO SPEC: Control republish period using a consensus parameter?
        let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
        let duration = Duration::from_secs(minutes * 60);
        let reupload_when = self.imm.runtime.now() + duration;
        let time_period = period.params.time_period();

        info!(
            time_period=?time_period,
            "reuploading descriptor in {}",
            humantime::format_duration(duration),
        );

        inner.reupload_timers.push(ReuploadTimer {
            period: time_period,
            when: reupload_when,
        });

        for upload_res in results.hsdir_result {
            let relay = period
                .hs_dirs
                .iter_mut()
                .find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);

            let Some((relay, status)) = relay else {
                // This HSDir went away, so the result doesn't matter.
                // Continue processing the rest of the results
                continue;
            };

            if upload_res.upload_res == UploadStatus::Success {
                let update_last_successful = match period.last_successful {
                    None => true,
                    Some(counter) => counter <= upload_res.revision_counter,
                };

                if update_last_successful {
                    period.last_successful = Some(upload_res.revision_counter);
                    // TODO (#1098): Is it possible that this won't update the statuses promptly
                    // enough. For example, it's possible for the reactor to see a Dirty descriptor
                    // and start an upload task for a descriptor has already been uploaded (or is
                    // being uploaded) in another task, but whose upload results have not yet been
                    // processed.
                    //
                    // This is probably made worse by the fact that the statuses are updated in
                    // batches (grouped by time period), rather than one by one as the upload tasks
                    // complete (updating the status involves locking the inner mutex, and I wanted
                    // to minimize the locking/unlocking overheads). I'm not sure handling the
                    // updates in batches was the correct decision here.
                    *status = DescriptorStatus::Clean;
                }
            }
        }
    }

    /// Maybe update our list of HsDirs.
    async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
        trace!("the consensus has changed; recomputing HSDirs");

        let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);

        self.recompute_hs_dirs()?;
        self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
            .await?;

        Ok(())
    }

    /// Recompute the HsDirs for all relevant time periods.
    fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
        let mut inner = self.inner.lock().expect("poisoned lock");
        let inner = &mut *inner;

        let netdir = Arc::clone(
            inner
                .netdir
                .as_ref()
                .ok_or_else(|| internal!("started upload task without a netdir"))?,
        );

        // Update our list of relevant time periods.
        let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
        inner.time_periods = new_time_periods;

        Ok(())
    }

    /// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
    ///
    /// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
    /// HsDirs where possible.
    fn compute_time_periods(
        &self,
        netdir: &Arc<NetDir>,
        time_periods: &[TimePeriodContext],
    ) -> Result<Vec<TimePeriodContext>, FatalError> {
        netdir
            .hs_all_time_periods()
            .iter()
            .map(|params| {
                let period = params.time_period();
                let svc_key_spec = HsIdKeypairSpecifier::new(self.imm.nickname.clone());
                let hsid_kp = self
                    .imm
                    .keymgr
                    .get::<HsIdKeypair>(&svc_key_spec)?
                    .ok_or_else(|| FatalError::MissingHsIdKeypair(self.imm.nickname.clone()))?;
                let svc_key_spec = BlindIdKeypairSpecifier::new(self.imm.nickname.clone(), period);

                let blind_id_kp =
                    read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
                        // Note: for now, read_blind_id_keypair cannot return Ok(None).
                        // It's supposed to return Ok(None) if we're in offline hsid mode,
                        // but that might change when we do #1194
                        .ok_or_else(|| internal!("offline hsid mode not supported"))?;

                let blind_id: HsBlindIdKey = (&blind_id_kp).into();

                // If our previous `TimePeriodContext`s also had an entry for `period`, we need to
                // preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
                // publishing the descriptor to the HsDirs that already have it (the ones that are
                // marked with DescriptorStatus::Clean).
                //
                // In other words, we only want to publish to those HsDirs that
                //   * are part of a new time period (which we have never published the descriptor
                //   for), or
                //   * have just been added to the ring of a time period we already knew about
                if let Some(ctx) = time_periods
                    .iter()
                    .find(|ctx| ctx.params.time_period() == period)
                {
                    TimePeriodContext::new(
                        params.clone(),
                        blind_id.into(),
                        netdir,
                        ctx.hs_dirs.iter(),
                    )
                } else {
                    // Passing an empty iterator here means all HsDirs in this TimePeriodContext
                    // will be marked as dirty, meaning we will need to upload our descriptor to them.
                    TimePeriodContext::new(params.clone(), blind_id.into(), netdir, iter::empty())
                }
            })
            .collect::<Result<Vec<TimePeriodContext>, FatalError>>()
    }

    /// Replace the old netdir with the new, returning the old.
    fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
        self.inner
            .lock()
            .expect("poisoned lock")
            .netdir
            .replace(new_netdir)
    }

    /// Replace our view of the service config with `new_config` if `new_config` contains changes
    /// that would cause us to generate a new descriptor.
    fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfig>) -> bool {
        let mut inner = self.inner.lock().expect("poisoned lock");
        let old_config = &mut inner.config;

        // The fields we're interested in haven't changed, so there's no need to update
        // `inner.config`.
        //
        // TODO: maybe `Inner` should only contain the fields we're interested in instead of
        // the entire config.
        //
        // Alternatively, a less error-prone solution would be to introduce a separate
        // `DescriptorConfigView` as described in
        // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1603#note_2944902

        // TODO (#1206): Temporarily disabled while we figure out how we want the client auth config to
        // work; see #1028
        /*
        if old_config.anonymity == new_config.anonymity
            && old_config.encrypt_descriptor == new_config.encrypt_descriptor
        {
            return false;
        }
        */

        let _old: Arc<OnionServiceConfig> = std::mem::replace(old_config, new_config);

        true
    }

    /// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
    /// uploading.
    fn note_ipt_change(&self) -> PublishStatus {
        let inner = self.inner.lock().expect("poisoned lock");

        let mut ipts = self.ipt_watcher.borrow_for_publish();
        match ipts.ipts.as_mut() {
            Some(ipts) => PublishStatus::UploadScheduled,
            None => PublishStatus::AwaitingIpts,
        }
    }

    /// Update our list of introduction points.
    async fn handle_ipt_change(
        &mut self,
        update: Option<Result<(), crate::FatalError>>,
    ) -> Result<ShutdownStatus, FatalError> {
        trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
        match update {
            Some(Ok(())) => {
                let should_upload = self.note_ipt_change();
                debug!(nickname=%self.imm.nickname, "the introduction points have changed");

                self.mark_all_dirty();
                self.update_publish_status_unless_rate_lim(should_upload)
                    .await?;
                Ok(ShutdownStatus::Continue)
            }
            Some(Err(e)) => Err(e),
            None => {
                debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
                Ok(ShutdownStatus::Terminate)
            }
        }
    }

    /// Update the `PublishStatus` of the reactor with `new_state`,
    /// unless the current state is `AwaitingIpts`.
    async fn update_publish_status_unless_waiting(
        &mut self,
        new_state: PublishStatus,
    ) -> Result<(), FatalError> {
        // Only update the state if we're not waiting for intro points.
        if self.status() != PublishStatus::AwaitingIpts {
            self.update_publish_status(new_state).await?;
        }

        Ok(())
    }

    /// Update the `PublishStatus` of the reactor with `new_state`,
    /// unless the current state is `RateLimited`.
    async fn update_publish_status_unless_rate_lim(
        &mut self,
        new_state: PublishStatus,
    ) -> Result<(), FatalError> {
        // We can't exit this state until the rate-limit expires.
        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
            self.update_publish_status(new_state).await?;
        }

        Ok(())
    }

    /// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
    async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), FatalError> {
        let onion_status = match new_state {
            PublishStatus::Idle => State::Running,
            PublishStatus::UploadScheduled
            | PublishStatus::AwaitingIpts
            | PublishStatus::RateLimited(_) => State::Bootstrapping,
        };

        self.imm.status_tx.send(onion_status, None);

        trace!(
            "publisher reactor status change: {:?} -> {:?}",
            self.status(),
            new_state
        );

        self.publish_status_tx.send(new_state).await.map_err(
            |_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
        )?;

        Ok(())
    }

    /// Use the new keys.
    async fn handle_new_keys(&self) -> Result<(), FatalError> {
        todo!()
    }

    /// Update the descriptors based on the config change.
    async fn handle_svc_config_change(
        &mut self,
        config: Arc<OnionServiceConfig>,
    ) -> Result<(), FatalError> {
        if self.replace_config_if_changed(config) {
            self.mark_all_dirty();

            // Schedule an upload, unless we're still waiting for IPTs.
            self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
                .await?;
        }

        Ok(())
    }

    /// Mark the descriptor dirty for all time periods.
    fn mark_all_dirty(&self) {
        trace!("marking the descriptor dirty for all time periods");

        self.inner
            .lock()
            .expect("poisoned lock")
            .time_periods
            .iter_mut()
            .for_each(|tp| tp.mark_all_dirty());
    }

    /// Mark the descriptor dirty for the specified time period.
    ///
    /// Returns `true` if the specified period is still relevant, and `false` otherwise.
    fn mark_dirty(&self, period: &TimePeriod) -> bool {
        let mut inner = self.inner.lock().expect("poisoned lock");
        let period_ctx = inner
            .time_periods
            .iter_mut()
            .find(|tp| tp.params.time_period() == *period);

        match period_ctx {
            Some(ctx) => {
                trace!(time_period=?period, "marking the descriptor dirty");
                ctx.mark_all_dirty();
                true
            }
            None => false,
        }
    }

    /// Try to upload our descriptor to the HsDirs that need it.
    ///
    /// If we've recently uploaded some descriptors, we return immediately and schedule the upload
    /// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
    ///
    /// Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we
    /// implement it, as well as in what cases this will return an error).
    async fn upload_all(&mut self) -> Result<(), FatalError> {
        trace!("starting descriptor upload task...");

        let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
        let now = self.imm.runtime.now();
        // Check if we should rate-limit this upload.
        if let Some(ts) = last_uploaded {
            let duration_since_upload = now.duration_since(ts);

            if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
                return self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await;
            }
        }

        let mut inner = self.inner.lock().expect("poisoned lock");
        let inner = &mut *inner;

        let _ = inner.last_uploaded.insert(now);

        for period_ctx in inner.time_periods.iter_mut() {
            let upload_task_complete_tx = self.upload_task_complete_tx.clone();

            // Figure out which HsDirs we need to upload the descriptor to (some of them might already
            // have our latest descriptor, so we filter them out).
            let hs_dirs = period_ctx
                .hs_dirs
                .iter()
                .filter_map(|(relay_id, status)| {
                    if *status == DescriptorStatus::Dirty {
                        Some(relay_id.clone())
                    } else {
                        None
                    }
                })
                .collect::<Vec<_>>();

            if hs_dirs.is_empty() {
                trace!("the descriptor is clean for all HSDirs. Nothing to do");
                return Ok(());
            }

            let time_period = period_ctx.params.time_period();
            // This scope exists because rng is not Send, so it needs to fall out of scope before we
            // await anything.
            let netdir = Arc::clone(
                inner
                    .netdir
                    .as_ref()
                    .ok_or_else(|| internal!("started upload task without a netdir"))?,
            );

            let imm = Arc::clone(&self.imm);
            let ipt_upload_view = self.ipt_watcher.upload_view();
            let config = Arc::clone(&inner.config);

            trace!(nickname=%self.imm.nickname, time_period=?time_period,
                "spawning upload task"
            );

            let params = period_ctx.params.clone();
            let shutdown_rx = self.shutdown_tx.subscribe();

            // Spawn a task to upload the descriptor to all HsDirs of this time period.
            //
            // This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
            // dropped).
            let _handle: () = self
                .imm
                .runtime
                .spawn(async move {
                    if let Err(e) = Self::upload_for_time_period(
                        hs_dirs,
                        &netdir,
                        config,
                        params,
                        Arc::clone(&imm),
                        ipt_upload_view.clone(),
                        upload_task_complete_tx,
                        shutdown_rx,
                    )
                    .await
                    {
                        error_report!(
                            e,
                            "descriptor upload failed for HS service {} and time period {:?}",
                            imm.nickname,
                            time_period
                        );
                    }
                })
                .map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
        }

        Ok(())
    }

    /// Upload the descriptor for the specified time period.
    ///
    /// Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we
    /// implement it, as well as in what cases this will return an error).
    #[allow(clippy::too_many_arguments)] // TODO: refactor
    async fn upload_for_time_period(
        hs_dirs: Vec<RelayIds>,
        netdir: &Arc<NetDir>,
        config: Arc<OnionServiceConfig>,
        params: HsDirParams,
        imm: Arc<Immutable<R, M>>,
        ipt_upload_view: IptsPublisherUploadView,
        mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
        shutdown_rx: broadcast::Receiver<Void>,
    ) -> Result<(), FatalError> {
        let time_period = params.time_period();
        trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");

        let hsdir_count = hs_dirs.len();

        /// An error returned from an upload future.
        //
        // Exhaustive, because this is a private type.
        #[derive(Clone, Debug, thiserror::Error)]
        enum PublishError {
            /// The upload was aborted because there are no IPTs.
            ///
            /// This happens because of an inevitable TOCTOU race, where after being notified by
            /// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
            /// we find out there actually are no IPTs, so we can't build the descriptor.
            ///
            /// This is a special kind of error that interrupts the current upload task, and is
            /// logged at `debug!` level rather than `warn!` or `error!`.
            ///
            /// Ideally, this shouldn't happen very often (if at all).
            #[error("No IPTs")]
            NoIpts,

            /// The reactor has shut down
            #[error("The reactor has shut down")]
            Shutdown,

            /// An fatal error.
            #[error("{0}")]
            Fatal(#[from] FatalError),
        }

        let upload_results = futures::stream::iter(hs_dirs)
            .map(|relay_ids| {
                let netdir = netdir.clone();
                let config = Arc::clone(&config);
                let imm = Arc::clone(&imm);
                let ipt_upload_view = ipt_upload_view.clone();
                let params = params.clone();
                let mut shutdown_rx = shutdown_rx.clone();

                let ed_id = relay_ids
                    .rsa_identity()
                    .map(|id| id.to_string())
                    .unwrap_or_else(|| "unknown".into());
                let rsa_id = relay_ids
                    .rsa_identity()
                    .map(|id| id.to_string())
                    .unwrap_or_else(|| "unknown".into());

                async move {
                    let run_upload = |desc| async {
                        let Some(hsdir) = netdir.by_ids(&relay_ids) else {
                            // This should never happen (all of our relay_ids are from the stored
                            // netdir).
                            warn!(
                                nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
                                "tried to upload descriptor to relay not found in consensus?!"
                            );
                            return UploadStatus::Failure;
                        };

                        Self::upload_descriptor_with_retries(
                            desc,
                            &netdir,
                            &hsdir,
                            &ed_id,
                            &rsa_id,
                            Arc::clone(&imm),
                        )
                        .await
                    };

                    // How long until we're supposed to time out?
                    let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
                    // We generate a new descriptor before _each_ HsDir upload. This means each
                    // HsDir could, in theory, receive a different descriptor (not just in terms of
                    // revision-counters, but also with a different set of IPTs). It may seem like
                    // this could lead to some HsDirs being left with an outdated descriptor, but
                    // that's not the case: after the upload completes, the publisher will be
                    // notified by the ipt_watcher of the IPT change event (if there was one to
                    // begin with), which will trigger another upload job.
                    let hsdesc = {
                        // This scope is needed because the ipt_set MutexGuard is not Send, so it
                        // needs to fall out of scope before the await point below
                        let mut ipt_set = ipt_upload_view.borrow_for_publish();

                        // If there are no IPTs, we abort the upload. At this point, we might have
                        // uploaded the descriptor to some, but not all, HSDirs from the specified
                        // time period.
                        //
                        // Returning an error here means the upload completion task is never
                        // notified of the outcome of any of these uploads (which means the
                        // descriptor is not marked clean). This is OK, because if we suddenly find
                        // out we have no IPTs, it means our built `hsdesc` has an outdated set of
                        // IPTs, so we need to go back to the main loop to wait for IPT changes,
                        // and generate a fresh descriptor anyway.
                        //
                        // Ideally, this shouldn't happen very often (if at all).
                        let Some(ipts) = ipt_set.ipts.as_mut() else {
                            return Err(PublishError::NoIpts);
                        };

                        let hsdesc = {
                            trace!(
                                nickname=%imm.nickname, time_period=?time_period,
                                "building descriptor"
                            );
                            let mut rng = imm.mockable.thread_rng();

                            // We're about to generate a new version of the descriptor,
                            // so let's generate a new revision counter.
                            let now = imm.runtime.wallclock();
                            let revision_counter = imm.generate_revision_counter(&params, now)?;

                            build_sign(
                                &imm.keymgr,
                                &config,
                                ipts,
                                time_period,
                                revision_counter,
                                &mut rng,
                                imm.runtime.wallclock(),
                            )?
                        };

                        if let Err(e) =
                            ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
                        {
                            let wait = e.log_retry_max(&imm.nickname)?;
                            // TODO (#1226): retry instead of this
                            return Err(FatalError::Bug(internal!(
                                "ought to retry after {wait:?}, crashing instead"
                            ))
                            .into());
                        }

                        hsdesc
                    };

                    let VersionedDescriptor {
                        desc,
                        revision_counter,
                    } = hsdesc;

                    trace!(
                        nickname=%imm.nickname, time_period=?time_period,
                        revision_counter=?revision_counter,
                        "generated new descriptor for time period",
                    );

                    // (Actually launch the upload attempt. No timeout is needed
                    // here, since the backoff::Runner code will handle that for us.)
                    let upload_res = select_biased! {
                        shutdown = shutdown_rx.next().fuse() => {
                            match shutdown {
                                Some(_) => unreachable!("received Void value?!"),
                                None => {

                                    // It looks like the reactor has shut down,
                                    // so there is no point in uploading the descriptor anymore.
                                    //
                                    // Let's shut down the upload task too.
                                    trace!(
                                        nickname=%imm.nickname, time_period=?time_period,
                                        "upload task received shutdown signal"
                                    );

                                    return Err(PublishError::Shutdown);
                                }
                            }
                        },
                        res = run_upload(desc.clone()).fuse() => res,
                    };

                    // Note: UploadStatus::Failure is only returned when
                    // upload_descriptor_with_retries fails, i.e. if all our retry
                    // attempts have failed
                    Ok(HsDirUploadStatus {
                        relay_ids,
                        upload_res,
                        revision_counter,
                    })
                }
            })
            // This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
            .boxed()
            .buffer_unordered(MAX_CONCURRENT_UPLOADS)
            .try_collect::<Vec<_>>()
            .await;

        let upload_results = match upload_results {
            Ok(v) => v,
            Err(PublishError::Fatal(e)) => return Err(e),
            Err(PublishError::NoIpts) => {
                debug!(
                    nickname=%imm.nickname, time_period=?time_period,
                     "no introduction points; skipping upload"
                );

                return Ok(());
            }
            Err(PublishError::Shutdown) => {
                debug!(
                    nickname=%imm.nickname, time_period=?time_period,
                     "the reactor has shut down; aborting upload"
                );

                return Ok(());
            }
        };

        let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
            .iter()
            .partition(|res| res.upload_res == UploadStatus::Success);

        debug!(
            nickname=%imm.nickname, time_period=?time_period,
            "descriptor uploaded successfully to {}/{} HSDirs",
            succeeded.len(), hsdir_count
        );

        if let Err(e) = upload_task_complete_tx
            .send(TimePeriodUploadResult {
                time_period,
                hsdir_result: upload_results,
            })
            .await
        {
            return Err(internal!(
                "failed to notify reactor of upload completion (reactor shut down)"
            )
            .into());
        }

        Ok(())
    }

    /// Upload a descriptor to the specified HSDir.
    ///
    /// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
    /// to the caller to retry on failure.
    ///
    /// This function does not handle timeouts.
    async fn upload_descriptor(
        hsdesc: String,
        netdir: &Arc<NetDir>,
        hsdir: &Relay<'_>,
        imm: Arc<Immutable<R, M>>,
    ) -> Result<(), UploadError> {
        let request = HsDescUploadRequest::new(hsdesc);

        trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
            "starting descriptor upload",
        );

        let circuit = imm
            .mockable
            .get_or_launch_specific(
                netdir,
                HsCircKind::SvcHsDir,
                OwnedCircTarget::from_circ_target(hsdir),
            )
            .await?;

        let mut stream = circuit
            .begin_dir_stream()
            .await
            .map_err(UploadError::Stream)?;

        let response = send_request(&imm.runtime, &request, &mut stream, None)
            .await
            .map_err(|dir_error| -> UploadError {
                match dir_error {
                    DirClientError::RequestFailed(e) => e.into(),
                    DirClientError::CircMgr(e) => into_internal!(
                        "tor-dirclient complains about circmgr going wrong but we gave it a stream"
                    )(e)
                    .into(),
                    e => into_internal!("unexpected error")(e).into(),
                }
            })?
            .into_output_string()?; // This returns an error if we received an error response

        Ok(())
    }

    /// Upload a descriptor to the specified HSDir, retrying if appropriate.
    ///
    /// TODO (#1216): document the retry logic when we implement it.
    async fn upload_descriptor_with_retries(
        hsdesc: String,
        netdir: &Arc<NetDir>,
        hsdir: &Relay<'_>,
        ed_id: &str,
        rsa_id: &str,
        imm: Arc<Immutable<R, M>>,
    ) -> UploadStatus {
        /// The base delay to use for the backoff schedule.
        const BASE_DELAY_MSEC: u32 = 1000;
        let schedule = PublisherBackoffSchedule {
            retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
            mockable: imm.mockable.clone(),
        };

        let runner = Runner::new(
            "upload a hidden service descriptor".into(),
            schedule.clone(),
            imm.runtime.clone(),
        );

        let fallible_op =
            || Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm));

        let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
        match outcome {
            Ok(()) => {
                debug!(
                    nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
                    "successfully uploaded descriptor to HSDir",
                );

                UploadStatus::Success
            }
            Err(e) => {
                warn_report!(
                    e,
                    "failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
                    imm.nickname,
                    ed_id,
                    rsa_id
                );

                UploadStatus::Failure
            }
        }
    }

    /// Stop publishing descriptors until the specified delay elapses.
    async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), FatalError> {
        if !matches!(self.status(), PublishStatus::RateLimited(_)) {
            debug!(
                "We are rate-limited for {}; pausing descriptor publication",
                humantime::format_duration(delay)
            );
            let until = self.imm.runtime.now() + delay;
            self.update_publish_status(PublishStatus::RateLimited(until))
                .await?;
        }

        Ok(())
    }

    /// Handle the upload rate-limit being lifted.
    async fn expire_rate_limit(&mut self) -> Result<(), FatalError> {
        debug!("We are no longer rate-limited; resuming descriptor publication");
        self.update_publish_status(PublishStatus::UploadScheduled)
            .await?;
        Ok(())
    }
}

/// Try to read the blinded identity key for a given `TimePeriod`.
///
/// Returns `None` if the service is running in "offline" mode.
///
// TODO (#1194): we don't currently have support for "offline" mode so this can never return
// `Ok(None)`.
pub(super) fn read_blind_id_keypair(
    keymgr: &Arc<KeyMgr>,
    nickname: &HsNickname,
    period: TimePeriod,
) -> Result<Option<HsBlindIdKeypair>, FatalError> {
    let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
    let hsid_kp = keymgr
        .get::<HsIdKeypair>(&svc_key_spec)?
        .ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;

    let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);

    // TODO: make the keystore selector configurable
    let keystore_selector = Default::default();
    match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
        Some(kp) => Ok(Some(kp)),
        None => {
            let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
                .compute_blinded_key(period)
                .map_err(|_| internal!("failed to compute blinded key"))?;

            // Note: we can't use KeyMgr::generate because this key is derived from the HsId
            // (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
            // which assumes keys are randomly generated, rather than derived from existing keys).

            keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector)?;

            let arti_path = |spec: &dyn KeySpecifier| {
                spec.arti_path()
                    .map_err(into_internal!("invalid key specifier?!"))
            };

            Ok(Some(
                keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
                    FatalError::KeystoreRace {
                        action: "read",
                        path: arti_path(&blind_id_key_spec)?,
                    },
                )?,
            ))
        }
    }
}

/// Whether the reactor should initiate an upload.
#[derive(Copy, Clone, Debug, Default, PartialEq)]
enum PublishStatus {
    /// We need to call upload_all.
    UploadScheduled,
    /// We are rate-limited until the specified [`Instant`].
    ///
    /// We have tried to schedule multiple uploads in a short time span,
    /// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
    /// channel to unblock us.
    RateLimited(Instant),
    /// We are idle and waiting for external events.
    ///
    /// We have enough information to build the descriptor, but since we have already called
    /// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
    Idle,
    /// We are waiting for the IPT manager to establish some introduction points.
    ///
    /// No descriptors will be published until the `PublishStatus` of the reactor is changed to
    /// `UploadScheduled`.
    #[default]
    AwaitingIpts,
}

/// The backoff schedule for the task that publishes descriptors.
#[derive(Clone, Debug)]
struct PublisherBackoffSchedule<M: Mockable> {
    /// The delays
    retry_delay: RetryDelay,
    /// The mockable reactor state, needed for obtaining an rng.
    mockable: M,
}

impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
    fn max_retries(&self) -> Option<usize> {
        None
    }

    fn overall_timeout(&self) -> Option<Duration> {
        Some(OVERALL_UPLOAD_TIMEOUT)
    }

    fn single_attempt_timeout(&self) -> Option<Duration> {
        Some(self.mockable.estimate_upload_timeout())
    }

    fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
        Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
    }
}

impl RetriableError for UploadError {
    fn should_retry(&self) -> bool {
        match self {
            UploadError::Request(_)
            | UploadError::Circuit(_)
            | UploadError::Stream(_)
            | UploadError::Timeout => true,
            UploadError::Bug(_) => false,
        }
    }
}

/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
#[derive(Debug, Clone)]
struct TimePeriodUploadResult {
    /// The time period.
    time_period: TimePeriod,
    /// The upload results.
    hsdir_result: Vec<HsDirUploadStatus>,
}

/// The outcome of uploading a descriptor to a particular HsDir.
#[derive(Clone, Debug, PartialEq)]
struct HsDirUploadStatus {
    /// The identity of the HsDir we attempted to upload the descriptor to.
    relay_ids: RelayIds,
    /// The outcome of this attempt.
    upload_res: UploadStatus,
    /// The revision counter of the descriptor we tried to upload.
    revision_counter: RevisionCounter,
}

/// The outcome of uploading a descriptor.
//
// TODO: consider making this a type alias for Result<(), ()>
#[derive(Copy, Clone, Debug, PartialEq)]
enum UploadStatus {
    /// The descriptor upload succeeded.
    Success,
    /// The descriptor upload failed.
    Failure,
}

impl<T, E> From<Result<T, E>> for UploadStatus {
    fn from(res: Result<T, E>) -> Self {
        if res.is_ok() {
            Self::Success
        } else {
            Self::Failure
        }
    }
}