1
//! Facility for detecting and preventing replays on introduction requests.
2
//!
3
//! If we were to permit the introduction point to replay the same request
4
//! multiple times, it would cause the service to contact the rendezvous point
5
//! again with the same rendezvous cookie as before, which could help with
6
//! traffic analysis.
7
//!
8
//! (This could also be a DoS vector if the introduction point decided to
9
//! overload the service.)
10
//!
11
//! Because we use the same introduction point keys across restarts, we need to
12
//! make sure that our replay logs are already persistent.  We do this by using
13
//! a file on disk.
14

            
15
mod ipt;
16

            
17
use crate::internal_prelude::*;
18

            
19
/// A probabilistic data structure to record fingerprints of observed Introduce2
20
/// messages.
21
///
22
/// We need to record these fingerprints to prevent replay attacks; see the
23
/// module documentation for an explanation of why that would be bad.
24
///
25
/// A ReplayLog should correspond to a `KP_hss_ntor` key, and should have the
26
/// same lifespan: dropping it sooner will enable replays, but dropping it later
27
/// will waste disk and memory.
28
///
29
/// False positives are allowed, to conserve on space.
30
pub(crate) struct ReplayLog<T> {
31
    /// The inner probabilistic data structure.
32
    seen: data::Filter,
33
    /// Persistent state file etc., if we're persistent
34
    ///
35
    /// If is is `None`, this RelayLog is ephemeral.
36
    file: Option<PersistFile>,
37
    /// [`PhantomData`] so rustc doesn't complain about the unused type param.
38
    ///
39
    /// This type represents the type of data that we're storing, as well as the type of the
40
    /// key/name for that data.
41
    replay_log_type: PhantomData<T>,
42
}
43

            
44
/// A [`ReplayLog`] for [`Introduce2`](tor_cell::relaycell::msg::Introduce2) messages.
45
pub(crate) type IptReplayLog = ReplayLog<ipt::IptReplayLogType>;
46

            
47
/// The length of the [`ReplayLogType::MAGIC`] constant.
48
///
49
// TODO: If Rust's constant expressions supported generics we wouldn't need this at all.
50
const MAGIC_LEN: usize = 32;
51

            
52
/// The length of the message that we store on disk, in bytes.
53
///
54
/// If the message is longer than this, then we will need to hash or truncate it before storing it
55
/// to disk.
56
///
57
// TODO: Once const generics are good, this should be a associated constant for ReplayLogType.
58
pub(crate) const OUTPUT_LEN: usize = 16;
59

            
60
/// A trait to represent a set of types that ReplayLog can be used with.
61
pub(crate) trait ReplayLogType {
62
    // TODO: It would be nice to encode the directory name as a associated constant here, rather
63
    // than having the external code pass it in to us.
64

            
65
    /// The name of this item, used for the log filename.
66
    type Name;
67

            
68
    /// The type of the messages that we are ensuring the uniqueness of.
69
    type Message;
70

            
71
    /// A magic string that we put at the start of each log file, to make sure that
72
    /// we don't confuse this file format with others.
73
    const MAGIC: &'static [u8; MAGIC_LEN];
74

            
75
    /// Convert [`Self::Name`] to a [`String`]
76
    fn format_filename(name: &Self::Name) -> String;
77

            
78
    /// Convert [`Self::Message`] to bytes that will be stored in the log.
79
    fn transform_message(message: &Self::Message) -> [u8; OUTPUT_LEN];
80

            
81
    /// Parse a filename into [`Self::Name`].
82
    fn parse_log_leafname(leaf: &OsStr) -> Result<Self::Name, Cow<'static, str>>;
83
}
84

            
85
/// Persistent state file, and associated data
86
///
87
/// Stored as `ReplayLog.file`.
88
#[derive(Debug)]
89
pub(crate) struct PersistFile {
90
    /// A file logging fingerprints of the messages we have seen.
91
    file: BufWriter<File>,
92
    /// Whether we had a possible partial write
93
    ///
94
    /// See the comment inside [`ReplayLog::check_for_replay`].
95
    /// `Ok` means all is well.
96
    /// `Err` means we may have written partial data to the actual file,
97
    /// and need to make sure we're back at a record boundary.
98
    needs_resynch: Result<(), ()>,
99
    /// Filesystem lock which must not be released until after we finish writing
100
    ///
101
    /// Must come last so that the drop order is correct
102
    #[allow(dead_code)] // Held just so we unlock on drop
103
    lock: Arc<LockFileGuard>,
104
}
105

            
106
/// Replay log files have a `.bin` suffix.
107
///
108
/// The name of the file is determined by [`ReplayLogType::format_filename`].
109
const REPLAY_LOG_SUFFIX: &str = ".bin";
110

            
111
impl<T: ReplayLogType> ReplayLog<T> {
112
    /// Create a new ReplayLog not backed by any data storage.
113
    #[allow(dead_code)] // TODO #1186 Remove once something uses ReplayLog.
114
2
    pub(crate) fn new_ephemeral() -> Self {
115
2
        Self {
116
2
            seen: data::Filter::new(),
117
2
            file: None,
118
2
            replay_log_type: PhantomData,
119
2
        }
120
2
    }
121

            
122
    /// Create a ReplayLog backed by the file at a given path.
123
    ///
124
    /// If the file already exists, load its contents and append any new
125
    /// contents to it; otherwise, create the file.
126
    ///
127
    /// **`lock` must already have been locked** and this
128
    /// *cannot be assured by the type system*.
129
    ///
130
    /// # Limitations
131
    ///
132
    /// It is the caller's responsibility to make sure that there are never two
133
    /// `ReplayLogs` open at once for the same path, or for two paths that
134
    /// resolve to the same file.
135
52
    pub(crate) fn new_logged(
136
52
        dir: &InstanceRawSubdir,
137
52
        name: &T::Name,
138
52
    ) -> Result<Self, CreateIptError> {
139
52
        let leaf = T::format_filename(name);
140
52
        let path = dir.as_path().join(leaf);
141
52
        let lock_guard = dir.raw_lock_guard();
142
52

            
143
52
        Self::new_logged_inner(&path, lock_guard).map_err(|error| CreateIptError::OpenReplayLog {
144
            file: path,
145
            error: error.into(),
146
52
        })
147
52
    }
148

            
149
    /// Inner function for `new_logged`, with reified arguments and raw error type
150
52
    fn new_logged_inner(path: impl AsRef<Path>, lock: Arc<LockFileGuard>) -> io::Result<Self> {
151
52
        let mut file = {
152
52
            let mut options = OpenOptions::new();
153
52
            options.read(true).write(true).create(true);
154
52

            
155
52
            #[cfg(target_family = "unix")]
156
52
            {
157
52
                use std::os::unix::fs::OpenOptionsExt as _;
158
52
                options.mode(0o600);
159
52
            }
160
52

            
161
52
            options.open(path)?
162
        };
163

            
164
        // If the file is new, we need to write the magic string. Else we must
165
        // read it.
166
52
        let file_len = file.metadata()?.len();
167
52
        if file_len == 0 {
168
32
            file.write_all(T::MAGIC)?;
169
        } else {
170
20
            let mut m = [0_u8; MAGIC_LEN];
171
20
            file.read_exact(&mut m)?;
172
20
            if &m != T::MAGIC {
173
                return Err(io::Error::new(
174
                    io::ErrorKind::InvalidData,
175
                    LogContentError::UnrecognizedFormat,
176
                ));
177
20
            }
178
20

            
179
20
            Self::truncate_to_multiple(&mut file, file_len)?;
180
        }
181

            
182
        // Now read the rest of the file.
183
52
        let mut seen = data::Filter::new();
184
52
        let mut r = BufReader::new(file);
185
        loop {
186
1262
            let mut msg = [0_u8; OUTPUT_LEN];
187
1262
            match r.read_exact(&mut msg) {
188
1210
                Ok(()) => {
189
1210
                    let _ = seen.test_and_add(&msg); // ignore error.
190
1210
                }
191
52
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
192
                Err(e) => return Err(e),
193
            }
194
        }
195
52
        let mut file = r.into_inner();
196
52
        file.seek(SeekFrom::End(0))?;
197

            
198
52
        let file = PersistFile {
199
52
            file: BufWriter::new(file),
200
52
            needs_resynch: Ok(()),
201
52
            lock,
202
52
        };
203
52

            
204
52
        Ok(Self {
205
52
            seen,
206
52
            file: Some(file),
207
52
            replay_log_type: PhantomData,
208
52
        })
209
52
    }
210

            
211
    /// Truncate `file` to contain a whole number of records
212
    ///
213
    /// `current_len` should have come from `file.metadata()`.
214
    // If the file's length is not an even multiple of MESSAGE_LEN after the MAGIC, truncate it.
215
20
    fn truncate_to_multiple(file: &mut File, current_len: u64) -> io::Result<()> {
216
20
        let excess = (current_len - T::MAGIC.len() as u64) % (OUTPUT_LEN as u64);
217
20
        if excess != 0 {
218
2
            file.set_len(current_len - excess)?;
219
18
        }
220
20
        Ok(())
221
20
    }
222

            
223
    /// Test whether we have already seen `message`.
224
    ///
225
    /// If we have seen it, return `Err(ReplayError::AlreadySeen)`.  (Since this
226
    /// is a probabilistic data structure, there is a chance of returning this
227
    /// error even if we have we have _not_ seen this particular message)
228
    ///
229
    /// Otherwise, return `Ok(())`.
230
2626
    pub(crate) fn check_for_replay(&mut self, message: &T::Message) -> Result<(), ReplayError> {
231
2626
        let h = T::transform_message(message);
232
2626
        self.seen.test_and_add(&h)?;
233
1214
        if let Some(f) = self.file.as_mut() {
234
810
            (|| {
235
810
                // If write_all fails, it might have written part of the data;
236
810
                // in that case, we must truncate the file to resynchronise.
237
810
                // We set a note to truncate just before we call write_all
238
810
                // and clear it again afterwards.
239
810
                //
240
810
                // But, first, we need to deal with any previous note we left ourselves.
241
810

            
242
810
                // (With the current implementation of std::io::BufWriter, this is
243
810
                // unnecessary, because if the argument to write_all is smaller than
244
810
                // the buffer size, BufWriter::write_all always just copies to the buffer,
245
810
                // flushing first if necessary; and when it flushes, it uses write,
246
810
                // not write_all.  So the use of write_all never causes "lost" data.
247
810
                // However, this is not a documented guarantee.)
248
810
                match f.needs_resynch {
249
810
                    Ok(()) => {}
250
                    Err(()) => {
251
                        // We're going to reach behind the BufWriter, so we need to make
252
                        // sure it's in synch with the underlying File.
253
                        f.file.flush()?;
254
                        let inner = f.file.get_mut();
255
                        let len = inner.metadata()?.len();
256
                        Self::truncate_to_multiple(inner, len)?;
257
                        // cursor is now past end, must reset (see std::fs::File::set_len)
258
                        inner.seek(SeekFrom::End(0))?;
259
                    }
260
                }
261
810
                f.needs_resynch = Err(());
262
810

            
263
810
                f.file.write_all(&h[..])?;
264

            
265
810
                f.needs_resynch = Ok(());
266
810

            
267
810
                Ok(())
268
810
            })()
269
810
            .map_err(|e| ReplayError::Log(Arc::new(e)))?;
270
404
        }
271
1214
        Ok(())
272
2626
    }
273

            
274
    /// Flush any buffered data to disk.
275
    #[allow(dead_code)] // TODO #1208
276
    pub(crate) fn flush(&mut self) -> Result<(), io::Error> {
277
        if let Some(f) = self.file.as_mut() {
278
            f.file.flush()?;
279
        }
280
        Ok(())
281
    }
282

            
283
    /// Tries to parse a filename in the replay logs directory
284
    ///
285
    /// If the leafname refers to a file that would be created by
286
    /// [`ReplayLog::new_logged`], returns the name as a Rust type.
287
    ///
288
    /// Otherwise returns an error explaining why it isn't,
289
    /// as a plain string (for logging).
290
88
    pub(crate) fn parse_log_leafname(leaf: &OsStr) -> Result<T::Name, Cow<'static, str>> {
291
88
        T::parse_log_leafname(leaf)
292
88
    }
