arti_rpcserver/connection.rs
1//! RPC connection support, mainloop, and protocol implementation.
2
3pub(crate) mod auth;
4mod methods;
5use std::{
6 collections::HashMap,
7 io::Error as IoError,
8 pin::Pin,
9 sync::{Arc, Mutex, RwLock, Weak},
10};
11
12use asynchronous_codec::JsonCodecError;
13use derive_deftly::Deftly;
14use futures::{
15 channel::mpsc,
16 stream::{FusedStream, FuturesUnordered},
17 AsyncWriteExt as _, FutureExt, Sink, SinkExt as _, StreamExt,
18};
19use rpc::dispatch::BoxedUpdateSink;
20use serde_json::error::Category as JsonErrorCategory;
21use tor_async_utils::{mpsc_channel_no_memquota, SinkExt as _};
22
23use crate::{
24 cancel::{self, Cancel, CancelHandle},
25 err::RequestParseError,
26 globalid::{GlobalId, MacKey},
27 msgs::{BoxedResponse, FlexibleRequest, ReqMeta, Request, RequestId, ResponseBody},
28 objmap::{GenIdx, ObjMap},
29 RpcMgr,
30};
31
32use tor_rpcbase::templates::*;
33use tor_rpcbase::{self as rpc, RpcError};
34
35/// An open connection from an RPC client.
36///
37/// Tracks information that persists from one request to another.
38///
39/// The client might not have authenticated;
40/// access and permissions control is handled via the capability system.
41/// Specifically, the `objects` table in `Inner` hold capabilities
42/// that the client will use to do things,
43/// including an `RpcSession`.
44///
45/// # In the Arti RPC System
46///
47/// A connection to Arti.
48///
49/// This object is available as soon as you open a connection to Arti RPC,
50/// even before you authenticate. Its ObjectId is always `"connection"`.
51///
52/// Because this object is available before authentication,
53/// it provides only those methods that you need
54/// in order to perform authentication
55/// and receive an `RpcSession`.
56#[derive(Deftly)]
57#[derive_deftly(Object)]
58pub struct Connection {
59 /// The mutable state of this connection.
60 inner: Mutex<Inner>,
61
62 /// Lookup table to find the implementations for methods
63 /// based on RPC object and method types.
64 ///
65 /// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
66 dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
67
68 /// A unique identifier for this connection.
69 ///
70 /// This kind of ID is used to refer to the connection from _outside_ of the
71 /// context of an RPC connection: it can uniquely identify the connection
72 /// from e.g. a SOCKS session so that clients can attach streams to it.
73 connection_id: ConnectionId,
74
75 /// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
76 /// need to exist outside this connection.
77 global_id_mac_key: MacKey,
78
79 /// A reference to the manager associated with this session.
80 mgr: Weak<RpcMgr>,
81
82 /// The authentication type that's required in order to get a session.
83 require_auth: tor_rpc_connect::auth::RpcAuth,
84}
85
86/// The inner, lock-protected part of an RPC connection.
87struct Inner {
88 /// Map from request ID to handles; used when we need to cancel a request.
89 //
90 // TODO: We have two options here for handling colliding IDs. We can either turn
91 // this into a multimap, or we can declare that cancelling a request only
92 // cancels the most recent request sent with that ID.
93 inflight: HashMap<RequestId, Option<CancelHandle>>,
94
95 /// An object map used to look up most objects by ID, and keep track of
96 /// which objects are owned by this connection.
97 objects: ObjMap,
98
99 /// A reference to this connection itself.
100 ///
101 /// Used when we're looking up the connection within the RPC system as an object.
102 ///
103 /// TODO RPC: Maybe there is an easier way to do this while keeping `context` object-save?
104 this_connection: Option<Weak<Connection>>,
105}
106
107/// How many updates can be pending, per connection, before they start to block?
108const UPDATE_CHAN_SIZE: usize = 128;
109
110/// A type-erased [`FusedStream`] yielding [`Request`]s.
111//
112// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
113// nice and simple.)
114pub(crate) type BoxedRequestStream = Pin<
115 Box<dyn FusedStream<Item = Result<FlexibleRequest, asynchronous_codec::JsonCodecError>> + Send>,
116>;
117
118/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
119pub(crate) type BoxedResponseSink =
120 Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
121
122/// A random value used to identify an connection.
123#[derive(
124 Copy,
125 Clone,
126 Debug,
127 Eq,
128 PartialEq,
129 Hash,
130 derive_more::From,
131 derive_more::Into,
132 derive_more::AsRef,
133)]
134pub(crate) struct ConnectionId([u8; 16]);
135
136impl ConnectionId {
137 /// The length of a ConnectionId.
138 pub(crate) const LEN: usize = 16;
139}
140
141impl Connection {
142 /// A special object ID that indicates the connection itself.
143 ///
144 /// On a fresh connection, this is the only ObjectId that exists.
145 //
146 // TODO: We might want to move responsibility for tracking this ID and its value into ObjMap.
147 const CONNECTION_OBJ_ID: &'static str = "connection";
148
149 /// Create a new connection.
150 pub(crate) fn new(
151 connection_id: ConnectionId,
152 dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
153 global_id_mac_key: MacKey,
154 mgr: Weak<RpcMgr>,
155 require_auth: tor_rpc_connect::auth::RpcAuth,
156 ) -> Arc<Self> {
157 Arc::new_cyclic(|this_connection| Self {
158 inner: Mutex::new(Inner {
159 inflight: HashMap::new(),
160 objects: ObjMap::new(),
161 this_connection: Some(Weak::clone(this_connection)),
162 }),
163 dispatch_table,
164 connection_id,
165 global_id_mac_key,
166 mgr,
167 require_auth,
168 })
169 }
170
171 /// If possible, convert an `ObjectId` into a `GenIdx` that can be used in
172 /// this connection's ObjMap.
173 fn id_into_local_idx(&self, id: &rpc::ObjectId) -> Result<GenIdx, rpc::LookupError> {
174 // TODO RPC: Use a tag byte instead of a magic length.
175
176 if id.as_ref().len() == GlobalId::B64_ENCODED_LEN {
177 // This is the right length to be a GlobalId; let's see if it really
178 // is one.
179 //
180 // Design note: It's not really necessary from a security POV to
181 // check the MAC here; any possible GenIdx we return will either
182 // refer to some object we're allowed to name in this session, or to
183 // no object at all. Still, we check anyway, since it shouldn't
184 // hurt to do so.
185 let global_id = GlobalId::try_decode(&self.global_id_mac_key, id)?;
186 // We have a GlobalId with a valid MAC. Let's make sure it applies
187 // to this connection's ObjMap. (We do not support referring to
188 // anyone else's objects.)
189 //
190 // Design note: As above, this check is a protection against
191 // accidental misuse, not a security feature: even if we removed
192 // this check, we would still only allow objects that this session
193 // is allowed to name.
194 if global_id.connection == self.connection_id {
195 Ok(global_id.local_id)
196 } else {
197 Err(rpc::LookupError::NoObject(id.clone()))
198 }
199 } else {
200 // It's not a GlobalId; let's see if we can make sense of it as an
201 // ObjMap index.
202 Ok(GenIdx::try_decode(id)?)
203 }
204 }
205
206 /// Look up a given object by its object ID relative to this connection.
207 pub(crate) fn lookup_object(
208 &self,
209 id: &rpc::ObjectId,
210 ) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
211 if id.as_ref() == Self::CONNECTION_OBJ_ID {
212 let this = self
213 .inner
214 .lock()
215 .expect("lock poisoned")
216 .this_connection
217 .as_ref()
218 .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?
219 .upgrade()
220 .ok_or_else(|| rpc::LookupError::NoObject(id.clone()))?;
221 Ok(this as Arc<_>)
222 } else {
223 let local_id = self.id_into_local_idx(id)?;
224
225 self.lookup_by_idx(local_id)
226 .ok_or(rpc::LookupError::NoObject(id.clone()))
227 }
228 }
229
230 /// As `lookup_object`, but expect a `GenIdx`.
231 pub(crate) fn lookup_by_idx(&self, idx: crate::objmap::GenIdx) -> Option<Arc<dyn rpc::Object>> {
232 let inner = self.inner.lock().expect("lock poisoned");
233 inner.objects.lookup(idx)
234 }
235
236 /// Un-register the request `id` and stop tracking its information.
237 fn remove_request(&self, id: &RequestId) {
238 let mut inner = self.inner.lock().expect("lock poisoned");
239 inner.inflight.remove(id);
240 }
241
242 /// Register the request `id` as a cancellable request.
243 ///
244 /// If `handle` is none, register it as an uncancellable request.
245 fn register_request(&self, id: RequestId, handle: Option<CancelHandle>) {
246 let mut inner = self.inner.lock().expect("lock poisoned");
247 inner.inflight.insert(id, handle);
248 }
249
250 /// Try to cancel the request `id`.
251 ///
252 /// Return an error when `id` cannot be found, or cannot be cancelled.
253 /// (These cases are indistinguishable.)
254 fn cancel_request(&self, id: &RequestId) -> Result<(), CancelError> {
255 let mut inner = self.inner.lock().expect("lock poisoned");
256 match inner.inflight.remove(id) {
257 Some(Some(handle)) => {
258 drop(inner);
259 handle.cancel()?;
260 Ok(())
261 }
262 Some(None) => {
263 // Put it back in case somebody tries again.
264 inner.inflight.insert(id.clone(), None);
265 Err(CancelError::CannotCancelRequest)
266 }
267 None => Err(CancelError::RequestNotFound),
268 }
269 }
270
271 /// Run in a loop, decoding JSON requests from `input` and
272 /// writing JSON responses onto `output`.
273 pub async fn run<IN, OUT>(
274 self: Arc<Self>,
275 input: IN,
276 mut output: OUT,
277 ) -> Result<(), ConnectionError>
278 where
279 IN: futures::AsyncRead + Send + Sync + Unpin + 'static,
280 OUT: futures::AsyncWrite + Send + Sync + Unpin + 'static,
281 {
282 /// Banner line to send, indicating that Arti is ready to receive requests.
283 ///
284 /// The key in this json object is mandatory; the value can be anything.
285 const BANNER: &[u8] = b"{\"arti_rpc\":{}}\n";
286
287 output
288 .write_all(BANNER)
289 .await
290 .map_err(|e| ConnectionError::WriteFailed(Arc::new(e)))?;
291
292 let write = Box::pin(asynchronous_codec::FramedWrite::new(
293 output,
294 crate::codecs::JsonLinesEncoder::<BoxedResponse>::default(),
295 ));
296
297 let read = Box::pin(
298 asynchronous_codec::FramedRead::new(
299 input,
300 asynchronous_codec::JsonCodec::<(), FlexibleRequest>::new(),
301 )
302 .fuse(),
303 );
304
305 self.run_loop(read, write).await
306 }
307
308 /// Run in a loop, handling requests from `request_stream` and writing
309 /// responses onto `response_stream`.
310 ///
311 /// After this returns, even if it returns `Ok(())`, the connection must no longer be used.
312 pub(crate) async fn run_loop(
313 self: Arc<Self>,
314 mut request_stream: BoxedRequestStream,
315 mut response_sink: BoxedResponseSink,
316 ) -> Result<(), ConnectionError> {
317 // This function will multiplex on three streams:
318 // * `request_stream` -- a stream of incoming requests from the client.
319 // * `finished_requests` -- a stream of requests that are done.
320 // * `rx_response` -- a stream of updates and final responses sent from
321 // in-progress tasks. (We put updates and final responsese onto the
322 // same channel to ensure that they stay in-order for each method
323 // invocation.
324 //
325 // Note that the blocking behavior here is deliberate: We want _all_ of
326 // these reads to start blocking when response_sink.send is blocked.
327
328 // TODO RPC should this queue participate in memquota?
329 let (tx_response, mut rx_response) =
330 mpsc_channel_no_memquota::<BoxedResponse>(UPDATE_CHAN_SIZE);
331
332 let mut finished_requests = FuturesUnordered::new();
333 finished_requests.push(futures::future::pending().boxed());
334
335 /// Helper: enforce an explicit "continue".
336 struct Continue;
337
338 // We create a separate async block here and immediately await it,
339 // so that any internal `returns` and `?`s do not escape the function.
340 let outcome = async {
341 loop {
342 let _: Continue = futures::select! {
343 r = finished_requests.next() => {
344 // A task is done, so we can forget about it.
345 let () = r.expect("Somehow, future::pending() terminated.");
346 Continue
347 }
348
349 r = rx_response.next() => {
350 // The future for some request has sent a response (success,
351 // failure, or update), so we can inform the client.
352 let update = r.expect("Somehow, tx_update got closed.");
353 debug_assert!(! update.body.is_final());
354 // Calling `await` here (and below) is deliberate: we _want_
355 // to stop reading the client's requests if the client is
356 // not reading their responses (or not) reading them fast
357 // enough.
358 response_sink.send(update).await.map_err(ConnectionError::writing)?;
359 Continue
360 }
361
362 req = request_stream.next() => {
363 match req {
364 None => {
365 // We've reached the end of the stream of requests;
366 // time to close.
367 return Ok(());
368 }
369 Some(Err(e)) => {
370 // We got a non-recoverable error from the JSON codec.
371 return Err(ConnectionError::from_read_error(e));
372
373 }
374 Some(Ok(FlexibleRequest::Invalid(bad_req))) => {
375 // We decoded the request as Json, but not as a `Valid`` request.
376 // Send back a response indicating what was wrong with it.
377 let response = BoxedResponse::from_error(
378 bad_req.id().cloned(), bad_req.error()
379 );
380 response_sink
381 .send(response)
382 .await
383 .map_err( ConnectionError::writing)?;
384 if bad_req.id().is_none() {
385 // The spec says we must close the connection in this case.
386 return Err(bad_req.error().into());
387 }
388 Continue
389
390 }
391 Some(Ok(FlexibleRequest::Valid(req))) => {
392 // We have a request. Time to launch it!
393 let tx = tx_response.clone();
394 let fut = self.run_method_and_deliver_response(tx, req);
395 finished_requests.push(fut.boxed());
396 Continue
397 }
398 }
399 }
400 };
401 }
402 }
403 .await;
404
405 match outcome {
406 Err(e) if e.is_connection_close() => Ok(()),
407 other => other,
408 }
409 }
410
411 /// Invoke `request` and send all of its responses to `tx_response`.
412 async fn run_method_and_deliver_response(
413 self: &Arc<Self>,
414 mut tx_response: mpsc::Sender<BoxedResponse>,
415 request: Request,
416 ) {
417 let Request {
418 id,
419 obj,
420 meta,
421 method,
422 } = request;
423
424 let update_sender: BoxedUpdateSink = if meta.updates {
425 let id_clone = id.clone();
426 let sink =
427 tx_response
428 .clone()
429 .with_fn(move |obj: Box<dyn erased_serde::Serialize + Send>| {
430 Result::<BoxedResponse, _>::Ok(BoxedResponse {
431 id: Some(id_clone.clone()),
432 body: ResponseBody::Update(obj),
433 })
434 });
435 Box::pin(sink)
436 } else {
437 let sink = futures::sink::drain().sink_err_into();
438 Box::pin(sink)
439 };
440
441 let is_cancellable = method.is_cancellable();
442
443 // Create `run_method_lowlevel` future, and make it cancellable.
444 let fut = self.run_method_lowlevel(update_sender, obj, method, meta);
445
446 // Optionally register the future as cancellable. Then run it to completion.
447 let outcome = if is_cancellable {
448 let (handle, fut) = Cancel::new(fut);
449 self.register_request(id.clone(), Some(handle));
450 fut.await
451 } else {
452 self.register_request(id.clone(), None);
453 Ok(fut.await)
454 };
455
456 // Figure out how to respond.
457 let body = match outcome {
458 Ok(Ok(value)) => ResponseBody::Success(value),
459 // TODO: If we're going to box this, let's do so earlier.
460 Ok(Err(err)) => {
461 if err.is_internal() {
462 tracing::warn!(
463 "Reporting an internal error on an RPC connection: {:?}",
464 err
465 );
466 }
467 ResponseBody::Error(Box::new(err))
468 }
469 Err(_cancelled) => ResponseBody::Error(Box::new(rpc::RpcError::from(RequestCancelled))),
470 };
471
472 // Send the response.
473 //
474 // (It's okay to ignore the error here, since it can only mean that the
475 // RPC connection has closed.)
476 let _ignore_err = tx_response
477 .send(BoxedResponse {
478 id: Some(id.clone()),
479 body,
480 })
481 .await;
482
483 // Unregister the request.
484 //
485 // TODO: This may unregister a different request if the user sent
486 // in another request with the same ID.
487 self.remove_request(&id);
488 }
489
490 /// Run a single method, and return its final response.
491 ///
492 /// If `tx_updates` is provided, and this method generates updates, it
493 /// should send those updates on `tx_updates`
494 ///
495 /// Note that this function is able to send responses with IDs that do not
496 /// match the original. It should enforce correct IDs on whatever response
497 /// it generates.
498 async fn run_method_lowlevel(
499 self: &Arc<Self>,
500 tx_updates: rpc::dispatch::BoxedUpdateSink,
501 obj_id: rpc::ObjectId,
502 method: Box<dyn rpc::DeserMethod>,
503 meta: ReqMeta,
504 ) -> Result<Box<dyn erased_serde::Serialize + Send + 'static>, rpc::RpcError> {
505 let obj = self.lookup_object(&obj_id)?;
506
507 if !meta.require.is_empty() {
508 // TODO RPC: Eventually, we will need a way to tell which "features" are actually
509 // available. But for now, we have no features, so if the require list is nonempty,
510 // we can safely reject the request.
511 return Err(MissingFeaturesError(meta.require).into());
512 }
513
514 let context: Arc<dyn rpc::Context> = self.clone() as Arc<_>;
515
516 let invoke_future =
517 rpc::invoke_rpc_method(context, &obj_id, obj, method.upcast_box(), tx_updates)?;
518
519 // Note that we drop the read lock before we await this future!
520 invoke_future.await
521 }
522
523 /// Try to get a strong reference to the RpcMgr for this connection, and
524 /// return an error if we can't.
525 pub(crate) fn mgr(&self) -> Result<Arc<RpcMgr>, MgrDisappearedError> {
526 self.mgr
527 .upgrade()
528 .ok_or(MgrDisappearedError::RpcMgrDisappeared)
529 }
530}
531
532/// An error returned when an RPC request lists some feature as required,
533/// but we don't have every such feature.
534#[derive(Clone, Debug, thiserror::Error)]
535#[error("Required features not available")]
536struct MissingFeaturesError(
537 /// A list of the features that were requested but not available.
538 Vec<String>,
539);
540
541impl From<MissingFeaturesError> for RpcError {
542 fn from(err: MissingFeaturesError) -> Self {
543 let mut e = RpcError::new(
544 err.to_string(),
545 tor_rpcbase::RpcErrorKind::FeatureNotPresent,
546 );
547 e.set_datum("rpc:unsupported_features".to_string(), err.0)
548 .expect("invalid keyword");
549 e
550 }
551}
552
553/// A failure that results in closing a [`Connection`].
554#[derive(Clone, Debug, thiserror::Error)]
555#[non_exhaustive]
556pub enum ConnectionError {
557 /// Unable to write to our connection.
558 #[error("Could not write to connection")]
559 WriteFailed(#[source] Arc<IoError>),
560 /// Read error from connection.
561 #[error("Problem reading from connection")]
562 ReadFailed(#[source] Arc<IoError>),
563 /// Read something that we could not decode.
564 #[error("Unable to decode request from connection")]
565 DecodeFailed(#[source] Arc<serde_json::Error>),
566 /// Unable to write our response as json.
567 #[error("Unable to encode response onto connection")]
568 EncodeFailed(#[source] Arc<serde_json::Error>),
569 /// We encountered a problem when parsing a request that was (in our judgment)
570 /// too severe to recover from.
571 #[error("Unrecoverable problem from parsed request")]
572 RequestParseFailed(#[from] RequestParseError),
573}
574
575impl ConnectionError {
576 /// Construct a new `ConnectionError` from a `JsonCodecError` that has occurred while writing.
577 fn writing(error: JsonCodecError) -> Self {
578 match error {
579 JsonCodecError::Io(e) => Self::WriteFailed(Arc::new(e)),
580 JsonCodecError::Json(e) => Self::EncodeFailed(Arc::new(e)),
581 }
582 }
583
584 /// Return true if this error is (or might be) due to the peer closing the connection.
585 ///
586 /// Such errors should be tolerated without much complaint;
587 /// other errors should at least be logged somewhere.
588 fn is_connection_close(&self) -> bool {
589 use std::io::ErrorKind as IK;
590 use JsonErrorCategory as JK;
591 #[allow(clippy::match_like_matches_macro)]
592 match self {
593 Self::ReadFailed(e) | Self::WriteFailed(e) => match e.kind() {
594 IK::UnexpectedEof | IK::ConnectionAborted | IK::BrokenPipe => true,
595 _ => false,
596 },
597 Self::DecodeFailed(e) => match e.classify() {
598 JK::Eof => true,
599 _ => false,
600 },
601 _ => false,
602 }
603 }
604
605 /// Construct a `ConnectionError` from a JsonCodecError that occurred while reading.
606 fn from_read_error(error: JsonCodecError) -> Self {
607 match error {
608 JsonCodecError::Io(e) => Self::ReadFailed(Arc::new(e)),
609 JsonCodecError::Json(e) => Self::DecodeFailed(Arc::new(e)),
610 }
611 }
612}
613
614/// A failure from trying to upgrade a `Weak<RpcMgr>`.
615#[derive(Clone, Debug, thiserror::Error, serde::Serialize)]
616pub(crate) enum MgrDisappearedError {
617 /// We tried to upgrade our reference to the RpcMgr, and failed.
618 #[error("RPC manager disappeared; Arti is shutting down?")]
619 RpcMgrDisappeared,
620}
621impl tor_error::HasKind for MgrDisappearedError {
622 fn kind(&self) -> tor_error::ErrorKind {
623 tor_error::ErrorKind::ArtiShuttingDown
624 }
625}
626
627impl rpc::Context for Connection {
628 fn lookup_object(&self, id: &rpc::ObjectId) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
629 Connection::lookup_object(self, id)
630 }
631
632 fn register_owned(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
633 let use_global_id = object.expose_outside_of_session();
634 let local_id = self
635 .inner
636 .lock()
637 .expect("Lock poisoned")
638 .objects
639 .insert_strong(object);
640
641 // Design note: It is a deliberate decision to _always_ use GlobalId for
642 // objects whose IDs are _ever_ exported for use in SOCKS requests. Some
643 // alternatives would be to use GlobalId conditionally, or to have a
644 // separate Method to create a new GlobalId given an existing LocalId.
645 if use_global_id {
646 GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
647 } else {
648 local_id.encode()
649 }
650 }
651
652 fn release_owned(&self, id: &rpc::ObjectId) -> Result<(), rpc::LookupError> {
653 let removed_some = if id.as_ref() == Self::CONNECTION_OBJ_ID {
654 self.inner
655 .lock()
656 .expect("Lock poisoned")
657 .this_connection
658 .take()
659 .is_some()
660 } else {
661 let idx = self.id_into_local_idx(id)?;
662
663 if !idx.is_strong() {
664 return Err(rpc::LookupError::WrongType(id.clone()));
665 }
666
667 self.inner
668 .lock()
669 .expect("Lock poisoned")
670 .objects
671 .remove(idx)
672 .is_some()
673 };
674
675 if removed_some {
676 Ok(())
677 } else {
678 Err(rpc::LookupError::NoObject(id.clone()))
679 }
680 }
681
682 fn dispatch_table(&self) -> &Arc<std::sync::RwLock<rpc::DispatchTable>> {
683 &self.dispatch_table
684 }
685}
686
687/// An error given when an RPC request is cancelled.
688///
689/// This is a separate type from [`crate::cancel::Cancelled`] since eventually
690/// we want to move that type into a general-purpose location, and make it not
691/// RPC-specific.
692#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
693#[error("RPC request was cancelled")]
694pub(crate) struct RequestCancelled;
695
696impl From<RequestCancelled> for RpcError {
697 fn from(_: RequestCancelled) -> Self {
698 RpcError::new(
699 "Request cancelled".into(),
700 rpc::RpcErrorKind::RequestCancelled,
701 )
702 }
703}
704
705/// An error given when we attempt to cancel an RPC request, but cannot.
706///
707#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
708pub(crate) enum CancelError {
709 /// We didn't find any request with the provided ID.
710 ///
711 /// Since we don't keep track of requests after they finish or are cancelled,
712 /// we cannot distinguish the cases where a request has finished,
713 /// where the request has been cancelled,
714 /// or where the request never existed.
715 /// Therefore we collapse them into a single error type.
716 #[error("RPC request not found")]
717 RequestNotFound,
718
719 /// This kind of request cannot be cancelled.
720 #[error("Uncancellable request")]
721 CannotCancelRequest,
722
723 /// We tried to cancel a request but found out it was already cancelled.
724 ///
725 /// This error should be impossible.
726 #[error("Request somehow cancelled twice!")]
727 AlreadyCancelled,
728}
729
730impl From<cancel::CannotCancel> for CancelError {
731 fn from(value: cancel::CannotCancel) -> Self {
732 use cancel::CannotCancel as CC;
733 use CancelError as CE;
734 match value {
735 CC::Cancelled => CE::AlreadyCancelled,
736 // We map "finished" to RequestNotFound since it is not in the general case
737 // distinguishable from it; see documentation on RequestNotFound.
738 CC::Finished => CE::RequestNotFound,
739 }
740 }
741}
742
743impl From<CancelError> for RpcError {
744 fn from(err: CancelError) -> Self {
745 use rpc::RpcErrorKind as REK;
746 use CancelError as CE;
747 let code = match err {
748 CE::RequestNotFound => REK::RequestError,
749 CE::CannotCancelRequest => REK::RequestError,
750 CE::AlreadyCancelled => REK::InternalError,
751 };
752 RpcError::new(err.to_string(), code)
753 }
754}