1
//! RPC connection support, mainloop, and protocol implementation.
2

            
3
pub(crate) mod auth;
4

            
5
use std::{
6
    collections::HashMap,
7
    io::Error as IoError,
8
    pin::Pin,
9
    sync::{Arc, Mutex, RwLock, Weak},
10
};
11

            
12
use asynchronous_codec::JsonCodecError;
13
use derive_deftly::Deftly;
14
use futures::{
15
    channel::mpsc,
16
    stream::{FusedStream, FuturesUnordered},
17
    FutureExt, Sink, SinkExt as _, StreamExt,
18
};
19
use rpc::dispatch::BoxedUpdateSink;
20
use serde_json::error::Category as JsonErrorCategory;
21
use tor_async_utils::SinkExt as _;
22

            
23
use crate::{
24
    cancel::{Cancel, CancelHandle},
25
    err::RequestParseError,
26
    globalid::{GlobalId, MacKey},
27
    msgs::{BoxedResponse, FlexibleRequest, Request, RequestId, ResponseBody},
28
    objmap::{GenIdx, ObjMap},
29
    RpcMgr,
30
};
31

            
32
use tor_rpcbase as rpc;
33
use tor_rpcbase::templates::*;
34

            
35
/// An open connection from an RPC client.
36
///
37
/// Tracks information that persists from one request to another.
38
///
39
/// The client might not have authenticated;
40
/// access and permissions control is handled via the capability system.
41
/// Specifically, the `objects` table in `Inner` hold capabilities
42
/// that the client will use to do things,
43
/// including an `RpcSession`.
44
#[derive(Deftly)]
45
#[derive_deftly(Object)]
46
pub struct Connection {
47
    /// The mutable state of this connection.
48
    inner: Mutex<Inner>,
49

            
50
    /// Lookup table to find the implementations for methods
51
    /// based on RPC object and method types.
52
    ///
53
    /// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
54
    dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
55

            
56
    /// A unique identifier for this connection.
57
    ///
58
    /// This kind of ID is used to refer to the connection from _outside_ of the
59
    /// context of an RPC connection: it can uniquely identify the connection
60
    /// from e.g. a SOCKS session so that clients can attach streams to it.
61
    connection_id: ConnectionId,
62

            
63
    /// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
64
    /// need to exist outside this connection.
65
    global_id_mac_key: MacKey,
66

            
67
    /// A reference to the manager associated with this session.
68
    mgr: Weak<RpcMgr>,
69

            
70
    /// A reference to this connection itself.
71
    ///
72
    /// Used when we're looking up the connection within the RPC system as an object.
73
    ///
74
    /// TODO RPC: Maybe there is an easier way to do this while keeping `context` object-save?
75
    this_connection: Weak<Connection>,
76
}
77

            
78
/// The inner, lock-protected part of an RPC connection.
79
struct Inner {
80
    /// Map from request ID to handles; used when we need to cancel a request.
81
    //
82
    // TODO: We have two options here for handling colliding IDs.  We can either turn
83
    // this into a multimap, or we can declare that cancelling a request only
84
    // cancels the most recent request sent with that ID.
85
    inflight: HashMap<RequestId, CancelHandle>,
86

            
87
    /// An object map used to look up most objects by ID, and keep track of
88
    /// which objects are owned by this connection.
89
    objects: ObjMap,
90
}
91

            
92
/// How many updates can be pending, per connection, before they start to block?
93
const UPDATE_CHAN_SIZE: usize = 128;
94

            
95
/// A type-erased [`FusedStream`] yielding [`Request`]s.
96
//
97
// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
98
// nice and simple.)
99
pub(crate) type BoxedRequestStream = Pin<
100
    Box<dyn FusedStream<Item = Result<FlexibleRequest, asynchronous_codec::JsonCodecError>> + Send>,
101
>;
102

            
103
/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
104
pub(crate) type BoxedResponseSink =
105
    Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
