1
//! Tests for bridge descriptor downloading
2

            
3
// @@ begin test lint list maintained by maint/add_warning @@
4
#![allow(clippy::bool_assert_comparison)]
5
#![allow(clippy::clone_on_copy)]
6
#![allow(clippy::dbg_macro)]
7
#![allow(clippy::mixed_attributes_style)]
8
#![allow(clippy::print_stderr)]
9
#![allow(clippy::print_stdout)]
10
#![allow(clippy::single_char_pattern)]
11
#![allow(clippy::unwrap_used)]
12
#![allow(clippy::unchecked_duration_subtraction)]
13
#![allow(clippy::useless_vec)]
14
#![allow(clippy::needless_pass_by_value)]
15
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
16

            
17
use std::future::Future;
18
use std::iter;
19
use std::time::UNIX_EPOCH;
20

            
21
use futures::select_biased;
22
use futures::stream::FusedStream;
23
use futures::Stream;
24
use itertools::{chain, Itertools};
25
use tempfile::TempDir;
26
use time::OffsetDateTime;
27
use tracing_test::traced_test;
28

            
29
use tor_linkspec::HasAddrs;
30
use tor_rtcompat::SleepProvider;
31
use tor_rtmock::simple_time::SimpleMockTimeProvider;
32
use tor_rtmock::MockRuntime;
33

            
34
use super::*;
35

            
36
const EXAMPLE_DESCRIPTOR: &str = include_str!("../../testdata/routerdesc1.txt");
37
const EXAMPLE_PORT: u16 = 9001;
38

            
39
24
fn example_validity() -> (SystemTime, SystemTime) {
40
24
    let (_, (t, u)) = RouterDesc::parse(EXAMPLE_DESCRIPTOR)
41
24
        .unwrap()
42
24
        .dangerously_assume_wellsigned()
43
24
        .dangerously_into_parts();
44
60
    let ret = |tb| match tb {
45
48
        Some(t) => t,
46
        None => panic!("Time range does not have a starting bound"),
47
48
    };
48
24
    (ret(t), ret(u))
49
24
}
50
16
fn example_wallclock() -> SystemTime {
51
16
    example_validity().0 + Duration::from_secs(10)
52
16
}
53

            
54
type R = MockRuntime;
55
type M = Mock;
56
type Bdm = BridgeDescMgr<R, M>;
57
type RT = RetryTime;
58
use Error::TestError as TE;
59

            
60
#[derive(Debug, Clone)]
61
struct Mock {
62
    sleep: SimpleMockTimeProvider,
63

            
64
    // Using an async mutex lets us block a call to `download`
65
    // so we can see what the state is mid-download.
66
    mstate: Arc<futures::lock::Mutex<MockState>>,
67
}
68

            
69
const MOCK_NOT_MODIFIED: &str = "IF-MODIFIED-SINCE ";
70

            
71
struct MockState {
72
    /// Maps the port number for a download, to what we should return
73
    ///
74
    /// If the Ok string starts with `MOCK_NOT_MODIFIED` then the rest is the Debug
75
    /// output from a SystemTime.   In this case the manager is supposed to pass
76
    /// `if_modified_since` as `Some(that SystemTime)`, and we will actually return `None`.
77
    ///
78
    /// Otherwise the `if_modified_since` from the manager will be ignored
79
    /// and we always give it Some.
80
    docs: HashMap<u16, Result<String, Error>>,
81

            
82
    download_calls: usize,
83
}
84

            
85
impl Mockable<R> for Mock {}
86

            
87
#[async_trait]
88
impl mockable::MockableAPI<R> for Mock {
89
    type CircMgr = ();
90

            
91
    async fn download(
92
        self,
93
        _runtime: &R,
94
        _circmgr: &Self::CircMgr,
95
        bridge: &BridgeConfig,
96
        if_modified_since: Option<SystemTime>,
97
88
    ) -> Result<Option<String>, Error> {
98
88
        eprint!("download ...");
99
88
        let mut mstate = self.mstate.lock().await;
100
88
        mstate.download_calls += 1;
101
88
        eprintln!("#{} {:?}", mstate.download_calls, bridge);
102
88
        let addr = bridge
103
88
            .addrs()
104
88
            .first()
105
88
            .ok_or(TE("bridge has no error", RT::Never))?;
106
88
        let doc = mstate
107
88
            .docs
108
88
            .get(&addr.port())
109
88
            .ok_or(TE("no document", RT::AfterWaiting))?;
110
24
        doc.clone().map(|text| {
111
24
            if let Some(expect_ims) = text.strip_prefix(MOCK_NOT_MODIFIED) {
112
4
                eprintln!("#{} {:?}", mstate.download_calls, text);
113
4
                assert_eq!(format!("{:?}", if_modified_since.unwrap()), expect_ims,);
114
4
                None
115
            } else {
116
20
                Some(text)
117
            }
118
24
        })
119
176
    }
120
}
121

            
122
impl Mock {
123
60
    async fn expect_download_calls(&self, expected: usize) {
124
40
        let mut mstate = self.mstate.lock().await;
125
40
        assert_eq!(mstate.download_calls, expected);
126
40
        mstate.download_calls = 0;
127
40
    }
128
}
129

            
130
16
fn setup(runtime: MockRuntime) -> (TempDir, Bdm, R, M, BridgeKey, rusqlite::Connection) {
131
16
    let sleep = runtime.mock_sleep().clone();
132
16
    sleep.jump_wallclock(example_wallclock());
133
16

            
134
16
    let mut docs = HashMap::new();
135
16
    docs.insert(EXAMPLE_PORT, Ok(EXAMPLE_DESCRIPTOR.into()));
136
16

            
137
16
    let mstate = Arc::new(futures::lock::Mutex::new(MockState {
138
16
        docs,
139
16
        download_calls: 0,
140
16
    }));
141
16

            
142
16
    let mock = Mock { sleep, mstate };
143
16

            
144
16
    let (db_tmp_dir, store) = crate::storage::sqlite::test::new_empty().unwrap();
145
16
    let store = Arc::new(Mutex::new(Box::new(store) as _));
146
16

            
147
16
    let sql_path = db_tmp_dir.path().join("db.sql");
148
16
    let conn = rusqlite::Connection::open(sql_path).unwrap();
149
16

            
150
16
    let bdm = BridgeDescMgr::<R, M>::new_internal(
151
16
        runtime.clone(),
152
16
        (),
153
16
        store,
154
16
        &Default::default(),
155
16
        Dormancy::Active,
156
16
        mock.clone(),
157
16
    )
158
16
    .unwrap();
159
16

            
160
16
    let bridge = "51.68.172.83:9001 EB6EFB27F29AC9511A4246D7ABE1AFABFB416FF1"
161
16
        .parse()
162
16
        .unwrap();
163
16

            
164
16
    (db_tmp_dir, bdm, runtime, mock, bridge, conn)
165
16
}
166

            
167
20
async fn stream_drain_ready<S: Stream + Unpin + FusedStream>(s: &mut S) -> usize {
168
20
    let mut count = 0;
169
24
    while select_biased! {
170
24
        _ = s.next() => true,
171
24
        () = future::ready(()) => false,
172
    } {
173
4
        tor_rtcompat::task::yield_now().await;
174
4
        count += 1;
175
    }
176
20
    count
177
20
}
178

            
179
48
async fn stream_drain_until<S, F, FF, Y>(attempts: usize, s: &mut S, mut f: F) -> Y
180
48
where
181
48
    S: Stream + Unpin + FusedStream,
