arti_rpcserver/
connection.rs

1//! RPC connection support, mainloop, and protocol implementation.
2
3pub(crate) mod auth;
4mod methods;
5use std::{
6    collections::HashMap,
7    io::Error as IoError,
8    pin::Pin,
9    sync::{Arc, Mutex, RwLock, Weak},
10};
11
12use asynchronous_codec::JsonCodecError;
13use derive_deftly::Deftly;
14use futures::{
15    AsyncWriteExt as _, FutureExt, Sink, SinkExt as _, StreamExt,
16    channel::mpsc,
17    stream::{FusedStream, FuturesUnordered},
18};
19use rpc::dispatch::BoxedUpdateSink;
20use serde_json::error::Category as JsonErrorCategory;
21use tor_async_utils::{SinkExt as _, mpsc_channel_no_memquota};
22
23use crate::{
24    RpcMgr,
25    cancel::{self, Cancel, CancelHandle},
26    err::RequestParseError,
27    globalid::{GlobalId, MacKey},
28    msgs::{BoxedResponse, FlexibleRequest, ReqMeta, Request, RequestId, ResponseBody},
29    objmap::{GenIdx, ObjMap},
30};
31
32use tor_rpcbase::templates::*;
33use tor_rpcbase::{self as rpc, RpcError};
34
35/// An open connection from an RPC client.
36///
37/// Tracks information that persists from one request to another.
38///
39/// The client might not have authenticated;
40/// access and permissions control is handled via the capability system.
41/// Specifically, the `objects` table in `Inner` hold capabilities
42/// that the client will use to do things,
43/// including an `RpcSession`.
44///
45/// # In the Arti RPC System
46///
47/// A connection to Arti.
48///
49/// This object is available as soon as you open a connection to Arti RPC,
50/// even before you authenticate.  Its ObjectId is always `"connection"`.
51///
52/// Because this object is available before authentication,
53/// it provides only those methods that you need
54/// in order to perform authentication
55/// and receive an `RpcSession`.
56#[derive(Deftly)]
57#[derive_deftly(Object)]
58pub struct Connection {
59    /// The mutable state of this connection.
60    inner: Mutex<Inner>,
61
62    /// Lookup table to find the implementations for methods
63    /// based on RPC object and method types.
64    ///
65    /// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
66    dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
67
68    /// A unique identifier for this connection.
69    ///
70    /// This kind of ID is used to refer to the connection from _outside_ of the
71    /// context of an RPC connection: it can uniquely identify the connection
72    /// from e.g. a SOCKS session so that clients can attach streams to it.
73    connection_id: ConnectionId,
74
75    /// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
76    /// need to exist outside this connection.
77    global_id_mac_key: MacKey,
78
79    /// A reference to the manager associated with this session.
80    mgr: Weak<RpcMgr>,
81
82    /// The authentication type that's required in order to get a session.
83    require_auth: tor_rpc_connect::auth::RpcAuth,
84}
85
86/// The inner, lock-protected part of an RPC connection.
87struct Inner {
88    /// Map from request ID to handles; used when we need to cancel a request.
89    //
90    // TODO: We have two options here for handling colliding IDs.  We can either turn
91    // this into a multimap, or we can declare that cancelling a request only
92    // cancels the most recent request sent with that ID.
93    inflight: HashMap<RequestId, Option<CancelHandle>>,
94
95    /// An object map used to look up most objects by ID, and keep track of
96    /// which objects are owned by this connection.
97    objects: ObjMap,
98
99    /// A reference to this connection itself.
100    ///
101    /// Used when we're looking up the connection within the RPC system as an object.
102    ///
103    /// TODO RPC: Maybe there is an easier way to do this while keeping `context` object-save?
104    this_connection: Option<Weak<Connection>>,
105}
106
107/// How many updates can be pending, per connection, before they start to block?
108const UPDATE_CHAN_SIZE: usize = 128;
109
110/// A type-erased [`FusedStream`] yielding [`Request`]s.
111//
112// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
113// nice and simple.)
114pub(crate) type BoxedRequestStream = Pin<
115    Box<dyn FusedStream<Item = Result<FlexibleRequest, asynchronous_codec::JsonCodecError>> + Send>,
116>;
117
118/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
119pub(crate) type BoxedResponseSink =
120    Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
121
122/// A random value used to identify an connection.
123#[derive(
124    Copy,
125    Clone,
126    Debug,
127    Eq,
128    PartialEq,
129    Hash,
130    derive_more::From,
131    derive_more::Into,
132    derive_more::AsRef,
133)]
134pub(crate) struct ConnectionId([u8; 16]);
135
136impl ConnectionId {
137    /// The length of a ConnectionId.
138    pub(crate) const LEN: usize = 16;
139}
140
141impl Connection {
142    /// A special object ID that indicates the connection itself.
143    ///
144    /// On a fresh connection, this is the only ObjectId that exists.
145    //
146    // TODO: We might want to move responsibility for tracking this ID and its value into ObjMap.
147    const CONNECTION_OBJ_ID: &'static str = "connection";
148
149    /// Create a new connection.
150    pub(crate) fn new(
151        connection_id: ConnectionId,
152        dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
153        global_id_mac_key: MacKey,
154        mgr: Weak<RpcMgr>,
155        require_auth: tor_rpc_connect::auth::RpcAuth,
156    ) -> Arc<Self> {
157        Arc::new_cyclic(|this_connection| Self {
158            inner: Mutex::new(Inner {
159                inflight: HashMap::new(),
160                objects: ObjMap::new(),
161                this_connection: Some(Weak::clone(this_connection)),
162            }),
163            dispatch_table,
164            connection_id,
165            global_id_mac_key,
166            mgr,
167            require_auth,
168        })
169    }
170
171    /// If possible, convert an `ObjectId` into a `GenIdx` that can be used in
172    /// this connection's ObjMap.
173    fn id_into_local_idx(&self, id: &rpc::ObjectId) -> Result<GenIdx, rpc::LookupError> {
174        // Design note: It's not really necessary from a security POV to
175        // check the MAC here; any possible GenIdx we return will either
176        // refer to some object we're allowed to name in this session, or to
177        // no object at all.  Still, we check anyway, since it shouldn't
178        // hurt to do so.
179        if let Some(global_id) = GlobalId::try_decode(&self.global_id_mac_key, id)? {
180            // We have a GlobalId with a valid MAC. Let's make sure it applies
181            // to this connection's ObjMap.  (We do not support referring to
182            // anyone else's objects.)
183            //
184            // Design note: As above, this check is a protection against
185            // accidental misuse, not a security feature: even if we removed
186            // this check, we would still only allow objects that this session
187            // is allowed to name.
188            if global_id.connection == self.connection_id {
189                Ok(global_id.local_id)
190            } else {
191                Err(rpc::LookupError::NoObject(id.clone()))
192            }
193        } else {
194            // It's not a GlobalId; let's see if we can make sense of it as an
195            // ObjMap index.
196            Ok(GenIdx::try_decode(id)?)
197        }
198    }
199
200    /// Look up a given object by its object ID relative to this connection.
201    pub(crate) fn lookup_object(
202        &self,
203        id: &rpc::ObjectId,
204    ) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
205        if id.as_ref() == Self::CONNECTION_OBJ_ID {
206            let this = self
207                .inner
208                .lock()
209                .expect("lock poisoned")
210                .this_connection
211                .as_ref()
212                .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?
213                .upgrade()
214                .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?;
215            Ok(this as Arc<_>)
216        } else {
217            let local_id = self.id_into_local_idx(id)?;
218
219            self.lookup_by_idx(local_id)
220                .ok_or(rpc::LookupError::NoObject(id.clone()))
221        }
222    }
223
224    /// As `lookup_object`, but expect a `GenIdx`.
225    pub(crate) fn lookup_by_idx(&self, idx: crate::objmap::GenIdx) -> Option<Arc<dyn rpc::Object>> {
226        let inner = self.inner.lock().expect("lock poisoned");
227        inner.objects.lookup(idx)
228    }
229
230    /// Un-register the request `id` and stop tracking its information.
231    fn remove_request(&self, id: &RequestId) {
232        let mut inner = self.inner.lock().expect("lock poisoned");
233        inner.inflight.remove(id);
234    }
235
236    /// Register the request `id` as a cancellable request.
237    ///
238    /// If `handle` is none, register it as an uncancellable request.
239    fn register_request(&self, id: RequestId, handle: Option<CancelHandle>) {
240        let mut inner = self.inner.lock().expect("lock poisoned");
241        inner.inflight.insert(id, handle);
242    }
243
244    /// Try to cancel the request `id`.
245    ///
246    /// Return an error when `id` cannot be found, or cannot be cancelled.
247    /// (These cases are indistinguishable.)
248    fn cancel_request(&self, id: &RequestId) -> Result<(), CancelError> {
249        let mut inner = self.inner.lock().expect("lock poisoned");
250        match inner.inflight.remove(id) {
251            Some(Some(handle)) => {
252                drop(inner);
253                handle.cancel()?;
254                Ok(())
255            }
256            Some(None) => {
257                // Put it back in case somebody tries again.
258                inner.inflight.insert(id.clone(), None);
259                Err(CancelError::CannotCancelRequest)
260            }
261            None => Err(CancelError::RequestNotFound),
262        }
263    }
264
265    /// Run in a loop, decoding JSON requests from `input` and
266    /// writing JSON responses onto `output`.
267    pub async fn run<IN, OUT>(
268        self: Arc<Self>,
269        input: IN,
270        mut output: OUT,
271    ) -> Result<(), ConnectionError>
272    where
273        IN: futures::AsyncRead + Send + Sync + Unpin + 'static,
274        OUT: futures::AsyncWrite + Send + Sync + Unpin + 'static,
275    {
276        /// Banner line to send, indicating that Arti is ready to receive requests.
277        ///
278        /// The key in this json object is mandatory; the value can be anything.
279        const BANNER: &[u8] = b"{\"arti_rpc\":{}}\n";
280
281        output
282            .write_all(BANNER)
283            .await
284            .map_err(|e| ConnectionError::WriteFailed(Arc::new(e)))?;
285
286        let write = Box::pin(asynchronous_codec::FramedWrite::new(
287            output,
288            crate::codecs::JsonLinesEncoder::<BoxedResponse>::default(),
289        ));
290
291        let read = Box::pin(
292            asynchronous_codec::FramedRead::new(
293                input,
294                asynchronous_codec::JsonCodec::<(), FlexibleRequest>::new(),
295            )
296            .fuse(),
297        );
298
299        self.run_loop(read, write).await
300    }
301
302    /// Run in a loop, handling requests from `request_stream` and writing
303    /// responses onto `response_stream`.
304    ///
305    /// After this returns, even if it returns `Ok(())`, the connection must no longer be used.
306    pub(crate) async fn run_loop(
307        self: Arc<Self>,
308        mut request_stream: BoxedRequestStream,
309        mut response_sink: BoxedResponseSink,
310    ) -> Result<(), ConnectionError> {
311        // This function will multiplex on three streams:
312        // * `request_stream` -- a stream of incoming requests from the client.
313        // * `finished_requests` -- a stream of requests that are done.
314        // * `rx_response` -- a stream of updates and final responses sent from
315        //   in-progress tasks. (We put updates and final responsese onto the
316        //   same channel to ensure that they stay in-order for each method
317        //   invocation.
318        //
319        // Note that the blocking behavior here is deliberate: We want _all_ of
320        // these reads to start blocking when response_sink.send is blocked.
321
322        // TODO RPC should this queue participate in memquota?
323        let (tx_response, mut rx_response) =
324            mpsc_channel_no_memquota::<BoxedResponse>(UPDATE_CHAN_SIZE);
325
326        let mut finished_requests = FuturesUnordered::new();
327        finished_requests.push(futures::future::pending().boxed());
328
329        /// Helper: enforce an explicit "continue".
330        struct Continue;
331
332        // We create a separate async block here and immediately await it,
333        // so that any internal `returns` and `?`s do not escape the function.
334        let outcome = async {
335            loop {
336                let _: Continue = futures::select! {
337                    r = finished_requests.next() => {
338                        // A task is done, so we can forget about it.
339                        let () = r.expect("Somehow, future::pending() terminated.");
340                        Continue
341                    }
342
343                    r = rx_response.next() => {
344                        // The future for some request has sent a response (success,
345                        // failure, or update), so we can inform the client.
346                        let update = r.expect("Somehow, tx_update got closed.");
347                        debug_assert!(! update.body.is_final());
348                        // Calling `await` here (and below) is deliberate: we _want_
349                        // to stop reading the client's requests if the client is
350                        // not reading their responses (or not) reading them fast
351                        // enough.
352                        response_sink.send(update).await.map_err(ConnectionError::writing)?;
353                        Continue
354                    }
355
356                    req = request_stream.next() => {
357                        match req {
358                            None => {
359                                // We've reached the end of the stream of requests;
360                                // time to close.
361                                return Ok(());
362                            }
363                            Some(Err(e)) => {
364                                // We got a non-recoverable error from the JSON codec.
365                                return Err(ConnectionError::from_read_error(e));
366
367                            }
368                            Some(Ok(FlexibleRequest::Invalid(bad_req))) => {
369                                // We decoded the request as Json, but not as a `Valid`` request.
370                                // Send back a response indicating what was wrong with it.
371                                let response = BoxedResponse::from_error(
372                                    bad_req.id().cloned(), bad_req.error()
373                                );
374                                response_sink
375                                    .send(response)
376                                    .await
377                                    .map_err( ConnectionError::writing)?;
378                                if bad_req.id().is_none() {
379                                    // The spec says we must close the connection in this case.
380                                    return Err(bad_req.error().into());
381                                }
382                                Continue
383
384                            }
385                            Some(Ok(FlexibleRequest::Valid(req))) => {
386                                // We have a request. Time to launch it!
387                                let tx = tx_response.clone();
388                                let fut = self.run_method_and_deliver_response(tx, req);
389                                finished_requests.push(fut.boxed());
390                                Continue
391                            }
392                        }
393                    }
394                };
395            }
396        }
397        .await;
398
399        match outcome {
400            Err(e) if e.is_connection_close() => Ok(()),
401            other => other,
402        }
403    }
404
405    /// Invoke `request` and send all of its responses to `tx_response`.
406    async fn run_method_and_deliver_response(
407        self: &Arc<Self>,
408        mut tx_response: mpsc::Sender<BoxedResponse>,
409        request: Request,
410    ) {
411        let Request {
412            id,
413            obj,
414            meta,
415            method,
416        } = request;
417
418        let update_sender: BoxedUpdateSink = if meta.updates {
419            let id_clone = id.clone();
420            let sink =
421                tx_response
422                    .clone()
423                    .with_fn(move |obj: Box<dyn erased_serde::Serialize + Send>| {
424                        Result::<BoxedResponse, _>::Ok(BoxedResponse {
425                            id: Some(id_clone.clone()),
426                            body: ResponseBody::Update(obj),
427                        })
428                    });
429            Box::pin(sink)
430        } else {
431            let sink = futures::sink::drain().sink_err_into();
432            Box::pin(sink)
433        };
434
435        let is_cancellable = method.is_cancellable();
436
437        // Create `run_method_lowlevel` future, and make it cancellable.
438        let fut = self.run_method_lowlevel(update_sender, obj, method, meta);
439
440        // Optionally register the future as cancellable.  Then run it to completion.
441        let outcome = if is_cancellable {
442            let (handle, fut) = Cancel::new(fut);
443            self.register_request(id.clone(), Some(handle));
444            fut.await
445        } else {
446            self.register_request(id.clone(), None);
447            Ok(fut.await)
448        };
449
450        // Figure out how to respond.
451        let body = match outcome {
452            Ok(Ok(value)) => ResponseBody::Success(value),
453            // TODO: If we're going to box this, let's do so earlier.
454            Ok(Err(err)) => {
455                if err.is_internal() {
456                    tracing::warn!(
457                        "Reporting an internal error on an RPC connection: {:?}",
458                        err
459                    );
460                }
461                ResponseBody::Error(Box::new(err))
462            }
463            Err(_cancelled) => ResponseBody::Error(Box::new(rpc::RpcError::from(RequestCancelled))),
464        };
465
466        // Send the response.
467        //
468        // (It's okay to ignore the error here, since it can only mean that the
469        // RPC connection has closed.)
470        let _ignore_err = tx_response
471            .send(BoxedResponse {
472                id: Some(id.clone()),
473                body,
474            })
475            .await;
476
477        // Unregister the request.
478        //
479        // TODO: This may unregister a different request if the user sent
480        // in another request with the same ID.
481        self.remove_request(&id);
482    }
483
484    /// Run a single method, and return its final response.
485    ///
486    /// If `tx_updates` is provided, and this method generates updates, it
487    /// should send those updates on `tx_updates`
488    ///
489    /// Note that this function is able to send responses with IDs that do not
490    /// match the original.  It should enforce correct IDs on whatever response
491    /// it generates.
492    async fn run_method_lowlevel(
493        self: &Arc<Self>,
494        tx_updates: rpc::dispatch::BoxedUpdateSink,
495        obj_id: rpc::ObjectId,
496        method: Box<dyn rpc::DeserMethod>,
497        meta: ReqMeta,
498    ) -> Result<Box<dyn erased_serde::Serialize + Send + 'static>, rpc::RpcError> {
499        let obj = self.lookup_object(&obj_id)?;
500
501        if !meta.require.is_empty() {
502            // TODO RPC: Eventually, we will need a way to tell which "features" are actually
503            // available.  But for now, we have no features, so if the require list is nonempty,
504            // we can safely reject the request.
505            return Err(MissingFeaturesError(meta.require).into());
506        }
507
508        let context: Arc<dyn rpc::Context> = self.clone() as Arc<_>;
509
510        let invoke_future =
511            rpc::invoke_rpc_method(context, &obj_id, obj, method.upcast_box(), tx_updates)?;
512
513        // Note that we drop the read lock before we await this future!
514        invoke_future.await
515    }
516
517    /// Try to get a strong reference to the RpcMgr for this connection, and
518    /// return an error if we can't.
519    pub(crate) fn mgr(&self) -> Result<Arc<RpcMgr>, MgrDisappearedError> {
520        self.mgr
521            .upgrade()
522            .ok_or(MgrDisappearedError::RpcMgrDisappeared)
523    }
524}
525
526/// An error returned when an RPC request lists some feature as required,
527/// but we don't have every such feature.
528#[derive(Clone, Debug, thiserror::Error)]
529#[error("Required features not available")]
530struct MissingFeaturesError(
531    /// A list of the features that were requested but not available.
532    Vec<String>,
533);
534
535impl From<MissingFeaturesError> for RpcError {
536    fn from(err: MissingFeaturesError) -> Self {
537        let mut e = RpcError::new(
538            err.to_string(),
539            tor_rpcbase::RpcErrorKind::FeatureNotPresent,
540        );
541        e.set_datum("rpc:unsupported_features".to_string(), err.0)
542            .expect("invalid keyword");
543        e
544    }
545}
546
547/// A failure that results in closing a [`Connection`].
548#[derive(Clone, Debug, thiserror::Error)]
549#[non_exhaustive]
550pub enum ConnectionError {
551    /// Unable to write to our connection.
552    #[error("Could not write to connection")]
553    WriteFailed(#[source] Arc<IoError>),
554    /// Read error from connection.
555    #[error("Problem reading from connection")]
556    ReadFailed(#[source] Arc<IoError>),
557    /// Read something that we could not decode.
558    #[error("Unable to decode request from connection")]
559    DecodeFailed(#[source] Arc<serde_json::Error>),
560    /// Unable to write our response as json.
561    #[error("Unable to encode response onto connection")]
562    EncodeFailed(#[source] Arc<serde_json::Error>),
563    /// We encountered a problem when parsing a request that was (in our judgment)
564    /// too severe to recover from.
565    #[error("Unrecoverable problem from parsed request")]
566    RequestParseFailed(#[from] RequestParseError),
567}
568
569impl ConnectionError {
570    /// Construct a new `ConnectionError` from a `JsonCodecError` that has occurred while writing.
571    fn writing(error: JsonCodecError) -> Self {
572        match error {
573            JsonCodecError::Io(e) => Self::WriteFailed(Arc::new(e)),
574            JsonCodecError::Json(e) => Self::EncodeFailed(Arc::new(e)),
575        }
576    }
577
578    /// Return true if this error is (or might be) due to the peer closing the connection.
579    ///
580    /// Such errors should be tolerated without much complaint;
581    /// other errors should at least be logged somewhere.
582    fn is_connection_close(&self) -> bool {
583        use JsonErrorCategory as JK;
584        use std::io::ErrorKind as IK;
585        #[allow(clippy::match_like_matches_macro)]
586        match self {
587            Self::ReadFailed(e) | Self::WriteFailed(e) => match e.kind() {
588                IK::UnexpectedEof | IK::ConnectionAborted | IK::BrokenPipe => true,
589                _ => false,
590            },
591            Self::DecodeFailed(e) => match e.classify() {
592                JK::Eof => true,
593                _ => false,
594            },
595            _ => false,
596        }
597    }
598
599    /// Construct a `ConnectionError` from a JsonCodecError that occurred while reading.
600    fn from_read_error(error: JsonCodecError) -> Self {
601        match error {
602            JsonCodecError::Io(e) => Self::ReadFailed(Arc::new(e)),
603            JsonCodecError::Json(e) => Self::DecodeFailed(Arc::new(e)),
604        }
605    }
606}
607
608/// A failure from trying to upgrade a `Weak<RpcMgr>`.
609#[derive(Clone, Debug, thiserror::Error, serde::Serialize)]
610pub(crate) enum MgrDisappearedError {
611    /// We tried to upgrade our reference to the RpcMgr, and failed.
612    #[error("RPC manager disappeared; Arti is shutting down?")]
613    RpcMgrDisappeared,
614}
615impl tor_error::HasKind for MgrDisappearedError {
616    fn kind(&self) -> tor_error::ErrorKind {
617        tor_error::ErrorKind::ArtiShuttingDown
618    }
619}
620
621impl rpc::Context for Connection {
622    fn lookup_object(&self, id: &rpc::ObjectId) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
623        Connection::lookup_object(self, id)
624    }
625
626    fn register_owned(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
627        let use_global_id = object.expose_outside_of_session();
628        let local_id = self
629            .inner
630            .lock()
631            .expect("Lock poisoned")
632            .objects
633            .insert_strong(object);
634
635        // Design note: It is a deliberate decision to _always_ use GlobalId for
636        // objects whose IDs are _ever_ exported for use in SOCKS requests.  Some
637        // alternatives would be to use GlobalId conditionally, or to have a
638        // separate Method to create a new GlobalId given an existing LocalId.
639        if use_global_id {
640            GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
641        } else {
642            local_id.encode()
643        }
644    }
645
646    fn release_owned(&self, id: &rpc::ObjectId) -> Result<(), rpc::LookupError> {
647        let removed_some = if id.as_ref() == Self::CONNECTION_OBJ_ID {
648            self.inner
649                .lock()
650                .expect("Lock poisoned")
651                .this_connection
652                .take()
653                .is_some()
654        } else {
655            let idx = self.id_into_local_idx(id)?;
656
657            if !idx.is_strong() {
658                return Err(rpc::LookupError::WrongType(id.clone()));
659            }
660
661            self.inner
662                .lock()
663                .expect("Lock poisoned")
664                .objects
665                .remove(idx)
666                .is_some()
667        };
668
669        if removed_some {
670            Ok(())
671        } else {
672            Err(rpc::LookupError::NoObject(id.clone()))
673        }
674    }
675
676    fn dispatch_table(&self) -> &Arc<std::sync::RwLock<rpc::DispatchTable>> {
677        &self.dispatch_table
678    }
679}
680
681/// An error given when an RPC request is cancelled.
682///
683/// This is a separate type from [`crate::cancel::Cancelled`] since eventually
684/// we want to move that type into a general-purpose location, and make it not
685/// RPC-specific.
686#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
687#[error("RPC request was cancelled")]
688pub(crate) struct RequestCancelled;
689
690impl From<RequestCancelled> for RpcError {
691    fn from(_: RequestCancelled) -> Self {
692        RpcError::new(
693            "Request cancelled".into(),
694            rpc::RpcErrorKind::RequestCancelled,
695        )
696    }
697}
698
699/// An error given when we attempt to cancel an RPC request, but cannot.
700///
701#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
702pub(crate) enum CancelError {
703    /// We didn't find any request with the provided ID.
704    ///
705    /// Since we don't keep track of requests after they finish or are cancelled,
706    /// we cannot distinguish the cases where a request has finished,
707    /// where the request has been cancelled,
708    /// or where the request never existed.
709    /// Therefore we collapse them into a single error type.
710    #[error("RPC request not found")]
711    RequestNotFound,
712
713    /// This kind of request cannot be cancelled.
714    #[error("Uncancellable request")]
715    CannotCancelRequest,
716
717    /// We tried to cancel a request but found out it was already cancelled.
718    ///
719    /// This error should be impossible.
720    #[error("Request somehow cancelled twice!")]
721    AlreadyCancelled,
722}
723
724impl From<cancel::CannotCancel> for CancelError {
725    fn from(value: cancel::CannotCancel) -> Self {
726        use CancelError as CE;
727        use cancel::CannotCancel as CC;
728        match value {
729            CC::Cancelled => CE::AlreadyCancelled,
730            // We map "finished" to RequestNotFound since it is not in the general case
731            // distinguishable from it; see documentation on RequestNotFound.
732            CC::Finished => CE::RequestNotFound,
733        }
734    }
735}
736
737impl From<CancelError> for RpcError {
738    fn from(err: CancelError) -> Self {
739        use CancelError as CE;
740        use rpc::RpcErrorKind as REK;
741        let code = match err {
742            CE::RequestNotFound => REK::RequestError,
743            CE::CannotCancelRequest => REK::RequestError,
744            CE::AlreadyCancelled => REK::InternalError,
745        };
746        RpcError::new(err.to_string(), code)
747    }
748}