1use std::io;
4use std::sync::{Arc, Mutex};
5
6use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
7use crate::transport::TransportImplHelper;
8use crate::{Error, event::ChanMgrEventSender};
9
10use async_trait::async_trait;
11use std::time::Duration;
12use tor_basic_utils::rand_hostname;
13use tor_error::internal;
14use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
15use tor_proto::channel::ChannelType;
16use tor_proto::channel::kist::KistParams;
17use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
18use tor_proto::memquota::ChannelAccount;
19use tor_rtcompat::SpawnExt;
20use tor_rtcompat::{Runtime, TlsProvider, tls::TlsConnector};
21use tracing::instrument;
22
23#[cfg(feature = "relay")]
24use {safelog::Sensitive, tor_proto::RelayIdentities};
25
26pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
39where
40 R: tor_rtcompat::TlsProvider<H::Stream>,
41{
42 runtime: R,
44 transport: H,
46 tls_connector: <R as TlsProvider<H::Stream>>::Connector,
48 #[cfg(feature = "relay")]
50 identities: Option<Arc<RelayIdentities>>,
51}
52
53impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
54where
55 R: TlsProvider<H::Stream>,
56{
57 pub fn new(runtime: R, transport: H) -> Self {
59 let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
60 ChanBuilder {
61 runtime,
62 transport,
63 tls_connector,
64 #[cfg(feature = "relay")]
65 identities: None,
66 }
67 }
68
69 #[cfg(feature = "relay")]
71 pub fn with_identities(mut self, ids: Arc<RelayIdentities>) -> Self {
72 self.identities = Some(ids);
73 self
74 }
75
76 fn outbound_chan_type(&self) -> ChannelType {
84 #[cfg(feature = "relay")]
85 if self.identities.is_some() {
86 return ChannelType::RelayInitiator;
87 }
88 ChannelType::ClientInitiator
90 }
91}
92
93#[async_trait]
94impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
95where
96 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
97 H: Send + Sync,
98{
99 #[instrument(skip_all, level = "trace")]
100 async fn connect_via_transport(
101 &self,
102 target: &OwnedChanTarget,
103 reporter: BootstrapReporter,
104 memquota: ChannelAccount,
105 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
106 use tor_rtcompat::SleepProviderExt;
107
108 let delay = if target.chan_method().is_direct() {
110 std::time::Duration::new(5, 0)
111 } else {
112 std::time::Duration::new(10, 0)
113 };
114
115 self.runtime
116 .timeout(delay, self.connect_no_timeout(target, reporter.0, memquota))
117 .await
118 .map_err(|_| Error::ChanTimeout {
119 peer: target.to_logged(),
120 })?
121 }
122}
123
124#[async_trait]
125impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
126where
127 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
128 H: Send + Sync,
129{
130 type Stream = H::Stream;
131
132 #[cfg(feature = "relay")]
133 async fn accept_from_transport(
134 &self,
135 peer: Sensitive<std::net::SocketAddr>,
136 stream: Self::Stream,
137 _memquota: ChannelAccount,
138 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
139 let map_ioe = |ioe, action| Error::Io {
140 action,
141 peer: Some(BridgeAddr::new_addr_from_sockaddr(peer.into_inner()).into()),
142 source: ioe,
143 };
144
145 let _tls = self
146 .tls_connector
147 .negotiate_unvalidated(stream, "ignored")
148 .await
149 .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
150
151 todo!();
153 }
154}
155
156impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
157where
158 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
159 H: Send + Sync,
160{
161 #[instrument(skip_all, level = "trace")]
165 async fn connect_no_timeout(
166 &self,
167 target: &OwnedChanTarget,
168 event_sender: Arc<Mutex<ChanMgrEventSender>>,
169 memquota: ChannelAccount,
170 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
171 use tor_rtcompat::tls::CertifiedConn;
172
173 {
174 event_sender.lock().expect("Lock poisoned").record_attempt();
175 }
176
177 let (using_target, stream) = self.transport.connect(target).await?;
180 let using_method = using_target.chan_method();
181 let peer = using_method.target_addr();
182 let peer_ref = &peer;
183
184 let map_ioe = |action: &'static str| {
185 let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| {
186 let peer: Option<BridgeAddr> = peer.clone().into();
187 peer
188 });
189 move |ioe: io::Error| Error::Io {
190 action,
191 peer: peer.map(Into::into),
192 source: ioe.into(),
193 }
194 };
195
196 {
197 event_sender
201 .lock()
202 .expect("Lock poisoned")
203 .record_tcp_success();
204 }
205
206 let hostname = rand_hostname::random_hostname(&mut rand::rng());
209
210 let tls = self
211 .tls_connector
212 .negotiate_unvalidated(stream, hostname.as_str())
213 .await
214 .map_err(map_ioe("TLS negotiation"))?;
215
216 let peer_cert = tls
217 .peer_certificate()
218 .map_err(map_ioe("TLS certs"))?
219 .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
220
221 {
222 event_sender
223 .lock()
224 .expect("Lock poisoned")
225 .record_tls_finished();
226 }
227
228 let outbound_chan_type = self.outbound_chan_type();
230 let chan = match outbound_chan_type {
231 ChannelType::ClientInitiator => {
232 let mut builder = tor_proto::ClientChannelBuilder::new();
234 builder.set_declared_method(target.chan_method());
235
236 builder
237 .launch(
238 tls,
239 self.runtime.clone(), memquota,
241 )
242 .connect(|| self.runtime.wallclock())
243 .await
244 .map_err(|e| Error::from_proto_no_skew(e, target))?
245 }
246 #[cfg(feature = "relay")]
247 ChannelType::RelayInitiator => {
248 let builder = tor_proto::RelayChannelBuilder::new();
249 let identities = self
250 .identities
251 .as_ref()
252 .ok_or(internal!(
253 "Unable to build relay channel without identities"
254 ))?
255 .clone();
256
257 builder
258 .launch(
259 tls,
260 self.runtime.clone(), identities,
262 memquota,
263 )
264 .connect(|| self.runtime.wallclock())
265 .await
266 .map_err(|e| Error::from_proto_no_skew(e, &using_target))?
267 }
268 _ => {
269 return Err(Error::Internal(internal!(
270 "Unusable channel type for outbound: {outbound_chan_type}",
271 )));
272 }
273 };
274
275 let clock_skew = Some(chan.clock_skew()); let now = self.runtime.wallclock();
277 let chan = chan
278 .check(target, &peer_cert, Some(now))
279 .map_err(|source| match &source {
280 tor_proto::Error::HandshakeCertsExpired { .. } => {
281 event_sender
282 .lock()
283 .expect("Lock poisoned")
284 .record_handshake_done_with_skewed_clock();
285 Error::Proto {
286 source,
287 peer: using_target.to_logged(),
288 clock_skew,
289 }
290 }
291 _ => Error::from_proto_no_skew(source, &using_target),
292 })?;
293 let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto {
294 source,
295 peer: target.to_logged(),
296 clock_skew,
297 })?;
298
299 {
300 event_sender
301 .lock()
302 .expect("Lock poisoned")
303 .record_handshake_done();
304 }
305
306 self.runtime
308 .spawn(async {
309 let _ = reactor.run().await;
310 })
311 .map_err(|e| Error::from_spawn("channel reactor", e))?;
312
313 Ok(chan)
314 }
315}
316
317impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
318 fn is_usable(&self) -> bool {
319 !self.is_closing()
320 }
321 fn duration_unused(&self) -> Option<Duration> {
322 self.duration_unused()
323 }
324 fn reparameterize(
325 &self,
326 updates: Arc<ChannelPaddingInstructionsUpdates>,
327 ) -> tor_proto::Result<()> {
328 tor_proto::channel::Channel::reparameterize(self, updates)
329 }
330 fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
331 tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
332 }
333 fn engage_padding_activities(&self) {
334 tor_proto::channel::Channel::engage_padding_activities(self);
335 }
336}
337
338#[cfg(test)]
339mod test {
340 #![allow(clippy::bool_assert_comparison)]
342 #![allow(clippy::clone_on_copy)]
343 #![allow(clippy::dbg_macro)]
344 #![allow(clippy::mixed_attributes_style)]
345 #![allow(clippy::print_stderr)]
346 #![allow(clippy::print_stdout)]
347 #![allow(clippy::single_char_pattern)]
348 #![allow(clippy::unwrap_used)]
349 #![allow(clippy::unchecked_time_subtraction)]
350 #![allow(clippy::useless_vec)]
351 #![allow(clippy::needless_pass_by_value)]
352 use super::*;
354 use crate::{
355 Result,
356 mgr::{AbstractChannel, AbstractChannelFactory},
357 };
358 use futures::StreamExt as _;
359 use std::net::SocketAddr;
360 use std::time::{Duration, SystemTime};
361 use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
362 use tor_llcrypto::pk::ed25519::Ed25519Identity;
363 use tor_llcrypto::pk::rsa::RsaIdentity;
364 use tor_proto::channel::Channel;
365 use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
366 use tor_rtcompat::{NetStreamListener, test_with_one_runtime};
367 use tor_rtmock::{io::LocalStream, net::MockNetwork};
368
369 #[allow(deprecated)] use tor_rtmock::MockSleepRuntime;
371
372 #[test]
377 fn build_ok() -> Result<()> {
378 use crate::testing::msgs;
379 let orport: SocketAddr = msgs::ADDR.parse().unwrap();
380 let ed: Ed25519Identity = msgs::ED_ID.into();
381 let rsa: RsaIdentity = msgs::RSA_ID.into();
382 let client_addr = "192.0.2.17".parse().unwrap();
383 let tls_cert = msgs::X509_CERT.into();
384 let target = OwnedChanTarget::builder()
385 .addrs(vec![orport])
386 .method(ChannelMethod::Direct(vec![orport]))
387 .ed_identity(ed)
388 .rsa_identity(rsa)
389 .build()
390 .unwrap();
391 let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
392
393 test_with_one_runtime!(|rt| async move {
394 let network = MockNetwork::new();
396
397 let client_rt = network
399 .builder()
400 .add_address(client_addr)
401 .runtime(rt.clone());
402 #[allow(deprecated)] let client_rt = MockSleepRuntime::new(client_rt);
405
406 let relay_rt = network
408 .builder()
409 .add_address(orport.ip())
410 .runtime(rt.clone());
411
412 let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
414
415 client_rt.jump_to(now);
417
418 let transport = crate::transport::DefaultTransport::new(client_rt.clone());
420 let builder = ChanBuilder::new(client_rt, transport);
421
422 let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
423 async {
424 builder
426 .build_channel(
427 &target,
428 BootstrapReporter::fake(),
429 ChannelAccount::new_noop(),
430 )
431 .await
432 },
433 async {
434 let (mut con, addr) = lis
437 .incoming()
438 .next()
439 .await
440 .expect("Closed?")
441 .expect("accept failed");
442 assert_eq!(client_addr, addr.ip());
443 crate::testing::answer_channel_req(&mut con)
444 .await
445 .expect("answer failed");
446 Ok(con)
447 }
448 );
449
450 let chan = r1.unwrap();
451 assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
452 assert!(chan.is_usable());
453 let dur_unused = Channel::duration_unused(&chan);
456 let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
457 let dur_unused_3 = Channel::duration_unused(&chan);
458 assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
459 assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
460
461 r2.unwrap();
462 Ok(())
463 })
464 }
465
466 }