182
48
    S::Item: Debug,
183
48
    F: FnMut() -> FF,
184
48
    FF: Future<Output = Option<Y>>,
185
48
{
186
48
    for _ in 0..attempts {
187
72
        let event = s.next().await;
188
72
        eprintln!("stream_drain_until, got {:?}", event);
189

            
190
72
        if let Some(y) = f().await {
191
48
            return y;
192
24
        }
193
    }
194
    panic!("untilness didn't occur");
195
48
}
196

            
197
12
fn queues_are_empty(bdm: &Bdm) -> Option<()> {
198
12
    let state = bdm.mgr.lock_only();
199
12
    (state.running.is_empty() && state.queued.is_empty()).then_some(())
200
12
}
201

            
202
24
fn in_results(bdm: &Bdm, bridge: &BridgeKey, wanted: Option<Result<(), ()>>) -> Option<()> {
203
24
    let bridges = bdm.bridges();
204
24
    let got = bridges.get(bridge);
205
32
    let got = got.map(|got| got.as_ref().map(|_| ()).map_err(|_| ()));
206
24
    (got == wanted).then_some(())
207
24
}
208

            
209
8
async fn clear_and_re_request<S>(bdm: &Bdm, events: &mut S, bridge: &BridgeKey)
210
8
where
211
8
    S: Stream + Unpin + FusedStream,
