1
//! Facility for detecting and preventing replays on introduction requests.
2
//!
3
//! If we were to permit the introduction point to replay the same request
4
//! multiple times, it would cause the service to contact the rendezvous point
5
//! again with the same rendezvous cookie as before, which could help with
6
//! traffic analysis.
7
//!
8
//! (This could also be a DoS vector if the introduction point decided to
9
//! overload the service.)
10
//!
11
//! Because we use the same introduction point keys across restarts, we need to
12
//! make sure that our replay logs are already persistent.  We do this by using
13
//! a file on disk.
14

            
15
mod ipt;
16
#[cfg(feature = "hs-pow-full")]
17
mod pow;
18

            
19
use crate::internal_prelude::*;
20

            
21
/// A probabilistic data structure to record fingerprints of observed Introduce2
22
/// messages.
23
///
24
/// We need to record these fingerprints to prevent replay attacks; see the
25
/// module documentation for an explanation of why that would be bad.
26
///
27
/// A ReplayLog should correspond to a `KP_hss_ntor` key, and should have the
28
/// same lifespan: dropping it sooner will enable replays, but dropping it later
29
/// will waste disk and memory.
30
///
31
/// False positives are allowed, to conserve on space.
32
pub(crate) struct ReplayLog<T> {
33
    /// The inner probabilistic data structure.
34
    seen: data::Filter,
35
    /// Persistent state file etc., if we're persistent
36
    ///
37
    /// If is is `None`, this RelayLog is ephemeral.
38
    file: Option<PersistFile>,
39
    /// [`PhantomData`] so rustc doesn't complain about the unused type param.
40
    ///
41
    /// This type represents the type of data that we're storing, as well as the type of the
42
    /// key/name for that data.
43
    replay_log_type: PhantomData<T>,
44
}
45

            
46
/// A [`ReplayLog`] for [`Introduce2`](tor_cell::relaycell::msg::Introduce2) messages.
47
pub(crate) type IptReplayLog = ReplayLog<ipt::IptReplayLogType>;
48

            
49
/// A [`ReplayLog`] for Proof-of-Work [`Nonce`](tor_hscrypto::pow::v1::Nonce)s.
50
#[cfg(feature = "hs-pow-full")]
51
pub(crate) type PowNonceReplayLog = ReplayLog<pow::PowNonceReplayLogType>;
52

            
53
/// The length of the [`ReplayLogType::MAGIC`] constant.
54
///
55
// TODO: If Rust's constant expressions supported generics we wouldn't need this at all.
56
const MAGIC_LEN: usize = 32;
57

            
58
/// The length of the message that we store on disk, in bytes.
59
///
60
/// If the message is longer than this, then we will need to hash or truncate it before storing it
61
/// to disk.
62
///
63
// TODO: Once const generics are good, this should be a associated constant for ReplayLogType.
64
pub(crate) const OUTPUT_LEN: usize = 16;
65

            
66
/// A trait to represent a set of types that ReplayLog can be used with.
67
pub(crate) trait ReplayLogType {
68
    // TODO: It would be nice to encode the directory name as a associated constant here, rather
69
    // than having the external code pass it in to us.
70

            
71
    /// The name of this item, used for the log filename.
72
    type Name;
73

            
74
    /// The type of the messages that we are ensuring the uniqueness of.
75
    type Message;
76

            
77
    /// A magic string that we put at the start of each log file, to make sure that
78
    /// we don't confuse this file format with others.
79
    const MAGIC: &'static [u8; MAGIC_LEN];
80

            
81
    /// Convert [`Self::Name`] to a [`String`]
82
    fn format_filename(name: &Self::Name) -> String;
83

            
84
    /// Convert [`Self::Message`] to bytes that will be stored in the log.
85
    fn transform_message(message: &Self::Message) -> [u8; OUTPUT_LEN];
86

            
87
    /// Parse a filename into [`Self::Name`].
88
    fn parse_log_leafname(leaf: &OsStr) -> Result<Self::Name, Cow<'static, str>>;
89
}
90

            
91
/// Persistent state file, and associated data
92
///
93
/// Stored as `ReplayLog.file`.
94
#[derive(Debug)]
95
pub(crate) struct PersistFile {
96
    /// A file logging fingerprints of the messages we have seen.
97
    file: BufWriter<File>,
98
    /// Whether we had a possible partial write
99
    ///
100
    /// See the comment inside [`ReplayLog::check_for_replay`].
101
    /// `Ok` means all is well.
102
    /// `Err` means we may have written partial data to the actual file,
103
    /// and need to make sure we're back at a record boundary.
104
    needs_resynch: Result<(), ()>,
105
    /// Filesystem lock which must not be released until after we finish writing
106
    ///
107
    /// Must come last so that the drop order is correct
108
    #[allow(dead_code)] // Held just so we unlock on drop
109
    lock: Arc<LockFileGuard>,
110
}
111

            
112
/// Replay log files have a `.bin` suffix.
113
///
114
/// The name of the file is determined by [`ReplayLogType::format_filename`].
115
const REPLAY_LOG_SUFFIX: &str = ".bin";
116

            
117
impl<T: ReplayLogType> ReplayLog<T> {
118
    /// Create a new ReplayLog not backed by any data storage.
119
    #[allow(dead_code)] // TODO #1186 Remove once something uses ReplayLog.
120
2
    pub(crate) fn new_ephemeral() -> Self {
121
2
        Self {
122
2
            seen: data::Filter::new(),
123
2
            file: None,
124
2
            replay_log_type: PhantomData,
125
2
        }
126
2
    }
127

            
128
    /// Create a ReplayLog backed by the file at a given path.
129
    ///
130
    /// If the file already exists, load its contents and append any new
131
    /// contents to it; otherwise, create the file.
132
    ///
133
    /// **`lock` must already have been locked** and this
134
    /// *cannot be assured by the type system*.
135
    ///
136
    /// # Limitations
137
    ///
138
    /// It is the caller's responsibility to make sure that there are never two
139
    /// `ReplayLogs` open at once for the same path, or for two paths that
140
    /// resolve to the same file.
141
60
    pub(crate) fn new_logged(
142
60
        dir: &InstanceRawSubdir,
143
60
        name: &T::Name,
144
60
    ) -> Result<Self, OpenReplayLogError> {
145
60
        let leaf = T::format_filename(name);
146
60
        let path = dir.as_path().join(leaf);
147
60
        let lock_guard = dir.raw_lock_guard();
148

            
149
60
        Self::new_logged_inner(&path, lock_guard).map_err(|error| OpenReplayLogError {
150
            file: path,
151
            error: error.into(),
152
        })
153
60
    }
154

            
155
    /// Inner function for `new_logged`, with reified arguments and raw error type
156
60
    fn new_logged_inner(path: impl AsRef<Path>, lock: Arc<LockFileGuard>) -> io::Result<Self> {
157
60
        let mut file = {
158
60
            let mut options = OpenOptions::new();
159
60
            options.read(true).write(true).create(true);
160

            
161
            #[cfg(target_family = "unix")]
162
            {
163
                use std::os::unix::fs::OpenOptionsExt as _;
164
60
                options.mode(0o600);
165
            }
166

            
167
60
            options.open(path)?
168
        };
169

            
170
        // If the file is new, we need to write the magic string. Else we must
171
        // read it.
172
60
        let file_len = file.metadata()?.len();
173
60
        if file_len == 0 {
174
40
            file.write_all(T::MAGIC)?;
175
        } else {
176
20
            let mut m = [0_u8; MAGIC_LEN];
177
20
            file.read_exact(&mut m)?;
178
20
            if &m != T::MAGIC {
179
                return Err(io::Error::new(
180
                    io::ErrorKind::InvalidData,
181
                    LogContentError::UnrecognizedFormat,
182
                ));
183
20
            }
184

            
185
20
            Self::truncate_to_multiple(&mut file, file_len)?;
186
        }
187

            
188
        // Now read the rest of the file.
189
60
        let mut seen = data::Filter::new();
190
60
        let mut r = BufReader::new(file);
191
        loop {
192
1270
            let mut msg = [0_u8; OUTPUT_LEN];
193
1270
            match r.read_exact(&mut msg) {
194
1210
                Ok(()) => {
195
1210
                    let _ = seen.test_and_add(&msg); // ignore error.
196
1210
                }
197
60
                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
198
                Err(e) => return Err(e),
199
            }
200
        }
201
60
        let mut file = r.into_inner();
202
60
        file.seek(SeekFrom::End(0))?;
203

            
204
60
        let file = PersistFile {
205
60
            file: BufWriter::new(file),
206
60
            needs_resynch: Ok(()),
207
60
            lock,
208
60
        };
209

            
210
60
        Ok(Self {
211
60
            seen,
212
60
            file: Some(file),
213
60
            replay_log_type: PhantomData,
214
60
        })
215
60
    }
216

            
217
    /// Truncate `file` to contain a whole number of records
218
    ///
219
    /// `current_len` should have come from `file.metadata()`.
220
    // If the file's length is not an even multiple of MESSAGE_LEN after the MAGIC, truncate it.
221
20
    fn truncate_to_multiple(file: &mut File, current_len: u64) -> io::Result<()> {
222
20
        let excess = (current_len - T::MAGIC.len() as u64) % (OUTPUT_LEN as u64);
223
20
        if excess != 0 {
224
2
            file.set_len(current_len - excess)?;
225
18
        }
226
20
        Ok(())
227
20
    }
228

            
229
    /// Test whether we have already seen `message`.
230
    ///
231
    /// If we have seen it, return `Err(ReplayError::AlreadySeen)`.  (Since this
232
    /// is a probabilistic data structure, there is a chance of returning this
233
    /// error even if we have we have _not_ seen this particular message)
234
    ///
235
    /// Otherwise, return `Ok(())`.
236
2626
    pub(crate) fn check_for_replay(&mut self, message: &T::Message) -> Result<(), ReplayError> {
237
2626
        let h = T::transform_message(message);
238
2626
        self.seen.test_and_add(&h)?;
239
1214
        if let Some(f) = self.file.as_mut() {
240
810
            (|| {
241
                // If write_all fails, it might have written part of the data;
242
                // in that case, we must truncate the file to resynchronise.
243
                // We set a note to truncate just before we call write_all
244
                // and clear it again afterwards.
245
                //
246
                // But, first, we need to deal with any previous note we left ourselves.
247

            
248
                // (With the current implementation of std::io::BufWriter, this is
249
                // unnecessary, because if the argument to write_all is smaller than
250
                // the buffer size, BufWriter::write_all always just copies to the buffer,
251
                // flushing first if necessary; and when it flushes, it uses write,
252
                // not write_all.  So the use of write_all never causes "lost" data.
253
                // However, this is not a documented guarantee.)
254
810
                match f.needs_resynch {
255
810
                    Ok(()) => {}
256
                    Err(()) => {
257
                        // We're going to reach behind the BufWriter, so we need to make
258
                        // sure it's in synch with the underlying File.
259
                        f.file.flush()?;
260
                        let inner = f.file.get_mut();
261
                        let len = inner.metadata()?.len();
262
                        Self::truncate_to_multiple(inner, len)?;
263
                        // cursor is now past end, must reset (see std::fs::File::set_len)
264
                        inner.seek(SeekFrom::End(0))?;
265
                    }
266
                }
267
810
                f.needs_resynch = Err(());
268

            
269
810
                f.file.write_all(&h[..])?;
270

            
271
810
                f.needs_resynch = Ok(());
272

            
273
810
                Ok(())
274
            })()
275
810
            .map_err(|e| ReplayError::Log(Arc::new(e)))?;
276
404
        }
277
1214
        Ok(())
278
2626
    }
279

            
280
    /// Flush any buffered data to disk.
281
    #[allow(dead_code)] // TODO #1208
282
    pub(crate) fn flush(&mut self) -> Result<(), io::Error> {
283
        if let Some(f) = self.file.as_mut() {
284
            f.file.flush()?;
285
        }
286
        Ok(())
287
    }
288

            
289
    /// Tries to parse a filename in the replay logs directory
290
    ///
291
    /// If the leafname refers to a file that would be created by
292
    /// [`ReplayLog::new_logged`], returns the name as a Rust type.
293
    ///
294
    /// Otherwise returns an error explaining why it isn't,
295
    /// as a plain string (for logging).
296
88
    pub(crate) fn parse_log_leafname(leaf: &OsStr) -> Result<T::Name, Cow<'static, str>> {
297
88
        T::parse_log_leafname(leaf)
298
88
    }
