1
//! Objects that can become or wrap a [`arti_client::DataStream`].
2

            
3
use arti_client::rpc::{
4
    ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
5
};
6
use derive_deftly::Deftly;
7
use std::{
8
    net::IpAddr,
9
    sync::{Arc, Mutex},
10
};
11
use tor_error::into_internal;
12
use tor_proto::stream::DataStreamCtrl;
13
use tor_rpcbase::{self as rpc, templates::*};
14

            
15
use crate::RpcSession;
16

            
17
/// An RPC object representing a single-use client that captures a data-stream.
18
///
19
/// This object is returned by the `arti:new_oneshot_client` method, and starts out with
20
/// enough information to know how to create a `DataStream`, or to respond
21
/// to some other SOCKS request.
22
/// When this object is the target of a SOCKS request,
23
/// it takes its target address, port, and isolation parameters from the SOCKS handshake,
24
/// and launches a data stream.
25
/// It then becomes interchangeable with the stream that was launched.
26
///
27
/// This object is single-use: once a SOCKS request has referred to it,
28
/// it cannot be used for any other SOCKS request.
29
/// (Otherwise, it could not be useable interchangeably with the `DataStream` it creates.)
30
///
31
/// The ObjectID for this object can be used as the target of a SOCKS request.
32
#[derive(Deftly)]
33
#[derive_deftly(Object)]
34
#[deftly(rpc(expose_outside_of_session))]
35
pub(crate) struct OneshotClient {
36
    /// The inner state of this object.
37
    inner: Mutex<Inner>,
38
}
39

            
40
/// The inner state of an `OneshotClient`.
41
///
42
/// A stream is created in the "Unused" state.
43
enum Inner {
44
    /// Newly constructed: Waiting for a SOCKS command.
45
    ///
46
    /// This is the initial state for every OneshotClient.
47
    ///
48
    /// It may become `Launching` or `UsedToResolve`.
49
    Unused(Arc<dyn rpc::Object>),
50

            
51
    /// The actual connection is being made, ie we are within `connect_with_prefs`
52
    ///
53
    /// If the state is `Launching`, no one except `connect_with_prefs` may change it.
54
    ///
55
    /// From this state, a stream may become `Stream`, or `StreamFailed`.
56
    Launching,
57

            
58
    /// Stream constructed; may or may not be connected.
59
    ///
60
    /// A stream does not exit this state.  Even if the stream is closed or fails,
61
    /// its `DataStreamCtrl` remains until it is dropped.
62
    Stream(Arc<DataStreamCtrl>),
63

            
64
    /// Stream was used for a resolve or resolve_ptr request; there is no underlying stream.
65
    ///
66
    /// A stream does not exit this state, even if resolve request fails.
67
    //
68
    // TODO RPC: We may want to make this state hold more information if someday we
69
    // make DNS requests into objects that we can inspect while they are running.
70
    UsedToResolve,
71

            
72
    /// Failed to construct the tor_proto::DataStream object.
73
    ///
74
    /// A stream does not exit this state.
75
    StreamFailed,
76
}
77

            
78
/// Error returned by an operations from OneshotClient.
79
#[derive(Debug, Clone, thiserror::Error)]
80
enum OneshotClientError {
81
    /// Application tried to open a stream using a OneshotClient,
82
    /// but that OneshotClient had already been used previously.
83
    #[error("Data stream object already used")]
84
    AlreadyUsed,
85
}
86

            
87
impl tor_error::HasKind for OneshotClientError {
88
    fn kind(&self) -> tor_error::ErrorKind {
89
        use tor_error::ErrorKind as EK;
90
        use OneshotClientError as E;
91
        match self {
92
            E::AlreadyUsed => EK::BadApiUsage, // TODO RPC: is this the correct ErrorKind?
93
        }
94
    }
95
}
96

            
97
impl OneshotClient {
98
    /// Construct a new unused OneshotClient that will make its connection
99
    /// with `connector`.
100
    ///
101
    /// The `connector` object should implement at least one of ConnectWithPrefs, ResolveWithPrefs,
102
    /// or ResolvePtrWithPrefs, or else it won't actually be useful for anything.
103
    pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
104
        Self {
105
            inner: Mutex::new(Inner::Unused(connector)),
106
        }
107
    }
108

            
109
    /// If this `OneshotClient` is in state Unused, replace its state with `new_state`
110
    /// and return the ClientConnectionTarget.  Otherwise, leave its state unchanged
111
    /// and return an error.
112
    fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, OneshotClientError> {
113
        let mut inner = self.inner.lock().expect("poisoned lock");
114
        let val = std::mem::replace(&mut *inner, new_state);
115
        if let Inner::Unused(conn) = val {
116
            Ok(conn)
117
        } else {
118
            *inner = val;
119
            Err(OneshotClientError::AlreadyUsed)
120
        }
121
    }
122

            
123
    /// Return the `DataStreamCtrl` for this stream, if it has one.
124
    #[allow(dead_code)]
125
    fn get_ctrl(&self) -> Option<Arc<DataStreamCtrl>> {
126
        let inner = self.inner.lock().expect("poisoned lock");
127
        if let Inner::Stream(s) = &*inner {
128
            Some(s.clone())
129
        } else {
130
            None
131
        }
132
    }
