1
//! Code for exporting events from the channel manager.
2
#![allow(dead_code, unreachable_pub)]
3

            
4
use educe::Educe;
5
use futures::{Stream, StreamExt};
6
use postage::watch;
7
use std::{
8
    fmt,
9
    time::{Duration, Instant},
10
};
11
use tor_basic_utils::skip_fmt;
12

            
13
/// The status of our connection to the internet.
14
#[derive(Default, Debug, Clone)]
15
pub struct ConnStatus {
16
    /// Have we been able to make TCP connections?
17
    ///
18
    /// True if we've been able to make outgoing connections recently.
19
    /// False if we've definitely been failing.
20
    /// None if we haven't succeeded yet, but it's too early to say if
21
    /// that's a problem.
22
    online: Option<bool>,
23

            
24
    /// Have we ever been able to make TLS handshakes and negotiate
25
    /// certificates, _not including timeliness checking_?
26
    ///
27
    /// True if we've been able to make TLS handshakes and talk to Tor relays we
28
    /// like recently. False if we've definitely been failing. None if we
29
    /// haven't succeeded yet, but it's too early to say if that's a problem.
30
    auth_works: Option<bool>,
31

            
32
    /// Have we been able to successfully negotiate full Tor handshakes?
33
    ///
34
    /// True if we've been able to make Tor handshakes recently.
35
    /// False if we've definitely been failing.
36
    /// None if we haven't succeeded yet, but it's too early to say if
37
    /// that's a problem.
38
    handshake_works: Option<bool>,
39
}
40

            
41
/// A problem detected while connecting to the Tor network.
42
#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
43
#[non_exhaustive]
44
pub enum ConnBlockage {
45
    #[display("unable to connect to the internet")]
46
    /// We haven't been able to make successful TCP connections.
47
    NoTcp,
48
    /// We've made TCP connections, but our TLS connections either failed, or
49
    /// got hit by an attempted man-in-the-middle attack.
50
    #[display("our internet connection seems to be filtered")]
51
    NoHandshake,
52
    /// We've made TCP connections, and our TLS connections mostly succeeded,
53
    /// but we encountered failures that are well explained by clock skew,
54
    /// or expired certificates.
55
    #[display("relays all seem to be using expired certificates")]
56
    CertsExpired,
57
}
58

            
59
impl ConnStatus {
60
    /// Return true if this status is equal to `other`.
61
    ///
62
    /// Note:(This would just be a PartialEq implementation, but I'm not sure I
63
    /// want to expose that PartialEq for this struct.)
64
40
    fn eq(&self, other: &ConnStatus) -> bool {
65
40
        self.online == other.online && self.handshake_works == other.handshake_works
66
40
    }
67

            
68
    /// Return true if this status indicates that we can successfully open Tor channels.
69
10
    pub fn usable(&self) -> bool {
70
10
        self.online == Some(true) && self.handshake_works == Some(true)
71
10
    }
72

            
73
    /// Return a float representing "how bootstrapped" we are with respect to
74
    /// connecting to the Tor network, where 0 is "not at all" and 1 is
75
    /// "successful".
76
    ///
77
    /// Callers _should not_ depend on the specific meaning of any particular
78
    /// fraction; we may change these fractions in the future.
79
392
    pub fn frac(&self) -> f32 {
80
6
        match self {
81
            Self {
82
                online: Some(true),
83
                auth_works: Some(true),
84
                handshake_works: Some(true),
85
4
            } => 1.0,
86
            Self {
87
                online: Some(true), ..
88
4
            } => 0.5,
89
384
            _ => 0.0,
90
        }
91
392
    }
92

            
93
    /// Return the cause of why we aren't able to connect to the Tor network,
94
    /// if we think we're stuck.
95
392
    pub fn blockage(&self) -> Option<ConnBlockage> {
96
384
        match self {
97
            Self {
98
                online: Some(false),
99
                ..
100
4
            } => Some(ConnBlockage::NoTcp),
101
            Self {
102
                auth_works: Some(false),
103
                ..
104
4
            } => Some(ConnBlockage::NoHandshake),
105
            Self {
106
                handshake_works: Some(false),
107
                ..
108
            } => Some(ConnBlockage::CertsExpired),
109
384
            _ => None,
110
        }
111
392
    }
112
}
113

            
114
impl fmt::Display for ConnStatus {
115
388
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116
2
        match self {
117
380
            ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
118
            ConnStatus {
119
                online: Some(false),
120
                ..
121
2
            } => write!(f, "unable to connect to the internet"),
122
            ConnStatus {
123
                handshake_works: None,
124
                ..
125
2
            } => write!(f, "handshaking with Tor relays"),
126
            ConnStatus {
127
                auth_works: Some(true),
128
                handshake_works: Some(false),
129
                ..
130
            } => write!(
131
                f,
132
                "unable to handshake with Tor relays, possibly due to clock skew"
133
            ),
134
            ConnStatus {
135
                handshake_works: Some(false),
136
                ..
137
2
            } => write!(f, "unable to handshake with Tor relays"),
138
            ConnStatus {
139
                online: Some(true),
140
                handshake_works: Some(true),
141
                ..
142
2
            } => write!(f, "connecting successfully"),
143
        }
144
388
    }