299
}
300

            
301
/// Wrapper around a fast-ish data structure for detecting replays with some
302
/// false positive rate.  Bloom filters, cuckoo filters, and xorf filters are all
303
/// an option here.  You could even use a HashSet.
304
///
305
/// We isolate this code to make it easier to replace.
306
mod data {
307
    use super::{OUTPUT_LEN, ReplayError};
308
    use growable_bloom_filter::GrowableBloom;
309

            
310
    /// A probabilistic membership filter.
311
    pub(super) struct Filter(pub(crate) GrowableBloom);
312

            
313
    impl Filter {
314
        /// Create a new empty filter
315
62
        pub(super) fn new() -> Self {
316
            // TODO: Perhaps we should make the capacity here tunable, based on
317
            // the number of entries we expect.  These values are more or less
318
            // pulled out of thin air.
319
62
            let desired_error_prob = 1.0 / 100_000.0;
320
62
            let est_insertions = 100_000;
321
62
            Filter(GrowableBloom::new(desired_error_prob, est_insertions))
322
62
        }
323

            
324
        /// Try to add `msg` to this filter if it isn't already there.
325
        ///
326
        /// Return Ok(()) or Err(AlreadySeen).
327
3836
        pub(super) fn test_and_add(&mut self, msg: &[u8; OUTPUT_LEN]) -> Result<(), ReplayError> {
328
3836
            if self.0.insert(&msg[..]) {
329
2424
                Ok(())
330
            } else {
331
1412
                Err(ReplayError::AlreadySeen)
332
            }
333
3836
        }
334
    }
335
}
336

            
337
/// A problem that prevents us from reading a ReplayLog from disk.
338
///
339
/// (This only exists so we can wrap it up in an [`io::Error`])
340
#[derive(thiserror::Error, Clone, Debug)]
341
enum LogContentError {
342
    /// The magic number on the log file was incorrect.
343
    #[error("unrecognized data format")]
344
    UnrecognizedFormat,
345
}
346

            
347
/// An error occurred while checking whether we've seen an element before.
348
#[derive(thiserror::Error, Clone, Debug)]
349
pub(crate) enum ReplayError {
350
    /// We have already seen this item.
351
    #[error("Already seen")]
352
    AlreadySeen,
353

            
354
    /// We were unable to record this item in the log.
355
    #[error("Unable to log data")]
356
    Log(Arc<std::io::Error>),
357
}
358

            
359
/// Error occured while opening replay log.
360
#[derive(thiserror::Error, Clone, Debug)]
361
#[error("unable to open replay log: {file:?}")]
362
pub struct OpenReplayLogError {
363
    /// What filesystem object we tried to do it to
364
    pub(crate) file: PathBuf,
365
    /// What happened
366
    #[source]
367
    pub(crate) error: Arc<io::Error>,
368
}
369

            
370
#[cfg(test)]
371
mod test {
372
    // @@ begin test lint list maintained by maint/add_warning @@
373
    #![allow(clippy::bool_assert_comparison)]
374
    #![allow(clippy::clone_on_copy)]
375
    #![allow(clippy::dbg_macro)]
376
    #![allow(clippy::mixed_attributes_style)]
377
    #![allow(clippy::print_stderr)]
378
    #![allow(clippy::print_stdout)]
379
    #![allow(clippy::single_char_pattern)]
380
    #![allow(clippy::unwrap_used)]
381
    #![allow(clippy::unchecked_duration_subtraction)]
382
    #![allow(clippy::useless_vec)]
383
    #![allow(clippy::needless_pass_by_value)]
384
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
385

            
386
    use super::*;
387
    use crate::test::mk_state_instance;
388
    use rand::Rng;
389
    use test_temp_dir::{TestTempDir, TestTempDirGuard, test_temp_dir};
390

            
391
    struct TestReplayLogType;
392

            
393
    type TestReplayLog = ReplayLog<TestReplayLogType>;
394

            
395
    impl ReplayLogType for TestReplayLogType {
396
        type Name = IptLocalId;
397
        type Message = [u8; OUTPUT_LEN];
398

            
399
        const MAGIC: &'static [u8; MAGIC_LEN] = b"<tor test replay>\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
400

            
401
        fn format_filename(name: &IptLocalId) -> String {
402
            format!("{name}{REPLAY_LOG_SUFFIX}")
403
        }
404

            
405
        fn transform_message(message: &[u8; OUTPUT_LEN]) -> [u8; OUTPUT_LEN] {
406
            message.clone()
407
        }
408

            
409
        fn parse_log_leafname(leaf: &OsStr) -> Result<IptLocalId, Cow<'static, str>> {
410
            let leaf = leaf.to_str().ok_or("not proper unicode")?;
411
            let lid = leaf.strip_suffix(REPLAY_LOG_SUFFIX).ok_or("not *.bin")?;
412
            let lid: IptLocalId = lid
413
                .parse()
414
                .map_err(|e: crate::InvalidIptLocalId| e.to_string())?;
415
            Ok(lid)
416
        }
417
    }