212
8
    S::Item: Debug,
213
8
{
214
8
    bdm.set_bridges(&[]);
215
8
    stream_drain_until(3, events, || async {
216
8
        in_results(bdm, bridge, None)
217
8
            .and_then(|()| bdm.mgr.lock_only().running.is_empty().then_some(()))
218
8
    })
219
8
    .await;
220
8
    bdm.set_bridges(&[bridge.clone()]);
221
8
}
222

            
223
24
fn bad_bridge(i: usize) -> BridgeKey {
224
24
    let bad = format!("192.126.0.1:{} EB6EFB27F29AC9511A4246D7ABE1AFABFB416FF1", i);
225
24
    let bad: BridgeConfig = bad.parse().unwrap();
226
24
    bad
227
24
}
228

            
229
#[traced_test]
230
#[test]
231
2
fn success() -> Result<(), anyhow::Error> {
232
5
    MockRuntime::try_test_with_various(|runtime| async {
233
4
        let (_db_tmp_dir, bdm, runtime, mock, bridge, ..) = setup(runtime);
234
4

            
235
4
        bdm.check_consistency(Some([]));
236
4

            
237
4
        let mut events = bdm.events().fuse();
238
4

            
239
4
        eprintln!("----- test downloading one descriptor -----");
240
4

            
241
4
        stream_drain_ready(&mut events).await;
242

            
243
4
        let hold = mock.mstate.lock().await;
244

            
245
4
        bdm.set_bridges(&[bridge.clone()]);
246
4
        bdm.check_consistency(Some([&bridge]));
247
4

            
248
4
        drop(hold);
249

            
250
4
        let got = stream_drain_until(3, &mut events, || async {
251
4
            bdm.bridges().get(&bridge).cloned()
252
4
        })
253
4
        .await;
254

            
255
4
        dbg!(runtime.wallclock(), example_validity(),);
256
4

            
257
4
        eprintln!("got: {:?}", got.unwrap());
258
4

            
259
4
        bdm.check_consistency(Some([&bridge]));
260
4
        mock.expect_download_calls(1).await;
261

            
262
4
        eprintln!("----- add a number of failing descriptors -----");
263

            
264
        const NFAIL: usize = 6;
265

            
266
4
        let bad = (1..=NFAIL).map(bad_bridge).collect_vec();
267
4

            
268
4
        let mut bridges = chain!(iter::once(bridge.clone()), bad.iter().cloned(),).collect_vec();
269

            
270
4
        let hold = mock.mstate.lock().await;
271

            
272
4
        bdm.set_bridges(&bridges);
273
4
        bdm.check_consistency(Some(&bridges));
274
4

            
275
4
        drop(hold);
276
4

            
277
12
        let () = stream_drain_until(13, &mut events, || async {
278
12
            bdm.check_consistency(Some(&bridges));
279
12
            bridges
280
12
                .iter()
281
48
                .all(|b| bdm.bridges().contains_key(b))
282
12
                .then_some(())
283
12
        })
284
4
        .await;
285

            
286
28
        for b in &bad {
287
24
            bdm.bridges().get(b).unwrap().as_ref().unwrap_err();
288
24
        }
289

            
290
4
        bdm.check_consistency(Some(&bridges));
291
4
        mock.expect_download_calls(NFAIL).await;
292

            
293
4
        eprintln!("----- move the clock forward to do some retries ----------");
294
4

            
295
4
        mock.sleep.advance(Duration::from_secs(5000));
296
4

            
297
4
        bdm.check_consistency(Some(&bridges));
298
4

            
299
12
        let () = stream_drain_until(13, &mut events, || async {
300
12
            bdm.check_consistency(Some(&bridges));
301
12
            (mock.mstate.lock().await.download_calls == NFAIL).then_some(())
302
12
        })
303
4
        .await;
304

            
305
4
        stream_drain_ready(&mut events).await;
306

            
307
4
        bdm.check_consistency(Some(&bridges));
308
4
        mock.expect_download_calls(NFAIL).await;
309

            
310
4
        eprintln!("----- set the bridges to the ones we have already ----------");
311

            
312
4
        let hold = mock.mstate.lock().await;
313

            
314
4
        bdm.set_bridges(&bridges);
315
4
        bdm.check_consistency(Some(&bridges));
316
4

            
317
4
        drop(hold);
318

            
319
4
        let events_counted = stream_drain_ready(&mut events).await;
320
4
        assert_eq!(events_counted, 0);
321
4
        bdm.check_consistency(Some(&bridges));
322
4
        mock.expect_download_calls(0).await;
323

            
324
4
        eprintln!("----- set the bridges to one fewer than we have already ----------");
325
4

            
326
4
        let _ = bridges.pop().unwrap();
327

            
328
4
        let hold = mock.mstate.lock().await;
329

            
330
4
        bdm.set_bridges(&bridges);
331
4
        bdm.check_consistency(Some(&bridges));
332
4

            
333
4
        drop(hold);
334

            
335
4
        let events_counted = stream_drain_ready(&mut events).await;
336
4
        assert_eq!(events_counted, 1);
337
4
        bdm.check_consistency(Some(&bridges));
338
4
        mock.expect_download_calls(0).await;
339

            
340
4
        eprintln!("----- remove a bridge while we have some requeued ----------");
341

            
342
4
        let hold = mock.mstate.lock().await;
343

            
344
4
        mock.sleep.advance(Duration::from_secs(8000));
345
4
        bdm.check_consistency(Some(&bridges));
346

            
347
        // should yield, but not produce any events yet
348
4
        let count = stream_drain_ready(&mut events).await;
349
4
        assert_eq!(count, 0);
350
4
        bdm.check_consistency(Some(&bridges));
351
4

            
352
4
        let removed = bridges.pop().unwrap();
353
4
        bdm.set_bridges(&bridges);
354
4

            
355
4
        // should produce a removed bridge event
356
4
        let () = stream_drain_until(1, &mut events, || async {
357
4
            bdm.check_consistency(Some(&bridges));
358
4
            (!bdm.bridges().contains_key(&removed)).then_some(())
359
4
        })
360
4
        .await;
361

            
362
4
        drop(hold);
363
4

            
364
4
        // Check that queues become empty.
365
4
        // Depending on scheduling, there may be tasks still live from the work above.
366
4
        // For example, one of the requeues might be still running after we did the remove.
367
4
        // So we may get a number of change events.  Certainly not more than 10.
368
12
        let () = stream_drain_until(10, &mut events, || async {
369
12
            bdm.check_consistency(Some(&bridges));
370
12
            queues_are_empty(&bdm)
371
12
        })
372
4
        .await;
373

            
374
        {
375
            // When we cancel the download, we race with the manager.
376
            // Maybe the download for the one we removed was started, or maybe not.
377
4
            let mut mstate = mock.mstate.lock().await;
378
4
            assert!(
379
4
                ((NFAIL - 1)..=NFAIL).contains(&mstate.download_calls),
380
                "{:?}",
381
                mstate.download_calls
382
            );
383
4
            mstate.download_calls = 0;
384
4
        }
385
4

            
386
4
        Ok(())
387
5
    })
388
2
}
389

            
390
2
#[traced_test]
391
#[test]
392
2
fn cache() -> Result<(), anyhow::Error> {
393
5
    MockRuntime::try_test_with_various(|runtime| async {
394
4
        let (_db_tmp_path, bdm, runtime, mock, bridge, sql_conn, ..) = setup(runtime);
395
4
        let mut events = bdm.events().fuse();
396
4

            
397
12
        let in_results = |wanted| in_results(&bdm, &bridge, wanted);
398

            
399
4
        eprintln!("----- test that a downloaded descriptor goes into the cache -----");
400
4

            
401
4
        bdm.set_bridges(&[bridge.clone()]);
402
4
        stream_drain_until(3, &mut events, || async { in_results(Some(Ok(()))) }).await;
403

            
404
4
        mock.expect_download_calls(1).await;
405

            
406
4
        sql_conn
407
4
            .query_row("SELECT * FROM BridgeDescs", [], |row| {
408
4
                let get_time =
409
8
                    |f| -> SystemTime { row.get_unwrap::<&str, OffsetDateTime>(f).into() };
410
4
                let bline: String = row.get_unwrap("bridge_line");
411
4
                let fetched: SystemTime = get_time("fetched");
412
4
                let until: SystemTime = get_time("until");
413
4
                let contents: String = row.get_unwrap("contents");
414
4
                let now = runtime.wallclock();
415
4
                assert_eq!(bline, bridge.to_string());
416
4
                assert!(fetched <= now);
417
4
                assert!(now < until);
418
4
                assert_eq!(contents, EXAMPLE_DESCRIPTOR);
419
4
                Ok(())
420
4
            })
421
4
            .unwrap();
422
4

            
423
4
        eprintln!("----- forget the descriptor and try to reload it from the cache -----");
424
4

            
425
4
        clear_and_re_request(&bdm, &mut events, &bridge).await;
426
4
        stream_drain_until(3, &mut events, || async { in_results(Some(Ok(()))) }).await;
427

            
428
        // Should not have been re-downloaded, since the fetch time is great.
429
4
        mock.expect_download_calls(0).await;
430

            
431
4
        eprintln!("----- corrupt the cache and check we re-download -----");
432
4

            
433
4
        sql_conn
434
4
            .execute_batch("UPDATE BridgeDescs SET contents = 'garbage'")
435
4
            .unwrap();
436
4

            
437
4
        clear_and_re_request(&bdm, &mut events, &bridge).await;
438
4
        stream_drain_until(3, &mut events, || async { in_results(Some(Ok(()))) }).await;
439

            
440
4
        mock.expect_download_calls(1).await;
441

            
442
4
        eprintln!("----- advance the lock and check that we do an if-modified-since -----");
443
4

            
444
4
        let published = bdm
445
4
            .bridges()
446
4
            .get(&bridge)
447
4
            .unwrap()
448
4
            .as_ref()
449
4
            .unwrap()
450
4
            .as_ref()
451
4
            .published();
452
4

            
453
4
        mock.mstate.lock().await.docs.insert(
454
4
            EXAMPLE_PORT,
455
4
            Ok(format!("{}{:?}", MOCK_NOT_MODIFIED, published)),
456
4
        );
457
4

            
458
4
        // Exceeds default max_refetch
459
4
        mock.sleep.advance(Duration::from_secs(20000));
460
4

            
461
4
        stream_drain_until(3, &mut events, || async {
462
4
            (mock.mstate.lock().await.download_calls > 0).then_some(())
463
4
        })
464
4
        .await;
465

            
466
4
        mock.expect_download_calls(1).await;
467

            
468
4
        Ok(())
469
5
    })
470
2
}
471

            
472
#[traced_test]
473
#[test]
474
2
fn dormant() -> Result<(), anyhow::Error> {
475
5
    MockRuntime::try_test_with_various(|runtime| async {
476
4
        #[allow(unused_variables)] // avoids churn and makes all of these identical
477
4
        let (db_tmp_path, bdm, runtime, mock, bridge, sql_conn, ..) = setup(runtime);
478
4
        let mut events = bdm.events().fuse();
479

            
480
        use Dormancy::*;
481

            
482
4
        eprintln!("----- become dormant, but request a bridge -----");
483
4
        bdm.set_dormancy(Dormant);
484
4
        bdm.set_bridges(&[bridge.clone()]);
485
4

            
486
4
        // Drive all tasks until we are idle
487
4
        runtime.progress_until_stalled().await;
488

            
489
4
        eprintln!("----- become active -----");
490
4
        bdm.set_dormancy(Active);
491
4
        // This should immediately trigger the download:
492
4

            
493
4
        stream_drain_until(3, &mut events, || async {
494
4
            in_results(&bdm, &bridge, Some(Ok(())))
495
4
        })
496
4
        .await;
497
4
        mock.expect_download_calls(1).await;
498

            
499
4
        Ok(())
500
5
    })
501
2
}
502

            
503
#[traced_test]
504
#[test]
505
2
fn process_doc() -> Result<(), anyhow::Error> {
506
5
    MockRuntime::try_test_with_various(|runtime| async {
507
4
        #[allow(unused_variables)] // avoids churn and makes all of these identical
508
4
        let (db_tmp_path, bdm, runtime, mock, bridge, sql_conn, ..) = setup(runtime);
509
4

            
510
4
        let text = EXAMPLE_DESCRIPTOR;
511
4
        let config = BridgeDescDownloadConfig::default();
512
4
        let valid = example_validity();
513
4

            
514
88
        let pr_t = |s: &str, t: SystemTime| {
515
88
            let now = runtime.wallclock();
516
88
            eprintln!(
517
88
                "                  {:10} {:?} {:10}",
518
88
                s,
519
88
                t,
520
88
                t.duration_since(UNIX_EPOCH).unwrap().as_secs_f64()
521
88
                    - now.duration_since(UNIX_EPOCH).unwrap().as_secs_f64(),
522
88
            );
523
88
        };
524

            
525
28
        let expecting_of = |text: &str, exp: Result<SystemTime, &str>| {
526
28
            let got = process_document(&runtime, &config, text);
527
28
            match exp {
528
16
                Ok(exp_refetch) => {
529
16
                    let refetch = got.unwrap().refetch;
530
16
                    pr_t("refetch", refetch);
531
16
                    assert_eq!(refetch, exp_refetch);
532
                }
533
12
                Err(exp_msg) => {
534
12
                    let msg = got.as_ref().expect_err(exp_msg).to_string();
535
12
                    assert!(
536
12
                        msg.contains(exp_msg),
537
                        "{:?} {:?} exp={:?}",
538
                        msg,
539
                        got,
540
                        exp_msg
541
                    );
542
                }
543
            }
544
28
        };
545

            
546
20
        let expecting_at = |now: SystemTime, exp| {
547
20
            mock.sleep.jump_wallclock(now);
548
20
            pr_t("now", now);
549
20
            pr_t("valid.0", valid.0);
550
20
            pr_t("valid.1", valid.1);
551
20
            if let Ok(exp) = exp {
552
12
                pr_t("expect", exp);
553
12
            }
554
20
            expecting_of(text, exp);
555
20
        };
556

            
557
4
        let secs = Duration::from_secs;
558
4

            
559
4
        eprintln!("----- good -----");
560
4
        expecting_of(text, Ok(runtime.wallclock() + config.max_refetch));
561
4

            
562
4
        eprintln!("----- modified under signature -----");
563
4
        expecting_of(
564
4
            &text.replace("\nbandwidth 10485760", "\nbandwidth 10485761"),
565
4
            Err("Signature check failed"),
566
4
        );
567
4

            
568
4
        eprintln!("----- doc not yet valid -----");
569
4
        expecting_at(
570
4
            valid.0 - secs(10),
571
4
            Err("Descriptor is outside its validity time"),
572
4
        );
573
4

            
574
4
        eprintln!("----- need to refetch due to doc validity expiring soon -----");
575
4
        expecting_at(valid.1 - secs(5000), Ok(valid.1 - secs(1000)));
576
4

            
577
4
        eprintln!("----- will refetch later than usual, due to min refetch interval -----");
578
4
        {
579
4
            let now = valid.1 - secs(4000); // would want to refetch at valid.1-1000 ie 30000
580
4
            expecting_at(now, Ok(now + config.min_refetch));
581
4
        }
582
4

            
583
4
        eprintln!("----- will refetch after doc validity ends, due to min refetch interval -----");
584
4
        {
585
4
            let now = valid.1 - secs(10);
586
4
            let exp = now + config.min_refetch;
587
4
            assert!(exp > valid.1);
588
4
            expecting_at(now, Ok(exp));
589
4
        }
590
4

            
591
4
        eprintln!("----- expired -----");
592
4
        expecting_at(
593
4
            valid.1 + secs(10),
594
4
            Err("Descriptor is outside its validity time"),
595
4
        );
596
4

            
597
4
        // TODO ideally we would test the unbounded case in process_download's
598
4
        // expiry time handling, but that would require making a document with unbounded
599
4
        // validity time.  Even if that is possible, I don't think we have code in-tree to
600
4
        // make signed test documents.
601
4

            
602
4
        Ok(())
603
5
    })
604
2
}