106

            
107
/// A random value used to identify an connection.
108
#[derive(
109
    Copy,
110
    Clone,
111
    Debug,
112
    Eq,
113
    PartialEq,
114
    Hash,
115
    derive_more::From,
116
    derive_more::Into,
117
8
    derive_more::AsRef,
118
)]
119

            
120
// TODO RPC: Document this, and make it participate in the Reader/Writer API
121
// enough that we can stop referring to its internals elsewhere.
122
pub(crate) struct ConnectionId([u8; 16]);
123

            
124
impl ConnectionId {
125
    /// The length of a ConnectionId.
126
    pub(crate) const LEN: usize = 16;
127
}
128

            
129
impl Connection {
130
    /// Create a new connection.
131
    pub(crate) fn new(
132
        connection_id: ConnectionId,
133
        dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
134
        global_id_mac_key: MacKey,
135
        mgr: Weak<RpcMgr>,
136
    ) -> Arc<Self> {
137
        Arc::new_cyclic(|this_connection| Self {
138
            inner: Mutex::new(Inner {
139
                inflight: HashMap::new(),
140
                objects: ObjMap::new(),
141
            }),
142
            dispatch_table,
143
            connection_id,
144
            global_id_mac_key,
145
            mgr,
146
            this_connection: Weak::clone(this_connection),
147
        })
148
    }
149

            
150
    /// If possible, convert an `ObjectId` into a `GenIdx` that can be used in
151
    /// this connection's ObjMap.
152
    fn id_into_local_idx(&self, id: &rpc::ObjectId) -> Result<GenIdx, rpc::LookupError> {
153
        // TODO RPC: Use a tag byte instead of a magic length.
154

            
155
        if id.as_ref().len() == GlobalId::B64_ENCODED_LEN {
156
            // This is the right length to be a GlobalId; let's see if it really
157
            // is one.
158
            //
159
            // Design note: It's not really necessary from a security POV to
160
            // check the MAC here; any possible GenIdx we return will either
161
            // refer to some object we're allowed to name in this session, or to
162
            // no object at all.  Still, we check anyway, since it shouldn't
163
            // hurt to do so.
164
            let global_id = GlobalId::try_decode(&self.global_id_mac_key, id)?;
165
            // We have a GlobalId with a valid MAC. Let's make sure it applies
166
            // to this connection's ObjMap.  (We do not support referring to
167
            // anyone else's objects.)
168
            //
169
            // Design note: As above, this check is a protection against
170
            // accidental misuse, not a security feature: even if we removed
171
            // this check, we would still only allow objects that this session
172
            // is allowed to name.
173
            if global_id.connection == self.connection_id {
174
                Ok(global_id.local_id)
175
            } else {
176
                Err(rpc::LookupError::NoObject(id.clone()))
177
            }
178
        } else {
179
            // It's not a GlobalId; let's see if we can make sense of it as an
180
            // ObjMap index.
181
            Ok(GenIdx::try_decode(id)?)
182
        }
183
    }
184

            
185
    /// Look up a given object by its object ID relative to this connection.
186
    pub(crate) fn lookup_object(
187
        &self,
188
        id: &rpc::ObjectId,
189
    ) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
190
        if id.as_ref() == "connection" {
191
            let this = self
192
                .this_connection
193
                .upgrade()
194
                .ok_or(rpc::LookupError::NoObject(id.clone()))?;
195
            Ok(this as Arc<_>)
196
        } else {
197
            let local_id = self.id_into_local_idx(id)?;
198

            
199
            self.lookup_by_idx(local_id)
200
                .ok_or(rpc::LookupError::NoObject(id.clone()))
201
        }
202
    }
203

            
204
    /// As `lookup_object`, but expect a `GenIdx`.
205
    pub(crate) fn lookup_by_idx(&self, idx: crate::objmap::GenIdx) -> Option<Arc<dyn rpc::Object>> {
206
        let inner = self.inner.lock().expect("lock poisoned");
207
        inner.objects.lookup(idx)
208
    }