418

            
419
    fn rand_msg<R: Rng>(rng: &mut R) -> [u8; OUTPUT_LEN] {
420
        rng.random()
421
    }
422

            
423
    /// Basic tests on an ephemeral IptReplayLog.
424
    #[test]
425
    fn simple_usage() {
426
        let mut rng = tor_basic_utils::test_rng::testing_rng();
427
        let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
428
        let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
429

            
430
        let mut log = TestReplayLog::new_ephemeral();
431
        // Add everything in group 1.
432
        for msg in &group_1 {
433
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
434
        }
435
        // Make sure that everything in group 1 is still there.
436
        for msg in &group_1 {
437
            assert!(log.check_for_replay(msg).is_err());
438
        }
439
        // Make sure that group 2 is detected as not-there.
440
        for msg in &group_2 {
441
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
442
        }
443
    }
444

            
445
    const TEST_TEMP_SUBDIR: &str = "replaylog";
446

            
447
    fn create_logged(dir: &TestTempDir) -> TestTempDirGuard<TestReplayLog> {
448
        dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
449
            let inst = mk_state_instance(&dir, "allium");
450
            let raw = inst.raw_subdir("iptreplay").unwrap();
451
            TestReplayLog::new_logged(&raw, &IptLocalId::dummy(1)).unwrap()
452
        })
