1use std::io;
4use std::sync::{Arc, Mutex};
5
6use crate::factory::{BootstrapReporter, ChannelFactory, IncomingChannelFactory};
7use crate::transport::TransportImplHelper;
8use crate::{event::ChanMgrEventSender, Error};
9
10use std::time::Duration;
11use tor_error::internal;
12use tor_linkspec::{BridgeAddr, HasChanMethod, IntoOwnedChanTarget, OwnedChanTarget};
13use tor_proto::channel::kist::KistParams;
14use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
15use tor_proto::memquota::ChannelAccount;
16use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
17
18use async_trait::async_trait;
19use futures::task::SpawnExt;
20
21pub struct ChanBuilder<R: Runtime, H: TransportImplHelper>
34where
35 R: tor_rtcompat::TlsProvider<H::Stream>,
36{
37 runtime: R,
39 transport: H,
41 tls_connector: <R as TlsProvider<H::Stream>>::Connector,
43}
44
45impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
46where
47 R: TlsProvider<H::Stream>,
48{
49 pub fn new(runtime: R, transport: H) -> Self {
51 let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
52 ChanBuilder {
53 runtime,
54 transport,
55 tls_connector,
56 }
57 }
58}
59#[async_trait]
60impl<R: Runtime, H: TransportImplHelper> ChannelFactory for ChanBuilder<R, H>
61where
62 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
63 H: Send + Sync,
64{
65 async fn connect_via_transport(
66 &self,
67 target: &OwnedChanTarget,
68 reporter: BootstrapReporter,
69 memquota: ChannelAccount,
70 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
71 use tor_rtcompat::SleepProviderExt;
72
73 let delay = if target.chan_method().is_direct() {
75 std::time::Duration::new(5, 0)
76 } else {
77 std::time::Duration::new(10, 0)
78 };
79
80 let connect_future = self.connect_no_timeout(target, reporter.0, memquota);
81 self.runtime
82 .timeout(delay, connect_future)
83 .await
84 .map_err(|_| Error::ChanTimeout {
85 peer: target.to_logged(),
86 })?
87 }
88}
89
90#[async_trait]
91impl<R: Runtime, H: TransportImplHelper> IncomingChannelFactory for ChanBuilder<R, H>
92where
93 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
94 H: Send + Sync,
95{
96 type Stream = H::Stream;
97
98 #[cfg(feature = "relay")]
99 async fn accept_from_transport(
100 &self,
101 peer: std::net::SocketAddr,
102 stream: Self::Stream,
103 _memquota: ChannelAccount,
104 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
105 let map_ioe = |ioe, action| Error::Io {
106 action,
107 peer: Some(BridgeAddr::new_addr_from_sockaddr(peer).into()),
108 source: ioe,
109 };
110
111 let _tls = self
112 .tls_connector
113 .negotiate_unvalidated(stream, "ignored")
114 .await
115 .map_err(|e| map_ioe(e.into(), "TLS negotiation"))?;
116
117 todo!();
119 }
120}
121
122impl<R: Runtime, H: TransportImplHelper> ChanBuilder<R, H>
123where
124 R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
125 H: Send + Sync,
126{
127 async fn connect_no_timeout(
129 &self,
130 target: &OwnedChanTarget,
131 event_sender: Arc<Mutex<ChanMgrEventSender>>,
132 memquota: ChannelAccount,
133 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
134 use tor_proto::channel::ChannelBuilder;
135 use tor_rtcompat::tls::CertifiedConn;
136
137 {
138 event_sender.lock().expect("Lock poisoned").record_attempt();
139 }
140
141 let (using_target, stream) = self.transport.connect(target).await?;
144 let using_method = using_target.chan_method();
145 let peer = using_target.chan_method().target_addr();
146 let peer_ref = &peer;
147
148 let map_ioe = |action: &'static str| {
149 let peer: Option<BridgeAddr> = peer_ref.as_ref().and_then(|peer| {
150 let peer: Option<BridgeAddr> = peer.clone().into();
151 peer
152 });
153 move |ioe: io::Error| Error::Io {
154 action,
155 peer: peer.map(Into::into),
156 source: ioe.into(),
157 }
158 };
159
160 {
161 event_sender
165 .lock()
166 .expect("Lock poisoned")
167 .record_tcp_success();
168 }
169
170 let tls = self
174 .tls_connector
175 .negotiate_unvalidated(stream, "ignored")
176 .await
177 .map_err(map_ioe("TLS negotiation"))?;
178
179 let peer_cert = tls
180 .peer_certificate()
181 .map_err(map_ioe("TLS certs"))?
182 .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
183
184 {
185 event_sender
186 .lock()
187 .expect("Lock poisoned")
188 .record_tls_finished();
189 }
190
191 let mut builder = ChannelBuilder::new();
193 builder.set_declared_method(using_method);
194 let chan = builder
195 .launch(
196 tls,
197 self.runtime.clone(), memquota,
199 )
200 .connect(|| self.runtime.wallclock())
201 .await
202 .map_err(|e| Error::from_proto_no_skew(e, &using_target))?;
203 let clock_skew = Some(chan.clock_skew()); let now = self.runtime.wallclock();
205 let chan = chan
206 .check(target, &peer_cert, Some(now))
207 .map_err(|source| match &source {
208 tor_proto::Error::HandshakeCertsExpired { .. } => {
209 event_sender
210 .lock()
211 .expect("Lock poisoned")
212 .record_handshake_done_with_skewed_clock();
213 Error::Proto {
214 source,
215 peer: using_target.to_logged(),
216 clock_skew,
217 }
218 }
219 _ => Error::from_proto_no_skew(source, &using_target),
220 })?;
221 let (chan, reactor) = chan.finish().await.map_err(|source| Error::Proto {
222 source,
223 peer: target.to_logged(),
224 clock_skew,
225 })?;
226
227 {
228 event_sender
229 .lock()
230 .expect("Lock poisoned")
231 .record_handshake_done();
232 }
233
234 self.runtime
236 .spawn(async {
237 let _ = reactor.run().await;
238 })
239 .map_err(|e| Error::from_spawn("channel reactor", e))?;
240 Ok(chan)
241 }
242}
243
244impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
245 fn is_usable(&self) -> bool {
246 !self.is_closing()
247 }
248 fn duration_unused(&self) -> Option<Duration> {
249 self.duration_unused()
250 }
251 fn reparameterize(
252 &self,
253 updates: Arc<ChannelPaddingInstructionsUpdates>,
254 ) -> tor_proto::Result<()> {
255 tor_proto::channel::Channel::reparameterize(self, updates)
256 }
257 fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()> {
258 tor_proto::channel::Channel::reparameterize_kist(self, kist_params)
259 }
260 fn engage_padding_activities(&self) {
261 tor_proto::channel::Channel::engage_padding_activities(self);
262 }
263}
264
265#[cfg(test)]
266mod test {
267 #![allow(clippy::bool_assert_comparison)]
269 #![allow(clippy::clone_on_copy)]
270 #![allow(clippy::dbg_macro)]
271 #![allow(clippy::mixed_attributes_style)]
272 #![allow(clippy::print_stderr)]
273 #![allow(clippy::print_stdout)]
274 #![allow(clippy::single_char_pattern)]
275 #![allow(clippy::unwrap_used)]
276 #![allow(clippy::unchecked_duration_subtraction)]
277 #![allow(clippy::useless_vec)]
278 #![allow(clippy::needless_pass_by_value)]
279 use super::*;
281 use crate::{
282 mgr::{AbstractChannel, AbstractChannelFactory},
283 Result,
284 };
285 use futures::StreamExt as _;
286 use std::net::SocketAddr;
287 use std::time::{Duration, SystemTime};
288 use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
289 use tor_llcrypto::pk::ed25519::Ed25519Identity;
290 use tor_llcrypto::pk::rsa::RsaIdentity;
291 use tor_proto::channel::Channel;
292 use tor_proto::memquota::{ChannelAccount, SpecificAccount as _};
293 use tor_rtcompat::{test_with_one_runtime, NetStreamListener};
294 use tor_rtmock::{io::LocalStream, net::MockNetwork};
295
296 #[allow(deprecated)] use tor_rtmock::MockSleepRuntime;
298
299 #[test]
304 fn build_ok() -> Result<()> {
305 use crate::testing::msgs;
306 let orport: SocketAddr = msgs::ADDR.parse().unwrap();
307 let ed: Ed25519Identity = msgs::ED_ID.into();
308 let rsa: RsaIdentity = msgs::RSA_ID.into();
309 let client_addr = "192.0.2.17".parse().unwrap();
310 let tls_cert = msgs::X509_CERT.into();
311 let target = OwnedChanTarget::builder()
312 .addrs(vec![orport])
313 .method(ChannelMethod::Direct(vec![orport]))
314 .ed_identity(ed)
315 .rsa_identity(rsa)
316 .build()
317 .unwrap();
318 let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
319
320 test_with_one_runtime!(|rt| async move {
321 let network = MockNetwork::new();
323
324 let client_rt = network
326 .builder()
327 .add_address(client_addr)
328 .runtime(rt.clone());
329 #[allow(deprecated)] let client_rt = MockSleepRuntime::new(client_rt);
332
333 let relay_rt = network
335 .builder()
336 .add_address(orport.ip())
337 .runtime(rt.clone());
338
339 let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
341
342 client_rt.jump_to(now);
344
345 let transport = crate::transport::DefaultTransport::new(client_rt.clone());
347 let builder = ChanBuilder::new(client_rt, transport);
348
349 let (r1, r2): (Result<Arc<Channel>>, Result<LocalStream>) = futures::join!(
350 async {
351 builder
353 .build_channel(
354 &target,
355 BootstrapReporter::fake(),
356 ChannelAccount::new_noop(),
357 )
358 .await
359 },
360 async {
361 let (mut con, addr) = lis
364 .incoming()
365 .next()
366 .await
367 .expect("Closed?")
368 .expect("accept failed");
369 assert_eq!(client_addr, addr.ip());
370 crate::testing::answer_channel_req(&mut con)
371 .await
372 .expect("answer failed");
373 Ok(con)
374 }
375 );
376
377 let chan = r1.unwrap();
378 assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
379 assert!(chan.is_usable());
380 let dur_unused = Channel::duration_unused(&chan);
383 let dur_unused_2 = AbstractChannel::duration_unused(chan.as_ref());
384 let dur_unused_3 = Channel::duration_unused(&chan);
385 assert!(dur_unused.unwrap() <= dur_unused_2.unwrap());
386 assert!(dur_unused_2.unwrap() <= dur_unused_3.unwrap());
387
388 r2.unwrap();
389 Ok(())
390 })
391 }
392
393 }