1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
//! The onion service publisher reactor.
//!
//! TODO (#1216): write the docs
//!
//! With respect to [`OnionServiceStatus`] reporting,
//! the following state transitions are possible:
//!
//!
//! ```ignore
//!
//! update_publish_status(UploadScheduled|AwaitingIpts|RateLimited) +---------------+
//! +--------------------------------------------------------------->| Bootstrapping |
//! | +---------------+
//! +----------+ | update_publish_status(Idle) +---------+ |
//! | Shutdown |-- +----------------------------------->| Running |----+ |
//! +----------+ | +---------+ | |
//! | | |
//! | | |
//! | run_once() returns an error +--------+ | |
//! +----------------------------->| Broken |<----------+--------------------+
//! +--------+ run_once() returns an error
//! ```
//!
//! Ideally, the publisher should also set the
//! [`OnionServiceStatus`] to `Recovering` whenever a transient
//! upload error occurs, but this is currently not possible:
//!
//! * making the upload tasks set the status to `Recovering` (on failure) and `Running` (on
//! success) wouldn't work, because the upload tasks run in parallel (they would race with each
//! other, and the final status (`Recovering`/`Running`) would be the status of the last upload
//! task, rather than the real status of the publisher
//! * making the upload task set the status to `Recovering` on upload failure, and letting
//! `upload_publish_status` reset it back to `Running also would not work:
//! `upload_publish_status` sets the status back to `Running` when the publisher enters its
//! `Idle` state, regardless of the status of its upload tasks
//!
//! TODO: Indeed, setting the status to `Recovering` _anywhere_ would not work, because
//! `upload_publish_status` will just overwrite it. We would need to introduce some new
//! `PublishStatus` variant (currently, the publisher only has 3 states, `Idle`, `UploadScheduled`,
//! `AwaitingIpts`), for the `Recovering` (retrying a failed upload) and `Broken` (the upload
//! failed and we've given up) states. However, adding these 2 new states is non-trivial:
//!
//! * how do we define "failure"? Is it the failure to upload to a single HsDir, or the failure
//! to upload to **any** HsDirs?
//! * what should make the publisher transition out of the `Broken`/`Recovering` states? While
//! `handle_upload_results` can see the upload results for a batch of HsDirs (corresponding to
//! a time period), the publisher doesn't do any sort of bookkeeping to know if a previously
//! failed HsDir upload succeeded in a later upload "batch"
//!
//! For the time being, the publisher never sets the status to `Recovering`, and uses the `Broken`
//! status for reporting fatal errors (crashes).
use super::*;
/// The upload rate-limiting threshold.
///
/// Before initiating an upload, the reactor checks if the last upload was at least
/// `UPLOAD_RATE_LIM_THRESHOLD` seconds ago. If so, it uploads the descriptor to all HsDirs that
/// need it. If not, it schedules the upload to happen `UPLOAD_RATE_LIM_THRESHOLD` seconds from the
/// current time.
//
// TODO: We may someday need to tune this value; it was chosen more or less arbitrarily.
const UPLOAD_RATE_LIM_THRESHOLD: Duration = Duration::from_secs(60);
/// The maximum number of concurrent upload tasks per time period.
//
// TODO: this value was arbitrarily chosen and may not be optimal. For now, it
// will have no effect, since the current number of replicas is far less than
// this value.
//
// The uploads for all TPs happen in parallel. As a result, the actual limit for the maximum
// number of concurrent upload tasks is multiplied by a number which depends on the TP parameters
// (currently 2, which means the concurrency limit will, in fact, be 32).
//
// We should try to decouple this value from the TP parameters.
const MAX_CONCURRENT_UPLOADS: usize = 16;
/// The maximum time allowed for uploading a descriptor to a single HSDir,
/// across all attempts.
pub(crate) const OVERALL_UPLOAD_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// A reactor for the HsDir [`Publisher`]
///
/// The entrypoint is [`Reactor::run`].
#[must_use = "If you don't call run() on the reactor, it won't publish any descriptors."]
pub(super) struct Reactor<R: Runtime, M: Mockable> {
/// The immutable, shared inner state.
imm: Arc<Immutable<R, M>>,
/// A source for new network directories that we use to determine
/// our HsDirs.
dir_provider: Arc<dyn NetDirProvider>,
/// The mutable inner state,
inner: Arc<Mutex<Inner>>,
/// A channel for receiving IPT change notifications.
ipt_watcher: IptsPublisherView,
/// A channel for receiving onion service config change notifications.
config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
/// A channel for receiving updates regarding our [`PublishStatus`].
///
/// The main loop of the reactor watches for updates on this channel.
///
/// When the [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
/// we can start publishing descriptors.
///
/// If the [`PublishStatus`] is [`AwaitingIpts`](PublishStatus::AwaitingIpts), publishing is
/// paused until we receive a notification on `ipt_watcher` telling us the IPT manager has
/// established some introduction points.
publish_status_rx: watch::Receiver<PublishStatus>,
/// A sender for updating our [`PublishStatus`].
///
/// When our [`PublishStatus`] changes to [`UploadScheduled`](PublishStatus::UploadScheduled),
/// we can start publishing descriptors.
publish_status_tx: watch::Sender<PublishStatus>,
/// A channel for sending upload completion notifications.
///
/// This channel is polled in the main loop of the reactor.
upload_task_complete_rx: mpsc::Receiver<TimePeriodUploadResult>,
/// A channel for receiving upload completion notifications.
///
/// A copy of this sender is handed to each upload task.
upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
/// A sender for notifying any pending upload tasks that the reactor is shutting down.
///
/// Receivers can use this channel to find out when reactor is dropped.
///
/// This is currently only used in [`upload_for_time_period`](Reactor::upload_for_time_period).
/// Any future background tasks can also use this channel to detect if the reactor is dropped.
///
/// Closing this channel will cause any pending upload tasks to be dropped.
shutdown_tx: broadcast::Sender<Void>,
}
/// The immutable, shared state of the descriptor publisher reactor.
#[derive(Clone)]
struct Immutable<R: Runtime, M: Mockable> {
/// The runtime.
runtime: R,
/// Mockable state.
///
/// This is used for launching circuits and for obtaining random number generators.
mockable: M,
/// The service for which we're publishing descriptors.
nickname: HsNickname,
/// The key manager,
keymgr: Arc<KeyMgr>,
/// A sender for updating the status of the onion service.
status_tx: PublisherStatusSender,
}
impl<R: Runtime, M: Mockable> Immutable<R, M> {
/// Create an [`AesOpeKey`] for generating revision counters for the descriptors associated
/// with the specified [`TimePeriod`].
///
/// If the onion service is not running in offline mode, the key of the returned `AesOpeKey` is
/// the private part of the blinded identity key. Otherwise, the key is the private part of the
/// descriptor signing key.
///
/// Returns an error if the service is running in offline mode and the descriptor signing
/// keypair of the specified `period` is not available.
//
// TODO (#1194): we don't support "offline" mode (yet), so this always returns an AesOpeKey
// built from the blinded id key
fn create_ope_key(&self, period: TimePeriod) -> Result<AesOpeKey, FatalError> {
let ope_key = match read_blind_id_keypair(&self.keymgr, &self.nickname, period)? {
Some(key) => {
let key: ed25519::ExpandedKeypair = key.into();
key.to_secret_key_bytes()[0..32]
.try_into()
.expect("Wrong length on slice")
}
None => {
// TODO (#1194): we don't support externally provisioned keys (yet), so this branch
// is unreachable (for now).
let desc_sign_key_spec =
DescSigningKeypairSpecifier::new(self.nickname.clone(), period);
let key: ed25519::Keypair = self
.keymgr
.get::<HsDescSigningKeypair>(&desc_sign_key_spec)?
// TODO (#1194): internal! is not the right type for this error (we need an
// error type for the case where a hidden service running in offline mode has
// run out of its pre-previsioned keys).
//
// This will be addressed when we add support for offline hs_id mode
.ok_or_else(|| internal!("identity keys are offline, but descriptor signing key is unavailable?!"))?
.into();
key.to_bytes()
}
};
Ok(AesOpeKey::from_secret(&ope_key))
}
/// Generate a revision counter for a descriptor associated with the specified
/// [`TimePeriod`].
///
/// Returns a revision counter generated according to the [encrypted time in period] scheme.
///
/// [encrypted time in period]: https://spec.torproject.org/rend-spec/revision-counter-mgt.html#encrypted-time
fn generate_revision_counter(
&self,
params: &HsDirParams,
now: SystemTime,
) -> Result<RevisionCounter, FatalError> {
// TODO: in the future, we might want to compute ope_key once per time period (as oppposed
// to each time we generate a new descriptor), for performance reasons.
let ope_key = self.create_ope_key(params.time_period())?;
// TODO: perhaps this should be moved to a new HsDirParams::offset_within_sr() function
let srv_start = params.start_of_shard_rand_period();
let offset = params.offset_within_srv_period(now).ok_or_else(|| {
internal!(
"current wallclock time not within SRV range?! (now={:?}, SRV_start={:?})",
now,
srv_start
)
})?;
let rev = ope_key.encrypt(offset);
Ok(RevisionCounter::from(rev))
}
}
/// Mockable state for the descriptor publisher reactor.
///
/// This enables us to mock parts of the [`Reactor`] for testing purposes.
#[async_trait]
pub(crate) trait Mockable: Clone + Send + Sync + Sized + 'static {
/// The type of random number generator.
type Rng: rand::Rng + rand::CryptoRng;
/// The type of client circuit.
type ClientCirc: MockableClientCirc;
/// Return a random number generator.
fn thread_rng(&self) -> Self::Rng;
/// Create a circuit of the specified `kind` to `target`.
async fn get_or_launch_specific<T>(
&self,
netdir: &NetDir,
kind: HsCircKind,
target: T,
) -> Result<Arc<Self::ClientCirc>, tor_circmgr::Error>
where
T: CircTarget + Send + Sync;
/// Return an estimate-based value for how long we should allow a single
/// directory upload operation to complete.
///
/// Includes circuit construction, stream opening, upload, and waiting for a
/// response.
fn estimate_upload_timeout(&self) -> Duration;
}
/// Mockable client circuit
#[async_trait]
pub(crate) trait MockableClientCirc: Send + Sync {
/// The data stream type.
type DataStream: AsyncRead + AsyncWrite + Send + Unpin;
/// Start a new stream to the last relay in the circuit, using
/// a BEGIN_DIR cell.
async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error>;
}
#[async_trait]
impl MockableClientCirc for ClientCirc {
type DataStream = tor_proto::stream::DataStream;
async fn begin_dir_stream(self: Arc<Self>) -> Result<Self::DataStream, tor_proto::Error> {
ClientCirc::begin_dir_stream(self).await
}
}
/// The real version of the mockable state of the reactor.
#[derive(Clone, From, Into)]
pub(crate) struct Real<R: Runtime>(Arc<HsCircPool<R>>);
#[async_trait]
impl<R: Runtime> Mockable for Real<R> {
type Rng = rand::rngs::ThreadRng;
type ClientCirc = ClientCirc;
fn thread_rng(&self) -> Self::Rng {
rand::thread_rng()
}
async fn get_or_launch_specific<T>(
&self,
netdir: &NetDir,
kind: HsCircKind,
target: T,
) -> Result<Arc<ClientCirc>, tor_circmgr::Error>
where
T: CircTarget + Send + Sync,
{
self.0.get_or_launch_specific(netdir, kind, target).await
}
fn estimate_upload_timeout(&self) -> Duration {
use tor_circmgr::timeouts::Action;
let est_build = self.0.estimate_timeout(&Action::BuildCircuit { length: 4 });
let est_roundtrip = self.0.estimate_timeout(&Action::RoundTrip { length: 4 });
// We assume that in the worst case we'll have to wait for an entire
// circuit construction and two round-trips to the hsdir.
let est_total = est_build + est_roundtrip * 2;
// We always allow _at least_ this much time, in case our estimate is
// ridiculously low.
let min_timeout = Duration::from_secs(30);
max(est_total, min_timeout)
}
}
/// The mutable state of a [`Reactor`].
struct Inner {
/// The onion service config.
config: Arc<OnionServiceConfig>,
/// The relevant time periods.
///
/// This includes the current time period, as well as any other time periods we need to be
/// publishing descriptors for.
///
/// This is empty until we fetch our first netdir in [`Reactor::run`].
time_periods: Vec<TimePeriodContext>,
/// Our most up to date netdir.
///
/// This is initialized in [`Reactor::run`].
netdir: Option<Arc<NetDir>>,
/// The timestamp of our last upload.
///
/// This is the time when the last update was _initiated_ (rather than completed), to prevent
/// the publisher from spawning multiple upload tasks at once in response to multiple external
/// events happening in quick succession, such as the IPT manager sending multiple IPT change
/// notifications in a short time frame (#1142), or an IPT change notification that's
/// immediately followed by a consensus change. Starting two upload tasks at once is not only
/// inefficient, but it also causes the publisher to generate two different descriptors with
/// the same revision counter (the revision counter is derived from the current timestamp),
/// which ultimately causes the slower upload task to fail (see #1142).
///
/// Note: This is only used for deciding when to reschedule a rate-limited upload. It is _not_
/// used for retrying failed uploads (these are handled internally by
/// [`Reactor::upload_descriptor_with_retries`]).
last_uploaded: Option<Instant>,
/// A max-heap containing the time periods for which we need to reupload the descriptor.
// TODO: we are currently reuploading more than nececessary.
// Ideally, this shouldn't contain contain duplicate TimePeriods,
// because we only need to retain the latest reupload time for each time period.
//
// Currently, if, for some reason, we upload the descriptor multiple times for the same TP,
// we will end up with multiple ReuploadTimer entries for that TP,
// each of which will (eventually) result in a reupload.
//
// TODO: maybe this should just be a HashMap<TimePeriod, Instant>
//
// See https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1971#note_2994950
reupload_timers: BinaryHeap<ReuploadTimer>,
}
/// The part of the reactor state that changes with every time period.
struct TimePeriodContext {
/// The HsDir params.
params: HsDirParams,
/// The blinded HsId.
blind_id: HsBlindId,
/// The HsDirs to use in this time period.
///
// We keep a list of `RelayIds` because we can't store a `Relay<'_>` inside the reactor
// (the lifetime of a relay is tied to the lifetime of its corresponding `NetDir`. To
// store `Relay<'_>`s in the reactor, we'd need a way of atomically swapping out both the
// `NetDir` and the cached relays, and to convince Rust what we're doing is sound)
hs_dirs: Vec<(RelayIds, DescriptorStatus)>,
/// The revision counter of the last successful upload, if any.
last_successful: Option<RevisionCounter>,
}
impl TimePeriodContext {
/// Create a new `TimePeriodContext`.
///
/// Any of the specified `old_hsdirs` also present in the new list of HsDirs
/// (returned by `NetDir::hs_dirs_upload`) will have their `DescriptorStatus` preserved.
fn new<'r>(
params: HsDirParams,
blind_id: HsBlindId,
netdir: &Arc<NetDir>,
old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
) -> Result<Self, FatalError> {
let period = params.time_period();
Ok(Self {
params,
blind_id,
hs_dirs: Self::compute_hsdirs(period, blind_id, netdir, old_hsdirs)?,
last_successful: None,
})
}
/// Recompute the HsDirs for this time period.
fn compute_hsdirs<'r>(
period: TimePeriod,
blind_id: HsBlindId,
netdir: &Arc<NetDir>,
mut old_hsdirs: impl Iterator<Item = &'r (RelayIds, DescriptorStatus)>,
) -> Result<Vec<(RelayIds, DescriptorStatus)>, FatalError> {
let hs_dirs = netdir.hs_dirs_upload(blind_id, period)?;
Ok(hs_dirs
.map(|hs_dir| {
let mut builder = RelayIds::builder();
if let Some(ed_id) = hs_dir.ed_identity() {
builder.ed_identity(*ed_id);
}
if let Some(rsa_id) = hs_dir.rsa_identity() {
builder.rsa_identity(*rsa_id);
}
let relay_id = builder.build().unwrap_or_else(|_| RelayIds::empty());
// Have we uploaded the descriptor to thiw relay before? If so, we don't need to
// reupload it unless it was already dirty and due for a reupload.
let status = match old_hsdirs.find(|(id, _)| *id == relay_id) {
Some((_, status)) => *status,
None => DescriptorStatus::Dirty,
};
(relay_id, status)
})
.collect::<Vec<_>>())
}
/// Mark the descriptor dirty for all HSDirs of this time period.
fn mark_all_dirty(&mut self) {
self.hs_dirs
.iter_mut()
.for_each(|(_relay_id, status)| *status = DescriptorStatus::Dirty);
}
}
/// Authorized client configuration error.
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub(crate) enum AuthorizedClientConfigError {
/// A key is malformed if it doesn't start with the "curve25519" prefix,
/// or if its decoded content is not exactly 32 bytes long.
#[error("Malformed authorized client key")]
MalformedKey,
/// Error while decoding an authorized client's key.
#[error("Failed base64-decode an authorized client's key")]
Base64Decode(#[from] base64ct::Error),
/// Error while accessing the authorized_client key dir.
#[error("Failed to {action} file {path}")]
KeyDir {
/// What we were doing when we encountered the error.
action: &'static str,
/// The file that we were trying to access.
path: std::path::PathBuf,
/// The underlying I/O error.
#[source]
error: Arc<std::io::Error>,
},
/// Error while accessing the authorized_client key dir.
#[error("expected regular file, found directory: {path}")]
MalformedFile {
/// The file that we were trying to access.
path: std::path::PathBuf,
},
}
/// An error that occurs while trying to upload a descriptor.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum UploadError {
/// An error that has occurred after we have contacted a directory cache and made a circuit to it.
#[error("descriptor upload request failed: {}", _0.error)]
Request(#[from] RequestFailedError),
/// Failed to establish circuit to hidden service directory
#[error("could not build circuit to HsDir")]
Circuit(#[from] tor_circmgr::Error),
/// Failed to establish stream to hidden service directory
#[error("failed to establish directory stream to HsDir")]
Stream(#[source] tor_proto::Error),
/// A descriptor upload timed out before it could complete.
#[error("descriptor publication timed out")]
Timeout,
/// An internal error.
#[error("Internal error")]
Bug(#[from] tor_error::Bug),
}
define_asref_dyn_std_error!(UploadError);
impl<R: Runtime, M: Mockable> Reactor<R, M> {
/// Create a new `Reactor`.
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
runtime: R,
nickname: HsNickname,
dir_provider: Arc<dyn NetDirProvider>,
mockable: M,
config: Arc<OnionServiceConfig>,
ipt_watcher: IptsPublisherView,
config_rx: watch::Receiver<Arc<OnionServiceConfig>>,
status_tx: PublisherStatusSender,
keymgr: Arc<KeyMgr>,
) -> Self {
/// The maximum size of the upload completion notifier channel.
///
/// The channel we use this for is a futures::mpsc channel, which has a capacity of
/// `UPLOAD_CHAN_BUF_SIZE + num-senders`. We don't need the buffer size to be non-zero, as
/// each sender will send exactly one message.
const UPLOAD_CHAN_BUF_SIZE: usize = 0;
let (upload_task_complete_tx, upload_task_complete_rx) =
mpsc::channel(UPLOAD_CHAN_BUF_SIZE);
let (publish_status_tx, publish_status_rx) = watch::channel();
// Setting the buffer size to zero here is OK,
// since we never actually send anything on this channel.
let (shutdown_tx, _shutdown_rx) = broadcast::channel(0);
let imm = Immutable {
runtime,
mockable,
nickname,
keymgr,
status_tx,
};
let inner = Inner {
time_periods: vec![],
config,
netdir: None,
last_uploaded: None,
reupload_timers: Default::default(),
};
Self {
imm: Arc::new(imm),
inner: Arc::new(Mutex::new(inner)),
dir_provider,
ipt_watcher,
config_rx,
publish_status_rx,
publish_status_tx,
upload_task_complete_rx,
upload_task_complete_tx,
shutdown_tx,
}
}
/// Start the reactor.
///
/// Under normal circumstances, this function runs indefinitely.
///
/// Note: this also spawns the "reminder task" that we use to reschedule uploads whenever an
/// upload fails or is rate-limited.
pub(super) async fn run(mut self) -> Result<(), FatalError> {
debug!(nickname=%self.imm.nickname, "starting descriptor publisher reactor");
{
let netdir = wait_for_netdir(self.dir_provider.as_ref(), Timeliness::Timely).await?;
let time_periods = self.compute_time_periods(&netdir, &[])?;
let mut inner = self.inner.lock().expect("poisoned lock");
inner.netdir = Some(netdir);
inner.time_periods = time_periods;
}
let nickname = self.imm.nickname.clone();
let rt = self.imm.runtime.clone();
let status_tx = self.imm.status_tx.clone();
loop {
match self.run_once().await {
Ok(ShutdownStatus::Continue) => continue,
Ok(ShutdownStatus::Terminate) => {
debug!(nickname=%self.imm.nickname, "descriptor publisher is shutting down!");
self.imm.status_tx.send_shutdown();
return Ok(());
}
Err(e) => {
error_report!(
e,
"HS service {}: descriptor publisher crashed!",
self.imm.nickname
);
self.imm.status_tx.send_broken(e.clone());
return Err(e);
}
}
}
}
/// Run one iteration of the reactor loop.
async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
let mut netdir_events = self.dir_provider.events();
// Note: TrackingNow tracks the values it is compared with.
// This is equivalent to sleeping for (until - now) units of time,
let upload_rate_lim: TrackingNow = TrackingNow::now(&self.imm.runtime);
if let PublishStatus::RateLimited(until) = self.status() {
if upload_rate_lim > until {
// We are no longer rate-limited
self.expire_rate_limit().await?;
}
}
let reupload_tracking = TrackingNow::now(&self.imm.runtime);
let mut reupload_periods = vec![];
{
let mut inner = self.inner.lock().expect("poisoned lock");
let inner = &mut *inner;
while let Some(reupload) = inner.reupload_timers.peek().copied() {
// First, extract all the timeouts that already elapsed.
if reupload.when <= reupload_tracking {
inner.reupload_timers.pop();
reupload_periods.push(reupload.period);
} else {
// We are not ready to schedule any more reuploads.
//
// How much we need to sleep is implicitly
// tracked in reupload_tracking (through
// the TrackingNow implementation)
break;
}
}
}
// Check if it's time to schedule any reuploads.
for period in reupload_periods {
if self.mark_dirty(&period) {
debug!(
time_period=?period,
"descriptor reupload timer elapsed; scheduling reupload",
);
self.update_publish_status_unless_rate_lim(PublishStatus::UploadScheduled)
.await?;
}
}
select_biased! {
res = self.upload_task_complete_rx.next().fuse() => {
let Some(upload_res) = res else {
return Ok(ShutdownStatus::Terminate);
};
self.handle_upload_results(upload_res);
},
() = upload_rate_lim.wait_for_earliest(&self.imm.runtime).fuse() => {
self.expire_rate_limit().await?;
},
() = reupload_tracking.wait_for_earliest(&self.imm.runtime).fuse() => {
// Run another iteration, executing run_once again. This time, we will remove the
// expired reupload from self.reupload_timers, mark the descriptor dirty for all
// relevant HsDirs, and schedule the upload by setting our status to
// UploadScheduled.
return Ok(ShutdownStatus::Continue);
},
netidr_event = netdir_events.next().fuse() => {
// The consensus changed. Grab a new NetDir.
let netdir = match self.dir_provider.netdir(Timeliness::Timely) {
Ok(y) => y,
Err(e) => {
error_report!(e, "HS service {}: netdir unavailable. Retrying...", self.imm.nickname);
// Hopefully a netdir will appear in the future.
// in the meantime, suspend operations.
//
// TODO (#1218): there is a bug here: we stop reading on our inputs
// including eg publish_status_rx, but it is our job to log some of
// these things. While we are waiting for a netdir, all those messages
// are "stuck"; they'll appear later, with misleading timestamps.
//
// Probably this should be fixed by moving the logging
// out of the reactor, where it won't be blocked.
wait_for_netdir(self.dir_provider.as_ref(), Timeliness::Timely)
.await?
}
};
let relevant_periods = netdir.hs_all_time_periods();
self.handle_consensus_change(netdir).await?;
expire_publisher_keys(
&self.imm.keymgr,
&self.imm.nickname,
&relevant_periods,
).unwrap_or_else(|e| {
error_report!(e, "failed to remove expired keys");
});
}
update = self.ipt_watcher.await_update().fuse() => {
if self.handle_ipt_change(update).await? == ShutdownStatus::Terminate {
return Ok(ShutdownStatus::Terminate);
}
},
config = self.config_rx.next().fuse() => {
let Some(config) = config else {
return Ok(ShutdownStatus::Terminate);
};
self.handle_svc_config_change(config).await?;
},
should_upload = self.publish_status_rx.next().fuse() => {
let Some(should_upload) = should_upload else {
return Ok(ShutdownStatus::Terminate);
};
// Our PublishStatus changed -- are we ready to publish?
if should_upload == PublishStatus::UploadScheduled {
self.update_publish_status_unless_waiting(PublishStatus::Idle).await?;
self.upload_all().await?;
}
}
}
Ok(ShutdownStatus::Continue)
}
/// Returns the current status of the publisher
fn status(&self) -> PublishStatus {
*self.publish_status_rx.borrow()
}
/// Handle a batch of upload outcomes,
/// possibly updating the status of the descriptor for the corresponding HSDirs.
fn handle_upload_results(&self, results: TimePeriodUploadResult) {
let mut inner = self.inner.lock().expect("poisoned lock");
let inner = &mut *inner;
// Check which time period these uploads pertain to.
let period = inner
.time_periods
.iter_mut()
.find(|ctx| ctx.params.time_period() == results.time_period);
let Some(period) = period else {
// The uploads were for a time period that is no longer relevant, so we
// can ignore the result.
return;
};
// We will need to reupload this descriptor at at some point, so we pick
// a random time between 60 minutes and 120 minutes in the future.
//
// See https://spec.torproject.org/rend-spec/deriving-keys.html#WHEN-HSDESC
let mut rng = self.imm.mockable.thread_rng();
// TODO SPEC: Control republish period using a consensus parameter?
let minutes = rng.gen_range_checked(60..=120).expect("low > high?!");
let duration = Duration::from_secs(minutes * 60);
let reupload_when = self.imm.runtime.now() + duration;
let time_period = period.params.time_period();
info!(
time_period=?time_period,
"reuploading descriptor in {}",
humantime::format_duration(duration),
);
inner.reupload_timers.push(ReuploadTimer {
period: time_period,
when: reupload_when,
});
for upload_res in results.hsdir_result {
let relay = period
.hs_dirs
.iter_mut()
.find(|(relay_ids, _status)| relay_ids == &upload_res.relay_ids);
let Some((relay, status)) = relay else {
// This HSDir went away, so the result doesn't matter.
// Continue processing the rest of the results
continue;
};
if upload_res.upload_res == UploadStatus::Success {
let update_last_successful = match period.last_successful {
None => true,
Some(counter) => counter <= upload_res.revision_counter,
};
if update_last_successful {
period.last_successful = Some(upload_res.revision_counter);
// TODO (#1098): Is it possible that this won't update the statuses promptly
// enough. For example, it's possible for the reactor to see a Dirty descriptor
// and start an upload task for a descriptor has already been uploaded (or is
// being uploaded) in another task, but whose upload results have not yet been
// processed.
//
// This is probably made worse by the fact that the statuses are updated in
// batches (grouped by time period), rather than one by one as the upload tasks
// complete (updating the status involves locking the inner mutex, and I wanted
// to minimize the locking/unlocking overheads). I'm not sure handling the
// updates in batches was the correct decision here.
*status = DescriptorStatus::Clean;
}
}
}
}
/// Maybe update our list of HsDirs.
async fn handle_consensus_change(&mut self, netdir: Arc<NetDir>) -> Result<(), FatalError> {
trace!("the consensus has changed; recomputing HSDirs");
let _old: Option<Arc<NetDir>> = self.replace_netdir(netdir);
self.recompute_hs_dirs()?;
self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
.await?;
Ok(())
}
/// Recompute the HsDirs for all relevant time periods.
fn recompute_hs_dirs(&self) -> Result<(), FatalError> {
let mut inner = self.inner.lock().expect("poisoned lock");
let inner = &mut *inner;
let netdir = Arc::clone(
inner
.netdir
.as_ref()
.ok_or_else(|| internal!("started upload task without a netdir"))?,
);
// Update our list of relevant time periods.
let new_time_periods = self.compute_time_periods(&netdir, &inner.time_periods)?;
inner.time_periods = new_time_periods;
Ok(())
}
/// Compute the [`TimePeriodContext`]s for the time periods from the specified [`NetDir`].
///
/// The specified `time_periods` are used to preserve the `DescriptorStatus` of the
/// HsDirs where possible.
fn compute_time_periods(
&self,
netdir: &Arc<NetDir>,
time_periods: &[TimePeriodContext],
) -> Result<Vec<TimePeriodContext>, FatalError> {
netdir
.hs_all_time_periods()
.iter()
.map(|params| {
let period = params.time_period();
let svc_key_spec = HsIdKeypairSpecifier::new(self.imm.nickname.clone());
let hsid_kp = self
.imm
.keymgr
.get::<HsIdKeypair>(&svc_key_spec)?
.ok_or_else(|| FatalError::MissingHsIdKeypair(self.imm.nickname.clone()))?;
let svc_key_spec = BlindIdKeypairSpecifier::new(self.imm.nickname.clone(), period);
let blind_id_kp =
read_blind_id_keypair(&self.imm.keymgr, &self.imm.nickname, period)?
// Note: for now, read_blind_id_keypair cannot return Ok(None).
// It's supposed to return Ok(None) if we're in offline hsid mode,
// but that might change when we do #1194
.ok_or_else(|| internal!("offline hsid mode not supported"))?;
let blind_id: HsBlindIdKey = (&blind_id_kp).into();
// If our previous `TimePeriodContext`s also had an entry for `period`, we need to
// preserve the `DescriptorStatus` of its HsDirs. This helps prevent unnecessarily
// publishing the descriptor to the HsDirs that already have it (the ones that are
// marked with DescriptorStatus::Clean).
//
// In other words, we only want to publish to those HsDirs that
// * are part of a new time period (which we have never published the descriptor
// for), or
// * have just been added to the ring of a time period we already knew about
if let Some(ctx) = time_periods
.iter()
.find(|ctx| ctx.params.time_period() == period)
{
TimePeriodContext::new(
params.clone(),
blind_id.into(),
netdir,
ctx.hs_dirs.iter(),
)
} else {
// Passing an empty iterator here means all HsDirs in this TimePeriodContext
// will be marked as dirty, meaning we will need to upload our descriptor to them.
TimePeriodContext::new(params.clone(), blind_id.into(), netdir, iter::empty())
}
})
.collect::<Result<Vec<TimePeriodContext>, FatalError>>()
}
/// Replace the old netdir with the new, returning the old.
fn replace_netdir(&self, new_netdir: Arc<NetDir>) -> Option<Arc<NetDir>> {
self.inner
.lock()
.expect("poisoned lock")
.netdir
.replace(new_netdir)
}
/// Replace our view of the service config with `new_config` if `new_config` contains changes
/// that would cause us to generate a new descriptor.
fn replace_config_if_changed(&self, new_config: Arc<OnionServiceConfig>) -> bool {
let mut inner = self.inner.lock().expect("poisoned lock");
let old_config = &mut inner.config;
// The fields we're interested in haven't changed, so there's no need to update
// `inner.config`.
//
// TODO: maybe `Inner` should only contain the fields we're interested in instead of
// the entire config.
//
// Alternatively, a less error-prone solution would be to introduce a separate
// `DescriptorConfigView` as described in
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/1603#note_2944902
// TODO (#1206): Temporarily disabled while we figure out how we want the client auth config to
// work; see #1028
/*
if old_config.anonymity == new_config.anonymity
&& old_config.encrypt_descriptor == new_config.encrypt_descriptor
{
return false;
}
*/
let _old: Arc<OnionServiceConfig> = std::mem::replace(old_config, new_config);
true
}
/// Read the intro points from `ipt_watcher`, and decide whether we're ready to start
/// uploading.
fn note_ipt_change(&self) -> PublishStatus {
let inner = self.inner.lock().expect("poisoned lock");
let mut ipts = self.ipt_watcher.borrow_for_publish();
match ipts.ipts.as_mut() {
Some(ipts) => PublishStatus::UploadScheduled,
None => PublishStatus::AwaitingIpts,
}
}
/// Update our list of introduction points.
async fn handle_ipt_change(
&mut self,
update: Option<Result<(), crate::FatalError>>,
) -> Result<ShutdownStatus, FatalError> {
trace!(nickname=%self.imm.nickname, "received IPT change notification from IPT manager");
match update {
Some(Ok(())) => {
let should_upload = self.note_ipt_change();
debug!(nickname=%self.imm.nickname, "the introduction points have changed");
self.mark_all_dirty();
self.update_publish_status_unless_rate_lim(should_upload)
.await?;
Ok(ShutdownStatus::Continue)
}
Some(Err(e)) => Err(e),
None => {
debug!(nickname=%self.imm.nickname, "received shut down signal from IPT manager");
Ok(ShutdownStatus::Terminate)
}
}
}
/// Update the `PublishStatus` of the reactor with `new_state`,
/// unless the current state is `AwaitingIpts`.
async fn update_publish_status_unless_waiting(
&mut self,
new_state: PublishStatus,
) -> Result<(), FatalError> {
// Only update the state if we're not waiting for intro points.
if self.status() != PublishStatus::AwaitingIpts {
self.update_publish_status(new_state).await?;
}
Ok(())
}
/// Update the `PublishStatus` of the reactor with `new_state`,
/// unless the current state is `RateLimited`.
async fn update_publish_status_unless_rate_lim(
&mut self,
new_state: PublishStatus,
) -> Result<(), FatalError> {
// We can't exit this state until the rate-limit expires.
if !matches!(self.status(), PublishStatus::RateLimited(_)) {
self.update_publish_status(new_state).await?;
}
Ok(())
}
/// Unconditionally update the `PublishStatus` of the reactor with `new_state`.
async fn update_publish_status(&mut self, new_state: PublishStatus) -> Result<(), FatalError> {
let onion_status = match new_state {
PublishStatus::Idle => State::Running,
PublishStatus::UploadScheduled
| PublishStatus::AwaitingIpts
| PublishStatus::RateLimited(_) => State::Bootstrapping,
};
self.imm.status_tx.send(onion_status, None);
trace!(
"publisher reactor status change: {:?} -> {:?}",
self.status(),
new_state
);
self.publish_status_tx.send(new_state).await.map_err(
|_: postage::sink::SendError<_>| internal!("failed to send upload notification?!"),
)?;
Ok(())
}
/// Use the new keys.
async fn handle_new_keys(&self) -> Result<(), FatalError> {
todo!()
}
/// Update the descriptors based on the config change.
async fn handle_svc_config_change(
&mut self,
config: Arc<OnionServiceConfig>,
) -> Result<(), FatalError> {
if self.replace_config_if_changed(config) {
self.mark_all_dirty();
// Schedule an upload, unless we're still waiting for IPTs.
self.update_publish_status_unless_waiting(PublishStatus::UploadScheduled)
.await?;
}
Ok(())
}
/// Mark the descriptor dirty for all time periods.
fn mark_all_dirty(&self) {
trace!("marking the descriptor dirty for all time periods");
self.inner
.lock()
.expect("poisoned lock")
.time_periods
.iter_mut()
.for_each(|tp| tp.mark_all_dirty());
}
/// Mark the descriptor dirty for the specified time period.
///
/// Returns `true` if the specified period is still relevant, and `false` otherwise.
fn mark_dirty(&self, period: &TimePeriod) -> bool {
let mut inner = self.inner.lock().expect("poisoned lock");
let period_ctx = inner
.time_periods
.iter_mut()
.find(|tp| tp.params.time_period() == *period);
match period_ctx {
Some(ctx) => {
trace!(time_period=?period, "marking the descriptor dirty");
ctx.mark_all_dirty();
true
}
None => false,
}
}
/// Try to upload our descriptor to the HsDirs that need it.
///
/// If we've recently uploaded some descriptors, we return immediately and schedule the upload
/// to happen after [`UPLOAD_RATE_LIM_THRESHOLD`].
///
/// Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we
/// implement it, as well as in what cases this will return an error).
async fn upload_all(&mut self) -> Result<(), FatalError> {
trace!("starting descriptor upload task...");
let last_uploaded = self.inner.lock().expect("poisoned lock").last_uploaded;
let now = self.imm.runtime.now();
// Check if we should rate-limit this upload.
if let Some(ts) = last_uploaded {
let duration_since_upload = now.duration_since(ts);
if duration_since_upload < UPLOAD_RATE_LIM_THRESHOLD {
return self.start_rate_limit(UPLOAD_RATE_LIM_THRESHOLD).await;
}
}
let mut inner = self.inner.lock().expect("poisoned lock");
let inner = &mut *inner;
let _ = inner.last_uploaded.insert(now);
for period_ctx in inner.time_periods.iter_mut() {
let upload_task_complete_tx = self.upload_task_complete_tx.clone();
// Figure out which HsDirs we need to upload the descriptor to (some of them might already
// have our latest descriptor, so we filter them out).
let hs_dirs = period_ctx
.hs_dirs
.iter()
.filter_map(|(relay_id, status)| {
if *status == DescriptorStatus::Dirty {
Some(relay_id.clone())
} else {
None
}
})
.collect::<Vec<_>>();
if hs_dirs.is_empty() {
trace!("the descriptor is clean for all HSDirs. Nothing to do");
return Ok(());
}
let time_period = period_ctx.params.time_period();
// This scope exists because rng is not Send, so it needs to fall out of scope before we
// await anything.
let netdir = Arc::clone(
inner
.netdir
.as_ref()
.ok_or_else(|| internal!("started upload task without a netdir"))?,
);
let imm = Arc::clone(&self.imm);
let ipt_upload_view = self.ipt_watcher.upload_view();
let config = Arc::clone(&inner.config);
trace!(nickname=%self.imm.nickname, time_period=?time_period,
"spawning upload task"
);
let params = period_ctx.params.clone();
let shutdown_rx = self.shutdown_tx.subscribe();
// Spawn a task to upload the descriptor to all HsDirs of this time period.
//
// This task will shut down when the reactor is dropped (i.e. when shutdown_rx is
// dropped).
let _handle: () = self
.imm
.runtime
.spawn(async move {
if let Err(e) = Self::upload_for_time_period(
hs_dirs,
&netdir,
config,
params,
Arc::clone(&imm),
ipt_upload_view.clone(),
upload_task_complete_tx,
shutdown_rx,
)
.await
{
error_report!(
e,
"descriptor upload failed for HS service {} and time period {:?}",
imm.nickname,
time_period
);
}
})
.map_err(|e| FatalError::from_spawn("upload_for_time_period task", e))?;
}
Ok(())
}
/// Upload the descriptor for the specified time period.
///
/// Any failed uploads are retried (TODO (#1216, #1098): document the retry logic when we
/// implement it, as well as in what cases this will return an error).
#[allow(clippy::too_many_arguments)] // TODO: refactor
async fn upload_for_time_period(
hs_dirs: Vec<RelayIds>,
netdir: &Arc<NetDir>,
config: Arc<OnionServiceConfig>,
params: HsDirParams,
imm: Arc<Immutable<R, M>>,
ipt_upload_view: IptsPublisherUploadView,
mut upload_task_complete_tx: mpsc::Sender<TimePeriodUploadResult>,
shutdown_rx: broadcast::Receiver<Void>,
) -> Result<(), FatalError> {
let time_period = params.time_period();
trace!(time_period=?time_period, "uploading descriptor to all HSDirs for this time period");
let hsdir_count = hs_dirs.len();
/// An error returned from an upload future.
//
// Exhaustive, because this is a private type.
#[derive(Clone, Debug, thiserror::Error)]
enum PublishError {
/// The upload was aborted because there are no IPTs.
///
/// This happens because of an inevitable TOCTOU race, where after being notified by
/// the IPT manager that the IPTs have changed (via `self.ipt_watcher.await_update`),
/// we find out there actually are no IPTs, so we can't build the descriptor.
///
/// This is a special kind of error that interrupts the current upload task, and is
/// logged at `debug!` level rather than `warn!` or `error!`.
///
/// Ideally, this shouldn't happen very often (if at all).
#[error("No IPTs")]
NoIpts,
/// The reactor has shut down
#[error("The reactor has shut down")]
Shutdown,
/// An fatal error.
#[error("{0}")]
Fatal(#[from] FatalError),
}
let upload_results = futures::stream::iter(hs_dirs)
.map(|relay_ids| {
let netdir = netdir.clone();
let config = Arc::clone(&config);
let imm = Arc::clone(&imm);
let ipt_upload_view = ipt_upload_view.clone();
let params = params.clone();
let mut shutdown_rx = shutdown_rx.clone();
let ed_id = relay_ids
.rsa_identity()
.map(|id| id.to_string())
.unwrap_or_else(|| "unknown".into());
let rsa_id = relay_ids
.rsa_identity()
.map(|id| id.to_string())
.unwrap_or_else(|| "unknown".into());
async move {
let run_upload = |desc| async {
let Some(hsdir) = netdir.by_ids(&relay_ids) else {
// This should never happen (all of our relay_ids are from the stored
// netdir).
warn!(
nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
"tried to upload descriptor to relay not found in consensus?!"
);
return UploadStatus::Failure;
};
Self::upload_descriptor_with_retries(
desc,
&netdir,
&hsdir,
&ed_id,
&rsa_id,
Arc::clone(&imm),
)
.await
};
// How long until we're supposed to time out?
let worst_case_end = imm.runtime.now() + OVERALL_UPLOAD_TIMEOUT;
// We generate a new descriptor before _each_ HsDir upload. This means each
// HsDir could, in theory, receive a different descriptor (not just in terms of
// revision-counters, but also with a different set of IPTs). It may seem like
// this could lead to some HsDirs being left with an outdated descriptor, but
// that's not the case: after the upload completes, the publisher will be
// notified by the ipt_watcher of the IPT change event (if there was one to
// begin with), which will trigger another upload job.
let hsdesc = {
// This scope is needed because the ipt_set MutexGuard is not Send, so it
// needs to fall out of scope before the await point below
let mut ipt_set = ipt_upload_view.borrow_for_publish();
// If there are no IPTs, we abort the upload. At this point, we might have
// uploaded the descriptor to some, but not all, HSDirs from the specified
// time period.
//
// Returning an error here means the upload completion task is never
// notified of the outcome of any of these uploads (which means the
// descriptor is not marked clean). This is OK, because if we suddenly find
// out we have no IPTs, it means our built `hsdesc` has an outdated set of
// IPTs, so we need to go back to the main loop to wait for IPT changes,
// and generate a fresh descriptor anyway.
//
// Ideally, this shouldn't happen very often (if at all).
let Some(ipts) = ipt_set.ipts.as_mut() else {
return Err(PublishError::NoIpts);
};
let hsdesc = {
trace!(
nickname=%imm.nickname, time_period=?time_period,
"building descriptor"
);
let mut rng = imm.mockable.thread_rng();
// We're about to generate a new version of the descriptor,
// so let's generate a new revision counter.
let now = imm.runtime.wallclock();
let revision_counter = imm.generate_revision_counter(¶ms, now)?;
build_sign(
&imm.keymgr,
&config,
ipts,
time_period,
revision_counter,
&mut rng,
imm.runtime.wallclock(),
)?
};
if let Err(e) =
ipt_set.note_publication_attempt(&imm.runtime, worst_case_end)
{
let wait = e.log_retry_max(&imm.nickname)?;
// TODO (#1226): retry instead of this
return Err(FatalError::Bug(internal!(
"ought to retry after {wait:?}, crashing instead"
))
.into());
}
hsdesc
};
let VersionedDescriptor {
desc,
revision_counter,
} = hsdesc;
trace!(
nickname=%imm.nickname, time_period=?time_period,
revision_counter=?revision_counter,
"generated new descriptor for time period",
);
// (Actually launch the upload attempt. No timeout is needed
// here, since the backoff::Runner code will handle that for us.)
let upload_res = select_biased! {
shutdown = shutdown_rx.next().fuse() => {
match shutdown {
Some(_) => unreachable!("received Void value?!"),
None => {
// It looks like the reactor has shut down,
// so there is no point in uploading the descriptor anymore.
//
// Let's shut down the upload task too.
trace!(
nickname=%imm.nickname, time_period=?time_period,
"upload task received shutdown signal"
);
return Err(PublishError::Shutdown);
}
}
},
res = run_upload(desc.clone()).fuse() => res,
};
// Note: UploadStatus::Failure is only returned when
// upload_descriptor_with_retries fails, i.e. if all our retry
// attempts have failed
Ok(HsDirUploadStatus {
relay_ids,
upload_res,
revision_counter,
})
}
})
// This fails to compile unless the stream is boxed. See https://github.com/rust-lang/rust/issues/104382
.boxed()
.buffer_unordered(MAX_CONCURRENT_UPLOADS)
.try_collect::<Vec<_>>()
.await;
let upload_results = match upload_results {
Ok(v) => v,
Err(PublishError::Fatal(e)) => return Err(e),
Err(PublishError::NoIpts) => {
debug!(
nickname=%imm.nickname, time_period=?time_period,
"no introduction points; skipping upload"
);
return Ok(());
}
Err(PublishError::Shutdown) => {
debug!(
nickname=%imm.nickname, time_period=?time_period,
"the reactor has shut down; aborting upload"
);
return Ok(());
}
};
let (succeeded, _failed): (Vec<_>, Vec<_>) = upload_results
.iter()
.partition(|res| res.upload_res == UploadStatus::Success);
debug!(
nickname=%imm.nickname, time_period=?time_period,
"descriptor uploaded successfully to {}/{} HSDirs",
succeeded.len(), hsdir_count
);
if let Err(e) = upload_task_complete_tx
.send(TimePeriodUploadResult {
time_period,
hsdir_result: upload_results,
})
.await
{
return Err(internal!(
"failed to notify reactor of upload completion (reactor shut down)"
)
.into());
}
Ok(())
}
/// Upload a descriptor to the specified HSDir.
///
/// If an upload fails, this returns an `Err`. This function does not handle retries. It is up
/// to the caller to retry on failure.
///
/// This function does not handle timeouts.
async fn upload_descriptor(
hsdesc: String,
netdir: &Arc<NetDir>,
hsdir: &Relay<'_>,
imm: Arc<Immutable<R, M>>,
) -> Result<(), UploadError> {
let request = HsDescUploadRequest::new(hsdesc);
trace!(nickname=%imm.nickname, hsdir_id=%hsdir.id(), hsdir_rsa_id=%hsdir.rsa_id(),
"starting descriptor upload",
);
let circuit = imm
.mockable
.get_or_launch_specific(
netdir,
HsCircKind::SvcHsDir,
OwnedCircTarget::from_circ_target(hsdir),
)
.await?;
let mut stream = circuit
.begin_dir_stream()
.await
.map_err(UploadError::Stream)?;
let response = send_request(&imm.runtime, &request, &mut stream, None)
.await
.map_err(|dir_error| -> UploadError {
match dir_error {
DirClientError::RequestFailed(e) => e.into(),
DirClientError::CircMgr(e) => into_internal!(
"tor-dirclient complains about circmgr going wrong but we gave it a stream"
)(e)
.into(),
e => into_internal!("unexpected error")(e).into(),
}
})?
.into_output_string()?; // This returns an error if we received an error response
Ok(())
}
/// Upload a descriptor to the specified HSDir, retrying if appropriate.
///
/// TODO (#1216): document the retry logic when we implement it.
async fn upload_descriptor_with_retries(
hsdesc: String,
netdir: &Arc<NetDir>,
hsdir: &Relay<'_>,
ed_id: &str,
rsa_id: &str,
imm: Arc<Immutable<R, M>>,
) -> UploadStatus {
/// The base delay to use for the backoff schedule.
const BASE_DELAY_MSEC: u32 = 1000;
let schedule = PublisherBackoffSchedule {
retry_delay: RetryDelay::from_msec(BASE_DELAY_MSEC),
mockable: imm.mockable.clone(),
};
let runner = Runner::new(
"upload a hidden service descriptor".into(),
schedule.clone(),
imm.runtime.clone(),
);
let fallible_op =
|| Self::upload_descriptor(hsdesc.clone(), netdir, hsdir, Arc::clone(&imm));
let outcome: Result<(), BackoffError<UploadError>> = runner.run(fallible_op).await;
match outcome {
Ok(()) => {
debug!(
nickname=%imm.nickname, hsdir_id=%ed_id, hsdir_rsa_id=%rsa_id,
"successfully uploaded descriptor to HSDir",
);
UploadStatus::Success
}
Err(e) => {
warn_report!(
e,
"failed to upload descriptor for service {} (hsdir_id={}, hsdir_rsa_id={})",
imm.nickname,
ed_id,
rsa_id
);
UploadStatus::Failure
}
}
}
/// Stop publishing descriptors until the specified delay elapses.
async fn start_rate_limit(&mut self, delay: Duration) -> Result<(), FatalError> {
if !matches!(self.status(), PublishStatus::RateLimited(_)) {
debug!(
"We are rate-limited for {}; pausing descriptor publication",
humantime::format_duration(delay)
);
let until = self.imm.runtime.now() + delay;
self.update_publish_status(PublishStatus::RateLimited(until))
.await?;
}
Ok(())
}
/// Handle the upload rate-limit being lifted.
async fn expire_rate_limit(&mut self) -> Result<(), FatalError> {
debug!("We are no longer rate-limited; resuming descriptor publication");
self.update_publish_status(PublishStatus::UploadScheduled)
.await?;
Ok(())
}
}
/// Try to read the blinded identity key for a given `TimePeriod`.
///
/// Returns `None` if the service is running in "offline" mode.
///
// TODO (#1194): we don't currently have support for "offline" mode so this can never return
// `Ok(None)`.
pub(super) fn read_blind_id_keypair(
keymgr: &Arc<KeyMgr>,
nickname: &HsNickname,
period: TimePeriod,
) -> Result<Option<HsBlindIdKeypair>, FatalError> {
let svc_key_spec = HsIdKeypairSpecifier::new(nickname.clone());
let hsid_kp = keymgr
.get::<HsIdKeypair>(&svc_key_spec)?
.ok_or_else(|| FatalError::MissingHsIdKeypair(nickname.clone()))?;
let blind_id_key_spec = BlindIdKeypairSpecifier::new(nickname.clone(), period);
// TODO: make the keystore selector configurable
let keystore_selector = Default::default();
match keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)? {
Some(kp) => Ok(Some(kp)),
None => {
let (_hs_blind_id_key, hs_blind_id_kp, _subcredential) = hsid_kp
.compute_blinded_key(period)
.map_err(|_| internal!("failed to compute blinded key"))?;
// Note: we can't use KeyMgr::generate because this key is derived from the HsId
// (KeyMgr::generate uses the tor_keymgr::Keygen trait under the hood,
// which assumes keys are randomly generated, rather than derived from existing keys).
keymgr.insert(hs_blind_id_kp, &blind_id_key_spec, keystore_selector)?;
let arti_path = |spec: &dyn KeySpecifier| {
spec.arti_path()
.map_err(into_internal!("invalid key specifier?!"))
};
Ok(Some(
keymgr.get::<HsBlindIdKeypair>(&blind_id_key_spec)?.ok_or(
FatalError::KeystoreRace {
action: "read",
path: arti_path(&blind_id_key_spec)?,
},
)?,
))
}
}
}
/// Whether the reactor should initiate an upload.
#[derive(Copy, Clone, Debug, Default, PartialEq)]
enum PublishStatus {
/// We need to call upload_all.
UploadScheduled,
/// We are rate-limited until the specified [`Instant`].
///
/// We have tried to schedule multiple uploads in a short time span,
/// and we are rate-limited. We are waiting for a signal from the schedule_upload_tx
/// channel to unblock us.
RateLimited(Instant),
/// We are idle and waiting for external events.
///
/// We have enough information to build the descriptor, but since we have already called
/// upload_all to upload it to all relevant HSDirs, there is nothing for us to do right nbow.
Idle,
/// We are waiting for the IPT manager to establish some introduction points.
///
/// No descriptors will be published until the `PublishStatus` of the reactor is changed to
/// `UploadScheduled`.
#[default]
AwaitingIpts,
}
/// The backoff schedule for the task that publishes descriptors.
#[derive(Clone, Debug)]
struct PublisherBackoffSchedule<M: Mockable> {
/// The delays
retry_delay: RetryDelay,
/// The mockable reactor state, needed for obtaining an rng.
mockable: M,
}
impl<M: Mockable> BackoffSchedule for PublisherBackoffSchedule<M> {
fn max_retries(&self) -> Option<usize> {
None
}
fn overall_timeout(&self) -> Option<Duration> {
Some(OVERALL_UPLOAD_TIMEOUT)
}
fn single_attempt_timeout(&self) -> Option<Duration> {
Some(self.mockable.estimate_upload_timeout())
}
fn next_delay<E: RetriableError>(&mut self, _error: &E) -> Option<Duration> {
Some(self.retry_delay.next_delay(&mut self.mockable.thread_rng()))
}
}
impl RetriableError for UploadError {
fn should_retry(&self) -> bool {
match self {
UploadError::Request(_)
| UploadError::Circuit(_)
| UploadError::Stream(_)
| UploadError::Timeout => true,
UploadError::Bug(_) => false,
}
}
}
/// The outcome of uploading a descriptor to the HSDirs from a particular time period.
#[derive(Debug, Clone)]
struct TimePeriodUploadResult {
/// The time period.
time_period: TimePeriod,
/// The upload results.
hsdir_result: Vec<HsDirUploadStatus>,
}
/// The outcome of uploading a descriptor to a particular HsDir.
#[derive(Clone, Debug, PartialEq)]
struct HsDirUploadStatus {
/// The identity of the HsDir we attempted to upload the descriptor to.
relay_ids: RelayIds,
/// The outcome of this attempt.
upload_res: UploadStatus,
/// The revision counter of the descriptor we tried to upload.
revision_counter: RevisionCounter,
}
/// The outcome of uploading a descriptor.
//
// TODO: consider making this a type alias for Result<(), ()>
#[derive(Copy, Clone, Debug, PartialEq)]
enum UploadStatus {
/// The descriptor upload succeeded.
Success,
/// The descriptor upload failed.
Failure,
}
impl<T, E> From<Result<T, E>> for UploadStatus {
fn from(res: Result<T, E>) -> Self {
if res.is_ok() {
Self::Success
} else {
Self::Failure
}
}
}