tor_hsclient/proto_oneshot.rs
1//! [`oneshot`] channel between a circuit control message handler and the main code
2//!
3//! Wraps up a [`oneshot`] and deals with some error handling.
4//!
5//! Used by [`connect`](crate::connect)
6
7use oneshot_fused_workaround as oneshot;
8use tor_cell::relaycell::msg::AnyRelayMsg;
9use tor_cell::relaycell::RelayMsg;
10use tor_error::internal;
11use tor_proto::circuit::MetaCellDisposition;
12
13use crate::FailedAttemptError;
14
15/// Sender, owned by the circuit message handler
16///
17/// Also records whether the message has been sent.
18/// Forms part of the state for message handler's state machine.
19pub(crate) struct Sender<M>(
20 /// This is an `Option` so that we can `send` without consuming.
21 ///
22 /// Needed because `oneshot`'s send consumes, but the message handler gets `&mut self`.
23 Option<oneshot::Sender<Result<M, tor_proto::Error>>>,
24);
25
26/// Receiver for awaiting the protocol message when the circuit handler sends it
27pub(crate) struct Receiver<M>(
28 oneshot::Receiver<Result<M, tor_proto::Error>>, // (force rustfmt to do this like Sender)
29);
30
31/// Create a new [`proto_oneshot::Sender`](Sender) and [`proto_oneshot::Receiver`](Receiver)
32pub(crate) fn channel<M>() -> (Sender<M>, Receiver<M>) {
33 let (tx, rx) = oneshot::channel();
34 (Sender(Some(tx)), Receiver(rx))
35}
36
37impl<M> Sender<M> {
38 /// Has this `Sender` yet to be used?
39 ///
40 /// Returns `true` until the first call to `deliver_expected_message`,
41 /// then `false` .
42 pub(crate) fn still_expected(&self) -> bool {
43 self.0.is_some()
44 }
45
46 /// Try to decode `msg` as message of type `M`, and to send the outcome on the
47 /// oneshot taken from `reply_tx`.
48 ///
49 /// Gives an error if `reply_tx` is None, or if an error occurs.
50 ///
51 /// Where possible, errors are also reported via the `oneshot`.
52 pub(crate) fn deliver_expected_message(
53 &mut self,
54 msg: AnyRelayMsg,
55 disposition_on_success: MetaCellDisposition,
56 ) -> Result<MetaCellDisposition, tor_proto::Error>
57 where
58 M: RelayMsg + Clone + TryFrom<AnyRelayMsg, Error = tor_cell::Error>,
59 {
60 let reply_tx = self
61 .0
62 .take()
63 .ok_or_else(|| internal!("Tried to handle two messages of the same type"))?;
64
65 let outcome = M::try_from(msg).map_err(|err| tor_proto::Error::CellDecodeErr {
66 object: "rendezvous-related cell",
67 err,
68 });
69
70 #[allow(clippy::unnecessary_lazy_evaluations)] // want to state the Err type
71 reply_tx
72 .send(outcome.clone())
73 // If the caller went away, we just drop the outcome
74 .unwrap_or_else(|_: Result<M, _>| ());
75
76 outcome.map(|_| disposition_on_success)
77 }
78}
79
80impl<M> Receiver<M> {
81 /// Receive the message `M`
82 ///
83 /// Waits for the call to `deliver_expected_message`, and converts the
84 /// resulting error to a `FailedAttemptError` using `handle_proto_error`.
85 pub(crate) async fn recv(
86 self,
87 handle_proto_error: impl Fn(tor_proto::Error) -> FailedAttemptError + Copy,
88 ) -> Result<M, FailedAttemptError> {
89 self.0
90 .await
91 // If the circuit collapsed, we don't get an error from tor_proto; make one up
92 .map_err(|_: oneshot::Canceled| tor_proto::Error::CircuitClosed)
93 .map_err(handle_proto_error)?
94 .map_err(handle_proto_error)
95 }
96}