arti_rpcserver/stream.rs
1//! Objects that can become or wrap a [`arti_client::DataStream`].
2
3use arti_client::rpc::{
4 ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
5};
6use derive_deftly::Deftly;
7use std::{
8 net::IpAddr,
9 sync::{Arc, Mutex},
10};
11use tor_error::into_internal;
12use tor_proto::stream::ClientDataStreamCtrl;
13use tor_rpcbase::{self as rpc, templates::*};
14
15use crate::RpcSession;
16
17/// An RPC object representing a single-use client that captures a data-stream.
18///
19/// This object is returned by the `arti:new_oneshot_client` method, and starts out with
20/// enough information to know how to create a `DataStream`, or to respond
21/// to some other SOCKS request.
22/// When this object is the target of a SOCKS request,
23/// it takes its target address, port, and isolation parameters from the SOCKS handshake,
24/// and launches a data stream.
25/// It then becomes interchangeable with the stream that was launched.
26///
27/// This object is single-use: once a SOCKS request has referred to it,
28/// it cannot be used for any other SOCKS request.
29/// (Otherwise, it could not be usable interchangeably with the `DataStream` it creates.)
30///
31/// The ObjectID for this object can be used as the target of a SOCKS request.
32#[derive(Deftly)]
33#[derive_deftly(Object)]
34#[deftly(rpc(expose_outside_of_session))]
35pub(crate) struct OneshotClient {
36 /// The inner state of this object.
37 inner: Mutex<Inner>,
38}
39
40/// The inner state of an `OneshotClient`.
41///
42/// A stream is created in the "Unused" state.
43enum Inner {
44 /// Newly constructed: Waiting for a SOCKS command.
45 ///
46 /// This is the initial state for every OneshotClient.
47 ///
48 /// It may become `Launching` or `UsedToResolve`.
49 Unused(Arc<dyn rpc::Object>),
50
51 /// The actual connection is being made, ie we are within `connect_with_prefs`
52 ///
53 /// If the state is `Launching`, no one except `connect_with_prefs` may change it.
54 ///
55 /// From this state, a stream may become `Stream`, or `StreamFailed`.
56 Launching,
57
58 /// Stream constructed; may or may not be connected.
59 ///
60 /// A stream does not exit this state. Even if the stream is closed or fails,
61 /// its `ClientDataStreamCtrl` remains until it is dropped.
62 Stream(Arc<ClientDataStreamCtrl>),
63
64 /// Stream was used for a resolve or resolve_ptr request; there is no underlying stream.
65 ///
66 /// A stream does not exit this state, even if resolve request fails.
67 //
68 // TODO RPC: We may want to make this state hold more information if someday we
69 // make DNS requests into objects that we can inspect while they are running.
70 UsedToResolve,
71
72 /// Failed to construct the tor_proto::DataStream object.
73 ///
74 /// A stream does not exit this state.
75 StreamFailed,
76}
77
78/// Error returned by an operations from OneshotClient.
79#[derive(Debug, Clone, thiserror::Error)]
80enum OneshotClientError {
81 /// Application tried to open a stream using a OneshotClient,
82 /// but that OneshotClient had already been used previously.
83 #[error("Data stream object already used")]
84 AlreadyUsed,
85}
86
87impl tor_error::HasKind for OneshotClientError {
88 fn kind(&self) -> tor_error::ErrorKind {
89 use tor_error::ErrorKind as EK;
90 use OneshotClientError as E;
91 match self {
92 E::AlreadyUsed => EK::BadApiUsage, // TODO RPC: is this the correct ErrorKind?
93 }
94 }
95}
96
97impl OneshotClient {
98 /// Construct a new unused OneshotClient that will make its connection
99 /// with `connector`.
100 ///
101 /// The `connector` object should implement at least one of ConnectWithPrefs, ResolveWithPrefs,
102 /// or ResolvePtrWithPrefs, or else it won't actually be useful for anything.
103 pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
104 Self {
105 inner: Mutex::new(Inner::Unused(connector)),
106 }
107 }
108
109 /// If this `OneshotClient` is in state Unused, replace its state with `new_state`
110 /// and return the ClientConnectionTarget. Otherwise, leave its state unchanged
111 /// and return an error.
112 fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, OneshotClientError> {
113 let mut inner = self.inner.lock().expect("poisoned lock");
114 let val = std::mem::replace(&mut *inner, new_state);
115 if let Inner::Unused(conn) = val {
116 Ok(conn)
117 } else {
118 *inner = val;
119 Err(OneshotClientError::AlreadyUsed)
120 }
121 }
122
123 /// Return the `ClientDataStreamCtrl` for this stream, if it has one.
124 #[allow(dead_code)]
125 fn get_ctrl(&self) -> Option<Arc<ClientDataStreamCtrl>> {
126 let inner = self.inner.lock().expect("poisoned lock");
127 if let Inner::Stream(s) = &*inner {
128 Some(s.clone())
129 } else {
130 None
131 }
132 }
133}
134
135/// Invoke ConnectWithPrefs on an OneshotClient.
136///
137/// Unlike the other methods on OneshotClient, this one is somewhat complex, since it must
138/// re-register the resulting datastream once it has one.
139async fn oneshot_client_connect_with_prefs(
140 rpc_data_stream: Arc<OneshotClient>,
141 mut method: Box<ConnectWithPrefs>,
142 ctx: Arc<dyn rpc::Context>,
143) -> ClientConnectionResult<arti_client::DataStream> {
144 // Extract the connector.
145 //
146 // As we do this, we put this OneshotClient into a Launching state.
147 //
148 // (`Launching`` wouldn't need to exist if we `connect_with_prefs` were synchronous,
149 // but it isn't synchronous, so `Launching` is an observable state.)
150 let connector = rpc_data_stream
151 .take_connector(Inner::Launching)
152 .map_err(|e| Box::new(e) as _)?;
153
154 // Internally, we're going to tell tor-proto to make an optimistic stream.
155 // The only effect here is that the DataStream will be returned immediately by
156 // our invoke_special_method call, which would otherwise call `wait_for_connection`
157 // if the stream was _not_ originally optimistic.
158 //
159 // We use `was_optimistic` to remember whether the prefs was _originally_
160 // configured to give an optimistic stream,
161 // so that we know whether _we_ should do the `wait_for_connection``.
162 //
163 // From the POV of the SOCKS proxy code that is calling this function,
164 // it will still receive the requested optimistic or non-optimistic behavior,
165 // since the `wait_for_connection` call will still happen (or not happen)
166 // as requested, causing _this_ function to possibly wait.
167 //
168 // The only observable impact here is that this object
169 // will immediately transition to its new state,
170 // so that other RPC calls will see a `DataStreamCtrl` object.
171 let was_optimistic = method.prefs.is_optimistic();
172 method.prefs.optimistic();
173
174 // Now, launch the connection. Since we marked it as optimistic,
175 // this call should return almost immediately.
176 let stream: Result<arti_client::DataStream, _> =
177 *rpc::invoke_special_method(ctx, connector, method)
178 .await
179 .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
180
181 // Pick the new state for this object, and install it.
182 let new_obj = match &stream {
183 Ok(s) => Inner::Stream(
184 s.client_stream_ctrl()
185 .expect("Created a client stream with no ClientDataStreamCtrl!?")
186 .clone(),
187 ),
188 Err(_) => Inner::StreamFailed, // TODO RPC: Remember some error information here.
189 };
190 {
191 let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
192 *inner = new_obj;
193 }
194 // Return early on failure.
195 let mut stream = stream?;
196
197 if !was_optimistic {
198 // Implement non-optimistic behavior, if that is what was originally configured.
199 stream
200 .wait_for_connection()
201 .await
202 .map_err(|e| Box::new(e) as _)?;
203 }
204
205 // Return the stream; the SOCKS layer will take it from here.
206 Ok(stream)
207}
208
209/// Invoke ResolveWithPrefs on an OneshotClient
210async fn oneshot_client_resolve_with_prefs(
211 rpc_data_stream: Arc<OneshotClient>,
212 method: Box<ResolveWithPrefs>,
213 ctx: Arc<dyn rpc::Context>,
214) -> ClientConnectionResult<Vec<IpAddr>> {
215 let connector = rpc_data_stream
216 .take_connector(Inner::UsedToResolve)
217 .map_err(|e| Box::new(e) as _)?;
218
219 let result = rpc::invoke_special_method(ctx, connector, method)
220 .await
221 .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
222
223 *result
224}
225
226/// Invoke ResolvePtrWithPrefs on an OneshotClient
227async fn oneshot_client_resolve_ptr_with_prefs(
228 rpc_data_stream: Arc<OneshotClient>,
229 method: Box<ResolvePtrWithPrefs>,
230 ctx: Arc<dyn rpc::Context>,
231) -> ClientConnectionResult<Vec<String>> {
232 let connector = rpc_data_stream
233 .take_connector(Inner::UsedToResolve)
234 .map_err(|e| Box::new(e) as _)?;
235
236 let result = rpc::invoke_special_method(ctx, connector, method)
237 .await
238 .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
239
240 *result
241}
242
243/// Create a new `RpcOneshotClient` to wait for a SOCKS request.
244///
245/// The resulting ObjectID will be a handle to an `RpcOneshotClient`.
246/// It can be used as the target of a single SOCKS request.
247///
248/// Once used for a SOCKS connect request,
249/// the object will become a handle for the the underlying DataStream
250/// that was created with the request.
251#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
252#[derive_deftly(DynMethod)]
253#[deftly(rpc(method_name = "arti:new_oneshot_client"))]
254pub(crate) struct NewOneshotClient {}
255
256impl rpc::RpcMethod for NewOneshotClient {
257 type Output = rpc::SingleIdResponse;
258 type Update = rpc::NoUpdates; // TODO actually, updates are quite suitable here.
259}
260
261/// Helper: construct and register an OneshotClient.
262fn new_oneshot_client_impl(
263 connector: Arc<dyn rpc::Object>,
264 ctx: &dyn rpc::Context,
265) -> rpc::ObjectId {
266 let rpc_stream = Arc::new(OneshotClient::new(connector));
267 ctx.register_owned(rpc_stream as _)
268}
269
270/// Implement NewOneshotClient for clients.
271pub(crate) async fn new_oneshot_client_on_client<R: tor_rtcompat::Runtime>(
272 client: Arc<arti_client::TorClient<R>>,
273 _method: Box<NewOneshotClient>,
274 ctx: Arc<dyn rpc::Context>,
275) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
276 Ok(new_oneshot_client_impl(client, ctx.as_ref()).into())
277}
278
279/// Implement NewOneshotClient for RpcSession.
280async fn new_oneshot_client_on_session(
281 session: Arc<RpcSession>,
282 _method: Box<NewOneshotClient>,
283 ctx: Arc<dyn rpc::Context>,
284) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
285 Ok(new_oneshot_client_impl(session, ctx.as_ref()).into())
286}
287rpc::static_rpc_invoke_fn! {
288 new_oneshot_client_on_session;
289 @special oneshot_client_connect_with_prefs;
290 @special oneshot_client_resolve_with_prefs;
291 @special oneshot_client_resolve_ptr_with_prefs;
292}