pub trait SinkPrepareExt<'w, OS, OM>where
OS: Sink<OM>,{
// Required method
fn prepare_send_from<IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM> ⓘ
where IF: Future<Output = IM>;
}
Expand description
Extension trait for [Sink
] to add a method for cancel-safe usage.
Required Methods§
Sourcefn prepare_send_from<IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM> ⓘwhere
IF: Future<Output = IM>,
fn prepare_send_from<IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM> ⓘwhere
IF: Future<Output = IM>,
For processing an item obtained from a future, avoiding async cancel lossage
let (message, sendable) = sink.prepare_send_from(
message_generator_future
).await?;
let message = process_message(message)?;
sendable.send(message);
Prepares to send a output message1 OM
to an output sink OS
(self
),
where the OM
is made from an input message IM
,
and the IM
is obtained from a future, generator: IF
.
When successfully run, prepare_send_from
gives (IM, SinkSendable)
.
After processing IM
into OM
,
use the SinkSendable
to send
the OM
to OS
.
§Why use this
This avoids the an async cancellation hazard
which exists with naive use of select!
followed by OS.send().await
. You might write this:
select!{
message = input_stream.next() => {
if let Some(message) = message {
let message = do_our_processing(message);
output_sink(message).await; // <---**BUG**
}
}
control = something_else() => { .. }
}
If, when we reach BUG
, the output sink is not ready to receive the message,
the future for that particular select!
branch will be suspended.
But when select!
finds that any one of the branches is ready,
it drops the futures for the other branches.
That drops all the local variables, including possibly message
, losing it.
For more about cancellation safety, see Rust for the Polyglot Programmer which has a general summary, and Matthias Einwag’s extensive discussion in his gist with comparisons to other languages.
§Alternatives
Unbounded mpsc channels, and certain other primitives,
do not suffer from this problem because they do not block.
UnboundedSender
offers
unbounded_send
but only as an inherent method, so this does not compose with Sink
combinators.
And of course unbounded channels do not implement any backpressure.
The problem can otherwise be avoided by completely eschewing use of select!
and writing manual implementations of Future
, Sink
, and so on,
However, such code is typically considerably more complex and involves
entangling the primary logic with future machinery.
It is normally better to write primary functionality in async { }
using utilities (often “futures combinators”) such as this one.
§Example
This comprehensive example demonstrates how to read from possibly multiple sources and also be able to process other events:
use futures::select;
use futures::{SinkExt as _, StreamExt as _};
use tor_async_utils::SinkPrepareExt as _;
let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::<usize>();
let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::<String>();
input_w.send(42).await;
select!{
ret = output_w.prepare_send_from(async {
select!{
got_input = input_r.next() => got_input.expect("input stream ended!"),
() = futures::future::pending() => panic!(), // other branches are OK here
}
}) => {
let (input_msg, sendable) = ret.unwrap();
let output_msg = input_msg.to_string();
let () = sendable.send(output_msg).unwrap();
},
() = futures::future::pending() => panic!(), // other branches are OK here
}
assert_eq!(output_r.next().await.unwrap(), "42");
§Formally
prepare_send_from
returns a SinkPrepareSendFuture
which, when awaited:
- Waits for
OS
to be ready to receive an item. - Runs
message_generator
to obtain aIM
. - Returns the
IM
(for processing), and aSinkSendable
.
The caller should then:
- Check the error from
prepare_send_from
(which came from the output sink). - Process the
IM
, making anOM
out of it. - Call
sendable.send()
(and check its error).
§Flushing
prepare_send_from
will (when awaited)
flush
the output sink
when it finds the input is not ready yet.
Until then items may be buffered
(as if they had been written with feed
).
§Errors
§Output sink errors
The call site can experience output sink errors in two places,
prepare_send_from()
and SinkSendable::send()
.
The caller should typically handle them the same way regardless of when they occurred.
If the error happens at SinkSendable::send()
,
the call site will usually be forced to discard the item being processed.
This will only occur if the sink is actually broken.
§Errors specific to the call site: faillible input, and fallible processing
At some call sites, the input future may yield errors
(perhaps it is reading from a Stream
of Result
s).
in that case the value from the input future will be a Result
.
Then IM
is a Result
, and is provided in the .0
element
of the “successful” return from prepare_send_from
.
And, at some call sites, the processing of an IM
into an OM
is fallible.
Handling these latter two error caess is up to the caller,
in the code which processes IM
.
The call site will often want to deal with such an error
without sending anything into the output sink,
and can then just drop the SinkSendable
.
§Implementations
This is an extension trait and you are not expected to need to implement it.
There are provided implementations for Pin<&mut impl Sink>
and &mut impl Sink + Unpin
, for your convenience.
We sometimes use slightly inconsistent terminology, “item” vs “message”. This avoids having to have the generic parameters by named
OI
andII
whereI
is sometimes “item” and sometimes “input”. ↩
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.