1use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
4use crate::util::defer::Defer;
5use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
6
7use crate::factory::BootstrapReporter;
8use async_trait::async_trait;
9use futures::future::Shared;
10use oneshot_fused_workaround as oneshot;
11use std::result::Result as StdResult;
12use std::sync::Arc;
13use std::time::Duration;
14use tor_error::{error_report, internal};
15use tor_linkspec::HasRelayIds;
16use tor_netdir::params::NetParameters;
17use tor_proto::channel::kist::KistParams;
18use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
19use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
20
21mod select;
22mod state;
23
24pub(crate) trait AbstractChannel: HasRelayIds {
28 fn is_usable(&self) -> bool;
34 fn duration_unused(&self) -> Option<Duration>;
37
38 fn reparameterize(
43 &self,
44 updates: Arc<ChannelPaddingInstructionsUpdates>,
45 ) -> tor_proto::Result<()>;
46
47 fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()>;
52
53 fn engage_padding_activities(&self);
59}
60
61#[async_trait]
67pub(crate) trait AbstractChannelFactory {
68 type Channel: AbstractChannel;
70 type BuildSpec: HasRelayIds;
72 type Stream;
74
75 async fn build_channel(
82 &self,
83 target: &Self::BuildSpec,
84 reporter: BootstrapReporter,
85 memquota: ChannelAccount,
86 ) -> Result<Arc<Self::Channel>>;
87
88 #[cfg(feature = "relay")]
90 async fn build_channel_using_incoming(
91 &self,
92 peer: std::net::SocketAddr,
93 stream: Self::Stream,
94 memquota: ChannelAccount,
95 ) -> Result<Arc<Self::Channel>>;
96}
97
98pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
107 pub(crate) channels: state::MgrState<CF>,
112
113 pub(crate) reporter: BootstrapReporter,
115
116 pub(crate) memquota: ToplevelAccount,
118}
119
120type Pending = Shared<oneshot::Receiver<Result<()>>>;
123
124type Sending = oneshot::Sender<Result<()>>;
127
128impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
129 pub(crate) fn new(
131 connector: CF,
132 config: &ChannelConfig,
133 dormancy: Dormancy,
134 netparams: &NetParameters,
135 reporter: BootstrapReporter,
136 memquota: ToplevelAccount,
137 ) -> Self {
138 AbstractChanMgr {
139 channels: state::MgrState::new(connector, config.clone(), dormancy, netparams),
140 reporter,
141 memquota,
142 }
143 }
144
145 #[allow(dead_code)]
147 pub(crate) fn with_mut_builder<F>(&self, func: F)
148 where
149 F: FnOnce(&mut CF),
150 {
151 self.channels.with_mut_builder(func);
152 }
153
154 #[cfg(test)]
156 pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
157 self.channels.remove_unusable()
158 }
159
160 #[cfg(feature = "relay")]
163 pub(crate) async fn handle_incoming(
164 &self,
165 src: std::net::SocketAddr,
166 stream: CF::Stream,
167 ) -> Result<Arc<CF::Channel>> {
168 let chan_builder = self.channels.builder();
169 let memquota = ChannelAccount::new(&self.memquota)?;
170 let _outcome = chan_builder
171 .build_channel_using_incoming(src, stream, memquota)
172 .await?;
173
174 todo!();
176 }
177
178 pub(crate) async fn get_or_launch(
188 &self,
189 target: CF::BuildSpec,
190 usage: ChannelUsage,
191 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
192 use ChannelUsage as CU;
193
194 let chan = self.get_or_launch_internal(target).await?;
195
196 match usage {
197 CU::Dir | CU::UselessCircuit => {}
198 CU::UserTraffic => chan.0.engage_padding_activities(),
199 }
200
201 Ok(chan)
202 }
203
204 async fn get_or_launch_internal(
206 &self,
207 target: CF::BuildSpec,
208 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
209 const N_ATTEMPTS: usize = 2;
211 let mut attempts_so_far = 0;
212 let mut final_attempt = false;
213 let mut provenance = ChanProvenance::Preexisting;
214
215 let mut last_err = None;
217
218 while attempts_so_far < N_ATTEMPTS || final_attempt {
219 attempts_so_far += 1;
220
221 let action = self.choose_action(&target, final_attempt)?;
226
227 match action {
230 None => {
233 if !final_attempt {
234 return Err(Error::Internal(internal!(
235 "No action returned while not on final attempt"
236 )));
237 }
238 break;
239 }
240 Some(Action::Return(v)) => {
242 return v.map(|chan| (chan, provenance));
243 }
244 Some(Action::Wait(pend)) => {
246 match pend.await {
247 Ok(Ok(())) => {
248 final_attempt = true;
254 provenance = ChanProvenance::NewlyCreated;
255 last_err.get_or_insert(Error::RequestCancelled);
256 }
257 Ok(Err(e)) => {
258 last_err = Some(e);
259 }
260 Err(_) => {
261 last_err =
262 Some(Error::Internal(internal!("channel build task disappeared")));
263 }
264 }
265 }
266 Some(Action::Launch((handle, send))) => {
268 let defer_remove_pending = Defer::new(handle, |handle| {
278 if let Err(e) = self.channels.remove_pending_channel(handle) {
279 #[allow(clippy::missing_docs_in_private_items)]
284 const MSG: &str = "Unable to remove the pending channel";
285 error_report!(internal!("{e}"), "{}", MSG);
286 }
287 });
288
289 let connector = self.channels.builder();
290 let memquota = ChannelAccount::new(&self.memquota)?;
291
292 let outcome = connector
293 .build_channel(&target, self.reporter.clone(), memquota)
294 .await;
295
296 match outcome {
297 Ok(ref chan) => {
298 let handle = defer_remove_pending.cancel();
300 self.channels
301 .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?;
302 }
303 Err(_) => {
304 drop(defer_remove_pending);
306 }
307 }
308
309 let _ignore_err = send.send(outcome.clone().map(|_| ()));
312
313 match outcome {
314 Ok(chan) => {
315 return Ok((chan, ChanProvenance::NewlyCreated));
316 }
317 Err(e) => last_err = Some(e),
318 }
319 }
320 }
321
322 }
324
325 Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
326 }
327
328 fn choose_action(
337 &self,
338 target: &CF::BuildSpec,
339 final_attempt: bool,
340 ) -> Result<Option<Action<CF::Channel>>> {
341 let response = self.channels.request_channel(
343 target,
344 !final_attempt,
345 );
346
347 match response {
348 Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
349 Ok(Some(ChannelForTarget::Pending(pending))) => {
350 if !final_attempt {
351 Ok(Some(Action::Wait(pending)))
352 } else {
353 Ok(None)
355 }
356 }
357 Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
358 Ok(Some(Action::Launch((handle, send))))
360 }
361 Ok(None) => Ok(None),
362 Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
363 Err(e) => Err(e),
364 }
365 }
366
367 pub(crate) fn update_netparams(
369 &self,
370 netparams: Arc<dyn AsRef<NetParameters>>,
371 ) -> StdResult<(), tor_error::Bug> {
372 self.channels.reconfigure_general(None, None, netparams)
373 }
374
375 pub(crate) fn set_dormancy(
377 &self,
378 dormancy: Dormancy,
379 netparams: Arc<dyn AsRef<NetParameters>>,
380 ) -> StdResult<(), tor_error::Bug> {
381 self.channels
382 .reconfigure_general(None, Some(dormancy), netparams)
383 }
384
385 pub(crate) fn reconfigure(
387 &self,
388 config: &ChannelConfig,
389 netparams: Arc<dyn AsRef<NetParameters>>,
390 ) -> StdResult<(), tor_error::Bug> {
391 self.channels
392 .reconfigure_general(Some(config), None, netparams)
393 }
394
395 pub(crate) fn expire_channels(&self) -> Duration {
404 self.channels.expire_channels()
405 }
406
407 #[cfg(test)]
409 pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
410 where
411 T: Into<tor_linkspec::RelayIdRef<'a>>,
412 {
413 use state::ChannelState::*;
414 self.channels
415 .with_channels(|channel_map| {
416 channel_map
417 .by_id(ident)
418 .filter_map(|entry| match entry {
419 Open(ref ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
420 _ => None,
421 })
422 .collect()
423 })
424 .expect("Poisoned lock")
425 }
426}
427
428#[allow(clippy::large_enum_variant)]
430enum Action<C: AbstractChannel> {
431 Launch((PendingChannelHandle, Sending)),
434 Wait(Pending),
437 Return(Result<Arc<C>>),
439}
440
441#[cfg(test)]
442mod test {
443 #![allow(clippy::bool_assert_comparison)]
445 #![allow(clippy::clone_on_copy)]
446 #![allow(clippy::dbg_macro)]
447 #![allow(clippy::mixed_attributes_style)]
448 #![allow(clippy::print_stderr)]
449 #![allow(clippy::print_stdout)]
450 #![allow(clippy::single_char_pattern)]
451 #![allow(clippy::unwrap_used)]
452 #![allow(clippy::unchecked_duration_subtraction)]
453 #![allow(clippy::useless_vec)]
454 #![allow(clippy::needless_pass_by_value)]
455 use super::*;
457 use crate::Error;
458
459 use futures::join;
460 use std::sync::atomic::{AtomicBool, Ordering};
461 use std::sync::Arc;
462 use std::time::Duration;
463 use tor_error::bad_api_usage;
464 use tor_llcrypto::pk::ed25519::Ed25519Identity;
465 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
466
467 use crate::ChannelUsage as CU;
468 use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
469
470 #[derive(Clone)]
471 struct FakeChannelFactory<RT> {
472 runtime: RT,
473 }
474
475 #[derive(Clone, Debug)]
476 struct FakeChannel {
477 ed_ident: Ed25519Identity,
478 mood: char,
479 closing: Arc<AtomicBool>,
480 detect_reuse: Arc<char>,
481 }
483
484 impl PartialEq for FakeChannel {
485 fn eq(&self, other: &Self) -> bool {
486 Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
487 }
488 }
489
490 impl AbstractChannel for FakeChannel {
491 fn is_usable(&self) -> bool {
492 !self.closing.load(Ordering::SeqCst)
493 }
494 fn duration_unused(&self) -> Option<Duration> {
495 None
496 }
497 fn reparameterize(
498 &self,
499 _updates: Arc<ChannelPaddingInstructionsUpdates>,
500 ) -> tor_proto::Result<()> {
501 Ok(())
503 }
504 fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
505 Ok(())
506 }
507 fn engage_padding_activities(&self) {}
508 }
509
510 impl HasRelayIds for FakeChannel {
511 fn identity(
512 &self,
513 key_type: tor_linkspec::RelayIdType,
514 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
515 match key_type {
516 tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
517 _ => None,
518 }
519 }
520 }
521
522 impl FakeChannel {
523 fn start_closing(&self) {
524 self.closing.store(true, Ordering::SeqCst);
525 }
526 }
527
528 impl<RT: Runtime> FakeChannelFactory<RT> {
529 fn new(runtime: RT) -> Self {
530 FakeChannelFactory { runtime }
531 }
532 }
533
534 fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
535 let cf = FakeChannelFactory::new(runtime);
536 AbstractChanMgr::new(
537 cf,
538 &ChannelConfig::default(),
539 Default::default(),
540 &Default::default(),
541 BootstrapReporter::fake(),
542 ToplevelAccount::new_noop(),
543 )
544 }
545
546 #[derive(Clone, Debug)]
547 struct FakeBuildSpec(u32, char, Ed25519Identity);
548
549 impl HasRelayIds for FakeBuildSpec {
550 fn identity(
551 &self,
552 key_type: tor_linkspec::RelayIdType,
553 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
554 match key_type {
555 tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
556 _ => None,
557 }
558 }
559 }
560
561 fn u32_to_ed(n: u32) -> Ed25519Identity {
563 let mut bytes = [0; 32];
564 bytes[0..4].copy_from_slice(&n.to_be_bytes());
565 bytes.into()
566 }
567
568 #[async_trait]
569 impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
570 type Channel = FakeChannel;
571 type BuildSpec = FakeBuildSpec;
572 type Stream = ();
573
574 async fn build_channel(
575 &self,
576 target: &Self::BuildSpec,
577 _reporter: BootstrapReporter,
578 _memquota: ChannelAccount,
579 ) -> Result<Arc<FakeChannel>> {
580 yield_now().await;
581 let FakeBuildSpec(ident, mood, id) = *target;
582 let ed_ident = u32_to_ed(ident);
583 assert_eq!(ed_ident, id);
584 match mood {
585 '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
587 '💤' => {
589 self.runtime.sleep(Duration::new(15, 0)).await;
590 }
591 _ => {}
592 }
593 Ok(Arc::new(FakeChannel {
594 ed_ident,
595 mood,
596 closing: Arc::new(AtomicBool::new(false)),
597 detect_reuse: Default::default(),
598 }))
600 }
601
602 #[cfg(feature = "relay")]
603 async fn build_channel_using_incoming(
604 &self,
605 _peer: std::net::SocketAddr,
606 _stream: Self::Stream,
607 _memquota: ChannelAccount,
608 ) -> Result<Arc<Self::Channel>> {
609 unimplemented!()
610 }
611 }
612
613 #[test]
614 fn connect_one_ok() {
615 test_with_one_runtime!(|runtime| async {
616 let mgr = new_test_abstract_chanmgr(runtime);
617 let target = FakeBuildSpec(413, '!', u32_to_ed(413));
618 let chan1 = mgr
619 .get_or_launch(target.clone(), CU::UserTraffic)
620 .await
621 .unwrap()
622 .0;
623 let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
624
625 assert_eq!(chan1, chan2);
626 assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
627 });
628 }
629
630 #[test]
631 fn connect_one_fail() {
632 test_with_one_runtime!(|runtime| async {
633 let mgr = new_test_abstract_chanmgr(runtime);
634
635 let target = FakeBuildSpec(999, '❌', u32_to_ed(999));
637 let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
638 assert!(matches!(res1, Err(Error::UnusableTarget(_))));
639
640 assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
641 });
642 }
643
644 #[test]
645 fn test_concurrent() {
646 test_with_one_runtime!(|runtime| async {
647 let mgr = new_test_abstract_chanmgr(runtime);
648
649 let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
653 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
654 mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic),
655 mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic),
656 mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic),
657 mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic),
658 mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic),
659 );
660 let ch3a = ch3a.unwrap();
661 let ch3b = ch3b.unwrap();
662 let ch44a = ch44a.unwrap();
663 let ch44b = ch44b.unwrap();
664 let err_a = ch86a.unwrap_err();
665 let err_b = ch86b.unwrap_err();
666
667 assert_eq!(ch3a, ch3b);
668 assert_eq!(ch44a, ch44b);
669 assert_ne!(ch44a, ch3a);
670
671 assert!(matches!(err_a, Error::UnusableTarget(_)));
672 assert!(matches!(err_b, Error::UnusableTarget(_)));
673 });
674 }
675
676 #[test]
677 fn unusable_entries() {
678 test_with_one_runtime!(|runtime| async {
679 let mgr = new_test_abstract_chanmgr(runtime);
680
681 let (ch3, ch4, ch5) = join!(
682 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
683 mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic),
684 mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic),
685 );
686
687 let ch3 = ch3.unwrap().0;
688 let _ch4 = ch4.unwrap();
689 let ch5 = ch5.unwrap().0;
690
691 ch3.start_closing();
692 ch5.start_closing();
693
694 let ch3_new = mgr
695 .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic)
696 .await
697 .unwrap()
698 .0;
699 assert_ne!(ch3, ch3_new);
700 assert_eq!(ch3_new.mood, 'b');
701
702 mgr.remove_unusable_entries().unwrap();
703
704 assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
705 assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
706 assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
707 });
708 }
709}