293
}
294

            
295
/// Wrapper around a fast-ish data structure for detecting replays with some
296
/// false positive rate.  Bloom filters, cuckoo filters, and xorf filters are all
297
/// an option here.  You could even use a HashSet.
298
///
299
/// We isolate this code to make it easier to replace.
300
mod data {
301
    use super::{ReplayError, OUTPUT_LEN};
302
    use growable_bloom_filter::GrowableBloom;
303

            
304
    /// A probabilistic membership filter.
305
    pub(super) struct Filter(pub(crate) GrowableBloom);
306

            
307
    impl Filter {
308
        /// Create a new empty filter
309
54
        pub(super) fn new() -> Self {
310
54
            // TODO: Perhaps we should make the capacity here tunable, based on
311
54
            // the number of entries we expect.  These values are more or less
312
54
            // pulled out of thin air.
313
54
            let desired_error_prob = 1.0 / 100_000.0;
314
54
            let est_insertions = 100_000;
315
54
            Filter(GrowableBloom::new(desired_error_prob, est_insertions))
316
54
        }
317

            
318
        /// Try to add `msg` to this filter if it isn't already there.
319
        ///
320
        /// Return Ok(()) or Err(AlreadySeen).
321
3836
        pub(super) fn test_and_add(&mut self, msg: &[u8; OUTPUT_LEN]) -> Result<(), ReplayError> {
322
3836
            if self.0.insert(&msg[..]) {
323
2424
                Ok(())
324
            } else {
325
1412
                Err(ReplayError::AlreadySeen)
326
            }
327
3836
        }
328
    }
329
}
330

            
331
/// A problem that prevents us from reading a ReplayLog from disk.
332
///
333
/// (This only exists so we can wrap it up in an [`io::Error`])
334
#[derive(thiserror::Error, Clone, Debug)]
335
enum LogContentError {
336
    /// The magic number on the log file was incorrect.
337
    #[error("unrecognized data format")]
338
    UnrecognizedFormat,
339
}
340

            
341
/// An error occurred while checking whether we've seen an element before.
342
#[derive(thiserror::Error, Clone, Debug)]
343
pub(crate) enum ReplayError {
344
    /// We have already seen this item.
345
    #[error("Already seen")]
346
    AlreadySeen,
347

            
348
    /// We were unable to record this item in the log.
349
    #[error("Unable to log data")]
350
    Log(Arc<std::io::Error>),
351
}
352

            
353
#[cfg(test)]
354
mod test {
355
    // @@ begin test lint list maintained by maint/add_warning @@
356
    #![allow(clippy::bool_assert_comparison)]
357
    #![allow(clippy::clone_on_copy)]
358
    #![allow(clippy::dbg_macro)]
359
    #![allow(clippy::mixed_attributes_style)]
360
    #![allow(clippy::print_stderr)]
361
    #![allow(clippy::print_stdout)]
362
    #![allow(clippy::single_char_pattern)]
363
    #![allow(clippy::unwrap_used)]
364
    #![allow(clippy::unchecked_duration_subtraction)]
365
    #![allow(clippy::useless_vec)]
366
    #![allow(clippy::needless_pass_by_value)]
367
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
368

            
369
    use super::*;
370
    use crate::test::mk_state_instance;
371
    use rand::Rng;
372
    use test_temp_dir::{test_temp_dir, TestTempDir, TestTempDirGuard};
373

            
374
    struct TestReplayLogType;
375

            
376
    type TestReplayLog = ReplayLog<TestReplayLogType>;
377

            
378
    impl ReplayLogType for TestReplayLogType {
379
        type Name = IptLocalId;
380
        type Message = [u8; OUTPUT_LEN];
381

            
382
        const MAGIC: &'static [u8; MAGIC_LEN] = b"<tor test replay>\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
383

            
384
        fn format_filename(name: &IptLocalId) -> String {
385
            format!("{name}{REPLAY_LOG_SUFFIX}")
386
        }
387

            
388
        fn transform_message(message: &[u8; OUTPUT_LEN]) -> [u8; OUTPUT_LEN] {
389
            message.clone()
390
        }
391

            
392
        fn parse_log_leafname(leaf: &OsStr) -> Result<IptLocalId, Cow<'static, str>> {
393
            let leaf = leaf.to_str().ok_or("not proper unicode")?;
394
            let lid = leaf.strip_suffix(REPLAY_LOG_SUFFIX).ok_or("not *.bin")?;
395
            let lid: IptLocalId = lid
396
                .parse()
397
                .map_err(|e: crate::InvalidIptLocalId| e.to_string())?;
398
            Ok(lid)
399
        }
400
    }
