tor_hsservice/replay.rs
1//! Facility for detecting and preventing replays on introduction requests.
2//!
3//! If we were to permit the introduction point to replay the same request
4//! multiple times, it would cause the service to contact the rendezvous point
5//! again with the same rendezvous cookie as before, which could help with
6//! traffic analysis.
7//!
8//! (This could also be a DoS vector if the introduction point decided to
9//! overload the service.)
10//!
11//! Because we use the same introduction point keys across restarts, we need to
12//! make sure that our replay logs are already persistent. We do this by using
13//! a file on disk.
14
15mod ipt;
16
17use crate::internal_prelude::*;
18
19/// A probabilistic data structure to record fingerprints of observed Introduce2
20/// messages.
21///
22/// We need to record these fingerprints to prevent replay attacks; see the
23/// module documentation for an explanation of why that would be bad.
24///
25/// A ReplayLog should correspond to a `KP_hss_ntor` key, and should have the
26/// same lifespan: dropping it sooner will enable replays, but dropping it later
27/// will waste disk and memory.
28///
29/// False positives are allowed, to conserve on space.
30pub(crate) struct ReplayLog<T> {
31 /// The inner probabilistic data structure.
32 seen: data::Filter,
33 /// Persistent state file etc., if we're persistent
34 ///
35 /// If is is `None`, this RelayLog is ephemeral.
36 file: Option<PersistFile>,
37 /// [`PhantomData`] so rustc doesn't complain about the unused type param.
38 ///
39 /// This type represents the type of data that we're storing, as well as the type of the
40 /// key/name for that data.
41 replay_log_type: PhantomData<T>,
42}
43
44/// A [`ReplayLog`] for [`Introduce2`](tor_cell::relaycell::msg::Introduce2) messages.
45pub(crate) type IptReplayLog = ReplayLog<ipt::IptReplayLogType>;
46
47/// The length of the [`ReplayLogType::MAGIC`] constant.
48///
49// TODO: If Rust's constant expressions supported generics we wouldn't need this at all.
50const MAGIC_LEN: usize = 32;
51
52/// The length of the message that we store on disk, in bytes.
53///
54/// If the message is longer than this, then we will need to hash or truncate it before storing it
55/// to disk.
56///
57// TODO: Once const generics are good, this should be a associated constant for ReplayLogType.
58pub(crate) const OUTPUT_LEN: usize = 16;
59
60/// A trait to represent a set of types that ReplayLog can be used with.
61pub(crate) trait ReplayLogType {
62 // TODO: It would be nice to encode the directory name as a associated constant here, rather
63 // than having the external code pass it in to us.
64
65 /// The name of this item, used for the log filename.
66 type Name;
67
68 /// The type of the messages that we are ensuring the uniqueness of.
69 type Message;
70
71 /// A magic string that we put at the start of each log file, to make sure that
72 /// we don't confuse this file format with others.
73 const MAGIC: &'static [u8; MAGIC_LEN];
74
75 /// Convert [`Self::Name`] to a [`String`]
76 fn format_filename(name: &Self::Name) -> String;
77
78 /// Convert [`Self::Message`] to bytes that will be stored in the log.
79 fn transform_message(message: &Self::Message) -> [u8; OUTPUT_LEN];
80
81 /// Parse a filename into [`Self::Name`].
82 fn parse_log_leafname(leaf: &OsStr) -> Result<Self::Name, Cow<'static, str>>;
83}
84
85/// Persistent state file, and associated data
86///
87/// Stored as `ReplayLog.file`.
88#[derive(Debug)]
89pub(crate) struct PersistFile {
90 /// A file logging fingerprints of the messages we have seen.
91 file: BufWriter<File>,
92 /// Whether we had a possible partial write
93 ///
94 /// See the comment inside [`ReplayLog::check_for_replay`].
95 /// `Ok` means all is well.
96 /// `Err` means we may have written partial data to the actual file,
97 /// and need to make sure we're back at a record boundary.
98 needs_resynch: Result<(), ()>,
99 /// Filesystem lock which must not be released until after we finish writing
100 ///
101 /// Must come last so that the drop order is correct
102 #[allow(dead_code)] // Held just so we unlock on drop
103 lock: Arc<LockFileGuard>,
104}
105
106/// Replay log files have a `.bin` suffix.
107///
108/// The name of the file is determined by [`ReplayLogType::format_filename`].
109const REPLAY_LOG_SUFFIX: &str = ".bin";
110
111impl<T: ReplayLogType> ReplayLog<T> {
112 /// Create a new ReplayLog not backed by any data storage.
113 #[allow(dead_code)] // TODO #1186 Remove once something uses ReplayLog.
114 pub(crate) fn new_ephemeral() -> Self {
115 Self {
116 seen: data::Filter::new(),
117 file: None,
118 replay_log_type: PhantomData,
119 }
120 }
121
122 /// Create a ReplayLog backed by the file at a given path.
123 ///
124 /// If the file already exists, load its contents and append any new
125 /// contents to it; otherwise, create the file.
126 ///
127 /// **`lock` must already have been locked** and this
128 /// *cannot be assured by the type system*.
129 ///
130 /// # Limitations
131 ///
132 /// It is the caller's responsibility to make sure that there are never two
133 /// `ReplayLogs` open at once for the same path, or for two paths that
134 /// resolve to the same file.
135 pub(crate) fn new_logged(
136 dir: &InstanceRawSubdir,
137 name: &T::Name,
138 ) -> Result<Self, CreateIptError> {
139 let leaf = T::format_filename(name);
140 let path = dir.as_path().join(leaf);
141 let lock_guard = dir.raw_lock_guard();
142
143 Self::new_logged_inner(&path, lock_guard).map_err(|error| CreateIptError::OpenReplayLog {
144 file: path,
145 error: error.into(),
146 })
147 }
148
149 /// Inner function for `new_logged`, with reified arguments and raw error type
150 fn new_logged_inner(path: impl AsRef<Path>, lock: Arc<LockFileGuard>) -> io::Result<Self> {
151 let mut file = {
152 let mut options = OpenOptions::new();
153 options.read(true).write(true).create(true);
154
155 #[cfg(target_family = "unix")]
156 {
157 use std::os::unix::fs::OpenOptionsExt as _;
158 options.mode(0o600);
159 }
160
161 options.open(path)?
162 };
163
164 // If the file is new, we need to write the magic string. Else we must
165 // read it.
166 let file_len = file.metadata()?.len();
167 if file_len == 0 {
168 file.write_all(T::MAGIC)?;
169 } else {
170 let mut m = [0_u8; MAGIC_LEN];
171 file.read_exact(&mut m)?;
172 if &m != T::MAGIC {
173 return Err(io::Error::new(
174 io::ErrorKind::InvalidData,
175 LogContentError::UnrecognizedFormat,
176 ));
177 }
178
179 Self::truncate_to_multiple(&mut file, file_len)?;
180 }
181
182 // Now read the rest of the file.
183 let mut seen = data::Filter::new();
184 let mut r = BufReader::new(file);
185 loop {
186 let mut msg = [0_u8; OUTPUT_LEN];
187 match r.read_exact(&mut msg) {
188 Ok(()) => {
189 let _ = seen.test_and_add(&msg); // ignore error.
190 }
191 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
192 Err(e) => return Err(e),
193 }
194 }
195 let mut file = r.into_inner();
196 file.seek(SeekFrom::End(0))?;
197
198 let file = PersistFile {
199 file: BufWriter::new(file),
200 needs_resynch: Ok(()),
201 lock,
202 };
203
204 Ok(Self {
205 seen,
206 file: Some(file),
207 replay_log_type: PhantomData,
208 })
209 }
210
211 /// Truncate `file` to contain a whole number of records
212 ///
213 /// `current_len` should have come from `file.metadata()`.
214 // If the file's length is not an even multiple of MESSAGE_LEN after the MAGIC, truncate it.
215 fn truncate_to_multiple(file: &mut File, current_len: u64) -> io::Result<()> {
216 let excess = (current_len - T::MAGIC.len() as u64) % (OUTPUT_LEN as u64);
217 if excess != 0 {
218 file.set_len(current_len - excess)?;
219 }
220 Ok(())
221 }
222
223 /// Test whether we have already seen `message`.
224 ///
225 /// If we have seen it, return `Err(ReplayError::AlreadySeen)`. (Since this
226 /// is a probabilistic data structure, there is a chance of returning this
227 /// error even if we have we have _not_ seen this particular message)
228 ///
229 /// Otherwise, return `Ok(())`.
230 pub(crate) fn check_for_replay(&mut self, message: &T::Message) -> Result<(), ReplayError> {
231 let h = T::transform_message(message);
232 self.seen.test_and_add(&h)?;
233 if let Some(f) = self.file.as_mut() {
234 (|| {
235 // If write_all fails, it might have written part of the data;
236 // in that case, we must truncate the file to resynchronise.
237 // We set a note to truncate just before we call write_all
238 // and clear it again afterwards.
239 //
240 // But, first, we need to deal with any previous note we left ourselves.
241
242 // (With the current implementation of std::io::BufWriter, this is
243 // unnecessary, because if the argument to write_all is smaller than
244 // the buffer size, BufWriter::write_all always just copies to the buffer,
245 // flushing first if necessary; and when it flushes, it uses write,
246 // not write_all. So the use of write_all never causes "lost" data.
247 // However, this is not a documented guarantee.)
248 match f.needs_resynch {
249 Ok(()) => {}
250 Err(()) => {
251 // We're going to reach behind the BufWriter, so we need to make
252 // sure it's in synch with the underlying File.
253 f.file.flush()?;
254 let inner = f.file.get_mut();
255 let len = inner.metadata()?.len();
256 Self::truncate_to_multiple(inner, len)?;
257 // cursor is now past end, must reset (see std::fs::File::set_len)
258 inner.seek(SeekFrom::End(0))?;
259 }
260 }
261 f.needs_resynch = Err(());
262
263 f.file.write_all(&h[..])?;
264
265 f.needs_resynch = Ok(());
266
267 Ok(())
268 })()
269 .map_err(|e| ReplayError::Log(Arc::new(e)))?;
270 }
271 Ok(())
272 }
273
274 /// Flush any buffered data to disk.
275 #[allow(dead_code)] // TODO #1208
276 pub(crate) fn flush(&mut self) -> Result<(), io::Error> {
277 if let Some(f) = self.file.as_mut() {
278 f.file.flush()?;
279 }
280 Ok(())
281 }
282
283 /// Tries to parse a filename in the replay logs directory
284 ///
285 /// If the leafname refers to a file that would be created by
286 /// [`ReplayLog::new_logged`], returns the name as a Rust type.
287 ///
288 /// Otherwise returns an error explaining why it isn't,
289 /// as a plain string (for logging).
290 pub(crate) fn parse_log_leafname(leaf: &OsStr) -> Result<T::Name, Cow<'static, str>> {
291 T::parse_log_leafname(leaf)
292 }
293}
294
295/// Wrapper around a fast-ish data structure for detecting replays with some
296/// false positive rate. Bloom filters, cuckoo filters, and xorf filters are all
297/// an option here. You could even use a HashSet.
298///
299/// We isolate this code to make it easier to replace.
300mod data {
301 use super::{ReplayError, OUTPUT_LEN};
302 use growable_bloom_filter::GrowableBloom;
303
304 /// A probabilistic membership filter.
305 pub(super) struct Filter(pub(crate) GrowableBloom);
306
307 impl Filter {
308 /// Create a new empty filter
309 pub(super) fn new() -> Self {
310 // TODO: Perhaps we should make the capacity here tunable, based on
311 // the number of entries we expect. These values are more or less
312 // pulled out of thin air.
313 let desired_error_prob = 1.0 / 100_000.0;
314 let est_insertions = 100_000;
315 Filter(GrowableBloom::new(desired_error_prob, est_insertions))
316 }
317
318 /// Try to add `msg` to this filter if it isn't already there.
319 ///
320 /// Return Ok(()) or Err(AlreadySeen).
321 pub(super) fn test_and_add(&mut self, msg: &[u8; OUTPUT_LEN]) -> Result<(), ReplayError> {
322 if self.0.insert(&msg[..]) {
323 Ok(())
324 } else {
325 Err(ReplayError::AlreadySeen)
326 }
327 }
328 }
329}
330
331/// A problem that prevents us from reading a ReplayLog from disk.
332///
333/// (This only exists so we can wrap it up in an [`io::Error`])
334#[derive(thiserror::Error, Clone, Debug)]
335enum LogContentError {
336 /// The magic number on the log file was incorrect.
337 #[error("unrecognized data format")]
338 UnrecognizedFormat,
339}
340
341/// An error occurred while checking whether we've seen an element before.
342#[derive(thiserror::Error, Clone, Debug)]
343pub(crate) enum ReplayError {
344 /// We have already seen this item.
345 #[error("Already seen")]
346 AlreadySeen,
347
348 /// We were unable to record this item in the log.
349 #[error("Unable to log data")]
350 Log(Arc<std::io::Error>),
351}
352
353#[cfg(test)]
354mod test {
355 // @@ begin test lint list maintained by maint/add_warning @@
356 #![allow(clippy::bool_assert_comparison)]
357 #![allow(clippy::clone_on_copy)]
358 #![allow(clippy::dbg_macro)]
359 #![allow(clippy::mixed_attributes_style)]
360 #![allow(clippy::print_stderr)]
361 #![allow(clippy::print_stdout)]
362 #![allow(clippy::single_char_pattern)]
363 #![allow(clippy::unwrap_used)]
364 #![allow(clippy::unchecked_duration_subtraction)]
365 #![allow(clippy::useless_vec)]
366 #![allow(clippy::needless_pass_by_value)]
367 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
368
369 use super::*;
370 use crate::test::mk_state_instance;
371 use rand::Rng;
372 use test_temp_dir::{test_temp_dir, TestTempDir, TestTempDirGuard};
373
374 struct TestReplayLogType;
375
376 type TestReplayLog = ReplayLog<TestReplayLogType>;
377
378 impl ReplayLogType for TestReplayLogType {
379 type Name = IptLocalId;
380 type Message = [u8; OUTPUT_LEN];
381
382 const MAGIC: &'static [u8; MAGIC_LEN] = b"<tor test replay>\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
383
384 fn format_filename(name: &IptLocalId) -> String {
385 format!("{name}{REPLAY_LOG_SUFFIX}")
386 }
387
388 fn transform_message(message: &[u8; OUTPUT_LEN]) -> [u8; OUTPUT_LEN] {
389 message.clone()
390 }
391
392 fn parse_log_leafname(leaf: &OsStr) -> Result<IptLocalId, Cow<'static, str>> {
393 let leaf = leaf.to_str().ok_or("not proper unicode")?;
394 let lid = leaf.strip_suffix(REPLAY_LOG_SUFFIX).ok_or("not *.bin")?;
395 let lid: IptLocalId = lid
396 .parse()
397 .map_err(|e: crate::InvalidIptLocalId| e.to_string())?;
398 Ok(lid)
399 }
400 }
401
402 fn rand_msg<R: Rng>(rng: &mut R) -> [u8; OUTPUT_LEN] {
403 rng.random()
404 }
405
406 /// Basic tests on an ephemeral IptReplayLog.
407 #[test]
408 fn simple_usage() {
409 let mut rng = tor_basic_utils::test_rng::testing_rng();
410 let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
411 let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
412
413 let mut log = TestReplayLog::new_ephemeral();
414 // Add everything in group 1.
415 for msg in &group_1 {
416 assert!(log.check_for_replay(msg).is_ok(), "False positive");
417 }
418 // Make sure that everything in group 1 is still there.
419 for msg in &group_1 {
420 assert!(log.check_for_replay(msg).is_err());
421 }
422 // Make sure that group 2 is detected as not-there.
423 for msg in &group_2 {
424 assert!(log.check_for_replay(msg).is_ok(), "False positive");
425 }
426 }
427
428 const TEST_TEMP_SUBDIR: &str = "replaylog";
429
430 fn create_logged(dir: &TestTempDir) -> TestTempDirGuard<TestReplayLog> {
431 dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
432 let inst = mk_state_instance(&dir, "allium");
433 let raw = inst.raw_subdir("iptreplay").unwrap();
434 TestReplayLog::new_logged(&raw, &IptLocalId::dummy(1)).unwrap()
435 })
436 }
437
438 /// Basic tests on an persistent IptReplayLog.
439 #[test]
440 fn logging_basics() {
441 let mut rng = tor_basic_utils::test_rng::testing_rng();
442 let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
443 let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
444
445 let dir = test_temp_dir!();
446 let mut log = create_logged(&dir);
447 // Add everything in group 1, then close and reload.
448 for msg in &group_1 {
449 assert!(log.check_for_replay(msg).is_ok(), "False positive");
450 }
451 drop(log);
452 let mut log = create_logged(&dir);
453 // Make sure everything in group 1 is still there.
454 for msg in &group_1 {
455 assert!(log.check_for_replay(msg).is_err());
456 }
457 // Now add everything in group 2, then close and reload.
458 for msg in &group_2 {
459 assert!(log.check_for_replay(msg).is_ok(), "False positive");
460 }
461 drop(log);
462 let mut log = create_logged(&dir);
463 // Make sure that groups 1 and 2 are still there.
464 for msg in group_1.iter().chain(group_2.iter()) {
465 assert!(log.check_for_replay(msg).is_err());
466 }
467 }
468
469 /// Test for a log that gets truncated mid-write.
470 #[test]
471 fn test_truncated() {
472 let mut rng = tor_basic_utils::test_rng::testing_rng();
473 let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
474 let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
475
476 let dir = test_temp_dir!();
477 let mut log = create_logged(&dir);
478 for msg in &group_1 {
479 assert!(log.check_for_replay(msg).is_ok(), "False positive");
480 }
481 drop(log);
482 // Truncate the file by 7 bytes.
483 dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
484 let path = dir.join(format!("hss/allium/iptreplay/{}.bin", IptLocalId::dummy(1)));
485 let file = OpenOptions::new().write(true).open(path).unwrap();
486 // Make sure that the file has the length we expect.
487 let expected_len = MAGIC_LEN + OUTPUT_LEN * group_1.len();
488 assert_eq!(expected_len as u64, file.metadata().unwrap().len());
489 file.set_len((expected_len - 7) as u64).unwrap();
490 });
491 // Now, reload the log. We should be able to recover every non-truncated
492 // item...
493 let mut log = create_logged(&dir);
494 for msg in &group_1[..group_1.len() - 1] {
495 assert!(log.check_for_replay(msg).is_err());
496 }
497 // But not the last one, which we truncated. (Checking will add it, though.)
498 assert!(
499 log.check_for_replay(&group_1[group_1.len() - 1]).is_ok(),
500 "False positive"
501 );
502 // Now add everything in group 2, then close and reload.
503 for msg in &group_2 {
504 assert!(log.check_for_replay(msg).is_ok(), "False positive");
505 }
506 drop(log);
507 let mut log = create_logged(&dir);
508 // Make sure that groups 1 and 2 are still there.
509 for msg in group_1.iter().chain(group_2.iter()) {
510 assert!(log.check_for_replay(msg).is_err());
511 }
512 }
513
514 /// Test for a partial write
515 #[test]
516 #[cfg(target_os = "linux")] // different platforms have different definitions of sigaction
517 fn test_partial_write() {
518 use std::env;
519 use std::os::unix::process::ExitStatusExt;
520 use std::process::Command;
521
522 // TODO this contraption should perhaps be productised and put somewhere else
523
524 const ENV_NAME: &str = "TOR_HSSERVICE_TEST_PARTIAL_WRITE_SUBPROCESS";
525 // for a wait status different from any of libtest's
526 const GOOD_SIGNAL: i32 = libc::SIGUSR2;
527
528 let sigemptyset = || unsafe {
529 let mut set = MaybeUninit::uninit();
530 libc::sigemptyset(set.as_mut_ptr());
531 set.assume_init()
532 };
533
534 // Check that SIGUSR2 starts out as SIG_DFL and unblocked
535 //
536 // We *reject* such situations, rather than fixing them up, because this is an
537 // irregular and broken environment that can cause arbitrarily weird behaviours.
538 // Programs on Unix are entitled to assume that their signal dispositions are
539 // SIG_DFL on entry, with signals unblocked. (With a few exceptions.)
540 //
541 // So we want to detect and report any such environment, not let it slide.
542 unsafe {
543 let mut sa = MaybeUninit::uninit();
544 let r = libc::sigaction(GOOD_SIGNAL, ptr::null(), sa.as_mut_ptr());
545 assert_eq!(r, 0);
546 let sa = sa.assume_init();
547 assert_eq!(
548 sa.sa_sigaction,
549 libc::SIG_DFL,
550 "tests running in broken environment (SIGUSR2 not SIG_DFL)"
551 );
552
553 let empty_set = sigemptyset();
554 let mut current_set = MaybeUninit::uninit();
555 let r = libc::sigprocmask(
556 libc::SIG_UNBLOCK,
557 (&empty_set) as _,
558 current_set.as_mut_ptr(),
559 );
560 assert_eq!(r, 0);
561 let current_set = current_set.assume_init();
562 let blocked = libc::sigismember((¤t_set) as _, GOOD_SIGNAL);
563 assert_eq!(
564 blocked, 0,
565 "tests running in broken environment (SIGUSR2 blocked)"
566 );
567 }
568
569 match env::var(ENV_NAME) {
570 Err(env::VarError::NotPresent) => {
571 eprintln!("in test runner process, forking..,");
572 let output = Command::new(env::current_exe().unwrap())
573 .args(["--nocapture", "replay::test::test_partial_write"])
574 .env(ENV_NAME, "1")
575 .output()
576 .unwrap();
577 let print_output = |prefix, data| match std::str::from_utf8(data) {
578 Ok(s) => {
579 for l in s.split("\n") {
580 eprintln!(" {prefix} {l}");
581 }
582 }
583 Err(e) => eprintln!(" UTF-8 ERROR {prefix} {e}"),
584 };
585 print_output("!", &output.stdout);
586 print_output(">", &output.stderr);
587 let st = output.status;
588 eprintln!("reaped actual test process {st:?} (expecting signal {GOOD_SIGNAL})");
589 assert_eq!(st.signal(), Some(GOOD_SIGNAL));
590 return;
591 }
592 Ok(y) if y == "1" => {}
593 other => panic!("bad env var {ENV_NAME:?} {other:?}"),
594 };
595
596 // Now we are in our own process, and can mess about with ulimit etc.
597
598 use std::fs;
599 use std::mem::MaybeUninit;
600 use std::ptr;
601
602 fn set_ulimit(size: usize) {
603 unsafe {
604 use libc::RLIMIT_FSIZE;
605 let mut rlim = libc::rlimit {
606 rlim_cur: 0,
607 rlim_max: 0,
608 };
609 let r = libc::getrlimit(RLIMIT_FSIZE, (&mut rlim) as _);
610 assert_eq!(r, 0);
611 rlim.rlim_cur = size.try_into().unwrap();
612 let r = libc::setrlimit(RLIMIT_FSIZE, (&rlim) as _);
613 assert_eq!(r, 0);
614 }
615 }
616
617 // This test is quite complicated.
618 //
619 // We want to test partial writes. We could perhaps have done this by
620 // parameterising IptReplayLog so it could have something other than File,
621 // but that would probably leak into the public API.
622 //
623 // Instead, we cause *actual* partial writes. We use the Unix setrlimit
624 // call to limit the size of files our process is allowed to write.
625 // This causes the underlying write(2) calls to (i) generate SIGXFSZ
626 // (ii) if that doesn't kill the process, return partial writes.
627
628 test_temp_dir!().used_by(|dir| {
629 let path = dir.join("test.log");
630 let lock = LockFileGuard::lock(dir.join("dummy.lock")).unwrap();
631 let lock = Arc::new(lock);
632 let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
633
634 const BUF: usize = 8192; // BufWriter default; if that changes, test will break
635
636 // We let ourselves write one whole buffer plus an odd amount of extra
637 const ALLOW: usize = BUF + 37;
638
639 // Ignore SIGXFSZ (default disposition is for exceeding the rlimit to kill us)
640 unsafe {
641 let sa = libc::sigaction {
642 sa_sigaction: libc::SIG_IGN,
643 sa_mask: sigemptyset(),
644 sa_flags: 0,
645 sa_restorer: None,
646 };
647 let r = libc::sigaction(libc::SIGXFSZ, (&sa) as _, ptr::null_mut());
648 assert_eq!(r, 0);
649 }
650
651 let demand_efbig = |e| match e {
652 // TODO MSRV 1.83: replace with io::ErrorKind::FileTooLarge?
653 ReplayError::Log(e) if e.raw_os_error() == Some(libc::EFBIG) => {}
654 other => panic!("expected EFBUG, got {other:?}"),
655 };
656
657 // Generate a distinct message given a phase and a counter
658 #[allow(clippy::identity_op)]
659 let mk_msg = |phase: u8, i: usize| {
660 let i = u32::try_from(i).unwrap();
661 let mut msg = [0_u8; OUTPUT_LEN];
662 msg[0] = phase;
663 msg[1] = phase;
664 msg[4] = (i >> 24) as _;
665 msg[5] = (i >> 16) as _;
666 msg[6] = (i >> 8) as _;
667 msg[7] = (i >> 0) as _;
668 msg
669 };
670
671 // Number of hashes we can write to the file before failure occurs
672 const CAN_DO: usize = (ALLOW + BUF - MAGIC_LEN) / OUTPUT_LEN;
673 dbg!(MAGIC_LEN, OUTPUT_LEN, BUF, ALLOW, CAN_DO);
674
675 // Record of the hashes that TestReplayLog tells us were OK and not replays;
676 // ie, which it therefore ought to have recorded.
677 let mut gave_ok = Vec::new();
678
679 set_ulimit(ALLOW);
680
681 for i in 0..CAN_DO {
682 let h = mk_msg(b'y', i);
683 rl.check_for_replay(&h).unwrap();
684 gave_ok.push(h);
685 }
686
687 let md = fs::metadata(&path).unwrap();
688 dbg!(md.len(), &rl.file);
689
690 // Now we have written what we can. The next two calls will fail,
691 // since the BufWriter buffer is full and can't be flushed.
692
693 for i in 0..2 {
694 eprintln!("expecting EFBIG {i}");
695 demand_efbig(rl.check_for_replay(&mk_msg(b'n', i)).unwrap_err());
696 let md = fs::metadata(&path).unwrap();
697 assert_eq!(md.len(), u64::try_from(ALLOW).unwrap());
698 }
699
700 // Enough that we don't get any further file size exceedances
701 set_ulimit(ALLOW * 10);
702
703 // Now we should be able to recover. We write two more hashes.
704 for i in 0..2 {
705 eprintln!("recovering {i}");
706 let h = mk_msg(b'r', i);
707 rl.check_for_replay(&h).unwrap();
708 gave_ok.push(h);
709 }
710
711 // flush explicitly just so we catch any error
712 // (drop would flush, but it can't report errors)
713 rl.flush().unwrap();
714 drop(rl);
715
716 // Reopen the log - reading in the written data.
717 // We can then check that everything the earlier IptReplayLog
718 // claimed to have written, is indeed recorded.
719
720 let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
721 for msg in &gave_ok {
722 match rl.check_for_replay(msg) {
723 Err(ReplayError::AlreadySeen) => {}
724 other => panic!("expected AlreadySeen, got {other:?}"),
725 }
726 }
727
728 eprintln!("recovered file contents checked, all good");
729 });
730
731 unsafe {
732 libc::raise(libc::SIGUSR2);
733 }
734 panic!("we survived raise SIGUSR2");
735 }
736}