209

            
210
    /// Un-register the request `id` and stop tracking its information.
211
    fn remove_request(&self, id: &RequestId) {
212
        let mut inner = self.inner.lock().expect("lock poisoned");
213
        inner.inflight.remove(id);
214
    }
215

            
216
    /// Register the request `id` as a cancellable request.
217
    fn register_request(&self, id: RequestId, handle: CancelHandle) {
218
        let mut inner = self.inner.lock().expect("lock poisoned");
219
        inner.inflight.insert(id, handle);
220
    }
221

            
222
    /// Run in a loop, decoding JSON requests from `input` and
223
    /// writing JSON responses onto `output`.
224
    pub async fn run<IN, OUT>(
225
        self: Arc<Self>,
226
        input: IN,
227
        output: OUT,
228
    ) -> Result<(), ConnectionError>
229
    where
230
        IN: futures::AsyncRead + Send + Sync + Unpin + 'static,
231
        OUT: futures::AsyncWrite + Send + Sync + Unpin + 'static,
232
    {
233
        let write = Box::pin(asynchronous_codec::FramedWrite::new(
234
            output,
235
            crate::codecs::JsonLinesEncoder::<BoxedResponse>::default(),
236
        ));
237

            
238
        let read = Box::pin(
239
            asynchronous_codec::FramedRead::new(
240
                input,
241
                asynchronous_codec::JsonCodec::<(), FlexibleRequest>::new(),
242
            )
243
            .fuse(),
244
        );
245

            
246
        self.run_loop(read, write).await
247
    }
248

            
249
    /// Run in a loop, handling requests from `request_stream` and writing
250
    /// responses onto `response_stream`.
251
    pub(crate) async fn run_loop(
252
        self: Arc<Self>,
253
        mut request_stream: BoxedRequestStream,
254
        mut response_sink: BoxedResponseSink,
255
    ) -> Result<(), ConnectionError> {
256
        // This function will multiplex on three streams:
257
        // * `request_stream` -- a stream of incoming requests from the client.
258
        // * `finished_requests` -- a stream of requests that are done.
259
        // * `rx_response` -- a stream of updates and final responses sent from
260
        //   in-progress tasks. (We put updates and final responsese onto the
261
        //   same channel to ensure that they stay in-order for each method
262
        //   invocation.
263
        //
264
        // Note that the blocking behavior here is deliberate: We want _all_ of
265
        // these reads to start blocking when response_sink.send is blocked.
266

            
267
        let (tx_response, mut rx_response) = mpsc::channel::<BoxedResponse>(UPDATE_CHAN_SIZE);
268
        let mut finished_requests = FuturesUnordered::new();
269
        finished_requests.push(futures::future::pending().boxed());
270

            
271
        'outer: loop {
272
            futures::select! {
273
                r = finished_requests.next() => {
274
                    // A task is done, so we can forget about it.
275
                    let () = r.expect("Somehow, future::pending() terminated.");
276
                }
277

            
278
                r = rx_response.next() => {
279
                    // The future for some request has sent a response (success,
280
                    // failure, or update), so we can inform the client.
281
                    let update = r.expect("Somehow, tx_update got closed.");
282
                    debug_assert!(! update.body.is_final());
283
                    // Calling `await` here (and below) is deliberate: we _want_
284
                    // to stop reading the client's requests if the client is
285
                    // not reading their responses (or not) reading them fast
286
                    // enough.
287
                    response_sink.send(update).await.map_err(ConnectionError::writing)?;
288
                }
289

            
290
                req = request_stream.next() => {
291
                    match req {
292
                        None => {
293
                            // We've reached the end of the stream of requests;
294
                            // time to close.
295
                            break 'outer;
296
                        }
297
                        Some(Err(e)) => {
298
                            // We got a non-recoverable error from the JSON codec.
299

            
300
                            let (reply_with_error, return_error) = match ConnectionError::classify_read_error(e) {
301
                                Ok(()) => break 'outer,
302
                                Err(e) => if let Some(r) = e.as_request_parse_err() {
303
                                    (r, e)
304
                                } else {
305
                                    return Err(e);
306
                                },
307
                            };
308

            
309
                            response_sink
310
                                .send(
311
                                    BoxedResponse::from_error(None, reply_with_error)
312
                                ).await.map_err(ConnectionError::writing)?;
313

            
314
                            // TODO RPC: Perhaps we should keep going on the NotAnObject case?
315
                            //      (InvalidJson is not recoverable!)
316
                            return Err(return_error);
317
                        }
318
                        Some(Ok(FlexibleRequest::Invalid(bad_req))) => {
319
                            response_sink
320
                                .send(
321
                                    BoxedResponse::from_error(bad_req.id().cloned(), bad_req.error())
322
                                ).await.map_err( ConnectionError::writing)?;
323
                            if bad_req.id().is_none() {
324
                                // The spec says we must close the connection in this case.
325
                                break 'outer;
326
                            }
327
                        }
328
                        Some(Ok(FlexibleRequest::Valid(req))) => {
329
                            // We have a request. Time to launch it!
330
                            let fut = self.run_method_and_deliver_response(tx_response.clone(), req);
331
                            finished_requests.push(fut.boxed());
332
                        }
333
                    }
334
                }
335
            }
336
        }
