tor_chanmgr/
builder.rs

1//! Implement a concrete type to build channels over a transport.
2
3use std::io;
4use std::sync::{Arc, Mutex};
5
6use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
7use crate::transport::TransportImplHelper;
8use crate::{Error, event::ChanMgrEventSender};
9
10use async_trait::async_trait;
11use std::time::Duration;
12use tor_basic_utils::rand_hostname;
13use tor_error::internal;
14use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
15use tor_proto::channel::ChannelType;
16use tor_proto::channel::kist::KistParams;
17use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
18use tor_proto::memquota::ChannelAccount;
19use tor_rtcompat::SpawnExt;
20use tor_rtcompat::{Runtime, TlsProvider, tls::TlsConnector};
21use tracing::instrument;
22
23#[cfg(feature = "relay")]
24use {safelog::Sensitive, tor_proto::RelayIdentities};
25
26/// TLS-based channel builder.
27///
28/// This is a separate type so that we can keep our channel management code
29/// network-agnostic.
30///
31/// It uses a provided `TransportHelper` type to make a connection (possibly
32/// directly over TCP, and possibly over some other protocol).  It then
33/// negotiates TLS over that connection, and negotiates a Tor channel over that
34/// TLS session.
35///
36/// This channel builder does not retry on failure, but it _does_ implement a
37/// time-out.
38pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
39where
40    R: tor_rtcompat::TlsProvider<H::Stream>,
41{
42    /// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
43    runtime: R,
44    /// The transport object that we use to construct streams.
45    transport: H,
46    /// Object to build TLS connections.
47    tls_connector: <R as TlsProvider<H::Stream>>::Connector,
48    /// Relay identities needed for relay channels.
49    #[cfg(feature = "relay")]
50    identities: Option<Arc<RelayIdentities>>,
51}
52
53impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
54where
55    R: TlsProvider<H::Stream>,
56{
57    /// Construct a new ChanBuilder.
58    pub fn new(runtime: R, transport: H) -> Self {
59        let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
60        ChanBuilder {
61            runtime,
62            transport,
63            tls_connector,
64            #[cfg(feature = "relay")]
65            identities: None,
66        }
67    }
68
69    /// Set the relay identities and return itself.
70    #[cfg(feature = "relay")]
71    pub fn with_identities(mut self, ids: Arc<RelayIdentities>) -> Self {
72        self.identities = Some(ids);
73        self
74    }
75
76    /// Return the outbound channel type of this config.
77    ///
78    /// The channel type is used when creating outbound channels. Relays always initiate channels
79    /// as "relay initiator" while client and bridges behave like a "client initiator".
80    ///
81    /// Important: The wrong channel type is returned if this is called before `with_identities()`
82    /// is called.
83    fn outbound_chan_type(&self) -> ChannelType {
84        #[cfg(feature = "relay")]
85        if self.identities.is_some() {
86            return ChannelType::RelayInitiator;
87        }
88        // No relay built in, always client.
89        ChannelType::ClientInitiator
90    }
91}
92
93#[async_trait]
94impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
95where
96    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
97    H: Send + Sync,
98{
99    #[instrument(skip_all, level = "trace")]
100    async fn connect_via_transport(
101        &self,
102        target: &OwnedChanTarget,
103        reporter: BootstrapReporter,
104        memquota: ChannelAccount,
105    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
106        use tor_rtcompat::SleepProviderExt;
107
108        // TODO: make this an option.  And make a better value.
109        let delay = if target.chan_method().is_direct() {
110            std::time::Duration::new(5, 0)
111        } else {
112            std::time::Duration::new(10, 0)
113        };
114
115        self.runtime
116            .timeout(delay, self.connect_no_timeout(target, reporter.0, memquota))
117            .await
118            .map_err(|_| Error::ChanTimeout {
119                peer: target.to_logged(),
120            })?
121    }
122}
123
124#[async_trait]
125impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
126where
127    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
128    H: Send + Sync,
129{
130    type Stream = H::Stream;
131
132    #[cfg(feature = "relay")]
133    async fn accept_from_transport(
134        &self,
135        peer: Sensitive<std::net::SocketAddr>,
136        stream: Self::Stream,
137        _memquota: ChannelAccount,
138    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
139        let map_ioe = |ioe, action| Error::Io {
140            action,
141            peer: Some(BridgeAddr::new_addr_from_sockaddr(peer.into_inner()).into()),
142            source: ioe,
143        };
144
145        let _tls = self
146            .tls_connector
147            .negotiate_unvalidated(stream, "ignored")
148            .await
149            .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
150
151        // TODO RELAY: do handshake and build channel
152        todo!();
153    }
154}
155
156impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
157where
158    R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
159    H: Send + Sync,
160{
161    /// Perform the work of `connect_via_transport`, but without enforcing a timeout.
162    ///
163    /// Return a [`Channel`](tor_proto::channel::Channel) on success.
164    #[instrument(skip_all, level = "trace")]
165    async fn connect_no_timeout(
166        &self,
167        target: &OwnedChanTarget,
168        event_sender: Arc<Mutex<ChanMgrEventSender>>,
169        memquota: ChannelAccount,
170    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
171        use tor_rtcompat::tls::CertifiedConn;
172
173        {
174            event_sender.lock().expect("Lock poisoned").record_attempt();
175        }
176
177        // 1a. Negotiate the TCP connection or other stream.
178
179        let (using_target, stream) = self.transport.connect(target).await?;
180        let using_method = using_target.chan_method();
181        let peer = using_method.target_addr();
182        let peer_ref = &peer;
183
184        let map_ioe = |action: &'static str| {
185            let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| {
186                let peer: Option<BridgeAddr> = peer.clone().into();
187                peer
188            });
189            move |ioe: io::Error| Error::Io {
190                action,
191                peer: peer.map(Into::into),
192                source: ioe.into(),
193            }
194        };
195
196        {
197            // TODO(nickm): At some point, it would be helpful to the
198            // bootstrapping logic if we could distinguish which
199            // transport just succeeded.
200            event_sender
201                .lock()
202                .expect("Lock poisoned")
203                .record_tcp_success();
204        }
205
206        // 1b. Negotiate TLS.
207
208        let hostname = rand_hostname::random_hostname(&mut rand::rng());
209
210        let tls = self
211            .tls_connector
212            .negotiate_unvalidated(stream, hostname.as_str())
213            .await
214            .map_err(map_ioe("TLS negotiation"))?;
215
216        let peer_cert = tls
217            .peer_certificate()
218            .map_err(map_ioe("TLS certs"))?
219            .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
220
221        {
222            event_sender
223                .lock()
224                .expect("Lock poisoned")
225                .record_tls_finished();
226        }
227
228        // Store this so we can log it in case we don't recognize it.
229        let outbound_chan_type = self.outbound_chan_type();
230        let chan = match outbound_chan_type {
231            ChannelType::ClientInitiator => {
232                // Get the client specific channel builder.
233                let mut builder = tor_proto::ClientChannelBuilder::new();
234                builder.set_declared_method(target.chan_method());
235
236                builder
237                    .launch(
238                        tls,
239                        self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
240                        memquota,
241                    )
242                    .connect(|| self.runtime.wallclock())
243                    .await
244                    .map_err(|e| Error::from_proto_no_skew(e, target))?
245            }
246            #[cfg(feature = "relay")]
247            ChannelType::RelayInitiator => {
248                let builder = tor_proto::RelayChannelBuilder::new();
249                let identities = self
250                    .identities
251                    .as_ref()
252                    .ok_or(internal!(
253                        "Unable to build relay channel without identities"
254                    ))?
255                    .clone();
256
257                builder
258                    .launch(
259                        tls,
260                        self.runtime.clone(), /* TODO provide ZST SleepProvider instead */
261                        identities,
262                        memquota,
263                    )
264                    .connect(|| self.runtime.wallclock())
265                    .await
266                    .map_err(|e| Error::from_proto_no_skew(e, &using_target))?
267            }
268            _ => {
269                return Err(Error::Internal(internal!(
270                    "Unusable channel type for outbound: {outbound_chan_type}",
271                )));
272            }
273        };
274
275        let clock_skew = Some(chan.clock_skew()); // Not yet authenticated; can't use it till `check` is done.
276        let now = self.runtime.wallclock();
277        let chan = chan
278            .check(target, &peer_cert, Some(now))
279            .map_err(|source| match &source {
280                tor_proto::Error::HandshakeCertsExpired { .. } => {
281                    event_sender
282                        .lock()
283                        .expect("Lock poisoned")
284                        .record_handshake_done_with_skewed_clock();
285                    Error::Proto {
286                        source,
287                        peer: using_target.to_logged(),
288                        clock_skew,
289                    }
290                }
291                _ => Error::from_proto_no_skew(source, &using_target),
292            })?;
293        let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto {
294            source,
295            peer: target.to_logged(),
296            clock_skew,
297        })?;
298
299        {
300            event_sender
301                .lock()
302                .expect("Lock poisoned")
303                .record_handshake_done();
304        }
305
306        // 3. Launch a task to run the channel reactor.
307        self.runtime
308            .spawn(async {
309                let _ = reactor.run().await;
310            })
311            .map_err(|e| Error::from_spawn("channel reactor", e))?;
312
313        Ok(chan)
314    }
315}
316
317impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
318    fn is_usable(&self) -> bool {
319        !self.is_closing()
320    }
321    fn duration_unused(&self) -> Option<Duration> {
322        self.duration_unused()
323    }
324    fn reparameterize(
325        &self,
326        updates: Arc<ChannelPaddingInstructionsUpdates>,
327    ) -> tor_proto::Result<()> {
328        tor_proto::channel::Channel::reparameterize(self, updates)
329    }
330    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
331        tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
332    }
333    fn engage_padding_activities(&self) {
334        tor_proto::channel::Channel::engage_padding_activities(self);
335    }
336}
337
338#[cfg(test)]
339mod test {
340    // @@ begin test lint list maintained by maint/add_warning @@
341    #![allow(clippy::bool_assert_comparison)]
342    #![allow(clippy::clone_on_copy)]
343    #![allow(clippy::dbg_macro)]
344    #![allow(clippy::mixed_attributes_style)]
345    #![allow(clippy::print_stderr)]
346    #![allow(clippy::print_stdout)]
347    #![allow(clippy::single_char_pattern)]
348    #![allow(clippy::unwrap_used)]
349    #![allow(clippy::unchecked_time_subtraction)]
350    #![allow(clippy::useless_vec)]
351    #![allow(clippy::needless_pass_by_value)]
352    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
353    use super::*;
354    use crate::{
355        Result,
356        mgr::{AbstractChannel, AbstractChannelFactory},
357    };
358    use futures::StreamExt as _;
359    use std::net::SocketAddr;
360    use std::time::{Duration, SystemTime};
361    use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
362    use tor_llcrypto::pk::ed25519::Ed25519Identity;
363    use tor_llcrypto::pk::rsa::RsaIdentity;
364    use tor_proto::channel::Channel;
365    use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
366    use tor_rtcompat::{NetStreamListener, test_with_one_runtime};
367    use tor_rtmock::{io::LocalStream, net::MockNetwork};
368
369    #[allow(deprecated)] // TODO #1885
370    use tor_rtmock::MockSleepRuntime;
371
372    // Make sure that the builder can build a real channel.  To test
373    // this out, we set up a listener that pretends to have the right
374    // IP, fake the current time, and use a canned response from
375    // [`testing::msgs`] crate.
376    #[test]
377    fn build_ok() -> Result<()> {
378        use crate::testing::msgs;
379        let orport: SocketAddr = msgs::ADDR.parse().unwrap();
380        let ed: Ed25519Identity = msgs::ED_ID.into();
381        let rsa: RsaIdentity = msgs::RSA_ID.into();
382        let client_addr = "192.0.2.17".parse().unwrap();
383        let tls_cert = msgs::X509_CERT.into();
384        let target = OwnedChanTarget::builder()
385            .addrs(vec![orport])
386            .method(ChannelMethod::Direct(vec![orport]))
387            .ed_identity(ed)
388            .rsa_identity(rsa)
389            .build()
390            .unwrap();
391        let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
392
393        test_with_one_runtime!(|rt| async move {
394            // Stub out the internet so that this connection can work.
395            let network = MockNetwork::new();
396
397            // Set up a client runtime with a given IP
398            let client_rt = network
399                .builder()
400                .add_address(client_addr)
401                .runtime(rt.clone());
402            // Mock the current time too
403            #[allow(deprecated)] // TODO #1885
404            let client_rt = MockSleepRuntime::new(client_rt);
405
406            // Set up a relay runtime with a different IP
407            let relay_rt = network
408                .builder()
409                .add_address(orport.ip())
410                .runtime(rt.clone());
411
412            // open a fake TLS listener and be ready to handle a request.
413            let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
414
415            // Tell the client to believe in a different timestamp.
416            client_rt.jump_to(now);
417
418            // Create the channel builder that we want to test.
419            let transport = crate::transport::DefaultTransport::new(client_rt.clone());
420            let builder = ChanBuilder::new(client_rt, transport);
421
422            let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
423                async {
424                    // client-side: build a channel!
425                    builder
426                        .build_channel(
427                            &target,
428                            BootstrapReporter::fake(),
429                            ChannelAccount::new_noop(),
430                        )
431                        .await
432                },
433                async {
434                    // relay-side: accept the channel
435                    // (and pretend to know what we're doing).
436                    let (mut con, addr) = lis
437                        .incoming()
438                        .next()
439                        .await
440                        .expect("Closed?")
441                        .expect("accept failed");
442                    assert_eq!(client_addr, addr.ip());
443                    crate::testing::answer_channel_req(&mut con)
444                        .await
445                        .expect("answer failed");
446                    Ok(con)
447                }
448            );
449
450            let chan = r1.unwrap();
451            assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
452            assert!(chan.is_usable());
453            // In theory, time could pass here, so we can't just use
454            // "assert_eq!(dur_unused, dur_unused2)".
455            let dur_unused = Channel::duration_unused(&chan);
456            let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
457            let dur_unused_3 = Channel::duration_unused(&chan);
458            assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
459            assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
460
461            r2.unwrap();
462            Ok(())
463        })
464    }
465
466    // TODO: Write tests for timeout logic, once there is smarter logic.
467}