145
}
146

            
147
/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
148
///
149
/// This stream is lossy; a reader might not see some events on the stream, if
150
/// they are produced faster than the reader can consume.  In that case, the
151
/// reader will see more recent updates, and miss older ones.
152
///
153
/// Note that the bootstrap status is not monotonic: we might become less
154
/// bootstrapped than we were before.  (For example, the internet could go
155
/// down.)
156
#[derive(Clone, Educe)]
157
#[educe(Debug)]
158
pub struct ConnStatusEvents {
159
    /// The receiver that implements this stream.
160
    ///
161
    /// (We wrap it in a new type here so that we can replace the implementation
162
    /// later on if we need to.)
163
    #[educe(Debug(method = "skip_fmt"))]
164
    inner: watch::Receiver<ConnStatus>,
165
}
166

            
167
impl Stream for ConnStatusEvents {
168
    type Item = ConnStatus;
169
336
    fn poll_next(
170
336
        mut self: std::pin::Pin<&mut Self>,
171
336
        cx: &mut std::task::Context<'_>,
172
336
    ) -> std::task::Poll<Option<Self::Item>> {
173
336
        self.inner.poll_next_unpin(cx)
174
336
    }
175
}
176

            
177
/// Crate-internal view of "how connected are we to the internet?"
178
///
179
/// This is a more complex and costly structure than ConnStatus, so we track
180
/// this here, and only expose the minimum via ConnStatus over a
181
/// `postage::watch`.  Later, we might want to expose more of this information.
182
//
183
// TODO: Eventually we should add some ability to reset our bootstrap status, if
184
// our connections start failing.
185
#[derive(Debug, Clone)]
186
struct ChanMgrStatus {
187
    /// When did we first get initialized?
188
    startup: Instant,
189

            
190
    /// Since we started, how many channels have we tried to build?
191
    n_attempts: usize,
192

            
193
    /// When (if ever) have we made a TCP connection to (what we hoped was) a
194
    /// Tor relay?
195
    ///
196
    /// If we don't reach this point, we're probably not on the internet.
197
    ///
198
    /// If we get no further than this, we're probably having our TCP
199
    /// connections captured or replaced.
200
    last_tcp_success: Option<Instant>,
201

            
202
    /// When (if ever) have we successfully finished a TLS handshake to (what we
203
    /// hoped was) a Tor relay?
204
    ///
205
    /// If we get no further than this, we might be facing a TLS MITM attack.
206
    //
207
    // TODO: We don't actually use this information yet: our output doesn't
208
    // distinguish filtering where TLS succeeds but gets MITM'd from filtering
209
    // where TLS fails.
210
    last_tls_success: Option<Instant>,
211

            
212
    /// When (if ever) have we ever finished the inner Tor handshake with a relay,
213
    /// up to the point where we check for certificate timeliness?
214
    last_chan_auth_success: Option<Instant>,
215

            
216
    /// When (if ever) have we successfully finished the inner Tor handshake
217
    /// with a relay?
218
    ///
219
    /// If we get to this point, we can successfully talk to something that
220
    /// holds the private key that it's supposed to.
221
    last_chan_success: Option<Instant>,
222
}
223

            
224
impl ChanMgrStatus {
225
    /// Construct a new ChanMgr status.
226
    ///
227
    /// It will be built as having been initialized at the time `now`.
228
946
    fn new_at(now: Instant) -> ChanMgrStatus {
229
946
        ChanMgrStatus {
230
946
            startup: now,
231
946
            n_attempts: 0,
232
946
            last_tcp_success: None,
233
946
            last_tls_success: None,
234
946
            last_chan_auth_success: None,
235
946
            last_chan_success: None,
236
946
        }
237
946
    }
238

            
239
    /// Return a [`ConnStatus`] for the current state, at time `now`.
240
    ///
241
    /// (The time is necessary because a lack of success doesn't indicate a
242
    /// problem until enough time has passed.)
243
30
    fn conn_status_at(&self, now: Instant) -> ConnStatus {
244
        /// How long do we need to be online before we'll acknowledge failure?
245
        const MIN_DURATION: Duration = Duration::from_secs(60);
246
        /// How many attempts do we need to launch before we'll acknowledge failure?
247
        const MIN_ATTEMPTS: usize = 6;
248

            
249
        // If set, it's too early to determine failure.
250
30
        let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
251

            
252
30
        let online = match (self.last_tcp_success.is_some(), early) {
253
18
            (true, _) => Some(true),
254
10
            (_, true) => None,
255
2
            (false, false) => Some(false),
256
        };
257

            
258
30
        let auth_works = match (self.last_chan_auth_success.is_some(), early) {
259
6
            (true, _) => Some(true),
260
20
            (_, true) => None,
261
4
            (false, false) => Some(false),
262
        };
263

            
264
30
        let handshake_works = match (self.last_chan_success.is_some(), early) {
265
6
            (true, _) => Some(true),
266
20
            (_, true) => None,
267
4
            (false, false) => Some(false),
268
        };
269

            
270
30
        ConnStatus {
271
30
            online,
272
30
            auth_works,
273
30
            handshake_works,
274
30
        }
275
30
    }
276

            
277
    /// Note that an attempt to connect has been started.
278
24
    fn record_attempt(&mut self) {
279
24
        self.n_attempts += 1;
280
24
    }
281

            
282
    /// Note that we've successfully done a TCP handshake with an alleged relay.
283
6
    fn record_tcp_success(&mut self, now: Instant) {
284
6
        self.last_tcp_success = Some(now);
285
6
    }
286

            
287
    /// Note that we've completed a TLS handshake with an alleged relay.
288
    ///
289
    /// (Its identity won't be verified till the next step.)
290
4
    fn record_tls_finished(&mut self, now: Instant) {
291
4
        self.last_tls_success = Some(now);
292
4
    }
293

            
294
    /// Note that we've completed a Tor handshake with a relay, _but failed to
295
    /// verify the certificates in a way that could indicate clock skew_.
296
    fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
297
        self.last_chan_auth_success = Some(now);
298
    }