133
}
134

            
135
/// Invoke ConnectWithPrefs on an OneshotClient.
136
///
137
/// Unlike the other methods on OneshotClient, this one is somewhat complex, since it must
138
/// re-register the resulting datastream once it has one.
139
async fn oneshot_client_connect_with_prefs(
140
    rpc_data_stream: Arc<OneshotClient>,
141
    mut method: Box<ConnectWithPrefs>,
142
    ctx: Arc<dyn rpc::Context>,
143
) -> ClientConnectionResult<arti_client::DataStream> {
144
    // Extract the connector.
145
    //
146
    // As we do this, we put this OneshotClient into a Launching state.
147
    //
148
    // (`Launching`` wouldn't need to exist if we `connect_with_prefs` were synchronous,
149
    // but it isn't synchronous, so `Launching` is an observable state.)
150
    let connector = rpc_data_stream
151
        .take_connector(Inner::Launching)
152
        .map_err(|e| Box::new(e) as _)?;
153

            
154
    let was_optimistic = method.prefs.is_optimistic();
155
    // We want this to be treated internally as an "optimistic" connection,
156
    // so that inner connect_with_prefs() will return ASAP.
157
    method.prefs.optimistic();
158

            
159
    // Now, launch the connection.  Since we marked it as optimistic,
160
    // this call should return almost immediately.
161
    let stream: Result<arti_client::DataStream, _> =
162
        *rpc::invoke_special_method(ctx, connector, method)
163
            .await
164
            .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
165

            
166
    // Pick the new state for this object, and install it.
167
    let new_obj = match &stream {
168
        Ok(s) => Inner::Stream(s.ctrl().clone()),
169
        Err(_) => Inner::StreamFailed, // TODO RPC: Remember some error information here.
170
    };
171
    {
172
        let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
173
        *inner = new_obj;
174
    }
175
    // Return early on failure.
176
    let mut stream = stream?;
177

            
178
    if !was_optimistic {
179
        // Implement non-optimistic behavior, if that is what was originally configured.
180
        stream
181
            .wait_for_connection()
182
            .await
183
            .map_err(|e| Box::new(e) as _)?;
184
    }
185

            
186
    // Return the stream; the SOCKS layer will take it from here.
187
    Ok(stream)
188
}
189

            
190
/// Invoke ResolveWithPrefs on an OneshotClient
191
async fn oneshot_client_resolve_with_prefs(
192
    rpc_data_stream: Arc<OneshotClient>,
193
    method: Box<ResolveWithPrefs>,
194
    ctx: Arc<dyn rpc::Context>,
195
) -> ClientConnectionResult<Vec<IpAddr>> {
196
    let connector = rpc_data_stream
197
        .take_connector(Inner::UsedToResolve)
198
        .map_err(|e| Box::new(e) as _)?;
199

            
200
    let result = rpc::invoke_special_method(ctx, connector, method)
201
        .await
202
        .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
203

            
204
    *result
205
}
206

            
207
/// Invoke ResolvePtrWithPrefs on an OneshotClient
208
async fn oneshot_client_resolve_ptr_with_prefs(
209
    rpc_data_stream: Arc<OneshotClient>,
210
    method: Box<ResolvePtrWithPrefs>,
211
    ctx: Arc<dyn rpc::Context>,
212
) -> ClientConnectionResult<Vec<String>> {
213
    let connector = rpc_data_stream
214
        .take_connector(Inner::UsedToResolve)
215
        .map_err(|e| Box::new(e) as _)?;
216

            
217
    let result = rpc::invoke_special_method(ctx, connector, method)
218
        .await
219
        .map_err(|e| Box::new(into_internal!("unable to delegate to connector")(e)) as _)?;
220

            
221
    *result
222
}
223

            
224
/// Create a new `RpcOneshotClient` to wait for a SOCKS request.
225
///
226
/// The resulting ObjectID will be a handle to an `RpcOneshotClient`.
227
/// It can be used as the target of a single SOCKS request.
228
///
229
/// Once used for a SOCKS connect request,
230
/// the object will become a handle for the the underlying DataStream
231
/// that was created with the request.
232
#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
233
#[derive_deftly(DynMethod)]
234
#[deftly(rpc(method_name = "arti:new_oneshot_client"))]
235
pub(crate) struct NewOneshotClient {}
236

            
237
impl rpc::RpcMethod for NewOneshotClient {
238
    type Output = rpc::SingleIdResponse;
239
    type Update = rpc::NoUpdates; // TODO actually, updates are quite suitable here.
240
}
241

            
242
/// Helper: construct and register an OneshotClient.
243
fn new_oneshot_client_impl(
244
    connector: Arc<dyn rpc::Object>,
245
    ctx: &dyn rpc::Context,
246
) -> rpc::ObjectId {
247
    let rpc_stream = Arc::new(OneshotClient::new(connector));
248
    ctx.register_owned(rpc_stream as _)
249
}
250

            
251
/// Implement NewOneshotClient for clients.
252
pub(crate) async fn new_oneshot_client_on_client<R: tor_rtcompat::Runtime>(
253
    client: Arc<arti_client::TorClient<R>>,
254
    _method: Box<NewOneshotClient>,
255
    ctx: Arc<dyn rpc::Context>,
256
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
257
    Ok(new_oneshot_client_impl(client, ctx.as_ref()).into())
258
}
259

            
260
/// Implement NewOneshotClient for RpcSession.
261
async fn new_oneshot_client_on_session(
262
    session: Arc<RpcSession>,
263
    _method: Box<NewOneshotClient>,
264
    ctx: Arc<dyn rpc::Context>,
265
) -> Result<rpc::SingleIdResponse, rpc::RpcError> {
266
    Ok(new_oneshot_client_impl(session, ctx.as_ref()).into())
267
}
268
rpc::static_rpc_invoke_fn! {
269
    new_oneshot_client_on_session;
270
    @special oneshot_client_connect_with_prefs;
271
    @special oneshot_client_resolve_with_prefs;
272
    @special oneshot_client_resolve_ptr_with_prefs;
273
}