tor_chanmgr/
builder.rs

1//! Implement a concrete type to build channels over a transport.
2
3use std::io;
4use std::sync::{Arc, Mutex};
5
6use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
7use crate::transport::TransportImplHelper;
8use crate::{event::ChanMgrEventSender, Error};
9
10use std::time::Duration;
11use tor_error::internal;
12use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
13use tor_proto::channel::kist::KistParams;
14use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
15use tor_proto::memquota::ChannelAccount;
16use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
17
18use async_trait::async_trait;
19use futures::task::SpawnExt;
20
21/// TLS-based channel builder.
22///
23/// This is a separate type so that we can keep our channel management code
24/// network-agnostic.
25///
26/// It uses a provided `TransportHelper` type to make a connection (possibly
27/// directly over TCP, and possibly over some other protocol).  It then
28/// negotiates TLS over that connection, and negotiates a Tor channel over that
29/// TLS session.
30///
31/// This channel builder does not retry on failure, but it _does_ implement a
32/// time-out.
33pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
34where
35    R: tor_rtcompat::TlsProvider<H::Stream>,
36{
37    /// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
38    runtime: R,
39    /// The transport object that we use to construct streams.
40    transport: H,
41    /// Object to build TLS connections.
42    tls_connector: <R as TlsProvider<H::Stream>>::Connector,
43}
44
45impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
46where
47    R: TlsProvider<H::Stream>,
48{
49    /// Construct a new ChanBuilder.
50    pub fn new(runtime: R, transport: H) -> Self {
51        let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
52        ChanBuilder {
53            runtime,
54            transport,
55            tls_connector,
56        }
57    }
58}
59#[async_trait]
60impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
61where
62    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
63    H: Send + Sync,
64{
65    async fn connect_via_transport(
66        &self,
67        target: &OwnedChanTarget,
68        reporter: BootstrapReporter,
69        memquota: ChannelAccount,
70    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
71        use tor_rtcompat::SleepProviderExt;
72
73        // TODO: make this an option.  And make a better value.
74        let delay = if target.chan_method().is_direct() {
75            std::time::Duration::new(5, 0)
76        } else {
77            std::time::Duration::new(10, 0)
78        };
79
80        let connect_future = self.connect_no_timeout(target, reporter.0, memquota);
81        self.runtime
82            .timeout(delay, connect_future)
83            .await
84            .map_err(|_| Error::ChanTimeout {
85                peer: target.to_logged(),
86            })?
87    }
88}
89
90#[async_trait]
91impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
92where
93    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
94    H: Send + Sync,
95{
96    type Stream = H::Stream;
97
98    #[cfg(feature = "relay")]
99    async fn accept_from_transport(
100        &self,
101        peer: std::net::SocketAddr,
102        stream: Self::Stream,
103        _memquota: ChannelAccount,
104    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
105        let map_ioe = |ioe, action| Error::Io {
106            action,
107            peer: Some(BridgeAddr::new_addr_from_sockaddr(peer).into()),
108            source: ioe,
109        };
110
111        let _tls = self
112            .tls_connector
113            .negotiate_unvalidated(stream, "ignored")
114            .await
115            .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
116
117        // TODO RELAY: do handshake and build channel
118        todo!();
119    }
120}
121
122impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
123where
124    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
125    H: Send + Sync,
126{
127    /// Perform the work of `connect_via_transport`, but without enforcing a timeout.
128    async fn connect_no_timeout(
129        &self,
130        target: &OwnedChanTarget,
131        event_sender: Arc<Mutex<ChanMgrEventSender>>,
132        memquota: ChannelAccount,
133    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
134        use tor_proto::channel::ChannelBuilder;
135        use tor_rtcompat::tls::CertifiedConn;
136
137        {
138            event_sender.lock().expect("Lock poisoned").record_attempt();
139        }
140
141        // 1a. Negotiate the TCP connection or other stream.
142
143        let (using_target, stream) = self.transport.connect(target).await?;
144        let using_method = using_target.chan_method();
145        let peer = using_target.chan_method().target_addr();
146        let peer_ref = &peer;
147
148        let map_ioe = |action: &'static str| {
149            let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| {
150                let peer: Option<BridgeAddr> = peer.clone().into();
151                peer
152            });
153            move |ioe: io::Error| Error::Io {
154                action,
155                peer: peer.map(Into::into),
156                source: ioe.into(),
157            }
158        };
159
160        {
161            // TODO(nickm): At some point, it would be helpful to the
162            // bootstrapping logic if we could distinguish which
163            // transport just succeeded.
164            event_sender
165                .lock()
166                .expect("Lock poisoned")
167                .record_tcp_success();
168        }
169
170        // 1b. Negotiate TLS.
171
172        // TODO: add a random hostname here if it will be used for SNI?
173        let tls = self
174            .tls_connector
175            .negotiate_unvalidated(stream, "ignored")
176            .await
177            .map_err(map_ioe("TLS negotiation"))?;
178
179        let peer_cert = tls
180            .peer_certificate()
181            .map_err(map_ioe("TLS certs"))?
182            .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
183
184        {
185            event_sender
186                .lock()
187                .expect("Lock poisoned")
188                .record_tls_finished();
189        }
190
191        // 2. Set up the channel.
192        let mut builder = ChannelBuilder::new();
193        builder.set_declared_method(using_method);
194        let chan = builder
195            .launch(
196                tls,
197                self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
198                memquota,
199            )
200            .connect(|| self.runtime.wallclock())
201            .await
202            .map_err(|e| Error::from_proto_no_skew(e, &using_target))?;
203        let clock_skew = Some(chan.clock_skew()); // Not yet authenticated; can't use it till `check` is done.
204        let now = self.runtime.wallclock();
205        let chan = chan
206            .check(target, &peer_cert, Some(now))
207            .map_err(|source| match &source {
208                tor_proto::Error::HandshakeCertsExpired { .. } => {
209                    event_sender
210                        .lock()
211                        .expect("Lock poisoned")
212                        .record_handshake_done_with_skewed_clock();
213                    Error::Proto {
214                        source,
215                        peer: using_target.to_logged(),
216                        clock_skew,
217                    }
218                }
219                _ => Error::from_proto_no_skew(source, &using_target),
220            })?;
221        let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto {
222            source,
223            peer: target.to_logged(),
224            clock_skew,
225        })?;
226
227        {
228            event_sender
229                .lock()
230                .expect("Lock poisoned")
231                .record_handshake_done();
232        }
233
234        // 3. Launch a task to run the channel reactor.
235        self.runtime
236            .spawn(async {
237                let _ = reactor.run().await;
238            })
239            .map_err(|e| Error::from_spawn("channel reactor", e))?;
240        Ok(chan)
241    }
242}
243
244impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
245    fn is_usable(&self) -> bool {
246        !self.is_closing()
247    }
248    fn duration_unused(&self) -> Option<Duration> {
249        self.duration_unused()
250    }
251    fn reparameterize(
252        &self,
253        updates: Arc<ChannelPaddingInstructionsUpdates>,
254    ) -> tor_proto::Result<()> {
255        tor_proto::channel::Channel::reparameterize(self, updates)
256    }
257    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
258        tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
259    }
260    fn engage_padding_activities(&self) {
261        tor_proto::channel::Channel::engage_padding_activities(self);
262    }
263}
264
265#[cfg(test)]
266mod test {
267    // @@ begin test lint list maintained by maint/add_warning @@
268    #![allow(clippy::bool_assert_comparison)]
269    #![allow(clippy::clone_on_copy)]
270    #![allow(clippy::dbg_macro)]
271    #![allow(clippy::mixed_attributes_style)]
272    #![allow(clippy::print_stderr)]
273    #![allow(clippy::print_stdout)]
274    #![allow(clippy::single_char_pattern)]
275    #![allow(clippy::unwrap_used)]
276    #![allow(clippy::unchecked_duration_subtraction)]
277    #![allow(clippy::useless_vec)]
278    #![allow(clippy::needless_pass_by_value)]
279    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
280    use super::*;
281    use crate::{
282        mgr::{AbstractChannel, AbstractChannelFactory},
283        Result,
284    };
285    use futures::StreamExt as _;
286    use std::net::SocketAddr;
287    use std::time::{Duration, SystemTime};
288    use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
289    use tor_llcrypto::pk::ed25519::Ed25519Identity;
290    use tor_llcrypto::pk::rsa::RsaIdentity;
291    use tor_proto::channel::Channel;
292    use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
293    use tor_rtcompat::{test_with_one_runtime, NetStreamListener};
294    use tor_rtmock::{io::LocalStream, net::MockNetwork};
295
296    #[allow(deprecated)] // TODO #1885
297    use tor_rtmock::MockSleepRuntime;
298
299    // Make sure that the builder can build a real channel.  To test
300    // this out, we set up a listener that pretends to have the right
301    // IP, fake the current time, and use a canned response from
302    // [`testing::msgs`] crate.
303    #[test]
304    fn build_ok() -> Result<()> {
305        use crate::testing::msgs;
306        let orport: SocketAddr = msgs::ADDR.parse().unwrap();
307        let ed: Ed25519Identity = msgs::ED_ID.into();
308        let rsa: RsaIdentity = msgs::RSA_ID.into();
309        let client_addr = "192.0.2.17".parse().unwrap();
310        let tls_cert = msgs::X509_CERT.into();
311        let target = OwnedChanTarget::builder()
312            .addrs(vec![orport])
313            .method(ChannelMethod::Direct(vec![orport]))
314            .ed_identity(ed)
315            .rsa_identity(rsa)
316            .build()
317            .unwrap();
318        let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
319
320        test_with_one_runtime!(|rt| async move {
321            // Stub out the internet so that this connection can work.
322            let network = MockNetwork::new();
323
324            // Set up a client runtime with a given IP
325            let client_rt = network
326                .builder()
327                .add_address(client_addr)
328                .runtime(rt.clone());
329            // Mock the current time too
330            #[allow(deprecated)] // TODO #1885
331            let client_rt = MockSleepRuntime::new(client_rt);
332
333            // Set up a relay runtime with a different IP
334            let relay_rt = network
335                .builder()
336                .add_address(orport.ip())
337                .runtime(rt.clone());
338
339            // open a fake TLS listener and be ready to handle a request.
340            let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
341
342            // Tell the client to believe in a different timestamp.
343            client_rt.jump_to(now);
344
345            // Create the channel builder that we want to test.
346            let transport = crate::transport::DefaultTransport::new(client_rt.clone());
347            let builder = ChanBuilder::new(client_rt, transport);
348
349            let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
350                async {
351                    // client-side: build a channel!
352                    builder
353                        .build_channel(
354                            &target,
355                            BootstrapReporter::fake(),
356                            ChannelAccount::new_noop(),
357                        )
358                        .await
359                },
360                async {
361                    // relay-side: accept the channel
362                    // (and pretend to know what we're doing).
363                    let (mut con, addr) = lis
364                        .incoming()
365                        .next()
366                        .await
367                        .expect("Closed?")
368                        .expect("accept failed");
369                    assert_eq!(client_addr, addr.ip());
370                    crate::testing::answer_channel_req(&mut con)
371                        .await
372                        .expect("answer failed");
373                    Ok(con)
374                }
375            );
376
377            let chan = r1.unwrap();
378            assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
379            assert!(chan.is_usable());
380            // In theory, time could pass here, so we can't just use
381            // "assert_eq!(dur_unused, dur_unused2)".
382            let dur_unused = Channel::duration_unused(&chan);
383            let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
384            let dur_unused_3 = Channel::duration_unused(&chan);
385            assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
386            assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
387
388            r2.unwrap();
389            Ok(())
390        })
391    }
392
393    // TODO: Write tests for timeout logic, once there is smarter logic.
394}