453
    }
454

            
455
    /// Basic tests on an persistent IptReplayLog.
456
    #[test]
457
    fn logging_basics() {
458
        let mut rng = tor_basic_utils::test_rng::testing_rng();
459
        let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
460
        let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
461

            
462
        let dir = test_temp_dir!();
463
        let mut log = create_logged(&dir);
464
        // Add everything in group 1, then close and reload.
465
        for msg in &group_1 {
466
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
467
        }
468
        drop(log);
469
        let mut log = create_logged(&dir);
470
        // Make sure everything in group 1 is still there.
471
        for msg in &group_1 {
472
            assert!(log.check_for_replay(msg).is_err());
473
        }
474
        // Now add everything in group 2, then close and reload.
475
        for msg in &group_2 {
476
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
477
        }
478
        drop(log);
479
        let mut log = create_logged(&dir);
480
        // Make sure that groups 1 and 2 are still there.
481
        for msg in group_1.iter().chain(group_2.iter()) {
482
            assert!(log.check_for_replay(msg).is_err());
483
        }
484
    }
485

            
486
    /// Test for a log that gets truncated mid-write.
487
    #[test]
488
    fn test_truncated() {
489
        let mut rng = tor_basic_utils::test_rng::testing_rng();
490
        let group_1: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
491
        let group_2: Vec<_> = (0..=100).map(|_| rand_msg(&mut rng)).collect();
492

            
493
        let dir = test_temp_dir!();
494
        let mut log = create_logged(&dir);
495
        for msg in &group_1 {
496
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
497
        }
498
        drop(log);
499
        // Truncate the file by 7 bytes.
500
        dir.subdir_used_by(TEST_TEMP_SUBDIR, |dir| {
501
            let path = dir.join(format!("hss/allium/iptreplay/{}.bin", IptLocalId::dummy(1)));
502
            let file = OpenOptions::new().write(true).open(path).unwrap();
503
            // Make sure that the file has the length we expect.
504
            let expected_len = MAGIC_LEN + OUTPUT_LEN * group_1.len();
505
            assert_eq!(expected_len as u64, file.metadata().unwrap().len());
506
            file.set_len((expected_len - 7) as u64).unwrap();
507
        });
508
        // Now, reload the log. We should be able to recover every non-truncated
509
        // item...
510
        let mut log = create_logged(&dir);
511
        for msg in &group_1[..group_1.len() - 1] {
512
            assert!(log.check_for_replay(msg).is_err());
513
        }
514
        // But not the last one, which we truncated.  (Checking will add it, though.)
515
        assert!(
516
            log.check_for_replay(&group_1[group_1.len() - 1]).is_ok(),
517
            "False positive"
518
        );
519
        // Now add everything in group 2, then close and reload.
520
        for msg in &group_2 {
521
            assert!(log.check_for_replay(msg).is_ok(), "False positive");
522
        }
523
        drop(log);
524
        let mut log = create_logged(&dir);
525
        // Make sure that groups 1 and 2 are still there.
526
        for msg in group_1.iter().chain(group_2.iter()) {
527
            assert!(log.check_for_replay(msg).is_err());
528
        }
529
    }
