1
//! Implement a concrete type to build channels over a transport.
2

            
3
use std::io;
4
use std::sync::{Arc, Mutex};
5

            
6
use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
7
use crate::transport::TransportImplHelper;
8
use crate::{event::ChanMgrEventSender, Error};
9

            
10
use std::time::Duration;
11
use tor_error::internal;
12
use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
13
use tor_proto::channel::kist::KistParams;
14
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
15
use tor_proto::memquota::ChannelAccount;
16
use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
17

            
18
use async_trait::async_trait;
19
use futures::task::SpawnExt;
20

            
21
/// TLS-based channel builder.
22
///
23
/// This is a separate type so that we can keep our channel management code
24
/// network-agnostic.
25
///
26
/// It uses a provided `TransportHelper` type to make a connection (possibly
27
/// directly over TCP, and possibly over some other protocol).  It then
28
/// negotiates TLS over that connection, and negotiates a Tor channel over that
29
/// TLS session.
30
///
31
/// This channel builder does not retry on failure, but it _does_ implement a
32
/// time-out.
33
pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
34
where
35
    R: tor_rtcompat::TlsProvider<H::Stream>,
36
{
37
    /// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
38
    runtime: R,
39
    /// The transport object that we use to construct streams.
40
    transport: H,
41
    /// Object to build TLS connections.
42
    tls_connector: <R as TlsProvider<H::Stream>>::Connector,
43
}
44

            
45
impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
46
where
47
    R: TlsProvider<H::Stream>,
48
{
49
    /// Construct a new ChanBuilder.
50
46
    pub fn new(runtime: R, transport: H) -> Self {
51
46
        let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
52
46
        ChanBuilder {
53
46
            runtime,
54
46
            transport,
55
46
            tls_connector,
56
46
        }
57
46
    }
58
}
59
#[async_trait]
60
impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
61
where
62
    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
63
    H: Send + Sync,
64
{
65
    async fn connect_via_transport(
66
        &self,
67
        target: &OwnedChanTarget,
68
        reporter: BootstrapReporter,
69
        memquota: ChannelAccount,
70
2
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
71
        use tor_rtcompat::SleepProviderExt;
72

            
73
        // TODO: make this an option.  And make a better value.
74
2
        let delay = if target.chan_method().is_direct() {
75
2
            std::time::Duration::new(5, 0)
76
        } else {
77
            std::time::Duration::new(10, 0)
78
        };
79

            
80
2
        let connect_future = self.connect_no_timeout(target, reporter.0, memquota);
81
2
        self.runtime
82
2
            .timeout(delay, connect_future)
83
2
            .await
84
2
            .map_err(|_| Error::ChanTimeout {
85
                peer: target.to_logged(),
86
2
            })?
87
4
    }
88
}
89

            
90
#[async_trait]
91
impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
92
where
93
    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
94
    H: Send + Sync,
95
{
96
    type Stream = H::Stream;
97

            
98
    #[cfg(feature = "relay")]
99
    async fn accept_from_transport(
100
        &self,
101
        peer: std::net::SocketAddr,
102
        stream: Self::Stream,
103
        _memquota: ChannelAccount,
104
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
105
        let map_ioe = |ioe, action| Error::Io {
106
            action,
107
            peer: Some(BridgeAddr::new_addr_from_sockaddr(peer).into()),
108
            source: ioe,
109
        };
110

            
111
        let _tls = self
112
            .tls_connector
113
            .negotiate_unvalidated(stream, "ignored")
114
            .await
115
            .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
116

            
117
        // TODO RELAY: do handshake and build channel
118
        todo!();
119
    }
120
}
121

            
122
impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
123
where
124
    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
125
    H: Send + Sync,
126
{
127
    /// Perform the work of `connect_via_transport`, but without enforcing a timeout.
128
2
    async fn connect_no_timeout(
129
2
        &self,
130
2
        target: &OwnedChanTarget,
131
2
        event_sender: Arc<Mutex<ChanMgrEventSender>>,
132
2
        memquota: ChannelAccount,
133
2
    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
134
        use tor_proto::channel::ChannelBuilder;
135
        use tor_rtcompat::tls::CertifiedConn;
136

            
137
2
        {
138
2
            event_sender.lock().expect("Lock poisoned").record_attempt();
139
2
        }
140

            
141
        // 1a. Negotiate the TCP connection or other stream.
142

            
143
2
        let (using_target, stream) = self.transport.connect(target).await?;
144
2
        let using_method = using_target.chan_method();
145
2
        let peer = using_target.chan_method().target_addr();
146
2
        let peer_ref = &peer;
147
2

            
148
4
        let map_ioe = |action: &'static str| {
149
4
            let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| {
150
4
                let peer: Option<BridgeAddr> = peer.clone().into();
151
4
                peer
152
4
            });
153
            move |ioe: io::Error| Error::Io {
154
                action,
155
                peer: peer.map(Into::into),
156
                source: ioe.into(),
157
            }
158
4
        };