401

            
402
    fn rand_msg<R: Rng>(rng: &mut R) -> [u8; OUTPUT_LEN] {
403
        rng.random()
404
    }
405

            
406
    /// Basic tests on an ephemeral IptReplayLog.
407
    #[test]
408
    fn simple_usage() {
409
        let mut rng = tor_basic_utils::test_rng::testing_rng();
410
        let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
411
        let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
412

            
413
        let mut log = TestReplayLog::new_ephemeral();
414
        // Add everything in group 1.
415
        for msg in &group_1 {
416
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
417
        }
418
        // Make sure that everything in group 1 is still there.
419
        for msg in &group_1 {
420
            assert!(log.check_for_replay(msg).is_err());
421
        }
422
        // Make sure that group 2 is detected as not-there.
423
        for msg in &group_2 {
424
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
425
        }
426
    }
427

            
428
    const TEST_TEMP_SUBDIR: &str = "replaylog";
429

            
430
    fn create_logged(dir: &TestTempDir) -> TestTempDirGuard<TestReplayLog> {
431
        dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
432
            let inst = mk_state_instance(&dir, "allium");
433
            let raw = inst.raw_subdir("iptreplay").unwrap();
434
            TestReplayLog::new_logged(&raw, &IptLocalId::dummy(1)).unwrap()
435
        })