299

            
300
    /// Note that we've completed a Tor handshake with a relay.
301
    ///
302
    /// (This includes performing the TLS handshake, and verifying that the
303
    /// relay was indeed the one that we wanted to reach.)
304
6
    fn record_handshake_done(&mut self, now: Instant) {
305
6
        self.last_chan_auth_success = Some(now);
306
6
        self.last_chan_success = Some(now);
307
6
    }
308
}
309

            
310
/// Object that manages information about a `ChanMgr`'s status, and sends
311
/// information about connectivity changes over an asynchronous channel
312
pub(crate) struct ChanMgrEventSender {
313
    /// The last ConnStatus that we sent over the channel.
314
    last_conn_status: ConnStatus,
315
    /// The unsummarized status information from the ChanMgr.
316
    mgr_status: ChanMgrStatus,
317
    /// The channel that we use for sending ConnStatus information.
318
    sender: watch::Sender<ConnStatus>,
319
}
320

            
321
impl ChanMgrEventSender {
322
    /// If the status has changed as of `now`, tell any listeners.
323
    ///
324
    /// (This takes a time because we need to know how much time has elapsed
325
    /// without successful attempts.)
326
    ///
327
    /// # Limitations
328
    ///
329
    /// We are dependent on calls to `record_attempt()` and similar methods to
330
    /// actually invoke this function; if they were never called, we'd never
331
    /// notice that we had gone too long without building connections.  That's
332
    /// okay for now, though, since any Tor client will immediately start
333
    /// building circuits, which will launch connection attempts until one
334
    /// succeeds or the client gives up entirely.  
335
16
    fn push_at(&mut self, now: Instant) {
336
16
        let status = self.mgr_status.conn_status_at(now);
337
16
        if !status.eq(&self.last_conn_status) {
338
8
            self.last_conn_status = status.clone();
339
8
            let mut b = self.sender.borrow_mut();
340
8
            *b = status;
341
8
        }
342
16
    }
343

            
344
    /// Note that an attempt to connect has been started.
345
4
    pub(crate) fn record_attempt(&mut self) {
346
4
        self.mgr_status.record_attempt();
347
4
        self.push_at(Instant::now());
348
4
    }
349

            
350
    /// Note that we've successfully done a TCP handshake with an alleged relay.
351
4
    pub(crate) fn record_tcp_success(&mut self) {
352
4
        let now = Instant::now();
353
4
        self.mgr_status.record_tcp_success(now);
354
4
        self.push_at(now);
355
4
    }
356

            
357
    /// Note that we've completed a TLS handshake with an alleged relay.
358
    ///
359
    /// (Its identity won't be verified till the next step.)
360
4
    pub(crate) fn record_tls_finished(&mut self) {
361
4
        let now = Instant::now();
362
4
        self.mgr_status.record_tls_finished(now);
363
4
        self.push_at(now);
364
4
    }
365

            
366
    /// Record that a handshake has succeeded _except for the certificate
367
    /// timeliness check, which may indicate a skewed clock.
368
    pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
369
        let now = Instant::now();
370
        self.mgr_status.record_handshake_done_with_skewed_clock(now);
371
        self.push_at(now);
372
    }