159

            
160
2
        {
161
2
            // TODO(nickm): At some point, it would be helpful to the
162
2
            // bootstrapping logic if we could distinguish which
163
2
            // transport just succeeded.
164
2
            event_sender
165
2
                .lock()
166
2
                .expect("Lock poisoned")
167
2
                .record_tcp_success();
168
2
        }
169

            
170
        // 1b. Negotiate TLS.
171

            
172
        // TODO: add a random hostname here if it will be used for SNI?
173
2
        let tls = self
174
2
            .tls_connector
175
2
            .negotiate_unvalidated(stream, "ignored")
176
2
            .await
177
2
            .map_err(map_ioe("TLS negotiation"))?;
178

            
179
2
        let peer_cert = tls
180
2
            .peer_certificate()
181
2
            .map_err(map_ioe("TLS certs"))?
182
2
            .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
183

            
184
2
        {
185
2
            event_sender
186
2
                .lock()
187
2
                .expect("Lock poisoned")
188
2
                .record_tls_finished();
189
2
        }
190
2

            
191
2
        // 2. Set up the channel.
192
2
        let mut builder = ChannelBuilder::new();
193
2
        builder.set_declared_method(using_method);
194
2
        let chan = builder
195
2
            .launch(
196
2
                tls,
197
2
                self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
198
2
                memquota,
199
2
            )
200
2
            .connect(|| self.runtime.wallclock())
201
2
            .await
202
2
            .map_err(|e| Error::from_proto_no_skew(e, &using_target))?;
203
2
        let clock_skew = Some(chan.clock_skew()); // Not yet authenticated; can't use it till `check` is done.
204
2
        let now = self.runtime.wallclock();
205
2
        let chan = chan
206
2
            .check(target, &peer_cert, Some(now))
207
2
            .map_err(|source| match &source {
208
                tor_proto::Error::HandshakeCertsExpired { .. } => {
209
                    event_sender
210
                        .lock()
211
                        .expect("Lock poisoned")
212
                        .record_handshake_done_with_skewed_clock();
213
                    Error::Proto {
214
                        source,
215
                        peer: using_target.to_logged(),
216
                        clock_skew,
217
                    }
218
                }
219
                _ => Error::from_proto_no_skew(source, &using_target),
220
2
            })?;
221
2
        let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto {
222
            source,
223
            peer: target.to_logged(),
224
            clock_skew,
225
2
        })?;
226

            
227
2
        {
228
2
            event_sender
229
2
                .lock()
230
2
                .expect("Lock poisoned")
231
2
                .record_handshake_done();
232
2
        }
233
2

            
234
2
        // 3. Launch a task to run the channel reactor.
235
2
        self.runtime
236
2
            .spawn(async {
237
2
                let _ = reactor.run().await;
238
2
            })
239
2
            .map_err(|e| Error::from_spawn("channel reactor", e))?;
240
2
        Ok(chan)
241
2
    }
242
}
243

            
244
impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
245
2
    fn is_usable(&self) -> bool {
246
2
        !self.is_closing()
247
2
    }
248
2
    fn duration_unused(&self) -> Option<Duration> {
249
2
        self.duration_unused()
250
2
    }
251
18
    fn reparameterize(
252
18
        &self,
253
18
        updates: Arc<ChannelPaddingInstructionsUpdates>,
254
18
    ) -> tor_proto::Result<()> {
255
18
        tor_proto::channel::Channel::reparameterize(self, updates)
256
18
    }
257
    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
258
        tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
259
    }
260
6
    fn engage_padding_activities(&self) {
261
6
        tor_proto::channel::Channel::engage_padding_activities(self);
262
6
    }
