1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
//! Top-level `RpcMgr` to launch sessions.
use std::sync::{Arc, Mutex, RwLock, Weak};
use rand::Rng;
use rpc::InvalidMethodName;
use tor_rpcbase as rpc;
use tracing::warn;
use weak_table::WeakValueHashMap;
use crate::{
connection::{Connection, ConnectionId},
globalid::{GlobalId, MacKey},
RpcAuthentication,
};
/// A function we use to construct Session objects in response to authentication.
//
// TODO RPC: Perhaps this should return a Result?
type SessionFactory = Box<dyn Fn(&RpcAuthentication) -> Arc<dyn rpc::Object> + Send + Sync>;
/// Shared state, configuration, and data for all RPC sessions.
///
/// An RpcMgr knows how to listen for incoming RPC connections, and launch sessions based on them.
///
/// TODO RPC: Actually not all of the above functionality is implemented yet. But it should be.
pub struct RpcMgr {
/// A key that we use to ensure that identifiers are unforgeable.
///
/// When giving out a global (non-session-bound) identifier, we use this key
/// to authenticate the identifier when it's given back to us.
///
/// We make copies of this key when constructing a session.
global_id_mac_key: MacKey,
/// Our reference to the dispatch table used to look up the functions that
/// implement each object on each.
///
/// Shared with each [`Connection`].
///
/// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
/// A function that we use to construct new Session objects when authentication
/// is successful.
session_factory: SessionFactory,
/// Lock-protected view of the manager's state.
///
/// **NOTE: observe the [Lock hierarchy](crate::mgr::Inner#lock-hierarchy)**
///
/// This mutex is at an _inner_ level
/// compared to the
/// per-Connection locks.
/// You must not take any per-connection lock if you
/// hold this lock.
/// Code that holds this lock must be checked
/// to make sure that it doesn't then acquire any `Connection` lock.
inner: Mutex<Inner>,
}
/// The [`RpcMgr`]'s state. This is kept inside a lock for interior mutability.
///
/// # Lock hierarchy
///
/// This system has, relevantly to the RPC code, three locks.
/// In order from outermost (acquire earlier) to innermost (acquire later):
///
/// 1. [`Connection`]`.inner`
/// 2. [`RpcMgr`]`.inner`
/// 3. `RwLock<rpc::DispatchTable>`
/// (found in [`RpcMgr`]`.dispatch_table` *and* [`Connection`]`.dispatch_table`)
///
/// To avoid deadlock, when more than one of these locks is acquired,
/// they must be acquired in an order consistent with the order listed above.
///
/// (This ordering is slightly surprising:
/// normally a lock covering more-global state would be
/// "outside" (or "earlier")
/// compared to one covering more-narrowly-relevant state.)
// pub(crate) so we can link to the doc comment and its lock hierarchy
pub(crate) struct Inner {
/// A map from [`ConnectionId`] to weak [`Connection`] references.
///
/// We use this map to give connections a manager-global identifier that can
/// be used to identify them from a SOCKS connection (or elsewhere outside
/// of the RPC system).
///
/// We _could_ use a generational arena here, but there isn't any point:
/// since these identifiers are global, we need to keep them secure by
/// MACing anything derived from them, which in turn makes the overhead of a
/// HashMap negligible.
connections: WeakValueHashMap<ConnectionId, Weak<Connection>>,
}
/// An error from creating or using an RpcMgr.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum RpcMgrError {
/// At least one method had an invalid name.
#[error("Method {1} had an invalid name")]
InvalidMethodName(#[source] InvalidMethodName, String),
}
impl RpcMgr {
/// Create a new RpcMgr.
pub fn new<F>(make_session: F) -> Result<Arc<Self>, RpcMgrError>
where
F: Fn(&RpcAuthentication) -> Arc<dyn rpc::Object> + Send + Sync + 'static,
{
let problems = rpc::check_method_names([]);
// We warn about every problem.
for (m, err) in &problems {
warn!("Internal issue: Invalid RPC method name {m:?}: {err}");
}
let fatal_problem = problems
.into_iter()
// We don't treat UnrecognizedNamespace as fatal; somebody else might be extending our methods.
.find(|(_, err)| !matches!(err, InvalidMethodName::UnrecognizedNamespace));
if let Some((name, err)) = fatal_problem {
return Err(RpcMgrError::InvalidMethodName(err, name.to_owned()));
}
Ok(Arc::new(RpcMgr {
global_id_mac_key: MacKey::new(&mut rand::thread_rng()),
dispatch_table: Arc::new(RwLock::new(rpc::DispatchTable::from_inventory())),
session_factory: Box::new(make_session),
inner: Mutex::new(Inner {
connections: WeakValueHashMap::new(),
}),
}))
}
/// Extend our method dispatch table with the method entries in `entries`.
///
/// Ignores any entries that
///
/// # Panics
///
/// Panics if any entries are conflicting, according to the logic of
/// [`DispatchTable::insert`](rpc::DispatchTable::insert)
pub fn register_rpc_methods<I>(&self, entries: I)
where
I: IntoIterator<Item = rpc::dispatch::InvokerEnt>,
{
// TODO: Conceivably we might want to get a read lock on the RPC dispatch table,
// check for the presence of these entries, and only take the write lock
// if the entries are absent. But for now, this function is called during
// RpcMgr initialization, so there's no reason to optimize it.
self.with_dispatch_table(|table| table.extend(entries));
}
/// Run `func` with a mutable reference to our dispatch table as an argument.
///
/// Used to register additional methods.
pub fn with_dispatch_table<F, T>(&self, func: F) -> T
where
F: FnOnce(&mut rpc::DispatchTable) -> T,
{
let mut table = self.dispatch_table.write().expect("poisoned lock");
func(&mut table)
}
/// Start a new session based on this RpcMgr, with a given TorClient.
pub fn new_connection(self: &Arc<Self>) -> Arc<Connection> {
let connection_id = ConnectionId::from(rand::thread_rng().gen::<[u8; 16]>());
let connection = Arc::new(Connection::new(
connection_id,
self.dispatch_table.clone(),
self.global_id_mac_key.clone(),
Arc::downgrade(self),
));
let mut inner = self.inner.lock().expect("poisoned lock");
let old = inner.connections.insert(connection_id, connection.clone());
assert!(
old.is_none(),
// Specifically, we shouldn't expect collisions until we have made on the
// order of 2^64 connections, and that shouldn't be possible on
// realistic systems.
"connection ID collision detected; this is phenomenally unlikely!",
);
connection
}
/// Look up an object in the context of this `RpcMgr`.
///
/// Some object identifiers exist in a manager-global context, so that they
/// can be used outside of a single RPC session. This function looks up an
/// object by such an identifier string. It returns an error if the
/// identifier is invalid or the object does not exist.
pub fn lookup_object(
&self,
id: &rpc::ObjectId,
) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
let global_id = GlobalId::try_decode(&self.global_id_mac_key, id)?;
self.lookup_by_global_id(&global_id)
.ok_or_else(|| rpc::LookupError::NoObject(id.clone()))
}
/// As `lookup_object`, but takes a parsed and validated [`GlobalId`].
pub(crate) fn lookup_by_global_id(&self, id: &GlobalId) -> Option<Arc<dyn rpc::Object>> {
let connection = {
let inner = self.inner.lock().expect("lock poisoned");
let connection = inner.connections.get(&id.connection)?;
// Here we release the lock on self.inner, which makes it okay to
// invoke a method on `connection` that may take its lock.
drop(inner);
connection
};
connection.lookup_by_idx(id.local_id)
}
/// Construct a new object to serve as the `session` for a connection.
pub(crate) fn create_session(&self, auth: &RpcAuthentication) -> Arc<dyn rpc::Object> {
(self.session_factory)(auth)
}
}