373

            
374
    /// Note that we've completed a Tor handshake with a relay.
375
    ///
376
    /// (This includes performing the TLS handshake, and verifying that the
377
    /// relay was indeed the one that we wanted to reach.)
378
4
    pub(crate) fn record_handshake_done(&mut self) {
379
4
        let now = Instant::now();
380
4
        self.mgr_status.record_handshake_done(now);
381
4
        self.push_at(now);
382
4
    }
383
}
384

            
385
/// Create a new channel for sending connectivity status events to other crates.
386
944
pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
387
944
    let (sender, receiver) = watch::channel();
388
944
    let receiver = ConnStatusEvents { inner: receiver };
389
944
    let sender = ChanMgrEventSender {
390
944
        last_conn_status: ConnStatus::default(),
391
944
        mgr_status: ChanMgrStatus::new_at(Instant::now()),
392
944
        sender,
393
944
    };
394
944
    (sender, receiver)
395
944
}
396

            
397
#[cfg(test)]
398
#[allow(clippy::unwrap_used, clippy::cognitive_complexity)]
399
mod test {
400
    // @@ begin test lint list maintained by maint/add_warning @@
401
    #![allow(clippy::bool_assert_comparison)]
402
    #![allow(clippy::clone_on_copy)]
403
    #![allow(clippy::dbg_macro)]
404
    #![allow(clippy::mixed_attributes_style)]
405
    #![allow(clippy::print_stderr)]
406
    #![allow(clippy::print_stdout)]
407
    #![allow(clippy::single_char_pattern)]
408
    #![allow(clippy::unwrap_used)]
409
    #![allow(clippy::unchecked_duration_subtraction)]
410
    #![allow(clippy::useless_vec)]
411
    #![allow(clippy::needless_pass_by_value)]
412
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
413
    use super::*;
414
    use float_eq::assert_float_eq;
415

            
416
    /// Tolerance for float comparison.
417
    const TOL: f32 = 0.00001;
418

            
419
    #[test]
420
    fn status_basics() {
421
        let s1 = ConnStatus::default();
422
        assert_eq!(s1.to_string(), "connecting to the internet");
423
        assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
424
        assert!(s1.eq(&s1));
425
        assert!(s1.blockage().is_none());
426
        assert!(!s1.usable());
427

            
428
        let s2 = ConnStatus {
429
            online: Some(false),
430
            auth_works: None,
431
            handshake_works: None,
432
        };
433
        assert_eq!(s2.to_string(), "unable to connect to the internet");
434
        assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
435
        assert!(s2.eq(&s2));
436
        assert!(!s2.eq(&s1));
437
        assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
438
        assert_eq!(
439
            s2.blockage().unwrap().to_string(),
440
            "unable to connect to the internet"
441
        );
442
        assert!(!s2.usable());
443

            
444
        let s3 = ConnStatus {
445
            online: Some(true),
446
            auth_works: None,
447
            handshake_works: None,
448
        };
449
        assert_eq!(s3.to_string(), "handshaking with Tor relays");
450
        assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
451
        assert_eq!(s3.blockage(), None);
452
        assert!(!s3.eq(&s1));
453
        assert!(!s3.usable());
454

            
455
        let s4 = ConnStatus {
456
            online: Some(true),
457
            auth_works: Some(false),
458
            handshake_works: Some(false),
459
        };
460
        assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
461
        assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
462
        assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
463
        assert_eq!(
464
            s4.blockage().unwrap().to_string(),
465
            "our internet connection seems to be filtered"
466
        );
467
        assert!(!s4.eq(&s1));
468
        assert!(!s4.eq(&s2));
469
        assert!(!s4.eq(&s3));
470
        assert!(s4.eq(&s4));
471
        assert!(!s4.usable());
472

            
473
        let s5 = ConnStatus {
474
            online: Some(true),
475
            auth_works: Some(true),
476
            handshake_works: Some(true),
477
        };
478
        assert_eq!(s5.to_string(), "connecting successfully");
479
        assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
480
        assert!(s5.blockage().is_none());
481
        assert!(s5.eq(&s5));
482
        assert!(!s5.eq(&s4));
483
        assert!(s5.usable());
484
    }
485

            
486
    #[test]
487
    fn derive_status() {
488
        let start = Instant::now();
489
        let sec = Duration::from_secs(1);
490
        let hour = Duration::from_secs(3600);
491

            
492
        let mut ms = ChanMgrStatus::new_at(start);
493

            
494
        // when we start, we're unable to reach any conclusions.
495
        let s0 = ms.conn_status_at(start);
496
        assert!(s0.online.is_none());
497
        assert!(s0.handshake_works.is_none());
498

            
499
        // Time won't let us make conclusions either, unless there have been
500
        // attempts.
501
        let s = ms.conn_status_at(start + hour);
502
        assert!(s.eq(&s0));
503

            
504
        // But if there have been attempts, _and_ time has passed, we notice
505
        // failure.
506
        for _ in 0..10 {
507
            ms.record_attempt();
508
        }
509
        // (Not immediately...)
510
        let s = ms.conn_status_at(start);
511
        assert!(s.eq(&s0));
512
        // (... but after a while.)
513
        let s = ms.conn_status_at(start + hour);
514
        assert_eq!(s.online, Some(false));
515
        assert_eq!(s.handshake_works, Some(false));
516

            
517
        // If TCP has succeeded, we should notice that.
518
        ms.record_tcp_success(start + sec);
519
        let s = ms.conn_status_at(start + sec * 2);
520
        assert_eq!(s.online, Some(true));
521
        assert!(s.handshake_works.is_none());
522
        let s = ms.conn_status_at(start + hour);
523
        assert_eq!(s.online, Some(true));
524
        assert_eq!(s.handshake_works, Some(false));
525

            
526
        // If the handshake succeeded, we can notice that too.
527
        ms.record_handshake_done(start + sec * 2);
528
        let s = ms.conn_status_at(start + sec * 3);
529
        assert_eq!(s.online, Some(true));
530
        assert_eq!(s.handshake_works, Some(true));
531
    }
532

            
533
    #[test]
534
    fn sender() {
535
        let (mut snd, rcv) = channel();
536

            
537
        {
538
            let s = rcv.inner.borrow().clone();
539
            assert_float_eq!(s.frac(), 0.0, abs <= TOL);
540
        }
541

            
542
        snd.record_attempt();
543
        snd.record_tcp_success();
544
        snd.record_tls_finished();
545
        snd.record_handshake_done();
546

            
547
        {
548
            let s = rcv.inner.borrow().clone();
549
            assert_float_eq!(s.frac(), 1.0, abs <= TOL);
550
        }
551
    }
552
}