1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3// @@ begin lint list maintained by maint/add_warning @@
4#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6#![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39#![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43#![allow(clippy::needless_lifetimes)] // See arti#1765
44#![allow(mismatched_lifetime_syntaxes)] // temporary workaround for arti#2060
45//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
4647pub mod events;
4849use crate::events::{TorEvent, TorEventKind};
50use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
51use futures::channel::mpsc;
52use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
53use futures::future::Either;
54use futures::StreamExt;
55use std::pin::Pin;
56use std::sync::atomic::{AtomicUsize, Ordering};
57use std::sync::OnceLock;
58use std::task::{Context, Poll};
59use thiserror::Error;
60use tracing::{error, warn};
6162/// Pointer to an `UnboundedSender`, used to send events into the `EventReactor`.
63static EVENT_SENDER: OnceLock<UnboundedSender<TorEvent>> = OnceLock::new();
64/// An inactive receiver for the currently active broadcast channel, if there is one.
65static CURRENT_RECEIVER: OnceLock<InactiveReceiver<TorEvent>> = OnceLock::new();
66/// The number of `TorEventKind`s there are.
67const EVENT_KIND_COUNT: usize = 1;
68/// An array containing one `AtomicUsize` for each `TorEventKind`, used to track subscriptions.
69///
70/// When a `TorEventReceiver` subscribes to a `TorEventKind`, it uses its `usize` value to index
71/// into this array and increment the associated `AtomicUsize` (and decrements it to unsubscribe).
72/// This lets event emitters check whether there are any subscribers, and avoid emitting events
73/// if there aren't.
74static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
7576/// The size of the internal broadcast channel used to implement event subscription.
77pub static BROADCAST_CAPACITY: usize = 512;
7879/// A reactor used to forward events to make the event reporting system work.
80///
81/// # Note
82///
83/// Currently, this type is a singleton; there is one event reporting system used for the entire
84/// program. This is not stable, and may change in future.
85pub struct EventReactor {
86/// A receiver that the reactor uses to learn about incoming events.
87 ///
88 /// This is unbounded so that event publication doesn't have to be async.
89receiver: UnboundedReceiver<TorEvent>,
90/// A sender that the reactor uses to publish events.
91 ///
92 /// Events are only sent here if at least one subscriber currently wants them.
93broadcast: Sender<TorEvent>,
94}
9596impl EventReactor {
97/// Initialize the event reporting system, returning a reactor that must be run for it to work,
98 /// and a `TorEventReceiver` that can be used to extract events from the system. If the system
99 /// has already been initialized, returns `None` instead of a reactor.
100 ///
101 /// # Warnings
102 ///
103 /// The returned reactor *must* be run with `EventReactor::run`, in a background async task.
104 /// If it is not, the event system might consume unbounded amounts of memory.
105pub fn new() -> Option<Self> {
106let (tx, rx) = mpsc::unbounded();
107if EVENT_SENDER.set(tx).is_ok() {
108let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
109 CURRENT_RECEIVER
110 .set(brx.deactivate())
111 .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
112Some(Self {
113 receiver: rx,
114 broadcast: btx,
115 })
116 } else {
117None
118}
119 }
120/// Get a `TorEventReceiver` to receive events from, assuming an `EventReactor` is already
121 /// running somewhere. (If it isn't, returns `None`.)
122 ///
123 /// As noted in the type-level documentation, this function might not always work this way.
124pub fn receiver() -> Option<TorEventReceiver> {
125 CURRENT_RECEIVER
126 .get()
127 .map(|rx| TorEventReceiver::wrap(rx.clone()))
128 }
129/// Run the event forwarding reactor.
130 ///
131 /// You *must* call this function once a reactor is created.
132pub async fn run(mut self) {
133while let Some(event) = self.receiver.next().await {
134match self.broadcast.try_broadcast(event) {
135Ok(_) => {}
136Err(TrySendError::Closed(_)) => break,
137Err(TrySendError::Full(event)) => {
138// If the channel is full, do a blocking broadcast to wait for it to be
139 // not full, and log a warning about receivers lagging behind.
140warn!("TorEventReceivers aren't receiving events fast enough!");
141if self.broadcast.broadcast(event).await.is_err() {
142break;
143 }
144 }
145Err(TrySendError::Inactive(_)) => {
146// no active receivers, so just drop the event on the floor.
147}
148 }
149 }
150// It shouldn't be possible to get here, since we have globals keeping the channels
151 // open. Still, if we somehow do, log an error about it.
152error!("event reactor shutting down; this shouldn't ever happen");
153 }
154}
155156/// An error encountered when trying to receive a `TorEvent`.
157#[derive(Clone, Debug, Error)]
158#[non_exhaustive]
159pub enum ReceiverError {
160/// The receiver isn't subscribed to anything, so wouldn't ever return any events.
161#[error("No event subscriptions")]
162NoSubscriptions,
163/// The internal broadcast channel was closed, which shouldn't ever happen.
164#[error("Internal event broadcast channel closed")]
165ChannelClosed,
166}
167168/// A receiver for `TorEvent`s emitted by other users of this crate.
169///
170/// To use this type, first subscribe to some kinds of event by calling
171/// `TorEventReceiver::subscribe`. Then, consume events using the implementation of
172/// `futures::stream::Stream`.
173///
174/// # Warning
175///
176/// Once interest in events has been signalled with `subscribe`, events must be continuously
177/// read from the receiver in order to avoid excessive memory consumption.
178#[derive(Clone, Debug)]
179pub struct TorEventReceiver {
180/// If no events have been subscribed to yet, this is an `InactiveReceiver`; otherwise,
181 /// it's a `Receiver`.
182inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
183/// Whether we're subscribed to each event kind (if `subscribed[kind]` is true, we're
184 /// subscribed to `kind`).
185subscribed: [bool; EVENT_KIND_COUNT],
186}
187188impl futures::stream::Stream for TorEventReceiver {
189type Item = TorEvent;
190191fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192let this = self.get_mut();
193match this.inner {
194 Either::Left(ref mut active) => loop {
195match Pin::new(&mut *active).poll_next(cx) {
196 Poll::Ready(Some(e)) => {
197if this.subscribed[e.kind() as usize] {
198return Poll::Ready(Some(e));
199 }
200// loop, since we weren't subscribed to that event
201}
202 x => return x,
203 }
204 },
205 Either::Right(_) => {
206warn!("TorEventReceiver::poll_next() called without subscriptions!");
207 Poll::Ready(None)
208 }
209 }
210 }
211}
212213impl TorEventReceiver {
214/// Create a `TorEventReceiver` from an `InactiveReceiver` handle.
215pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
216Self {
217 inner: Either::Right(rx),
218 subscribed: [false; EVENT_KIND_COUNT],
219 }
220 }
221/// Subscribe to a given kind of `TorEvent`.
222 ///
223 /// After calling this function, `TorEventReceiver::recv` will emit events of that kind.
224 /// This function is idempotent (subscribing twice has the same effect as doing so once).
225pub fn subscribe(&mut self, kind: TorEventKind) {
226if !self.subscribed[kind as usize] {
227 EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
228self.subscribed[kind as usize] = true;
229 }
230// FIXME(eta): cloning is ungood, but hard to avoid
231if let Either::Right(inactive) = self.inner.clone() {
232self.inner = Either::Left(inactive.activate());
233 }
234 }
235/// Unsubscribe from a given kind of `TorEvent`.
236 ///
237 /// After calling this function, `TorEventReceiver::recv` will no longer emit events of that
238 /// kind.
239 /// This function is idempotent (unsubscribing twice has the same effect as doing so once).
240pub fn unsubscribe(&mut self, kind: TorEventKind) {
241if self.subscribed[kind as usize] {
242 EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
243self.subscribed[kind as usize] = false;
244 }
245// If we're now not subscribed to anything, deactivate our channel.
246if self.subscribed.iter().all(|x| !*x) {
247// FIXME(eta): cloning is ungood, but hard to avoid
248if let Either::Left(active) = self.inner.clone() {
249self.inner = Either::Right(active.deactivate());
250 }
251 }
252 }
253}
254255impl Drop for TorEventReceiver {
256fn drop(&mut self) {
257for (i, subscribed) in self.subscribed.iter().enumerate() {
258// FIXME(eta): duplicates logic from Self::unsubscribe, because it's not possible
259 // to go from a `usize` to a `TorEventKind`
260if *subscribed {
261 EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
262 }
263 }
264 }
265}
266267/// Returns a boolean indicating whether the event `kind` has any subscribers (as in,
268/// whether `TorEventReceiver::subscribe` has been called with that event kind).
269///
270/// This is useful to avoid doing work to generate events that might be computationally expensive
271/// to generate.
272pub fn event_has_subscribers(kind: TorEventKind) -> bool {
273 EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
274}
275276/// Broadcast the given `TorEvent` to any interested subscribers.
277///
278/// As an optimization, does nothing if the event has no subscribers (`event_has_subscribers`
279/// returns false). (also does nothing if the event subsystem hasn't been initialized yet)
280///
281/// This function isn't intended for use outside Arti crates (as in, library consumers of Arti
282/// shouldn't broadcast events!).
283pub fn broadcast(event: TorEvent) {
284if !event_has_subscribers(event.kind()) {
285return;
286 }
287if let Some(sender) = EVENT_SENDER.get() {
288// If this fails, there isn't much we can really do about it!
289let _ = sender.unbounded_send(event);
290 }
291}
292293#[cfg(test)]
294mod test {
295// @@ begin test lint list maintained by maint/add_warning @@
296#![allow(clippy::bool_assert_comparison)]
297 #![allow(clippy::clone_on_copy)]
298 #![allow(clippy::dbg_macro)]
299 #![allow(clippy::mixed_attributes_style)]
300 #![allow(clippy::print_stderr)]
301 #![allow(clippy::print_stdout)]
302 #![allow(clippy::single_char_pattern)]
303 #![allow(clippy::unwrap_used)]
304 #![allow(clippy::unchecked_duration_subtraction)]
305 #![allow(clippy::useless_vec)]
306 #![allow(clippy::needless_pass_by_value)]
307//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
308use crate::{
309 broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
310 };
311use std::sync::{Mutex, MutexGuard, OnceLock};
312use std::time::Duration;
313use tokio::runtime::Runtime;
314315// HACK(eta): these tests need to run effectively singlethreaded, since they mutate global
316 // state. They *also* need to share the same tokio runtime, which the
317 // #[tokio::test] thing doesn't do (it makes a new runtime per test), because of
318 // the need to have a background singleton EventReactor.
319 //
320 // To hack around this, we just have a global runtime protected by a mutex!
321static TEST_MUTEX: OnceLock<Mutex<Runtime>> = OnceLock::new();
322323/// Locks the mutex, and makes sure the event reactor is initialized.
324fn test_setup() -> MutexGuard<'static, Runtime> {
325let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
326let runtime = mutex
327 .lock()
328 .expect("mutex poisoned, probably by other failing tests");
329if let Some(reactor) = EventReactor::new() {
330 runtime.handle().spawn(reactor.run());
331 }
332 runtime
333 }
334335#[test]
336fn subscriptions() {
337let rt = test_setup();
338339 rt.block_on(async move {
340// shouldn't have any subscribers at the start
341assert!(!event_has_subscribers(TorEventKind::Empty));
342343let mut rx = EventReactor::receiver().unwrap();
344// creating a receiver shouldn't result in any subscriptions
345assert!(!event_has_subscribers(TorEventKind::Empty));
346347 rx.subscribe(TorEventKind::Empty);
348// subscription should work
349assert!(event_has_subscribers(TorEventKind::Empty));
350351 rx.unsubscribe(TorEventKind::Empty);
352// unsubscribing should work
353assert!(!event_has_subscribers(TorEventKind::Empty));
354355// subscription should be idempotent
356rx.subscribe(TorEventKind::Empty);
357 rx.subscribe(TorEventKind::Empty);
358 rx.subscribe(TorEventKind::Empty);
359assert!(event_has_subscribers(TorEventKind::Empty));
360361 rx.unsubscribe(TorEventKind::Empty);
362assert!(!event_has_subscribers(TorEventKind::Empty));
363364 rx.subscribe(TorEventKind::Empty);
365assert!(event_has_subscribers(TorEventKind::Empty));
366367 std::mem::drop(rx);
368// dropping the receiver should auto-unsubscribe
369assert!(!event_has_subscribers(TorEventKind::Empty));
370 });
371 }
372373#[test]
374fn empty_recv() {
375let rt = test_setup();
376377 rt.block_on(async move {
378let mut rx = EventReactor::receiver().unwrap();
379// attempting to read from a receiver with no subscriptions should return None
380let result = rx.next().await;
381assert!(result.is_none());
382 });
383 }
384385#[test]
386fn receives_events() {
387let rt = test_setup();
388389 rt.block_on(async move {
390let mut rx = EventReactor::receiver().unwrap();
391 rx.subscribe(TorEventKind::Empty);
392// HACK(eta): give the event reactor time to run
393tokio::time::sleep(Duration::from_millis(100)).await;
394 broadcast(TorEvent::Empty);
395396let result = rx.next().await;
397assert_eq!(result, Some(TorEvent::Empty));
398 });
399 }
400401#[test]
402fn does_not_send_to_no_subscribers() {
403let rt = test_setup();
404405 rt.block_on(async move {
406// this event should just get dropped on the floor, because no subscribers exist
407broadcast(TorEvent::Empty);
408409let mut rx = EventReactor::receiver().unwrap();
410 rx.subscribe(TorEventKind::Empty);
411412// this shouldn't have an event to receive now
413let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
414assert!(result.is_err());
415 });
416 }
417}