tor_events/
lib.rs

1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
45
46pub mod events;
47
48use crate::events::{TorEvent, TorEventKind};
49use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
50use futures::channel::mpsc;
51use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
52use futures::future::Either;
53use futures::StreamExt;
54use std::pin::Pin;
55use std::sync::atomic::{AtomicUsize, Ordering};
56use std::sync::OnceLock;
57use std::task::{Context, Poll};
58use thiserror::Error;
59use tracing::{error, warn};
60
61/// Pointer to an `UnboundedSender`, used to send events into the `EventReactor`.
62static EVENT_SENDER: OnceLock<UnboundedSender<TorEvent>> = OnceLock::new();
63/// An inactive receiver for the currently active broadcast channel, if there is one.
64static CURRENT_RECEIVER: OnceLock<InactiveReceiver<TorEvent>> = OnceLock::new();
65/// The number of `TorEventKind`s there are.
66const EVENT_KIND_COUNT: usize = 1;
67/// An array containing one `AtomicUsize` for each `TorEventKind`, used to track subscriptions.
68///
69/// When a `TorEventReceiver` subscribes to a `TorEventKind`, it uses its `usize` value to index
70/// into this array and increment the associated `AtomicUsize` (and decrements it to unsubscribe).
71/// This lets event emitters check whether there are any subscribers, and avoid emitting events
72/// if there aren't.
73static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
74
75/// The size of the internal broadcast channel used to implement event subscription.
76pub static BROADCAST_CAPACITY: usize = 512;
77
78/// A reactor used to forward events to make the event reporting system work.
79///
80/// # Note
81///
82/// Currently, this type is a singleton; there is one event reporting system used for the entire
83/// program. This is not stable, and may change in future.
84pub struct EventReactor {
85    /// A receiver that the reactor uses to learn about incoming events.
86    ///
87    /// This is unbounded so that event publication doesn't have to be async.
88    receiver: UnboundedReceiver<TorEvent>,
89    /// A sender that the reactor uses to publish events.
90    ///
91    /// Events are only sent here if at least one subscriber currently wants them.
92    broadcast: Sender<TorEvent>,
93}
94
95impl EventReactor {
96    /// Initialize the event reporting system, returning a reactor that must be run for it to work,
97    /// and a `TorEventReceiver` that can be used to extract events from the system. If the system
98    /// has already been initialized, returns `None` instead of a reactor.
99    ///
100    /// # Warnings
101    ///
102    /// The returned reactor *must* be run with `EventReactor::run`, in a background async task.
103    /// If it is not, the event system might consume unbounded amounts of memory.
104    pub fn new() -> Option<Self> {
105        let (tx, rx) = mpsc::unbounded();
106        if EVENT_SENDER.set(tx).is_ok() {
107            let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
108            CURRENT_RECEIVER
109                .set(brx.deactivate())
110                .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
111            Some(Self {
112                receiver: rx,
113                broadcast: btx,
114            })
115        } else {
116            None
117        }
118    }
119    /// Get a `TorEventReceiver` to receive events from, assuming an `EventReactor` is already
120    /// running somewhere. (If it isn't, returns `None`.)
121    ///
122    /// As noted in the type-level documentation, this function might not always work this way.
123    pub fn receiver() -> Option<TorEventReceiver> {
124        CURRENT_RECEIVER
125            .get()
126            .map(|rx| TorEventReceiver::wrap(rx.clone()))
127    }
128    /// Run the event forwarding reactor.
129    ///
130    /// You *must* call this function once a reactor is created.
131    pub async fn run(mut self) {
132        while let Some(event) = self.receiver.next().await {
133            match self.broadcast.try_broadcast(event) {
134                Ok(_) => {}
135                Err(TrySendError::Closed(_)) => break,
136                Err(TrySendError::Full(event)) => {
137                    // If the channel is full, do a blocking broadcast to wait for it to be
138                    // not full, and log a warning about receivers lagging behind.
139                    warn!("TorEventReceivers aren't receiving events fast enough!");
140                    if self.broadcast.broadcast(event).await.is_err() {
141                        break;
142                    }
143                }
144                Err(TrySendError::Inactive(_)) => {
145                    // no active receivers, so just drop the event on the floor.
146                }
147            }
148        }
149        // It shouldn't be possible to get here, since we have globals keeping the channels
150        // open. Still, if we somehow do, log an error about it.
151        error!("event reactor shutting down; this shouldn't ever happen");
152    }
153}
154
155/// An error encountered when trying to receive a `TorEvent`.
156#[derive(Clone, Debug, Error)]
157#[non_exhaustive]
158pub enum ReceiverError {
159    /// The receiver isn't subscribed to anything, so wouldn't ever return any events.
160    #[error("No event subscriptions")]
161    NoSubscriptions,
162    /// The internal broadcast channel was closed, which shouldn't ever happen.
163    #[error("Internal event broadcast channel closed")]
164    ChannelClosed,
165}
166
167/// A receiver for `TorEvent`s emitted by other users of this crate.
168///
169/// To use this type, first subscribe to some kinds of event by calling
170/// `TorEventReceiver::subscribe`. Then, consume events using the implementation of
171/// `futures::stream::Stream`.
172///
173/// # Warning
174///
175/// Once interest in events has been signalled with `subscribe`, events must be continuously
176/// read from the receiver in order to avoid excessive memory consumption.
177#[derive(Clone, Debug)]
178pub struct TorEventReceiver {
179    /// If no events have been subscribed to yet, this is an `InactiveReceiver`; otherwise,
180    /// it's a `Receiver`.
181    inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
182    /// Whether we're subscribed to each event kind (if `subscribed[kind]` is true, we're
183    /// subscribed to `kind`).
184    subscribed: [bool; EVENT_KIND_COUNT],
185}
186
187impl futures::stream::Stream for TorEventReceiver {
188    type Item = TorEvent;
189
190    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191        let this = self.get_mut();
192        match this.inner {
193            Either::Left(ref mut active) => loop {
194                match Pin::new(&mut *active).poll_next(cx) {
195                    Poll::Ready(Some(e)) => {
196                        if this.subscribed[e.kind() as usize] {
197                            return Poll::Ready(Some(e));
198                        }
199                        // loop, since we weren't subscribed to that event
200                    }
201                    x => return x,
202                }
203            },
204            Either::Right(_) => {
205                warn!("TorEventReceiver::poll_next() called without subscriptions!");
206                Poll::Ready(None)
207            }
208        }
209    }
210}
211
212impl TorEventReceiver {
213    /// Create a `TorEventReceiver` from an `InactiveReceiver` handle.
214    pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
215        Self {
216            inner: Either::Right(rx),
217            subscribed: [false; EVENT_KIND_COUNT],
218        }
219    }
220    /// Subscribe to a given kind of `TorEvent`.
221    ///
222    /// After calling this function, `TorEventReceiver::recv` will emit events of that kind.
223    /// This function is idempotent (subscribing twice has the same effect as doing so once).
224    pub fn subscribe(&mut self, kind: TorEventKind) {
225        if !self.subscribed[kind as usize] {
226            EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
227            self.subscribed[kind as usize] = true;
228        }
229        // FIXME(eta): cloning is ungood, but hard to avoid
230        if let Either::Right(inactive) = self.inner.clone() {
231            self.inner = Either::Left(inactive.activate());
232        }
233    }
234    /// Unsubscribe from a given kind of `TorEvent`.
235    ///
236    /// After calling this function, `TorEventReceiver::recv` will no longer emit events of that
237    /// kind.
238    /// This function is idempotent (unsubscribing twice has the same effect as doing so once).
239    pub fn unsubscribe(&mut self, kind: TorEventKind) {
240        if self.subscribed[kind as usize] {
241            EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
242            self.subscribed[kind as usize] = false;
243        }
244        // If we're now not subscribed to anything, deactivate our channel.
245        if self.subscribed.iter().all(|x| !*x) {
246            // FIXME(eta): cloning is ungood, but hard to avoid
247            if let Either::Left(active) = self.inner.clone() {
248                self.inner = Either::Right(active.deactivate());
249            }
250        }
251    }
252}
253
254impl Drop for TorEventReceiver {
255    fn drop(&mut self) {
256        for (i, subscribed) in self.subscribed.iter().enumerate() {
257            // FIXME(eta): duplicates logic from Self::unsubscribe, because it's not possible
258            //             to go from a `usize` to a `TorEventKind`
259            if *subscribed {
260                EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
261            }
262        }
263    }
264}
265
266/// Returns a boolean indicating whether the event `kind` has any subscribers (as in,
267/// whether `TorEventReceiver::subscribe` has been called with that event kind).
268///
269/// This is useful to avoid doing work to generate events that might be computationally expensive
270/// to generate.
271pub fn event_has_subscribers(kind: TorEventKind) -> bool {
272    EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
273}
274
275/// Broadcast the given `TorEvent` to any interested subscribers.
276///
277/// As an optimization, does nothing if the event has no subscribers (`event_has_subscribers`
278/// returns false). (also does nothing if the event subsystem hasn't been initialized yet)
279///
280/// This function isn't intended for use outside Arti crates (as in, library consumers of Arti
281/// shouldn't broadcast events!).
282pub fn broadcast(event: TorEvent) {
283    if !event_has_subscribers(event.kind()) {
284        return;
285    }
286    if let Some(sender) = EVENT_SENDER.get() {
287        // If this fails, there isn't much we can really do about it!
288        let _ = sender.unbounded_send(event);
289    }
290}
291
292#[cfg(test)]
293mod test {
294    // @@ begin test lint list maintained by maint/add_warning @@
295    #![allow(clippy::bool_assert_comparison)]
296    #![allow(clippy::clone_on_copy)]
297    #![allow(clippy::dbg_macro)]
298    #![allow(clippy::mixed_attributes_style)]
299    #![allow(clippy::print_stderr)]
300    #![allow(clippy::print_stdout)]
301    #![allow(clippy::single_char_pattern)]
302    #![allow(clippy::unwrap_used)]
303    #![allow(clippy::unchecked_duration_subtraction)]
304    #![allow(clippy::useless_vec)]
305    #![allow(clippy::needless_pass_by_value)]
306    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
307    use crate::{
308        broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
309    };
310    use std::sync::{Mutex, MutexGuard, OnceLock};
311    use std::time::Duration;
312    use tokio::runtime::Runtime;
313
314    // HACK(eta): these tests need to run effectively singlethreaded, since they mutate global
315    //            state. They *also* need to share the same tokio runtime, which the
316    //            #[tokio::test] thing doesn't do (it makes a new runtime per test), because of
317    //            the need to have a background singleton EventReactor.
318    //
319    //            To hack around this, we just have a global runtime protected by a mutex!
320    static TEST_MUTEX: OnceLock<Mutex<Runtime>> = OnceLock::new();
321
322    /// Locks the mutex, and makes sure the event reactor is initialized.
323    fn test_setup() -> MutexGuard<'static, Runtime> {
324        let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
325        let runtime = mutex
326            .lock()
327            .expect("mutex poisoned, probably by other failing tests");
328        if let Some(reactor) = EventReactor::new() {
329            runtime.handle().spawn(reactor.run());
330        }
331        runtime
332    }
333
334    #[test]
335    fn subscriptions() {
336        let rt = test_setup();
337
338        rt.block_on(async move {
339            // shouldn't have any subscribers at the start
340            assert!(!event_has_subscribers(TorEventKind::Empty));
341
342            let mut rx = EventReactor::receiver().unwrap();
343            // creating a receiver shouldn't result in any subscriptions
344            assert!(!event_has_subscribers(TorEventKind::Empty));
345
346            rx.subscribe(TorEventKind::Empty);
347            // subscription should work
348            assert!(event_has_subscribers(TorEventKind::Empty));
349
350            rx.unsubscribe(TorEventKind::Empty);
351            // unsubscribing should work
352            assert!(!event_has_subscribers(TorEventKind::Empty));
353
354            // subscription should be idempotent
355            rx.subscribe(TorEventKind::Empty);
356            rx.subscribe(TorEventKind::Empty);
357            rx.subscribe(TorEventKind::Empty);
358            assert!(event_has_subscribers(TorEventKind::Empty));
359
360            rx.unsubscribe(TorEventKind::Empty);
361            assert!(!event_has_subscribers(TorEventKind::Empty));
362
363            rx.subscribe(TorEventKind::Empty);
364            assert!(event_has_subscribers(TorEventKind::Empty));
365
366            std::mem::drop(rx);
367            // dropping the receiver should auto-unsubscribe
368            assert!(!event_has_subscribers(TorEventKind::Empty));
369        });
370    }
371
372    #[test]
373    fn empty_recv() {
374        let rt = test_setup();
375
376        rt.block_on(async move {
377            let mut rx = EventReactor::receiver().unwrap();
378            // attempting to read from a receiver with no subscriptions should return None
379            let result = rx.next().await;
380            assert!(result.is_none());
381        });
382    }
383
384    #[test]
385    fn receives_events() {
386        let rt = test_setup();
387
388        rt.block_on(async move {
389            let mut rx = EventReactor::receiver().unwrap();
390            rx.subscribe(TorEventKind::Empty);
391            // HACK(eta): give the event reactor time to run
392            tokio::time::sleep(Duration::from_millis(100)).await;
393            broadcast(TorEvent::Empty);
394
395            let result = rx.next().await;
396            assert_eq!(result, Some(TorEvent::Empty));
397        });
398    }
399
400    #[test]
401    fn does_not_send_to_no_subscribers() {
402        let rt = test_setup();
403
404        rt.block_on(async move {
405            // this event should just get dropped on the floor, because no subscribers exist
406            broadcast(TorEvent::Empty);
407
408            let mut rx = EventReactor::receiver().unwrap();
409            rx.subscribe(TorEventKind::Empty);
410
411            // this shouldn't have an event to receive now
412            let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
413            assert!(result.is_err());
414        });
415    }
416}