337

            
338
        Ok(())
339
    }
340

            
341
    /// Invoke `request` and send all of its responses to `tx_response`.
342
    async fn run_method_and_deliver_response(
343
        self: &Arc<Self>,
344
        mut tx_response: mpsc::Sender<BoxedResponse>,
345
        request: Request,
346
    ) {
347
        let Request {
348
            id,
349
            obj,
350
            meta,
351
            method,
352
        } = request;
353

            
354
        let update_sender: BoxedUpdateSink = if meta.updates {
355
            let id_clone = id.clone();
356
            let sink =
357
                tx_response
358
                    .clone()
359
                    .with_fn(move |obj: Box<dyn erased_serde::Serialize + Send>| {
360
                        Result::<BoxedResponse, _>::Ok(BoxedResponse {
361
                            id: Some(id_clone.clone()),
362
                            body: ResponseBody::Update(obj),
363
                        })
364
                    });
365
            Box::pin(sink)
366
        } else {
367
            let sink = futures::sink::drain().sink_err_into();
368
            Box::pin(sink)
369
        };
370

            
371
        // Create `run_method_lowlevel` future, and make it cancellable.
372
        let fut = self.run_method_lowlevel(update_sender, obj, method);
373
        let (handle, fut) = Cancel::new(fut);
374
        self.register_request(id.clone(), handle);
375

            
376
        // Run the cancellable future to completion, and figure out how to respond.
377
        let body = match fut.await {
378
            Ok(Ok(value)) => ResponseBody::Success(value),
379
            // TODO: If we're going to box this, let's do so earlier.
380
            Ok(Err(err)) => {
381
                if err.is_internal() {
382
                    tracing::warn!(
383
                        "Reporting an internal error on an RPC connection: {:?}",
384
                        err
385
                    );
386
                }
387
                ResponseBody::Error(Box::new(err))
388
            }
389
            Err(_cancelled) => ResponseBody::Error(Box::new(rpc::RpcError::from(RequestCancelled))),
390
        };
391

            
392
        // Send the response.
393
        //
394
        // (It's okay to ignore the error here, since it can only mean that the
395
        // RPC connection has closed.)
396
        let _ignore_err = tx_response
397
            .send(BoxedResponse {
398
                id: Some(id.clone()),
399
                body,
400
            })
401
            .await;
402

            
403
        // Unregister the request.
404
        self.remove_request(&id);
405
    }
