tor_chanmgr/
event.rs

1//! Code for exporting events from the channel manager.
2#![allow(dead_code, unreachable_pub)]
3
4use educe::Educe;
5use futures::{Stream, StreamExt};
6use postage::watch;
7use std::{
8    fmt,
9    time::{Duration, Instant},
10};
11use tor_basic_utils::skip_fmt;
12
13/// The status of our connection to the internet.
14#[derive(Default, Debug, Clone)]
15pub struct ConnStatus {
16    /// Have we been able to make TCP connections?
17    ///
18    /// True if we've been able to make outgoing connections recently.
19    /// False if we've definitely been failing.
20    /// None if we haven't succeeded yet, but it's too early to say if
21    /// that's a problem.
22    online: Option<bool>,
23
24    /// Have we ever been able to make TLS handshakes and negotiate
25    /// certificates, _not including timeliness checking_?
26    ///
27    /// True if we've been able to make TLS handshakes and talk to Tor relays we
28    /// like recently. False if we've definitely been failing. None if we
29    /// haven't succeeded yet, but it's too early to say if that's a problem.
30    auth_works: Option<bool>,
31
32    /// Have we been able to successfully negotiate full Tor handshakes?
33    ///
34    /// True if we've been able to make Tor handshakes recently.
35    /// False if we've definitely been failing.
36    /// None if we haven't succeeded yet, but it's too early to say if
37    /// that's a problem.
38    handshake_works: Option<bool>,
39}
40
41/// A problem detected while connecting to the Tor network.
42#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
43#[non_exhaustive]
44pub enum ConnBlockage {
45    #[display("unable to connect to the internet")]
46    /// We haven't been able to make successful TCP connections.
47    NoTcp,
48    /// We've made TCP connections, but our TLS connections either failed, or
49    /// got hit by an attempted man-in-the-middle attack.
50    #[display("our internet connection seems to be filtered")]
51    NoHandshake,
52    /// We've made TCP connections, and our TLS connections mostly succeeded,
53    /// but we encountered failures that are well explained by clock skew,
54    /// or expired certificates.
55    #[display("relays all seem to be using expired certificates")]
56    CertsExpired,
57}
58
59impl ConnStatus {
60    /// Return true if this status is equal to `other`.
61    ///
62    /// Note:(This would just be a PartialEq implementation, but I'm not sure I
63    /// want to expose that PartialEq for this struct.)
64    fn eq(&self, other: &ConnStatus) -> bool {
65        self.online == other.online && self.handshake_works == other.handshake_works
66    }
67
68    /// Return true if this status indicates that we can successfully open Tor channels.
69    pub fn usable(&self) -> bool {
70        self.online == Some(true) && self.handshake_works == Some(true)
71    }
72
73    /// Return a float representing "how bootstrapped" we are with respect to
74    /// connecting to the Tor network, where 0 is "not at all" and 1 is
75    /// "successful".
76    ///
77    /// Callers _should not_ depend on the specific meaning of any particular
78    /// fraction; we may change these fractions in the future.
79    pub fn frac(&self) -> f32 {
80        match self {
81            Self {
82                online: Some(true),
83                auth_works: Some(true),
84                handshake_works: Some(true),
85            } => 1.0,
86            Self {
87                online: Some(true), ..
88            } => 0.5,
89            _ => 0.0,
90        }
91    }
92
93    /// Return the cause of why we aren't able to connect to the Tor network,
94    /// if we think we're stuck.
95    pub fn blockage(&self) -> Option<ConnBlockage> {
96        match self {
97            Self {
98                online: Some(false),
99                ..
100            } => Some(ConnBlockage::NoTcp),
101            Self {
102                auth_works: Some(false),
103                ..
104            } => Some(ConnBlockage::NoHandshake),
105            Self {
106                handshake_works: Some(false),
107                ..
108            } => Some(ConnBlockage::CertsExpired),
109            _ => None,
110        }
111    }
112}
113
114impl fmt::Display for ConnStatus {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
118            ConnStatus {
119                online: Some(false),
120                ..
121            } => write!(f, "unable to connect to the internet"),
122            ConnStatus {
123                handshake_works: None,
124                ..
125            } => write!(f, "handshaking with Tor relays"),
126            ConnStatus {
127                auth_works: Some(true),
128                handshake_works: Some(false),
129                ..
130            } => write!(
131                f,
132                "unable to handshake with Tor relays, possibly due to clock skew"
133            ),
134            ConnStatus {
135                handshake_works: Some(false),
136                ..
137            } => write!(f, "unable to handshake with Tor relays"),
138            ConnStatus {
139                online: Some(true),
140                handshake_works: Some(true),
141                ..
142            } => write!(f, "connecting successfully"),
143        }
144    }
145}
146
147/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
148///
149/// This stream is lossy; a reader might not see some events on the stream, if
150/// they are produced faster than the reader can consume.  In that case, the
151/// reader will see more recent updates, and miss older ones.
152///
153/// Note that the bootstrap status is not monotonic: we might become less
154/// bootstrapped than we were before.  (For example, the internet could go
155/// down.)
156#[derive(Clone, Educe)]
157#[educe(Debug)]
158pub struct ConnStatusEvents {
159    /// The receiver that implements this stream.
160    ///
161    /// (We wrap it in a new type here so that we can replace the implementation
162    /// later on if we need to.)
163    #[educe(Debug(method = "skip_fmt"))]
164    inner: watch::Receiver<ConnStatus>,
165}
166
167impl Stream for ConnStatusEvents {
168    type Item = ConnStatus;
169    fn poll_next(
170        mut self: std::pin::Pin<&mut Self>,
171        cx: &mut std::task::Context<'_>,
172    ) -> std::task::Poll<Option<Self::Item>> {
173        self.inner.poll_next_unpin(cx)
174    }
175}
176
177/// Crate-internal view of "how connected are we to the internet?"
178///
179/// This is a more complex and costly structure than ConnStatus, so we track
180/// this here, and only expose the minimum via ConnStatus over a
181/// `postage::watch`.  Later, we might want to expose more of this information.
182//
183// TODO: Eventually we should add some ability to reset our bootstrap status, if
184// our connections start failing.
185#[derive(Debug, Clone)]
186struct ChanMgrStatus {
187    /// When did we first get initialized?
188    startup: Instant,
189
190    /// Since we started, how many channels have we tried to build?
191    n_attempts: usize,
192
193    /// When (if ever) have we made a TCP connection to (what we hoped was) a
194    /// Tor relay?
195    ///
196    /// If we don't reach this point, we're probably not on the internet.
197    ///
198    /// If we get no further than this, we're probably having our TCP
199    /// connections captured or replaced.
200    last_tcp_success: Option<Instant>,
201
202    /// When (if ever) have we successfully finished a TLS handshake to (what we
203    /// hoped was) a Tor relay?
204    ///
205    /// If we get no further than this, we might be facing a TLS MITM attack.
206    //
207    // TODO: We don't actually use this information yet: our output doesn't
208    // distinguish filtering where TLS succeeds but gets MITM'd from filtering
209    // where TLS fails.
210    last_tls_success: Option<Instant>,
211
212    /// When (if ever) have we ever finished the inner Tor handshake with a relay,
213    /// up to the point where we check for certificate timeliness?
214    last_chan_auth_success: Option<Instant>,
215
216    /// When (if ever) have we successfully finished the inner Tor handshake
217    /// with a relay?
218    ///
219    /// If we get to this point, we can successfully talk to something that
220    /// holds the private key that it's supposed to.
221    last_chan_success: Option<Instant>,
222}
223
224impl ChanMgrStatus {
225    /// Construct a new ChanMgr status.
226    ///
227    /// It will be built as having been initialized at the time `now`.
228    fn new_at(now: Instant) -> ChanMgrStatus {
229        ChanMgrStatus {
230            startup: now,
231            n_attempts: 0,
232            last_tcp_success: None,
233            last_tls_success: None,
234            last_chan_auth_success: None,
235            last_chan_success: None,
236        }
237    }
238
239    /// Return a [`ConnStatus`] for the current state, at time `now`.
240    ///
241    /// (The time is necessary because a lack of success doesn't indicate a
242    /// problem until enough time has passed.)
243    fn conn_status_at(&self, now: Instant) -> ConnStatus {
244        /// How long do we need to be online before we'll acknowledge failure?
245        const MIN_DURATION: Duration = Duration::from_secs(60);
246        /// How many attempts do we need to launch before we'll acknowledge failure?
247        const MIN_ATTEMPTS: usize = 6;
248
249        // If set, it's too early to determine failure.
250        let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
251
252        let online = match (self.last_tcp_success.is_some(), early) {
253            (true, _) => Some(true),
254            (_, true) => None,
255            (false, false) => Some(false),
256        };
257
258        let auth_works = match (self.last_chan_auth_success.is_some(), early) {
259            (true, _) => Some(true),
260            (_, true) => None,
261            (false, false) => Some(false),
262        };
263
264        let handshake_works = match (self.last_chan_success.is_some(), early) {
265            (true, _) => Some(true),
266            (_, true) => None,
267            (false, false) => Some(false),
268        };
269
270        ConnStatus {
271            online,
272            auth_works,
273            handshake_works,
274        }
275    }
276
277    /// Note that an attempt to connect has been started.
278    fn record_attempt(&mut self) {
279        self.n_attempts += 1;
280    }
281
282    /// Note that we've successfully done a TCP handshake with an alleged relay.
283    fn record_tcp_success(&mut self, now: Instant) {
284        self.last_tcp_success = Some(now);
285    }
286
287    /// Note that we've completed a TLS handshake with an alleged relay.
288    ///
289    /// (Its identity won't be verified till the next step.)
290    fn record_tls_finished(&mut self, now: Instant) {
291        self.last_tls_success = Some(now);
292    }
293
294    /// Note that we've completed a Tor handshake with a relay, _but failed to
295    /// verify the certificates in a way that could indicate clock skew_.
296    fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
297        self.last_chan_auth_success = Some(now);
298    }
299
300    /// Note that we've completed a Tor handshake with a relay.
301    ///
302    /// (This includes performing the TLS handshake, and verifying that the
303    /// relay was indeed the one that we wanted to reach.)
304    fn record_handshake_done(&mut self, now: Instant) {
305        self.last_chan_auth_success = Some(now);
306        self.last_chan_success = Some(now);
307    }
308}
309
310/// Object that manages information about a `ChanMgr`'s status, and sends
311/// information about connectivity changes over an asynchronous channel
312pub(crate) struct ChanMgrEventSender {
313    /// The last ConnStatus that we sent over the channel.
314    last_conn_status: ConnStatus,
315    /// The unsummarized status information from the ChanMgr.
316    mgr_status: ChanMgrStatus,
317    /// The channel that we use for sending ConnStatus information.
318    sender: watch::Sender<ConnStatus>,
319}
320
321impl ChanMgrEventSender {
322    /// If the status has changed as of `now`, tell any listeners.
323    ///
324    /// (This takes a time because we need to know how much time has elapsed
325    /// without successful attempts.)
326    ///
327    /// # Limitations
328    ///
329    /// We are dependent on calls to `record_attempt()` and similar methods to
330    /// actually invoke this function; if they were never called, we'd never
331    /// notice that we had gone too long without building connections.  That's
332    /// okay for now, though, since any Tor client will immediately start
333    /// building circuits, which will launch connection attempts until one
334    /// succeeds or the client gives up entirely.  
335    fn push_at(&mut self, now: Instant) {
336        let status = self.mgr_status.conn_status_at(now);
337        if !status.eq(&self.last_conn_status) {
338            self.last_conn_status = status.clone();
339            let mut b = self.sender.borrow_mut();
340            *b = status;
341        }
342    }
343
344    /// Note that an attempt to connect has been started.
345    pub(crate) fn record_attempt(&mut self) {
346        self.mgr_status.record_attempt();
347        self.push_at(Instant::now());
348    }
349
350    /// Note that we've successfully done a TCP handshake with an alleged relay.
351    pub(crate) fn record_tcp_success(&mut self) {
352        let now = Instant::now();
353        self.mgr_status.record_tcp_success(now);
354        self.push_at(now);
355    }
356
357    /// Note that we've completed a TLS handshake with an alleged relay.
358    ///
359    /// (Its identity won't be verified till the next step.)
360    pub(crate) fn record_tls_finished(&mut self) {
361        let now = Instant::now();
362        self.mgr_status.record_tls_finished(now);
363        self.push_at(now);
364    }
365
366    /// Record that a handshake has succeeded _except for the certificate
367    /// timeliness check, which may indicate a skewed clock.
368    pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
369        let now = Instant::now();
370        self.mgr_status.record_handshake_done_with_skewed_clock(now);
371        self.push_at(now);
372    }
373
374    /// Note that we've completed a Tor handshake with a relay.
375    ///
376    /// (This includes performing the TLS handshake, and verifying that the
377    /// relay was indeed the one that we wanted to reach.)
378    pub(crate) fn record_handshake_done(&mut self) {
379        let now = Instant::now();
380        self.mgr_status.record_handshake_done(now);
381        self.push_at(now);
382    }
383}
384
385/// Create a new channel for sending connectivity status events to other crates.
386pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
387    let (sender, receiver) = watch::channel();
388    let receiver = ConnStatusEvents { inner: receiver };
389    let sender = ChanMgrEventSender {
390        last_conn_status: ConnStatus::default(),
391        mgr_status: ChanMgrStatus::new_at(Instant::now()),
392        sender,
393    };
394    (sender, receiver)
395}
396
397#[cfg(test)]
398#[allow(clippy::unwrap_used, clippy::cognitive_complexity)]
399mod test {
400    // @@ begin test lint list maintained by maint/add_warning @@
401    #![allow(clippy::bool_assert_comparison)]
402    #![allow(clippy::clone_on_copy)]
403    #![allow(clippy::dbg_macro)]
404    #![allow(clippy::mixed_attributes_style)]
405    #![allow(clippy::print_stderr)]
406    #![allow(clippy::print_stdout)]
407    #![allow(clippy::single_char_pattern)]
408    #![allow(clippy::unwrap_used)]
409    #![allow(clippy::unchecked_duration_subtraction)]
410    #![allow(clippy::useless_vec)]
411    #![allow(clippy::needless_pass_by_value)]
412    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
413    use super::*;
414    use float_eq::assert_float_eq;
415
416    /// Tolerance for float comparison.
417    const TOL: f32 = 0.00001;
418
419    #[test]
420    fn status_basics() {
421        let s1 = ConnStatus::default();
422        assert_eq!(s1.to_string(), "connecting to the internet");
423        assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
424        assert!(s1.eq(&s1));
425        assert!(s1.blockage().is_none());
426        assert!(!s1.usable());
427
428        let s2 = ConnStatus {
429            online: Some(false),
430            auth_works: None,
431            handshake_works: None,
432        };
433        assert_eq!(s2.to_string(), "unable to connect to the internet");
434        assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
435        assert!(s2.eq(&s2));
436        assert!(!s2.eq(&s1));
437        assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
438        assert_eq!(
439            s2.blockage().unwrap().to_string(),
440            "unable to connect to the internet"
441        );
442        assert!(!s2.usable());
443
444        let s3 = ConnStatus {
445            online: Some(true),
446            auth_works: None,
447            handshake_works: None,
448        };
449        assert_eq!(s3.to_string(), "handshaking with Tor relays");
450        assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
451        assert_eq!(s3.blockage(), None);
452        assert!(!s3.eq(&s1));
453        assert!(!s3.usable());
454
455        let s4 = ConnStatus {
456            online: Some(true),
457            auth_works: Some(false),
458            handshake_works: Some(false),
459        };
460        assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
461        assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
462        assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
463        assert_eq!(
464            s4.blockage().unwrap().to_string(),
465            "our internet connection seems to be filtered"
466        );
467        assert!(!s4.eq(&s1));
468        assert!(!s4.eq(&s2));
469        assert!(!s4.eq(&s3));
470        assert!(s4.eq(&s4));
471        assert!(!s4.usable());
472
473        let s5 = ConnStatus {
474            online: Some(true),
475            auth_works: Some(true),
476            handshake_works: Some(true),
477        };
478        assert_eq!(s5.to_string(), "connecting successfully");
479        assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
480        assert!(s5.blockage().is_none());
481        assert!(s5.eq(&s5));
482        assert!(!s5.eq(&s4));
483        assert!(s5.usable());
484    }
485
486    #[test]
487    fn derive_status() {
488        let start = Instant::now();
489        let sec = Duration::from_secs(1);
490        let hour = Duration::from_secs(3600);
491
492        let mut ms = ChanMgrStatus::new_at(start);
493
494        // when we start, we're unable to reach any conclusions.
495        let s0 = ms.conn_status_at(start);
496        assert!(s0.online.is_none());
497        assert!(s0.handshake_works.is_none());
498
499        // Time won't let us make conclusions either, unless there have been
500        // attempts.
501        let s = ms.conn_status_at(start + hour);
502        assert!(s.eq(&s0));
503
504        // But if there have been attempts, _and_ time has passed, we notice
505        // failure.
506        for _ in 0..10 {
507            ms.record_attempt();
508        }
509        // (Not immediately...)
510        let s = ms.conn_status_at(start);
511        assert!(s.eq(&s0));
512        // (... but after a while.)
513        let s = ms.conn_status_at(start + hour);
514        assert_eq!(s.online, Some(false));
515        assert_eq!(s.handshake_works, Some(false));
516
517        // If TCP has succeeded, we should notice that.
518        ms.record_tcp_success(start + sec);
519        let s = ms.conn_status_at(start + sec * 2);
520        assert_eq!(s.online, Some(true));
521        assert!(s.handshake_works.is_none());
522        let s = ms.conn_status_at(start + hour);
523        assert_eq!(s.online, Some(true));
524        assert_eq!(s.handshake_works, Some(false));
525
526        // If the handshake succeeded, we can notice that too.
527        ms.record_handshake_done(start + sec * 2);
528        let s = ms.conn_status_at(start + sec * 3);
529        assert_eq!(s.online, Some(true));
530        assert_eq!(s.handshake_works, Some(true));
531    }
532
533    #[test]
534    fn sender() {
535        let (mut snd, rcv) = channel();
536
537        {
538            let s = rcv.inner.borrow().clone();
539            assert_float_eq!(s.frac(), 0.0, abs <= TOL);
540        }
541
542        snd.record_attempt();
543        snd.record_tcp_success();
544        snd.record_tls_finished();
545        snd.record_handshake_done();
546
547        {
548            let s = rcv.inner.borrow().clone();
549            assert_float_eq!(s.frac(), 1.0, abs <= TOL);
550        }
551    }
552}