263
}
264

            
265
#[cfg(test)]
266
mod test {
267
    // @@ begin test lint list maintained by maint/add_warning @@
268
    #![allow(clippy::bool_assert_comparison)]
269
    #![allow(clippy::clone_on_copy)]
270
    #![allow(clippy::dbg_macro)]
271
    #![allow(clippy::mixed_attributes_style)]
272
    #![allow(clippy::print_stderr)]
273
    #![allow(clippy::print_stdout)]
274
    #![allow(clippy::single_char_pattern)]
275
    #![allow(clippy::unwrap_used)]
276
    #![allow(clippy::unchecked_duration_subtraction)]
277
    #![allow(clippy::useless_vec)]
278
    #![allow(clippy::needless_pass_by_value)]
279
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
280
    use super::*;
281
    use crate::{
282
        mgr::{AbstractChannel, AbstractChannelFactory},
283
        Result,
284
    };
285
    use futures::StreamExt as _;
286
    use std::net::SocketAddr;
287
    use std::time::{Duration, SystemTime};
288
    use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
289
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
290
    use tor_llcrypto::pk::rsa::RsaIdentity;
291
    use tor_proto::channel::Channel;
292
    use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
293
    use tor_rtcompat::{test_with_one_runtime, NetStreamListener};
294
    use tor_rtmock::{io::LocalStream, net::MockNetwork};
295

            
296
    #[allow(deprecated)] // TODO #1885
297
    use tor_rtmock::MockSleepRuntime;
298

            
299
    // Make sure that the builder can build a real channel.  To test
300
    // this out, we set up a listener that pretends to have the right
301
    // IP, fake the current time, and use a canned response from
302
    // [`testing::msgs`] crate.
303
    #[test]
304
    fn build_ok() -> Result<()> {
305
        use crate::testing::msgs;
306
        let orport: SocketAddr = msgs::ADDR.parse().unwrap();
307
        let ed: Ed25519Identity = msgs::ED_ID.into();
308
        let rsa: RsaIdentity = msgs::RSA_ID.into();
309
        let client_addr = "192.0.2.17".parse().unwrap();
310
        let tls_cert = msgs::X509_CERT.into();
311
        let target = OwnedChanTarget::builder()
312
            .addrs(vec![orport])
313
            .method(ChannelMethod::Direct(vec![orport]))
314
            .ed_identity(ed)
315
            .rsa_identity(rsa)
316
            .build()
317
            .unwrap();
318
        let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
319

            
320
        test_with_one_runtime!(|rt| async move {
321
            // Stub out the internet so that this connection can work.
322
            let network = MockNetwork::new();
323

            
324
            // Set up a client runtime with a given IP
325
            let client_rt = network
326
                .builder()
327
                .add_address(client_addr)
328
                .runtime(rt.clone());
329
            // Mock the current time too
330
            #[allow(deprecated)] // TODO #1885
331
            let client_rt = MockSleepRuntime::new(client_rt);
332

            
333
            // Set up a relay runtime with a different IP
334
            let relay_rt = network
335
                .builder()
336
                .add_address(orport.ip())
337
                .runtime(rt.clone());
338

            
339
            // open a fake TLS listener and be ready to handle a request.
340
            let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
341

            
342
            // Tell the client to believe in a different timestamp.
343
            client_rt.jump_to(now);
344

            
345
            // Create the channel builder that we want to test.
346
            let transport = crate::transport::DefaultTransport::new(client_rt.clone());
347
            let builder = ChanBuilder::new(client_rt, transport);
348

            
349
            let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
350
                async {
351
                    // client-side: build a channel!
352
                    builder
353
                        .build_channel(
354
                            &target,
355
                            BootstrapReporter::fake(),
356
                            ChannelAccount::new_noop(),
357
                        )
358
                        .await
359
                },
360
                async {
361
                    // relay-side: accept the channel
362
                    // (and pretend to know what we're doing).
363
                    let (mut con, addr) = lis
364
                        .incoming()
365
                        .next()
366
                        .await
367
                        .expect("Closed?")
368
                        .expect("accept failed");
369
                    assert_eq!(client_addr, addr.ip());
370
                    crate::testing::answer_channel_req(&mut con)
371
                        .await
372
                        .expect("answer failed");
373
                    Ok(con)
374
                }
375
            );
376

            
377
            let chan = r1.unwrap();
378
            assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
379
            assert!(chan.is_usable());
380
            // In theory, time could pass here, so we can't just use
381
            // "assert_eq!(dur_unused, dur_unused2)".
382
            let dur_unused = Channel::duration_unused(&chan);
383
            let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
384
            let dur_unused_3 = Channel::duration_unused(&chan);
385
            assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
386
            assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
387

            
388
            r2.unwrap();
389
            Ok(())
390
        })
391
    }
392

            
393
    // TODO: Write tests for timeout logic, once there is smarter logic.
394
}