406

            
407
    /// Run a single method, and return its final response.
408
    ///
409
    /// If `tx_updates` is provided, and this method generates updates, it
410
    /// should send those updates on `tx_updates`
411
    ///
412
    /// Note that this function is able to send responses with IDs that do not
413
    /// match the original.  It should enforce correct IDs on whatever response
414
    /// it generates.
415
    async fn run_method_lowlevel(
416
        self: &Arc<Self>,
417
        tx_updates: rpc::dispatch::BoxedUpdateSink,
418
        obj: rpc::ObjectId,
419
        method: Box<dyn rpc::DeserMethod>,
420
    ) -> Result<Box<dyn erased_serde::Serialize + Send + 'static>, rpc::RpcError> {
421
        let obj = self.lookup_object(&obj)?;
422

            
423
        let context: Arc<dyn rpc::Context> = self.clone() as Arc<_>;
424
        let invoke_future = rpc::invoke_rpc_method(context, obj, method.upcast_box(), tx_updates)?;
425

            
426
        // Note that we drop the read lock before we await this future!
427
        invoke_future.await
428
    }
429

            
430
    /// Try to get a strong reference to the RpcMgr for this connection, and
431
    /// return an error if we can't.
432
    pub(crate) fn mgr(&self) -> Result<Arc<RpcMgr>, MgrDisappearedError> {
433
        self.mgr
434
            .upgrade()
435
            .ok_or(MgrDisappearedError::RpcMgrDisappeared)
436
    }
