tor_async_utils/
watch.rs

1//! Extension trait for more efficient use of [`postage::watch`].
2use std::ops::{Deref, DerefMut};
3use void::{ResultVoidExt as _, Void};
4
5/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
6///
7/// Ideally these, or something like them, would be upstream:
8/// See <https://github.com/austinjones/postage-rs/issues/56>.
9///
10/// We provide this as an extension trait became the implementation is a bit fiddly.
11/// This lets us concentrate on the actual logic, when we use it.
12pub trait PostageWatchSenderExt<T> {
13    /// Update, by calling a fallible function, sending only if necessary
14    ///
15    /// Calls `update` on the current value in the watch, to obtain a new value.
16    /// If the new value doesn't compare equal, updates the watch, notifying receivers.
17    fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
18    where
19        T: PartialEq,
20        F: FnOnce(&T) -> Result<T, E>;
21
22    /// Update, by calling a function, sending only if necessary
23    ///
24    /// Calls `update` on the current value in the watch, to obtain a new value.
25    /// If the new value doesn't compare equal, updates the watch, notifying receivers.
26    fn maybe_send<F>(&mut self, update: F)
27    where
28        T: PartialEq,
29        F: FnOnce(&T) -> T,
30    {
31        self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
32            .void_unwrap();
33    }
34}
35
36impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
37    fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
38    where
39        T: PartialEq,
40        F: FnOnce(&T) -> Result<T, E>,
41    {
42        let lock = self.borrow();
43        let new = update(&*lock)?;
44        if new != *lock {
45            // We must drop the lock guard, because otherwise borrow_mut will deadlock.
46            // There is no race, because we hold &mut self, so no-one else can get a look in.
47            // (postage::watch::Sender is not one of those facilities which is mereely a
48            // handle, and Clone.)
49            drop(lock);
50            *self.borrow_mut() = new;
51        }
52        Ok(())
53    }
54}
55
56#[derive(Debug)]
57/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
58///
59/// Derefs to the inner `Sender`.
60///
61/// Ideally this would be behaviour promised by upstream, or something
62/// See <https://github.com/austinjones/postage-rs/issues/57>.
63pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
64
65/// Values that can signal EOF
66///
67/// Implemented for `Option`, which is usually what you want to use.
68pub trait DropNotifyEofSignallable {
69    /// Generate the EOF value
70    fn eof() -> Self;
71
72    /// Does this value indicate EOF?
73    ///
74    /// ### Deprecated
75    ///
76    /// This method is deprecated.
77    /// It should not be called, or defined, in new programs.
78    /// It is not required by [`DropNotifyWatchSender`].
79    /// The provided implementation always returns `false`.
80    #[deprecated]
81    fn is_eof(&self) -> bool {
82        false
83    }
84}
85
86impl<T> DropNotifyEofSignallable for Option<T> {
87    fn eof() -> Self {
88        None
89    }
90
91    fn is_eof(&self) -> bool {
92        self.is_none()
93    }
94}
95
96impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
97    /// Arrange to send `T::Default` when `inner` is dropped
98    pub fn new(inner: postage::watch::Sender<T>) -> Self {
99        DropNotifyWatchSender(Some(inner))
100    }
101
102    /// Unwrap the inner sender, defusing the drop notification
103    pub fn into_inner(mut self) -> postage::watch::Sender<T> {
104        self.0.take().expect("inner was None")
105    }
106}
107
108impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
109    type Target = postage::watch::Sender<T>;
110    fn deref(&self) -> &Self::Target {
111        self.0.as_ref().expect("inner was None")
112    }
113}
114
115impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
116    fn deref_mut(&mut self) -> &mut Self::Target {
117        self.0.as_mut().expect("inner was None")
118    }
119}
120
121impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
122    fn drop(&mut self) {
123        if let Some(mut inner) = self.0.take() {
124            // None means into_inner() was called
125            *inner.borrow_mut() = DropNotifyEofSignallable::eof();
126        }
127    }
128}
129
130#[cfg(test)]
131mod test {
132    // @@ begin test lint list maintained by maint/add_warning @@
133    #![allow(clippy::bool_assert_comparison)]
134    #![allow(clippy::clone_on_copy)]
135    #![allow(clippy::dbg_macro)]
136    #![allow(clippy::mixed_attributes_style)]
137    #![allow(clippy::print_stderr)]
138    #![allow(clippy::print_stdout)]
139    #![allow(clippy::single_char_pattern)]
140    #![allow(clippy::unwrap_used)]
141    #![allow(clippy::unchecked_duration_subtraction)]
142    #![allow(clippy::useless_vec)]
143    #![allow(clippy::needless_pass_by_value)]
144    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
145
146    use super::*;
147    use futures::select_biased;
148    use futures_await_test::async_test;
149
150    #[async_test]
151    async fn postage_sender_ext() {
152        use futures::stream::StreamExt;
153        use futures::FutureExt;
154
155        let (mut s, mut r) = postage::watch::channel_with(20);
156        // Receiver of a fresh watch wakes once, but let's not rely on this
157        select_biased! {
158            i = r.next().fuse() => assert_eq!(i, Some(20)),
159            _ = futures::future::ready(()) => { }, // tolerate nothing
160        };
161        // Now, not ready
162        select_biased! {
163            _ = r.next().fuse() => panic!(),
164            _ = futures::future::ready(()) => { },
165        };
166
167        s.maybe_send(|i| *i);
168        // Still not ready
169        select_biased! {
170            _ = r.next().fuse() => panic!(),
171            _ = futures::future::ready(()) => { },
172        };
173
174        s.maybe_send(|i| *i + 1);
175        // Ready, with 21
176        select_biased! {
177            i = r.next().fuse() => assert_eq!(i, Some(21)),
178            _ = futures::future::ready(()) => panic!(),
179        };
180
181        let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
182        // Not ready
183        select_biased! {
184            _ = r.next().fuse() => panic!(),
185            _ = futures::future::ready(()) => { },
186        };
187    }
188
189    #[async_test]
190    async fn postage_drop() {
191        #[derive(Clone, Copy, Debug, Eq, PartialEq)]
192        struct I(i32);
193
194        impl DropNotifyEofSignallable for I {
195            fn eof() -> I {
196                I(0)
197            }
198            fn is_eof(&self) -> bool {
199                self.0 == 0
200            }
201        }
202
203        let (s, r) = postage::watch::channel_with(I(20));
204        let s = DropNotifyWatchSender::new(s);
205
206        assert_eq!(*r.borrow(), I(20));
207        drop(s);
208        assert_eq!(*r.borrow(), I(0));
209
210        let (s, r) = postage::watch::channel_with(I(44));
211        let s = DropNotifyWatchSender::new(s);
212
213        assert_eq!(*r.borrow(), I(44));
214        drop(s.into_inner());
215        assert_eq!(*r.borrow(), I(44));
216    }
217}