tor_chanmgr/event.rs
1//! Code for exporting events from the channel manager.
2#![allow(dead_code, unreachable_pub)]
3
4use educe::Educe;
5use futures::{Stream, StreamExt};
6use postage::watch;
7use std::{
8 fmt,
9 time::{Duration, Instant},
10};
11use tor_basic_utils::skip_fmt;
12
13/// The status of our connection to the internet.
14#[derive(Default, Debug, Clone)]
15pub struct ConnStatus {
16 /// Have we been able to make TCP connections?
17 ///
18 /// True if we've been able to make outgoing connections recently.
19 /// False if we've definitely been failing.
20 /// None if we haven't succeeded yet, but it's too early to say if
21 /// that's a problem.
22 online: Option<bool>,
23
24 /// Have we ever been able to make TLS handshakes and negotiate
25 /// certificates, _not including timeliness checking_?
26 ///
27 /// True if we've been able to make TLS handshakes and talk to Tor relays we
28 /// like recently. False if we've definitely been failing. None if we
29 /// haven't succeeded yet, but it's too early to say if that's a problem.
30 auth_works: Option<bool>,
31
32 /// Have we been able to successfully negotiate full Tor handshakes?
33 ///
34 /// True if we've been able to make Tor handshakes recently.
35 /// False if we've definitely been failing.
36 /// None if we haven't succeeded yet, but it's too early to say if
37 /// that's a problem.
38 handshake_works: Option<bool>,
39}
40
41/// A problem detected while connecting to the Tor network.
42#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
43#[non_exhaustive]
44pub enum ConnBlockage {
45 #[display("unable to connect to the internet")]
46 /// We haven't been able to make successful TCP connections.
47 NoTcp,
48 /// We've made TCP connections, but our TLS connections either failed, or
49 /// got hit by an attempted man-in-the-middle attack.
50 #[display("our internet connection seems to be filtered")]
51 NoHandshake,
52 /// We've made TCP connections, and our TLS connections mostly succeeded,
53 /// but we encountered failures that are well explained by clock skew,
54 /// or expired certificates.
55 #[display("relays all seem to be using expired certificates")]
56 CertsExpired,
57}
58
59impl ConnStatus {
60 /// Return true if this status is equal to `other`.
61 ///
62 /// Note:(This would just be a PartialEq implementation, but I'm not sure I
63 /// want to expose that PartialEq for this struct.)
64 fn eq(&self, other: &ConnStatus) -> bool {
65 self.online == other.online && self.handshake_works == other.handshake_works
66 }
67
68 /// Return true if this status indicates that we can successfully open Tor channels.
69 pub fn usable(&self) -> bool {
70 self.online == Some(true) && self.handshake_works == Some(true)
71 }
72
73 /// Return a float representing "how bootstrapped" we are with respect to
74 /// connecting to the Tor network, where 0 is "not at all" and 1 is
75 /// "successful".
76 ///
77 /// Callers _should not_ depend on the specific meaning of any particular
78 /// fraction; we may change these fractions in the future.
79 pub fn frac(&self) -> f32 {
80 match self {
81 Self {
82 online: Some(true),
83 auth_works: Some(true),
84 handshake_works: Some(true),
85 } => 1.0,
86 Self {
87 online: Some(true), ..
88 } => 0.5,
89 _ => 0.0,
90 }
91 }
92
93 /// Return the cause of why we aren't able to connect to the Tor network,
94 /// if we think we're stuck.
95 pub fn blockage(&self) -> Option<ConnBlockage> {
96 match self {
97 Self {
98 online: Some(false),
99 ..
100 } => Some(ConnBlockage::NoTcp),
101 Self {
102 auth_works: Some(false),
103 ..
104 } => Some(ConnBlockage::NoHandshake),
105 Self {
106 handshake_works: Some(false),
107 ..
108 } => Some(ConnBlockage::CertsExpired),
109 _ => None,
110 }
111 }
112}
113
114impl fmt::Display for ConnStatus {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
118 ConnStatus {
119 online: Some(false),
120 ..
121 } => write!(f, "unable to connect to the internet"),
122 ConnStatus {
123 handshake_works: None,
124 ..
125 } => write!(f, "handshaking with Tor relays"),
126 ConnStatus {
127 auth_works: Some(true),
128 handshake_works: Some(false),
129 ..
130 } => write!(
131 f,
132 "unable to handshake with Tor relays, possibly due to clock skew"
133 ),
134 ConnStatus {
135 handshake_works: Some(false),
136 ..
137 } => write!(f, "unable to handshake with Tor relays"),
138 ConnStatus {
139 online: Some(true),
140 handshake_works: Some(true),
141 ..
142 } => write!(f, "connecting successfully"),
143 }
144 }
145}
146
147/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
148///
149/// This stream is lossy; a reader might not see some events on the stream, if
150/// they are produced faster than the reader can consume. In that case, the
151/// reader will see more recent updates, and miss older ones.
152///
153/// Note that the bootstrap status is not monotonic: we might become less
154/// bootstrapped than we were before. (For example, the internet could go
155/// down.)
156#[derive(Clone, Educe)]
157#[educe(Debug)]
158pub struct ConnStatusEvents {
159 /// The receiver that implements this stream.
160 ///
161 /// (We wrap it in a new type here so that we can replace the implementation
162 /// later on if we need to.)
163 #[educe(Debug(method = "skip_fmt"))]
164 inner: watch::Receiver<ConnStatus>,
165}
166
167impl Stream for ConnStatusEvents {
168 type Item = ConnStatus;
169 fn poll_next(
170 mut self: std::pin::Pin<&mut Self>,
171 cx: &mut std::task::Context<'_>,
172 ) -> std::task::Poll<Option<Self::Item>> {
173 self.inner.poll_next_unpin(cx)
174 }
175}
176
177/// Crate-internal view of "how connected are we to the internet?"
178///
179/// This is a more complex and costly structure than ConnStatus, so we track
180/// this here, and only expose the minimum via ConnStatus over a
181/// `postage::watch`. Later, we might want to expose more of this information.
182//
183// TODO: Eventually we should add some ability to reset our bootstrap status, if
184// our connections start failing.
185#[derive(Debug, Clone)]
186struct ChanMgrStatus {
187 /// When did we first get initialized?
188 startup: Instant,
189
190 /// Since we started, how many channels have we tried to build?
191 n_attempts: usize,
192
193 /// When (if ever) have we made a TCP connection to (what we hoped was) a
194 /// Tor relay?
195 ///
196 /// If we don't reach this point, we're probably not on the internet.
197 ///
198 /// If we get no further than this, we're probably having our TCP
199 /// connections captured or replaced.
200 last_tcp_success: Option<Instant>,
201
202 /// When (if ever) have we successfully finished a TLS handshake to (what we
203 /// hoped was) a Tor relay?
204 ///
205 /// If we get no further than this, we might be facing a TLS MITM attack.
206 //
207 // TODO: We don't actually use this information yet: our output doesn't
208 // distinguish filtering where TLS succeeds but gets MITM'd from filtering
209 // where TLS fails.
210 last_tls_success: Option<Instant>,
211
212 /// When (if ever) have we ever finished the inner Tor handshake with a relay,
213 /// up to the point where we check for certificate timeliness?
214 last_chan_auth_success: Option<Instant>,
215
216 /// When (if ever) have we successfully finished the inner Tor handshake
217 /// with a relay?
218 ///
219 /// If we get to this point, we can successfully talk to something that
220 /// holds the private key that it's supposed to.
221 last_chan_success: Option<Instant>,
222}
223
224impl ChanMgrStatus {
225 /// Construct a new ChanMgr status.
226 ///
227 /// It will be built as having been initialized at the time `now`.
228 fn new_at(now: Instant) -> ChanMgrStatus {
229 ChanMgrStatus {
230 startup: now,
231 n_attempts: 0,
232 last_tcp_success: None,
233 last_tls_success: None,
234 last_chan_auth_success: None,
235 last_chan_success: None,
236 }
237 }
238
239 /// Return a [`ConnStatus`] for the current state, at time `now`.
240 ///
241 /// (The time is necessary because a lack of success doesn't indicate a
242 /// problem until enough time has passed.)
243 fn conn_status_at(&self, now: Instant) -> ConnStatus {
244 /// How long do we need to be online before we'll acknowledge failure?
245 const MIN_DURATION: Duration = Duration::from_secs(60);
246 /// How many attempts do we need to launch before we'll acknowledge failure?
247 const MIN_ATTEMPTS: usize = 6;
248
249 // If set, it's too early to determine failure.
250 let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
251
252 let online = match (self.last_tcp_success.is_some(), early) {
253 (true, _) => Some(true),
254 (_, true) => None,
255 (false, false) => Some(false),
256 };
257
258 let auth_works = match (self.last_chan_auth_success.is_some(), early) {
259 (true, _) => Some(true),
260 (_, true) => None,
261 (false, false) => Some(false),
262 };
263
264 let handshake_works = match (self.last_chan_success.is_some(), early) {
265 (true, _) => Some(true),
266 (_, true) => None,
267 (false, false) => Some(false),
268 };
269
270 ConnStatus {
271 online,
272 auth_works,
273 handshake_works,
274 }
275 }
276
277 /// Note that an attempt to connect has been started.
278 fn record_attempt(&mut self) {
279 self.n_attempts += 1;
280 }
281
282 /// Note that we've successfully done a TCP handshake with an alleged relay.
283 fn record_tcp_success(&mut self, now: Instant) {
284 self.last_tcp_success = Some(now);
285 }
286
287 /// Note that we've completed a TLS handshake with an alleged relay.
288 ///
289 /// (Its identity won't be verified till the next step.)
290 fn record_tls_finished(&mut self, now: Instant) {
291 self.last_tls_success = Some(now);
292 }
293
294 /// Note that we've completed a Tor handshake with a relay, _but failed to
295 /// verify the certificates in a way that could indicate clock skew_.
296 fn record_handshake_done_with_skewed_clock(&mut self, now: Instant) {
297 self.last_chan_auth_success = Some(now);
298 }
299
300 /// Note that we've completed a Tor handshake with a relay.
301 ///
302 /// (This includes performing the TLS handshake, and verifying that the
303 /// relay was indeed the one that we wanted to reach.)
304 fn record_handshake_done(&mut self, now: Instant) {
305 self.last_chan_auth_success = Some(now);
306 self.last_chan_success = Some(now);
307 }
308}
309
310/// Object that manages information about a `ChanMgr`'s status, and sends
311/// information about connectivity changes over an asynchronous channel
312pub(crate) struct ChanMgrEventSender {
313 /// The last ConnStatus that we sent over the channel.
314 last_conn_status: ConnStatus,
315 /// The unsummarized status information from the ChanMgr.
316 mgr_status: ChanMgrStatus,
317 /// The channel that we use for sending ConnStatus information.
318 sender: watch::Sender<ConnStatus>,
319}
320
321impl ChanMgrEventSender {
322 /// If the status has changed as of `now`, tell any listeners.
323 ///
324 /// (This takes a time because we need to know how much time has elapsed
325 /// without successful attempts.)
326 ///
327 /// # Limitations
328 ///
329 /// We are dependent on calls to `record_attempt()` and similar methods to
330 /// actually invoke this function; if they were never called, we'd never
331 /// notice that we had gone too long without building connections. That's
332 /// okay for now, though, since any Tor client will immediately start
333 /// building circuits, which will launch connection attempts until one
334 /// succeeds or the client gives up entirely.
335 fn push_at(&mut self, now: Instant) {
336 let status = self.mgr_status.conn_status_at(now);
337 if !status.eq(&self.last_conn_status) {
338 self.last_conn_status = status.clone();
339 let mut b = self.sender.borrow_mut();
340 *b = status;
341 }
342 }
343
344 /// Note that an attempt to connect has been started.
345 pub(crate) fn record_attempt(&mut self) {
346 self.mgr_status.record_attempt();
347 self.push_at(Instant::now());
348 }
349
350 /// Note that we've successfully done a TCP handshake with an alleged relay.
351 pub(crate) fn record_tcp_success(&mut self) {
352 let now = Instant::now();
353 self.mgr_status.record_tcp_success(now);
354 self.push_at(now);
355 }
356
357 /// Note that we've completed a TLS handshake with an alleged relay.
358 ///
359 /// (Its identity won't be verified till the next step.)
360 pub(crate) fn record_tls_finished(&mut self) {
361 let now = Instant::now();
362 self.mgr_status.record_tls_finished(now);
363 self.push_at(now);
364 }
365
366 /// Record that a handshake has succeeded _except for the certificate
367 /// timeliness check, which may indicate a skewed clock.
368 pub(crate) fn record_handshake_done_with_skewed_clock(&mut self) {
369 let now = Instant::now();
370 self.mgr_status.record_handshake_done_with_skewed_clock(now);
371 self.push_at(now);
372 }
373
374 /// Note that we've completed a Tor handshake with a relay.
375 ///
376 /// (This includes performing the TLS handshake, and verifying that the
377 /// relay was indeed the one that we wanted to reach.)
378 pub(crate) fn record_handshake_done(&mut self) {
379 let now = Instant::now();
380 self.mgr_status.record_handshake_done(now);
381 self.push_at(now);
382 }
383}
384
385/// Create a new channel for sending connectivity status events to other crates.
386pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
387 let (sender, receiver) = watch::channel();
388 let receiver = ConnStatusEvents { inner: receiver };
389 let sender = ChanMgrEventSender {
390 last_conn_status: ConnStatus::default(),
391 mgr_status: ChanMgrStatus::new_at(Instant::now()),
392 sender,
393 };
394 (sender, receiver)
395}
396
397#[cfg(test)]
398#[allow(clippy::unwrap_used, clippy::cognitive_complexity)]
399mod test {
400 // @@ begin test lint list maintained by maint/add_warning @@
401 #![allow(clippy::bool_assert_comparison)]
402 #![allow(clippy::clone_on_copy)]
403 #![allow(clippy::dbg_macro)]
404 #![allow(clippy::mixed_attributes_style)]
405 #![allow(clippy::print_stderr)]
406 #![allow(clippy::print_stdout)]
407 #![allow(clippy::single_char_pattern)]
408 #![allow(clippy::unwrap_used)]
409 #![allow(clippy::unchecked_duration_subtraction)]
410 #![allow(clippy::useless_vec)]
411 #![allow(clippy::needless_pass_by_value)]
412 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
413 use super::*;
414 use float_eq::assert_float_eq;
415
416 /// Tolerance for float comparison.
417 const TOL: f32 = 0.00001;
418
419 #[test]
420 fn status_basics() {
421 let s1 = ConnStatus::default();
422 assert_eq!(s1.to_string(), "connecting to the internet");
423 assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
424 assert!(s1.eq(&s1));
425 assert!(s1.blockage().is_none());
426 assert!(!s1.usable());
427
428 let s2 = ConnStatus {
429 online: Some(false),
430 auth_works: None,
431 handshake_works: None,
432 };
433 assert_eq!(s2.to_string(), "unable to connect to the internet");
434 assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
435 assert!(s2.eq(&s2));
436 assert!(!s2.eq(&s1));
437 assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
438 assert_eq!(
439 s2.blockage().unwrap().to_string(),
440 "unable to connect to the internet"
441 );
442 assert!(!s2.usable());
443
444 let s3 = ConnStatus {
445 online: Some(true),
446 auth_works: None,
447 handshake_works: None,
448 };
449 assert_eq!(s3.to_string(), "handshaking with Tor relays");
450 assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
451 assert_eq!(s3.blockage(), None);
452 assert!(!s3.eq(&s1));
453 assert!(!s3.usable());
454
455 let s4 = ConnStatus {
456 online: Some(true),
457 auth_works: Some(false),
458 handshake_works: Some(false),
459 };
460 assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
461 assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
462 assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
463 assert_eq!(
464 s4.blockage().unwrap().to_string(),
465 "our internet connection seems to be filtered"
466 );
467 assert!(!s4.eq(&s1));
468 assert!(!s4.eq(&s2));
469 assert!(!s4.eq(&s3));
470 assert!(s4.eq(&s4));
471 assert!(!s4.usable());
472
473 let s5 = ConnStatus {
474 online: Some(true),
475 auth_works: Some(true),
476 handshake_works: Some(true),
477 };
478 assert_eq!(s5.to_string(), "connecting successfully");
479 assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
480 assert!(s5.blockage().is_none());
481 assert!(s5.eq(&s5));
482 assert!(!s5.eq(&s4));
483 assert!(s5.usable());
484 }
485
486 #[test]
487 fn derive_status() {
488 let start = Instant::now();
489 let sec = Duration::from_secs(1);
490 let hour = Duration::from_secs(3600);
491
492 let mut ms = ChanMgrStatus::new_at(start);
493
494 // when we start, we're unable to reach any conclusions.
495 let s0 = ms.conn_status_at(start);
496 assert!(s0.online.is_none());
497 assert!(s0.handshake_works.is_none());
498
499 // Time won't let us make conclusions either, unless there have been
500 // attempts.
501 let s = ms.conn_status_at(start + hour);
502 assert!(s.eq(&s0));
503
504 // But if there have been attempts, _and_ time has passed, we notice
505 // failure.
506 for _ in 0..10 {
507 ms.record_attempt();
508 }
509 // (Not immediately...)
510 let s = ms.conn_status_at(start);
511 assert!(s.eq(&s0));
512 // (... but after a while.)
513 let s = ms.conn_status_at(start + hour);
514 assert_eq!(s.online, Some(false));
515 assert_eq!(s.handshake_works, Some(false));
516
517 // If TCP has succeeded, we should notice that.
518 ms.record_tcp_success(start + sec);
519 let s = ms.conn_status_at(start + sec * 2);
520 assert_eq!(s.online, Some(true));
521 assert!(s.handshake_works.is_none());
522 let s = ms.conn_status_at(start + hour);
523 assert_eq!(s.online, Some(true));
524 assert_eq!(s.handshake_works, Some(false));
525
526 // If the handshake succeeded, we can notice that too.
527 ms.record_handshake_done(start + sec * 2);
528 let s = ms.conn_status_at(start + sec * 3);
529 assert_eq!(s.online, Some(true));
530 assert_eq!(s.handshake_works, Some(true));
531 }
532
533 #[test]
534 fn sender() {
535 let (mut snd, rcv) = channel();
536
537 {
538 let s = rcv.inner.borrow().clone();
539 assert_float_eq!(s.frac(), 0.0, abs <= TOL);
540 }
541
542 snd.record_attempt();
543 snd.record_tcp_success();
544 snd.record_tls_finished();
545 snd.record_handshake_done();
546
547 {
548 let s = rcv.inner.borrow().clone();
549 assert_float_eq!(s.frac(), 1.0, abs <= TOL);
550 }
551 }
552}