437
}
438

            
439
/// A failure that results in closing a [`Connection`].
440
#[derive(Clone, Debug, thiserror::Error)]
441
#[non_exhaustive]
442
pub enum ConnectionError {
443
    /// Unable to write to our connection.
444
    #[error("Could not write to connection")]
445
    WriteFailed(#[source] Arc<IoError>),
446
    /// Read error from connection.
447
    #[error("Problem reading from connection")]
448
    ReadFailed(#[source] Arc<IoError>),
449
    /// Read something that we could not decode.
450
    #[error("Unable to decode request from connection")]
451
    DecodeFailed(#[source] Arc<serde_json::Error>),
452
    /// Unable to write our response as json.
453
    #[error("Unable to encode response onto connection")]
454
    EncodeFailed(#[source] Arc<serde_json::Error>),
455
}
456

            
457
impl ConnectionError {
458
    /// Construct a new `ConnectionError` from a `JsonCodecError` that has occurred while writing.
459
    fn writing(error: JsonCodecError) -> Self {
460
        match error {
461
            JsonCodecError::Io(e) => Self::WriteFailed(Arc::new(e)),
462
            JsonCodecError::Json(e) => Self::EncodeFailed(Arc::new(e)),
463
        }
464
    }
465

            
466
    /// If this error is the result of a Json decode failure, return an appropriate
467
    /// `RequestParseError`.
468
    fn as_request_parse_err(&self) -> Option<RequestParseError> {
469
        match self {
470
            ConnectionError::DecodeFailed(d) => match d.classify() {
471
                JsonErrorCategory::Syntax => Some(RequestParseError::InvalidJson),
472
                JsonErrorCategory::Data => Some(RequestParseError::NotAnObject),
473
                _ => None,
474
            },
475
            _ => None,
476
        }
477
    }
478

            
479
    /// Decide what to do with an error that has occurred while reading from a Json codec.
480
    ///
481
    /// Return Ok(()) if the error should silently be ignored, and treated as closing the session.
482
    /// Otherwise return the error object.
483
    fn classify_read_error(error: JsonCodecError) -> Result<(), Self> {
484
        use std::io::ErrorKind as IK;
485
        use JsonCodecError as E;
486
        use JsonErrorCategory as JK;
487
        match error {
488
            E::Io(e) => match e.kind() {
489
                IK::UnexpectedEof | IK::ConnectionAborted | IK::BrokenPipe => Ok(()),
490
                _ => Err(ConnectionError::ReadFailed(Arc::new(e))),
491
            },
492
            E::Json(e) => match e.classify() {
493
                JK::Eof => Ok(()),
494
                _ => Err(ConnectionError::DecodeFailed(Arc::new(e))),
495
            },
496
        }
497
    }
498
}
499

            
500
/// A failure from trying to upgrade a `Weak<RpcMgr>`.
501
#[derive(Clone, Debug, thiserror::Error, serde::Serialize)]
502
pub(crate) enum MgrDisappearedError {
503
    /// We tried to upgrade our reference to the RpcMgr, and failed.
504
    #[error("RPC manager disappeared; Arti is shutting down?")]
505
    RpcMgrDisappeared,
506
}
507
impl tor_error::HasKind for MgrDisappearedError {
508
    fn kind(&self) -> tor_error::ErrorKind {
509
        tor_error::ErrorKind::ArtiShuttingDown
510
    }
511
}
512

            
513
impl rpc::Context for Connection {
514
    fn lookup_object(&self, id: &rpc::ObjectId) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
515
        Connection::lookup_object(self, id)
516
    }
517

            
518
    fn register_owned(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
519
        let use_global_id = object.expose_outside_of_session();
520
        let local_id = self
521
            .inner
522
            .lock()
523
            .expect("Lock poisoned")
524
            .objects
525
            .insert_strong(object);
526

            
527
        // Design note: It is a deliberate decision to _always_ use GlobalId for
528
        // objects whose IDs are _ever_ exported for use in SOCKS requests.  Some
529
        // alternatives would be to use GlobalId conditionally, or to have a
530
        // separate Method to create a new GlobalId given an existing LocalId.
531
        if use_global_id {
532
            GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
533
        } else {
534
            local_id.encode()
535
        }
536
    }
537

            
538
    fn register_weak(&self, object: Arc<dyn rpc::Object>) -> rpc::ObjectId {
539
        let use_global_id = object.expose_outside_of_session();
540
        let local_id = self
541
            .inner
542
            .lock()
543
            .expect("Lock poisoned")
544
            .objects
545
            .insert_weak(object);
546
        if use_global_id {
547
            GlobalId::new(self.connection_id, local_id).encode(&self.global_id_mac_key)
548
        } else {
549
            local_id.encode()
550
        }
551
    }
552

            
553
    fn release_owned(&self, id: &rpc::ObjectId) -> Result<(), rpc::LookupError> {
554
        let idx = self.id_into_local_idx(id)?;
555

            
556
        if !idx.is_strong() {
557
            return Err(rpc::LookupError::WrongType(id.clone()));
558
        }
559

            
560
        let removed = self
561
            .inner
562
            .lock()
563
            .expect("Lock poisoned")
564
            .objects
565
            .remove(idx);
566

            
567
        if removed.is_some() {
568
            Ok(())
569
        } else {
570
            Err(rpc::LookupError::NoObject(id.clone()))
571
        }
572
    }
573

            
574
    fn dispatch_table(&self) -> &Arc<std::sync::RwLock<rpc::DispatchTable>> {
575
        &self.dispatch_table
576
    }
577
}
578

            
579
/// An error given when an RPC request is cancelled.
580
///
581
/// This is a separate type from [`crate::cancel::Cancelled`] since eventually
582
/// we want to move that type into a general-purpose location, and make it not
583
/// RPC-specific.
584
2
#[derive(thiserror::Error, Clone, Debug, serde::Serialize)]
585
#[error("RPC request was cancelled")]
586
pub(crate) struct RequestCancelled;
587
impl tor_error::HasKind for RequestCancelled {
588
4
    fn kind(&self) -> tor_error::ErrorKind {
589
4
        // TODO RPC: Can we do better here?
590
4
        tor_error::ErrorKind::Other
591
4
    }
592
}