436
    }
437

            
438
    /// Basic tests on an persistent IptReplayLog.
439
    #[test]
440
    fn logging_basics() {
441
        let mut rng = tor_basic_utils::test_rng::testing_rng();
442
        let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
443
        let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
444

            
445
        let dir = test_temp_dir!();
446
        let mut log = create_logged(&dir);
447
        // Add everything in group 1, then close and reload.
448
        for msg in &group_1 {
449
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
450
        }
451
        drop(log);
452
        let mut log = create_logged(&dir);
453
        // Make sure everything in group 1 is still there.
454
        for msg in &group_1 {
455
            assert!(log.check_for_replay(msg).is_err());
456
        }
457
        // Now add everything in group 2, then close and reload.
458
        for msg in &group_2 {
459
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
460
        }
461
        drop(log);
462
        let mut log = create_logged(&dir);
463
        // Make sure that groups 1 and 2 are still there.
464
        for msg in group_1.iter().chain(group_2.iter()) {
465
            assert!(log.check_for_replay(msg).is_err());
466
        }
467
    }
468

            
469
    /// Test for a log that gets truncated mid-write.
470
    #[test]
471
    fn test_truncated() {
472
        let mut rng = tor_basic_utils::test_rng::testing_rng();
473
        let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
474
        let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
475

            
476
        let dir = test_temp_dir!();
477
        let mut log = create_logged(&dir);
478
        for msg in &group_1 {
479
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
480
        }
481
        drop(log);
482
        // Truncate the file by 7 bytes.
483
        dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
484
            let path = dir.join(format!("hss/allium/iptreplay/{}.bin", IptLocalId::dummy(1)));
485
            let file = OpenOptions::new().write(true).open(path).unwrap();
486
            // Make sure that the file has the length we expect.
487
            let expected_len = MAGIC_LEN + OUTPUT_LEN * group_1.len();
488
            assert_eq!(expected_len as u64, file.metadata().unwrap().len());
489
            file.set_len((expected_len - 7) as u64).unwrap();
490
        });