530

            
531
    /// Test for a partial write
532
    #[test]
533
    #[cfg(target_os = "linux")] // different platforms have different definitions of sigaction
534
    fn test_partial_write() {
535
        use std::env;
536
        use std::os::unix::process::ExitStatusExt;
537
        use std::process::Command;
538

            
539
        // TODO this contraption should perhaps be productised and put somewhere else
540

            
541
        const ENV_NAME: &str = "TOR_HSSERVICE_TEST_PARTIAL_WRITE_SUBPROCESS";
542
        // for a wait status different from any of libtest's
543
        const GOOD_SIGNAL: i32 = libc::SIGUSR2;
544

            
545
        let sigemptyset = || unsafe {
546
            let mut set = MaybeUninit::uninit();
547
            libc::sigemptyset(set.as_mut_ptr());
548
            set.assume_init()
549
        };
550

            
551
        // Check that SIGUSR2 starts out as SIG_DFL and unblocked
552
        //
553
        // We *reject* such situations, rather than fixing them up, because this is an
554
        // irregular and broken environment that can cause arbitrarily weird behaviours.
555
        // Programs on Unix are entitled to assume that their signal dispositions are
556
        // SIG_DFL on entry, with signals unblocked.  (With a few exceptions.)
557
        //
558
        // So we want to detect and report any such environment, not let it slide.
559
        unsafe {
560
            let mut sa = MaybeUninit::uninit();
561
            let r = libc::sigaction(GOOD_SIGNAL, ptr::null(), sa.as_mut_ptr());
562
            assert_eq!(r, 0);
563
            let sa = sa.assume_init();
564
            assert_eq!(
565
                sa.sa_sigaction,
566
                libc::SIG_DFL,
567
                "tests running in broken environment (SIGUSR2 not SIG_DFL)"
568
            );
569

            
570
            let empty_set = sigemptyset();
571
            let mut current_set = MaybeUninit::uninit();
572
            let r = libc::sigprocmask(
573
                libc::SIG_UNBLOCK,
574
                (&empty_set) as _,
575
                current_set.as_mut_ptr(),
576
            );
577
            assert_eq!(r, 0);
578
            let current_set = current_set.assume_init();
579
            let blocked = libc::sigismember((&current_set) as _, GOOD_SIGNAL);
580
            assert_eq!(
581
                blocked, 0,
582
                "tests running in broken environment (SIGUSR2 blocked)"
583
            );
584
        }
585

            
586
        match env::var(ENV_NAME) {
587
            Err(env::VarError::NotPresent) => {
588
                eprintln!("in test runner process, forking..,");
589
                let output = Command::new(env::current_exe().unwrap())
590
                    .args(["--nocapture", "replay::test::test_partial_write"])
591
                    .env(ENV_NAME, "1")
592
                    .output()
593
                    .unwrap();
594
                let print_output = |prefix, data| match std::str::from_utf8(data) {
595
                    Ok(s) => {
596
                        for l in s.split("\n") {
597
                            eprintln!(" {prefix} {l}");
598
                        }
599
                    }
600
                    Err(e) => eprintln!(" UTF-8 ERROR {prefix} {e}"),
601
                };
602
                print_output("!", &output.stdout);
603
                print_output(">", &output.stderr);
604
                let st = output.status;
605
                eprintln!("reaped actual test process {st:?} (expecting signal {GOOD_SIGNAL})");
606
                assert_eq!(st.signal(), Some(GOOD_SIGNAL));
607
                return;
608
            }
609
            Ok(y) if y == "1" => {}
610
            other => panic!("bad env var {ENV_NAME:?} {other:?}"),
611
        };
612

            
613
        // Now we are in our own process, and can mess about with ulimit etc.
614

            
615
        use std::fs;
616
        use std::mem::MaybeUninit;
617
        use std::ptr;
618

            
619
        fn set_ulimit(size: usize) {
620
            unsafe {
621
                use libc::RLIMIT_FSIZE;
622
                let mut rlim = libc::rlimit {
623
                    rlim_cur: 0,
624
                    rlim_max: 0,
625
                };
626
                let r = libc::getrlimit(RLIMIT_FSIZE, (&mut rlim) as _);
627
                assert_eq!(r, 0);
628
                rlim.rlim_cur = size.try_into().unwrap();
629
                let r = libc::setrlimit(RLIMIT_FSIZE, (&rlim) as _);
630
                assert_eq!(r, 0);
631
            }
632
        }
633

            
634
        // This test is quite complicated.
635
        //
636
        // We want to test partial writes.  We could perhaps have done this by
637
        // parameterising IptReplayLog so it could have something other than File,
638
        // but that would probably leak into the public API.
639
        //
640
        // Instead, we cause *actual* partial writes.  We use the Unix setrlimit
641
        // call to limit the size of files our process is allowed to write.
642
        // This causes the underlying write(2) calls to (i) generate SIGXFSZ
643
        // (ii) if that doesn't kill the process, return partial writes.
644

            
645
        test_temp_dir!().used_by(|dir| {
646
            let path = dir.join("test.log");
647
            let lock = LockFileGuard::lock(dir.join("dummy.lock")).unwrap();
648
            let lock = Arc::new(lock);
649
            let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
650

            
651
            const BUF: usize = 8192; // BufWriter default; if that changes, test will break
652

            
653
            // We let ourselves write one whole buffer plus an odd amount of extra
654
            const ALLOW: usize = BUF + 37;
655

            
656
            // Ignore SIGXFSZ (default disposition is for exceeding the rlimit to kill us)
657
            unsafe {
658
                let sa = libc::sigaction {
659
                    sa_sigaction: libc::SIG_IGN,
660
                    sa_mask: sigemptyset(),
661
                    sa_flags: 0,
662
                    sa_restorer: None,
663
                };
664
                let r = libc::sigaction(libc::SIGXFSZ, (&sa) as _, ptr::null_mut());
665
                assert_eq!(r, 0);
666
            }
667

            
668
            let demand_efbig = |e| match e {
669
                ReplayError::Log(e) if e.kind() == io::ErrorKind::FileTooLarge => {}
670
                other => panic!("expected EFBIG, got {other:?}"),
671
            };
672

            
673
            // Generate a distinct message given a phase and a counter
674
            #[allow(clippy::identity_op)]
675
            let mk_msg = |phase: u8, i: usize| {
676
                let i = u32::try_from(i).unwrap();
677
                let mut msg = [0_u8; OUTPUT_LEN];
678
                msg[0] = phase;
679
                msg[1] = phase;
680
                msg[4] = (i >> 24) as _;
681
                msg[5] = (i >> 16) as _;
682
                msg[6] = (i >> 8) as _;
683
                msg[7] = (i >> 0) as _;
684
                msg
685
            };
686

            
687
            // Number of hashes we can write to the file before failure occurs
688
            const CAN_DO: usize = (ALLOW + BUF - MAGIC_LEN) / OUTPUT_LEN;
689
            dbg!(MAGIC_LEN, OUTPUT_LEN, BUF, ALLOW, CAN_DO);
690

            
691
            // Record of the hashes that TestReplayLog tells us were OK and not replays;
692
            // ie, which it therefore ought to have recorded.
693
            let mut gave_ok = Vec::new();
694

            
695
            set_ulimit(ALLOW);
696

            
697
            for i in 0..CAN_DO {
698
                let h = mk_msg(b'y', i);
699
                rl.check_for_replay(&h).unwrap();
700
                gave_ok.push(h);
701
            }
702

            
703
            let md = fs::metadata(&path).unwrap();
704
            dbg!(md.len(), &rl.file);
705

            
706
            // Now we have written what we can.  The next two calls will fail,
707
            // since the BufWriter buffer is full and can't be flushed.
708

            
709
            for i in 0..2 {
710
                eprintln!("expecting EFBIG {i}");
711
                demand_efbig(rl.check_for_replay(&mk_msg(b'n', i)).unwrap_err());
712
                let md = fs::metadata(&path).unwrap();
713
                assert_eq!(md.len(), u64::try_from(ALLOW).unwrap());
714
            }
715

            
716
            // Enough that we don't get any further file size exceedances
717
            set_ulimit(ALLOW * 10);
718

            
719
            // Now we should be able to recover.  We write two more hashes.
720
            for i in 0..2 {
721
                eprintln!("recovering {i}");
722
                let h = mk_msg(b'r', i);
723
                rl.check_for_replay(&h).unwrap();
724
                gave_ok.push(h);
725
            }
726

            
727
            // flush explicitly just so we catch any error
728
            // (drop would flush, but it can't report errors)
729
            rl.flush().unwrap();
730
            drop(rl);
731

            
732
            // Reopen the log - reading in the written data.
733
            // We can then check that everything the earlier IptReplayLog
734
            // claimed to have written, is indeed recorded.
735

            
736
            let mut rl = TestReplayLog::new_logged_inner(&path, lock.clone()).unwrap();
737
            for msg in &gave_ok {
738
                match rl.check_for_replay(msg) {
739
                    Err(ReplayError::AlreadySeen) => {}
740
                    other => panic!("expected AlreadySeen, got {other:?}"),
741
                }
742
            }
743

            
744
            eprintln!("recovered file contents checked, all good");
745
        });
746

            
747
        unsafe {
748
            libc::raise(libc::SIGUSR2);
749
        }
750
        panic!("we survived raise SIGUSR2");
751
    }
752
}