1//! Extension trait for more efficient use of [`postage::watch`].
2use std::ops::{Deref, DerefMut};
3use void::{ResultVoidExt as _, Void};
45/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
6///
7/// Ideally these, or something like them, would be upstream:
8/// See <https://github.com/austinjones/postage-rs/issues/56>.
9///
10/// We provide this as an extension trait became the implementation is a bit fiddly.
11/// This lets us concentrate on the actual logic, when we use it.
12pub trait PostageWatchSenderExt<T> {
13/// Update, by calling a fallible function, sending only if necessary
14 ///
15 /// Calls `update` on the current value in the watch, to obtain a new value.
16 /// If the new value doesn't compare equal, updates the watch, notifying receivers.
17fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
18where
19T: PartialEq,
20 F: FnOnce(&T) -> Result<T, E>;
2122/// Update, by calling a function, sending only if necessary
23 ///
24 /// Calls `update` on the current value in the watch, to obtain a new value.
25 /// If the new value doesn't compare equal, updates the watch, notifying receivers.
26fn maybe_send<F>(&mut self, update: F)
27where
28T: PartialEq,
29 F: FnOnce(&T) -> T,
30 {
31self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
32 .void_unwrap();
33 }
34}
3536impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
37fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
38where
39T: PartialEq,
40 F: FnOnce(&T) -> Result<T, E>,
41 {
42let lock = self.borrow();
43let new = update(&*lock)?;
44if new != *lock {
45// We must drop the lock guard, because otherwise borrow_mut will deadlock.
46 // There is no race, because we hold &mut self, so no-one else can get a look in.
47 // (postage::watch::Sender is not one of those facilities which is mereely a
48 // handle, and Clone.)
49drop(lock);
50*self.borrow_mut() = new;
51 }
52Ok(())
53 }
54}
5556#[derive(Debug)]
57/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
58///
59/// Derefs to the inner `Sender`.
60///
61/// Ideally this would be behaviour promised by upstream, or something
62/// See <https://github.com/austinjones/postage-rs/issues/57>.
63pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
6465/// Values that can signal EOF
66///
67/// Implemented for `Option`, which is usually what you want to use.
68pub trait DropNotifyEofSignallable {
69/// Generate the EOF value
70fn eof() -> Self;
7172/// Does this value indicate EOF?
73 ///
74 /// ### Deprecated
75 ///
76 /// This method is deprecated.
77 /// It should not be called, or defined, in new programs.
78 /// It is not required by [`DropNotifyWatchSender`].
79 /// The provided implementation always returns `false`.
80#[deprecated]
81fn is_eof(&self) -> bool {
82false
83}
84}
8586impl<T> DropNotifyEofSignallable for Option<T> {
87fn eof() -> Self {
88None
89}
9091fn is_eof(&self) -> bool {
92self.is_none()
93 }
94}
9596impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
97/// Arrange to send `T::Default` when `inner` is dropped
98pub fn new(inner: postage::watch::Sender<T>) -> Self {
99 DropNotifyWatchSender(Some(inner))
100 }
101102/// Unwrap the inner sender, defusing the drop notification
103pub fn into_inner(mut self) -> postage::watch::Sender<T> {
104self.0.take().expect("inner was None")
105 }
106}
107108impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
109type Target = postage::watch::Sender<T>;
110fn deref(&self) -> &Self::Target {
111self.0.as_ref().expect("inner was None")
112 }
113}
114115impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
116fn deref_mut(&mut self) -> &mut Self::Target {
117self.0.as_mut().expect("inner was None")
118 }
119}
120121impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
122fn drop(&mut self) {
123if let Some(mut inner) = self.0.take() {
124// None means into_inner() was called
125*inner.borrow_mut() = DropNotifyEofSignallable::eof();
126 }
127 }
128}
129130#[cfg(test)]
131mod test {
132// @@ begin test lint list maintained by maint/add_warning @@
133#![allow(clippy::bool_assert_comparison)]
134 #![allow(clippy::clone_on_copy)]
135 #![allow(clippy::dbg_macro)]
136 #![allow(clippy::mixed_attributes_style)]
137 #![allow(clippy::print_stderr)]
138 #![allow(clippy::print_stdout)]
139 #![allow(clippy::single_char_pattern)]
140 #![allow(clippy::unwrap_used)]
141 #![allow(clippy::unchecked_duration_subtraction)]
142 #![allow(clippy::useless_vec)]
143 #![allow(clippy::needless_pass_by_value)]
144//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
145146use super::*;
147use futures::select_biased;
148use futures_await_test::async_test;
149150#[async_test]
151async fn postage_sender_ext() {
152use futures::stream::StreamExt;
153use futures::FutureExt;
154155let (mut s, mut r) = postage::watch::channel_with(20);
156// Receiver of a fresh watch wakes once, but let's not rely on this
157select_biased! {
158 i = r.next().fuse() => assert_eq!(i, Some(20)),
159_ = futures::future::ready(()) => { }, // tolerate nothing
160};
161// Now, not ready
162select_biased! {
163_ = r.next().fuse() => panic!(),
164_ = futures::future::ready(()) => { },
165 };
166167 s.maybe_send(|i| *i);
168// Still not ready
169select_biased! {
170_ = r.next().fuse() => panic!(),
171_ = futures::future::ready(()) => { },
172 };
173174 s.maybe_send(|i| *i + 1);
175// Ready, with 21
176select_biased! {
177 i = r.next().fuse() => assert_eq!(i, Some(21)),
178_ = futures::future::ready(()) => panic!(),
179 };
180181let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
182// Not ready
183select_biased! {
184_ = r.next().fuse() => panic!(),
185_ = futures::future::ready(()) => { },
186 };
187 }
188189#[async_test]
190async fn postage_drop() {
191#[derive(Clone, Copy, Debug, Eq, PartialEq)]
192struct I(i32);
193194impl DropNotifyEofSignallable for I {
195fn eof() -> I {
196 I(0)
197 }
198fn is_eof(&self) -> bool {
199self.0 == 0
200}
201 }
202203let (s, r) = postage::watch::channel_with(I(20));
204let s = DropNotifyWatchSender::new(s);
205206assert_eq!(*r.borrow(), I(20));
207 drop(s);
208assert_eq!(*r.borrow(), I(0));
209210let (s, r) = postage::watch::channel_with(I(44));
211let s = DropNotifyWatchSender::new(s);
212213assert_eq!(*r.borrow(), I(44));
214 drop(s.into_inner());
215assert_eq!(*r.borrow(), I(44));
216 }
217}