491
        // Now, reload the log. We should be able to recover every non-truncated
492
        // item...
493
        let mut log = create_logged(&dir);
494
        for msg in &group_1[..group_1.len() - 1] {
495
            assert!(log.check_for_replay(msg).is_err());
496
        }
497
        // But not the last one, which we truncated.  (Checking will add it, though.)
498
        assert!(
499
            log.check_for_replay(&group_1[group_1.len() - 1]).is_ok(),
500
            "False positive"
501
        );
502
        // Now add everything in group 2, then close and reload.
503
        for msg in &group_2 {
504
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
505
        }
506
        drop(log);
507
        let mut log = create_logged(&dir);
508
        // Make sure that groups 1 and 2 are still there.
509
        for msg in group_1.iter().chain(group_2.iter()) {
510
            assert!(log.check_for_replay(msg).is_err());
511
        }
512
    }
513

            
514
    /// Test for a partial write
515
    #[test]
516
    #[cfg(target_os = "linux")] // different platforms have different definitions of sigaction
517
    fn test_partial_write() {
518
        use std::env;
519
        use std::os::unix::process::ExitStatusExt;
520
        use std::process::Command;
521

            
522
        // TODO this contraption should perhaps be productised and put somewhere else
523

            
524
        const ENV_NAME: &str = "TOR_HSSERVICE_TEST_PARTIAL_WRITE_SUBPROCESS";
525
        // for a wait status different from any of libtest's
526
        const GOOD_SIGNAL: i32 = libc::SIGUSR2;
527

            
528
        let sigemptyset = || unsafe {
529
            let mut set = MaybeUninit::uninit();
530
            libc::sigemptyset(set.as_mut_ptr());
531
            set.assume_init()
532
        };
533

            
534
        // Check that SIGUSR2 starts out as SIG_DFL and unblocked
535
        //
536
        // We *reject* such situations, rather than fixing them up, because this is an
537
        // irregular and broken environment that can cause arbitrarily weird behaviours.
538
        // Programs on Unix are entitled to assume that their signal dispositions are
539
        // SIG_DFL on entry, with signals unblocked.  (With a few exceptions.)
540
        //
541
        // So we want to detect and report any such environment, not let it slide.
542
        unsafe {
543
            let mut sa = MaybeUninit::uninit();
544
            let r = libc::sigaction(GOOD_SIGNAL, ptr::null(), sa.as_mut_ptr());
545
            assert_eq!(r, 0);
546
            let sa = sa.assume_init();
547
            assert_eq!(
548
                sa.sa_sigaction,
549
                libc::SIG_DFL,
550
                "tests running in broken environment (SIGUSR2 not SIG_DFL)"
551
            );
552

            
553
            let empty_set = sigemptyset();
554
            let mut current_set = MaybeUninit::uninit();
555
            let r = libc::sigprocmask(
556
                libc::SIG_UNBLOCK,
557
                (&empty_set) as _,
558
                current_set.as_mut_ptr(),
559
            );
560
            assert_eq!(r, 0);
561
            let current_set = current_set.assume_init();
562
            let blocked = libc::sigismember((&current_set) as _, GOOD_SIGNAL);
563
            assert_eq!(
564
                blocked, 0,
565
                "tests running in broken environment (SIGUSR2 blocked)"
566
            );
567
        }
568

            
569
        match env::var(ENV_NAME) {
570
            Err(env::VarError::NotPresent) => {
571
                eprintln!("in test runner process, forking..,");
572
                let output = Command::new(env::current_exe().unwrap())
573
                    .args(["--nocapture", "replay::test::test_partial_write"])
574
                    .env(ENV_NAME, "1")
575
                    .output()
576
                    .unwrap();
577
                let print_output = |prefix, data| match std::str::from_utf8(data) {
578
                    Ok(s) => {
579
                        for l in s.split("\n") {
580
                            eprintln!(" {prefix} {l}");
581
                        }
582
                    }
583
                    Err(e) => eprintln!(" UTF-8 ERROR {prefix} {e}"),
584
                };
585
                print_output("!", &output.stdout);
586
                print_output(">", &output.stderr);
587
                let st = output.status;
588
                eprintln!("reaped actual test process {st:?} (expecting signal {GOOD_SIGNAL})");
589
                assert_eq!(st.signal(), Some(GOOD_SIGNAL));
590
                return;
591
            }
592
            Ok(y) if y == "1" => {}
593
            other => panic!("bad env var {ENV_NAME:?} {other:?}"),
594
        };
595

            
596
        // Now we are in our own process, and can mess about with ulimit etc.
597

            
598
        use std::fs;
599
        use std::mem::MaybeUninit;
600
        use std::ptr;
601

            
602
        fn set_ulimit(size: usize) {
603
            unsafe {
604
                use libc::RLIMIT_FSIZE;
605
                let mut rlim = libc::rlimit {
606
                    rlim_cur: 0,
607
                    rlim_max: 0,
608
                };
609
                let r = libc::getrlimit(RLIMIT_FSIZE, (&mut rlim) as _);
610
                assert_eq!(r, 0);
611
                rlim.rlim_cur = size.try_into().unwrap();
612
                let r = libc::setrlimit(RLIMIT_FSIZE, (&rlim) as _);
613
                assert_eq!(r, 0);
614
            }
615
        }
616

            
617
        // This test is quite complicated.
618
        //
619
        // We want to test partial writes.  We could perhaps have done this by
620
        // parameterising IptReplayLog so it could have something other than File,
621
        // but that would probably leak into the public API.
622
        //
623
        // Instead, we cause *actual* partial writes.  We use the Unix setrlimit
624
        // call to limit the size of files our process is allowed to write.
625
        // This causes the underlying write(2) calls to (i) generate SIGXFSZ
626
        // (ii) if that doesn't kill the process, return partial writes.
627

            
628
        test_temp_dir!().used_by(|dir| {
629
            let path = dir.join("test.log");
630
            let lock = LockFileGuard::lock(dir.join("dummy.lock")).unwrap();
631
            let lock = Arc::new(lock);
632
            let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
633

            
634
            const BUF: usize = 8192; // BufWriter default; if that changes, test will break
635

            
636
            // We let ourselves write one whole buffer plus an odd amount of extra
637
            const ALLOW: usize = BUF + 37;
638

            
639
            // Ignore SIGXFSZ (default disposition is for exceeding the rlimit to kill us)
640
            unsafe {
641
                let sa = libc::sigaction {
642
                    sa_sigaction: libc::SIG_IGN,
643
                    sa_mask: sigemptyset(),
644
                    sa_flags: 0,
645
                    sa_restorer: None,
646
                };
647
                let r = libc::sigaction(libc::SIGXFSZ, (&sa) as _, ptr::null_mut());
648
                assert_eq!(r, 0);
649
            }
650

            
651
            let demand_efbig = |e| match e {
652
                // TODO MSRV 1.83: replace with io::ErrorKind::FileTooLarge?
653
                ReplayError::Log(e) if e.raw_os_error() == Some(libc::EFBIG) => {}
654
                other => panic!("expected EFBUG, got {other:?}"),
655
            };
656

            
657
            // Generate a distinct message given a phase and a counter
658
            #[allow(clippy::identity_op)]
659
            let mk_msg = |phase: u8, i: usize| {
660
                let i = u32::try_from(i).unwrap();
661
                let mut msg = [0_u8; OUTPUT_LEN];
662
                msg[0] = phase;
663
                msg[1] = phase;
664
                msg[4] = (i >> 24) as _;
665
                msg[5] = (i >> 16) as _;
666
                msg[6] = (i >> 8) as _;
667
                msg[7] = (i >> 0) as _;
668
                msg
669
            };
670

            
671
            // Number of hashes we can write to the file before failure occurs
672
            const CAN_DO: usize = (ALLOW + BUF - MAGIC_LEN) / OUTPUT_LEN;
673
            dbg!(MAGIC_LEN, OUTPUT_LEN, BUF, ALLOW, CAN_DO);
674

            
675
            // Record of the hashes that TestReplayLog tells us were OK and not replays;
676
            // ie, which it therefore ought to have recorded.
677
            let mut gave_ok = Vec::new();
678

            
679
            set_ulimit(ALLOW);
680

            
681
            for i in 0..CAN_DO {
682
                let h = mk_msg(b'y', i);
683
                rl.check_for_replay(&h).unwrap();
684
                gave_ok.push(h);
685
            }
686

            
687
            let md = fs::metadata(&path).unwrap();
688
            dbg!(md.len(), &rl.file);
689

            
690
            // Now we have written what we can.  The next two calls will fail,
691
            // since the BufWriter buffer is full and can't be flushed.
692

            
693
            for i in 0..2 {
694
                eprintln!("expecting EFBIG {i}");
695
                demand_efbig(rl.check_for_replay(&mk_msg(b'n', i)).unwrap_err());
696
                let md = fs::metadata(&path).unwrap();
697
                assert_eq!(md.len(), u64::try_from(ALLOW).unwrap());
698
            }
699

            
700
            // Enough that we don't get any further file size exceedances
701
            set_ulimit(ALLOW * 10);
702

            
703
            // Now we should be able to recover.  We write two more hashes.
704
            for i in 0..2 {
705
                eprintln!("recovering {i}");
706
                let h = mk_msg(b'r', i);
707
                rl.check_for_replay(&h).unwrap();
708
                gave_ok.push(h);
709
            }
710

            
711
            // flush explicitly just so we catch any error
712
            // (drop would flush, but it can't report errors)
713
            rl.flush().unwrap();
714
            drop(rl);
715

            
716
            // Reopen the log - reading in the written data.
717
            // We can then check that everything the earlier IptReplayLog
718
            // claimed to have written, is indeed recorded.
719

            
720
            let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
721
            for msg in &gave_ok {
722
                match rl.check_for_replay(msg) {
723
                    Err(ReplayError::AlreadySeen) => {}
724
                    other => panic!("expected AlreadySeen, got {other:?}"),
725
                }
726
            }
727

            
728
            eprintln!("recovered file contents checked, all good");
729
        });
730

            
731
        unsafe {
732
            libc::raise(libc::SIGUSR2);
733
        }
734
        panic!("we survived raise SIGUSR2");
735
    }
736
}