1
//! Objects that can become or wrap a [`arti_client::DataStream`].
2

            
3
use arti_client::rpc::{
4
    ClientConnectionResult, ConnectWithPrefs, ResolvePtrWithPrefs, ResolveWithPrefs,
5
};
6
use derive_deftly::Deftly;
7
use std::{
8
    net::IpAddr,
9
    sync::{Arc, Mutex},
10
};
11
use tor_proto::stream::DataStreamCtrl;
12
use tor_rpcbase::{self as rpc, templates::*};
13

            
14
use crate::RpcSession;
15

            
16
/// An RPC object representing a (possibly unconstructed) DataStream.
17
///
18
/// This object is created from an RPC method, and starts out with
19
/// enough information to know how to create a DataStream or to respond
20
/// to some other SOCKS request.
21
///
22
/// This object is single-use: once a SOCKS request has referred to it,
23
/// it cannot be used for any other SOCKS request.
24
//
25
// TODO RPC: This object's name is questionable.
26
#[derive(Deftly)]
27
#[derive_deftly(Object)]
28
#[deftly(rpc(expose_outside_of_session))]
29
pub(crate) struct RpcDataStream {
30
    /// The inner state of this object.
31
    inner: Mutex<Inner>,
32
}
33

            
34
/// The inner state of an `RpcDataStream`.
35
///
36
/// A stream is created in the "Unused" state.
37
enum Inner {
38
    /// Newly constructed: Waiting for a SOCKS command.
39
    ///
40
    /// This is the initial state for every RpcDataStream.
41
    ///
42
    /// It may become `Launching` or `UsedToResolve`.
43
    Unused(Arc<dyn rpc::Object>),
44

            
45
    /// The actual connection is being made, ie we are within `connect_with_prefs`
46
    ///
47
    /// If the state is `Launching`, no one except `connect_with_prefs` may change it.
48
    ///
49
    /// From this state, a stream may become `Stream`, or `StreamFailed`.
50
    Launching,
51

            
52
    /// Stream constructed; may or may not be connected.
53
    ///
54
    /// A stream does not exit this state.  Even if the stream is closed or fails,
55
    /// its `DataStreamCtrl` remains until it is dropped.
56
    Stream(Arc<DataStreamCtrl>),
57

            
58
    /// Stream was used for a resolve or resolve_ptr request; there is no underlying stream.
59
    ///
60
    /// A stream does not exit this state, even if resolve request fails.
61
    //
62
    // TODO RPC: We may want to make this state hold more information if someday we
63
    // make DNS requests into objects that we can inspect while they are running.
64
    UsedToResolve,
65

            
66
    /// Failed to construct the tor_proto::DataStream object.
67
    ///
68
    /// A stream does not exit this state.
69
    StreamFailed,
70
}
71

            
72
/// Error returned by an operations from RpcDataStream.
73
#[derive(Debug, Clone, thiserror::Error)]
74
enum DataStreamError {
75
    /// Application tried to provide an identifier for an RpcDataStream,
76
    /// but that RpcDataStream had already been used previously.
77
    #[error("Data stream object already used")]
78
    AlreadyUsed,
79
}
80

            
81
impl tor_error::HasKind for DataStreamError {
82
    fn kind(&self) -> tor_error::ErrorKind {
83
        use tor_error::ErrorKind as EK;
84
        use DataStreamError as E;
85
        match self {
86
            E::AlreadyUsed => EK::BadApiUsage, // TODO RPC: is this the correct ErrorKind?
87
        }
88
    }
89
}
90

            
91
impl RpcDataStream {
92
    /// Construct a new unused RpcDataStream that will make its connection
93
    /// with `connector`.
94
    ///
95
    /// The `connector` object should implement at least one of ConnectWithPrefs, ResolveWithPrefs,
96
    /// or ResolvePtrWithPrefs, or else it won't actually be useful for anything.
97
    pub(crate) fn new(connector: Arc<dyn rpc::Object>) -> Self {
98
        Self {
99
            inner: Mutex::new(Inner::Unused(connector)),
100
        }
101
    }
102

            
103
    /// If this DataStream is in state Unused, replace its state with `new_state`
104
    /// and return the ClientConnectionTarget.  Otherwise, leave its state unchanged
105
    /// and return an error.
106
    fn take_connector(&self, new_state: Inner) -> Result<Arc<dyn rpc::Object>, DataStreamError> {
107
        let mut inner = self.inner.lock().expect("poisoned lock");
108
        let val = std::mem::replace(&mut *inner, new_state);
109
        if let Inner::Unused(conn) = val {
110
            Ok(conn)
111
        } else {
112
            *inner = val;
113
            Err(DataStreamError::AlreadyUsed)
114
        }
115
    }
116

            
117
    /// Return the `DataStreamCtrl` for this stream, if it has one.
118
    #[allow(dead_code)]
119
    fn get_ctrl(&self) -> Option<Arc<DataStreamCtrl>> {
120
        let inner = self.inner.lock().expect("poisoned lock");
121
        if let Inner::Stream(s) = &*inner {
122
            Some(s.clone())
123
        } else {
124
            None
125
        }
126
    }
127
}
128

            
129
/// Invoke ConnectWithPrefs on an RpcDataStream.
130
///
131
/// Unlike the other methods on RpcDataStream, this one is somewhat complex, since it must
132
/// re-register the resulting datastream once it has one.
133
async fn rpcdatastream_connect_with_prefs(
134
    rpc_data_stream: Arc<RpcDataStream>,
135
    mut method: Box<ConnectWithPrefs>,
136
    ctx: Arc<dyn rpc::Context>,
137
) -> ClientConnectionResult<arti_client::DataStream> {
138
    // Extract the connector.
139
    //
140
    // As we do this, we put this RpcDataStream into a Launching state.
141
    //
142
    // (`Launching`` wouldn't need to exist if we `connect_with_prefs` were synchronous,
143
    // but it isn't synchronous, so `Launching` is an observable state.)
144
    let connector = rpc_data_stream
145
        .take_connector(Inner::Launching)
146
        .map_err(|e| Box::new(e) as _)?;
147

            
148
    let was_optimistic = method.prefs.is_optimistic();
149
    // We want this to be treated internally as an "optimistic" connection,
150
    // so that inner connect_with_prefs() will return ASAP.
151
    method.prefs.optimistic();
152

            
153
    // Now, launch the connection.  Since we marked it as optimistic,
154
    // this call should return almost immediately.
155
    let stream: Result<arti_client::DataStream, _> =
156
        *rpc::invoke_special_method(ctx, connector, method)
157
            .await
158
            .map_err(|e| Box::new(e) as _)?;
159

            
160
    // Pick the new state for this object, and install it.
161
    let new_obj = match &stream {
162
        Ok(s) => Inner::Stream(s.ctrl().clone()),
163
        Err(_) => Inner::StreamFailed, // TODO RPC: Remember some error information here.
164
    };
165
    {
166
        let mut inner = rpc_data_stream.inner.lock().expect("poisoned lock");
167
        *inner = new_obj;
168
    }
169
    // Return early on failure.
170
    let mut stream = stream?;
171

            
172
    if !was_optimistic {
173
        // Implement non-optimistic behavior, if that is what was originally configured.
174
        stream
175
            .wait_for_connection()
176
            .await
177
            .map_err(|e| Box::new(e) as _)?;
178
    }
179

            
180
    // Return the stream; the SOCKS layer will take it from here.
181
    Ok(stream)
182
}
183

            
184
/// Invoke ResolveWithPrefs on an RpcDataStream
185
async fn rpcdatastream_resolve_with_prefs(
186
    rpc_data_stream: Arc<RpcDataStream>,
187
    method: Box<ResolveWithPrefs>,
188
    ctx: Arc<dyn rpc::Context>,
189
) -> ClientConnectionResult<Vec<IpAddr>> {
190
    let connector = rpc_data_stream
191
        .take_connector(Inner::UsedToResolve)
192
        .map_err(|e| Box::new(e) as _)?;
193

            
194
    let result = rpc::invoke_special_method(ctx, connector, method)
195
        .await
196
        .map_err(|e| Box::new(e) as _)?;
197

            
198
    *result
199
}
200

            
201
/// Invoke ResolvePtrWithPrefs on an RpcDataStream
202
async fn rpcdatastream_resolve_ptr_with_prefs(
203
    rpc_data_stream: Arc<RpcDataStream>,
204
    method: Box<ResolvePtrWithPrefs>,
205
    ctx: Arc<dyn rpc::Context>,
206
) -> ClientConnectionResult<Vec<String>> {
207
    let connector = rpc_data_stream
208
        .take_connector(Inner::UsedToResolve)
209
        .map_err(|e| Box::new(e) as _)?;
210

            
211
    let result = rpc::invoke_special_method(ctx, connector, method)
212
        .await
213
        .map_err(|e| Box::new(e) as _)?;
214

            
215
    *result
216
}
217

            
218
/// Method to create a stream handle.
219
#[derive(Debug, serde::Deserialize, serde::Serialize, Deftly)]
220
#[derive_deftly(DynMethod)]
221
#[deftly(rpc(method_name = "arti:new_stream_handle"))]
222
pub(crate) struct NewStreamHandle {}
223

            
224
impl rpc::RpcMethod for NewStreamHandle {
225
    type Output = rpc::SingletonId;
226
    type Update = rpc::NoUpdates; // TODO actually, updates are quite suitable here.
227
}
228

            
229
/// Helper: construct and register an RpcDataStream.
230
fn new_stream_handle_impl(
231
    connector: Arc<dyn rpc::Object>,
232
    ctx: &dyn rpc::Context,
233
) -> rpc::ObjectId {
234
    let rpc_stream = Arc::new(RpcDataStream::new(connector));
235
    ctx.register_owned(rpc_stream as _)
236
}
237

            
238
/// Implement NewStreamHandle for clients.
239
pub(crate) async fn new_stream_handle_on_client<R: tor_rtcompat::Runtime>(
240
    client: Arc<arti_client::TorClient<R>>,
241
    _method: Box<NewStreamHandle>,
242
    ctx: Arc<dyn rpc::Context>,
243
) -> Result<rpc::SingletonId, rpc::RpcError> {
244
    Ok(new_stream_handle_impl(client, ctx.as_ref()).into())
245
}
246

            
247
/// Implement NewStreamHandle for RpcSession.
248
async fn new_stream_handle_on_session(
249
    session: Arc<RpcSession>,
250
    _method: Box<NewStreamHandle>,
251
    ctx: Arc<dyn rpc::Context>,
252
) -> Result<rpc::SingletonId, rpc::RpcError> {
253
    Ok(new_stream_handle_impl(session, ctx.as_ref()).into())
254
}
255
rpc::static_rpc_invoke_fn! {
256
    new_stream_handle_on_session;
257
    @special rpcdatastream_connect_with_prefs;
258
    @special rpcdatastream_resolve_with_prefs;
259
    @special rpcdatastream_resolve_ptr_with_prefs;
260
}