tor_chanmgr/
mgr.rs

1//! Abstract implementation of a channel manager
2
3use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
4use crate::util::defer::Defer;
5use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
6
7use crate::factory::BootstrapReporter;
8use async_trait::async_trait;
9use futures::future::Shared;
10use oneshot_fused_workaround as oneshot;
11use std::result::Result as StdResult;
12use std::sync::Arc;
13use std::time::Duration;
14use tor_error::{error_report, internal};
15use tor_linkspec::HasRelayIds;
16use tor_netdir::params::NetParameters;
17use tor_proto::channel::kist::KistParams;
18use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
19use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
20
21mod select;
22mod state;
23
24/// Trait to describe as much of a
25/// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr`
26/// needs to use.
27pub(crate) trait AbstractChannel: HasRelayIds {
28    /// Return true if this channel is usable.
29    ///
30    /// A channel might be unusable because it is closed, because it has
31    /// hit a bug, or for some other reason.  We don't return unusable
32    /// channels back to the user.
33    fn is_usable(&self) -> bool;
34    /// Return the amount of time a channel has not been in use.
35    /// Return None if the channel is currently in use.
36    fn duration_unused(&self) -> Option<Duration>;
37
38    /// Reparameterize this channel according to the provided `ChannelPaddingInstructionsUpdates`
39    ///
40    /// The changed parameters may not be implemented "immediately",
41    /// but this will be done "reasonably soon".
42    fn reparameterize(
43        &self,
44        updates: Arc<ChannelPaddingInstructionsUpdates>,
45    ) -> tor_proto::Result<()>;
46
47    /// Update the KIST parameters.
48    ///
49    /// The changed parameters may not be implemented "immediately",
50    /// but this will be done "reasonably soon".
51    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()>;
52
53    /// Specify that this channel should do activities related to channel padding
54    ///
55    /// See [`Channel::engage_padding_activities`]
56    ///
57    /// [`Channel::engage_padding_activities`]: tor_proto::channel::Channel::engage_padding_activities
58    fn engage_padding_activities(&self);
59}
60
61/// Trait to describe how channels-like objects are created.
62///
63/// This differs from [`ChannelFactory`](crate::factory::ChannelFactory) in that
64/// it's a purely crate-internal type that we use to decouple the
65/// AbstractChanMgr code from actual "what is a channel" concerns.
66#[async_trait]
67pub(crate) trait AbstractChannelFactory {
68    /// The type of channel that this factory can build.
69    type Channel: AbstractChannel;
70    /// Type that explains how to build an outgoing channel.
71    type BuildSpec: HasRelayIds;
72    /// The type of byte stream that's required to build channels for incoming connections.
73    type Stream;
74
75    /// Construct a new channel to the destination described at `target`.
76    ///
77    /// This function must take care of all timeouts, error detection,
78    /// and so on.
79    ///
80    /// It should not retry; that is handled at a higher level.
81    async fn build_channel(
82        &self,
83        target: &Self::BuildSpec,
84        reporter: BootstrapReporter,
85        memquota: ChannelAccount,
86    ) -> Result<Arc<Self::Channel>>;
87
88    /// Construct a new channel for an incoming connection.
89    #[cfg(feature = "relay")]
90    async fn build_channel_using_incoming(
91        &self,
92        peer: std::net::SocketAddr,
93        stream: Self::Stream,
94        memquota: ChannelAccount,
95    ) -> Result<Arc<Self::Channel>>;
96}
97
98/// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr).
99///
100/// This type does the work of keeping track of open channels and pending
101/// channel requests, launching requests as needed, waiting for pending
102/// requests, and so forth.
103///
104/// The actual job of launching connections is deferred to an
105/// `AbstractChannelFactory` type.
106pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
107    /// All internal state held by this channel manager.
108    ///
109    /// The most important part is the map from relay identity to channel, or
110    /// to pending channel status.
111    pub(crate) channels: state::MgrState<CF>,
112
113    /// A bootstrap reporter to give out when building channels.
114    pub(crate) reporter: BootstrapReporter,
115
116    /// The memory quota account that every channel will be a child of
117    pub(crate) memquota: ToplevelAccount,
118}
119
120/// Type alias for a future that we wait on to see when a pending
121/// channel is done or failed.
122type Pending = Shared<oneshot::Receiver<Result<()>>>;
123
124/// Type alias for the sender we notify when we complete a channel (or fail to
125/// complete it).
126type Sending = oneshot::Sender<Result<()>>;
127
128impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
129    /// Make a new empty channel manager.
130    pub(crate) fn new(
131        connector: CF,
132        config: &ChannelConfig,
133        dormancy: Dormancy,
134        netparams: &NetParameters,
135        reporter: BootstrapReporter,
136        memquota: ToplevelAccount,
137    ) -> Self {
138        AbstractChanMgr {
139            channels: state::MgrState::new(connector, config.clone(), dormancy, netparams),
140            reporter,
141            memquota,
142        }
143    }
144
145    /// Run a function to modify the channel builder in this object.
146    #[allow(dead_code)]
147    pub(crate) fn with_mut_builder<F>(&self, func: F)
148    where
149        F: FnOnce(&mut CF),
150    {
151        self.channels.with_mut_builder(func);
152    }
153
154    /// Remove every unusable entry from this channel manager.
155    #[cfg(test)]
156    pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
157        self.channels.remove_unusable()
158    }
159
160    /// Build a channel for an incoming stream. See
161    /// [`ChanMgr::handle_incoming`](crate::ChanMgr::handle_incoming).
162    #[cfg(feature = "relay")]
163    pub(crate) async fn handle_incoming(
164        &self,
165        src: std::net::SocketAddr,
166        stream: CF::Stream,
167    ) -> Result<Arc<CF::Channel>> {
168        let chan_builder = self.channels.builder();
169        let memquota = ChannelAccount::new(&self.memquota)?;
170        let _outcome = chan_builder
171            .build_channel_using_incoming(src, stream, memquota)
172            .await?;
173
174        // TODO RELAY: we need to do something with the channel here now that we've created it
175        todo!();
176    }
177
178    /// Get a channel corresponding to the identities of `target`.
179    ///
180    /// If a usable channel exists with that identity, return it.
181    ///
182    /// If no such channel exists already, and none is in progress,
183    /// launch a new request using `target`.
184    ///
185    /// If no such channel exists already, but we have one that's in
186    /// progress, wait for it to succeed or fail.
187    pub(crate) async fn get_or_launch(
188        &self,
189        target: CF::BuildSpec,
190        usage: ChannelUsage,
191    ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
192        use ChannelUsage as CU;
193
194        let chan = self.get_or_launch_internal(target).await?;
195
196        match usage {
197            CU::Dir | CU::UselessCircuit => {}
198            CU::UserTraffic => chan.0.engage_padding_activities(),
199        }
200
201        Ok(chan)
202    }
203
204    /// Get a channel whose identity is `ident` - internal implementation
205    async fn get_or_launch_internal(
206        &self,
207        target: CF::BuildSpec,
208    ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
209        /// How many times do we try?
210        const N_ATTEMPTS: usize = 2;
211        let mut attempts_so_far = 0;
212        let mut final_attempt = false;
213        let mut provenance = ChanProvenance::Preexisting;
214
215        // TODO(nickm): It would be neat to use tor_retry instead.
216        let mut last_err = None;
217
218        while attempts_so_far < N_ATTEMPTS || final_attempt {
219            attempts_so_far += 1;
220
221            // For each attempt, we _first_ look at the state of the channel map
222            // to decide on an `Action`, and _then_ we execute that action.
223
224            // First, see what state we're in, and what we should do about it.
225            let action = self.choose_action(&target, final_attempt)?;
226
227            // We are done deciding on our Action! It's time act based on the
228            // Action that we chose.
229            match action {
230                // If this happens, we were trying to make one final check of our state, but
231                // we would have had to make additional attempts.
232                None => {
233                    if !final_attempt {
234                        return Err(Error::Internal(internal!(
235                            "No action returned while not on final attempt"
236                        )));
237                    }
238                    break;
239                }
240                // Easy case: we have an error or a channel to return.
241                Some(Action::Return(v)) => {
242                    return v.map(|chan| (chan, provenance));
243                }
244                // There's an in-progress channel.  Wait for it.
245                Some(Action::Wait(pend)) => {
246                    match pend.await {
247                        Ok(Ok(())) => {
248                            // We were waiting for a channel, and it succeeded, or it
249                            // got cancelled.  But it might have gotten more
250                            // identities while negotiating than it had when it was
251                            // launched, or it might have failed to get all the
252                            // identities we want. Check for this.
253                            final_attempt = true;
254                            provenance = ChanProvenance::NewlyCreated;
255                            last_err.get_or_insert(Error::RequestCancelled);
256                        }
257                        Ok(Err(e)) => {
258                            last_err = Some(e);
259                        }
260                        Err(_) => {
261                            last_err =
262                                Some(Error::Internal(internal!("channel build task disappeared")));
263                        }
264                    }
265                }
266                // We need to launch a channel.
267                Some(Action::Launch((handle, send))) => {
268                    // If the remainder of this code returns early or is cancelled, we still want to
269                    // clean up our pending entry in the channel map. The following closure will be
270                    // run when dropped to ensure that it's cleaned up properly.
271                    //
272                    // The `remove_pending_channel` will acquire the lock within `MgrState`, but
273                    // this won't lead to deadlocks since the lock is only ever acquired within
274                    // methods of `MgrState`. When this `Defer` is being dropped, no other
275                    // `MgrState` methods will be running on this thread, so the lock will not have
276                    // already been acquired.
277                    let defer_remove_pending = Defer::new(handle, |handle| {
278                        if let Err(e) = self.channels.remove_pending_channel(handle) {
279                            // Just log an error if we're unable to remove it, since there's
280                            // nothing else we can do here, and returning the error would
281                            // hide the actual error that we care about (the channel build
282                            // failure).
283                            #[allow(clippy::missing_docs_in_private_items)]
284                            const MSG: &str = "Unable to remove the pending channel";
285                            error_report!(internal!("{e}"), "{}", MSG);
286                        }
287                    });
288
289                    let connector = self.channels.builder();
290                    let memquota = ChannelAccount::new(&self.memquota)?;
291
292                    let outcome = connector
293                        .build_channel(&target, self.reporter.clone(), memquota)
294                        .await;
295
296                    match outcome {
297                        Ok(ref chan) => {
298                            // Replace the pending channel with the newly built channel.
299                            let handle = defer_remove_pending.cancel();
300                            self.channels
301                                .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?;
302                        }
303                        Err(_) => {
304                            // Remove the pending channel.
305                            drop(defer_remove_pending);
306                        }
307                    }
308
309                    // It's okay if all the receivers went away:
310                    // that means that nobody was waiting for this channel.
311                    let _ignore_err = send.send(outcome.clone().map(|_| ()));
312
313                    match outcome {
314                        Ok(chan) => {
315                            return Ok((chan, ChanProvenance::NewlyCreated));
316                        }
317                        Err(e) => last_err = Some(e),
318                    }
319                }
320            }
321
322            // End of this attempt. We will try again...
323        }
324
325        Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
326    }
327
328    /// Helper: based on our internal state, decide which action to take when
329    /// asked for a channel, and update our internal state accordingly.
330    ///
331    /// If `final_attempt` is true, then we will not pick any action that does
332    /// not result in an immediate result. If we would pick such an action, we
333    /// instead return `Ok(None)`.  (We could instead have the caller detect
334    /// such actions, but it's less efficient to construct them, insert them,
335    /// and immediately revert them.)
336    fn choose_action(
337        &self,
338        target: &CF::BuildSpec,
339        final_attempt: bool,
340    ) -> Result<Option<Action<CF::Channel>>> {
341        // don't create new channels on the final attempt
342        let response = self.channels.request_channel(
343            target,
344            /* add_new_entry_if_not_found= */ !final_attempt,
345        );
346
347        match response {
348            Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
349            Ok(Some(ChannelForTarget::Pending(pending))) => {
350                if !final_attempt {
351                    Ok(Some(Action::Wait(pending)))
352                } else {
353                    // don't return a pending channel on the final attempt
354                    Ok(None)
355                }
356            }
357            Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
358                // do not drop the handle if refactoring; see `PendingChannelHandle` for details
359                Ok(Some(Action::Launch((handle, send))))
360            }
361            Ok(None) => Ok(None),
362            Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
363            Err(e) => Err(e),
364        }
365    }
366
367    /// Update the netdir
368    pub(crate) fn update_netparams(
369        &self,
370        netparams: Arc<dyn AsRef<NetParameters>>,
371    ) -> StdResult<(), tor_error::Bug> {
372        self.channels.reconfigure_general(None, None, netparams)
373    }
374
375    /// Notifies the chanmgr to be dormant like dormancy
376    pub(crate) fn set_dormancy(
377        &self,
378        dormancy: Dormancy,
379        netparams: Arc<dyn AsRef<NetParameters>>,
380    ) -> StdResult<(), tor_error::Bug> {
381        self.channels
382            .reconfigure_general(None, Some(dormancy), netparams)
383    }
384
385    /// Reconfigure all channels
386    pub(crate) fn reconfigure(
387        &self,
388        config: &ChannelConfig,
389        netparams: Arc<dyn AsRef<NetParameters>>,
390    ) -> StdResult<(), tor_error::Bug> {
391        self.channels
392            .reconfigure_general(Some(config), None, netparams)
393    }
394
395    /// Expire any channels that have been unused longer than
396    /// their maximum unused duration assigned during creation.
397    ///
398    /// Return a duration from now until next channel expires.
399    ///
400    /// If all channels are in use or there are no open channels,
401    /// return 180 seconds which is the minimum value of
402    /// max_unused_duration.
403    pub(crate) fn expire_channels(&self) -> Duration {
404        self.channels.expire_channels()
405    }
406
407    /// Test only: return the open usable channels with a given `ident`.
408    #[cfg(test)]
409    pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
410    where
411        T: Into<tor_linkspec::RelayIdRef<'a>>,
412    {
413        use state::ChannelState::*;
414        self.channels
415            .with_channels(|channel_map| {
416                channel_map
417                    .by_id(ident)
418                    .filter_map(|entry| match entry {
419                        Open(ref ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
420                        _ => None,
421                    })
422                    .collect()
423            })
424            .expect("Poisoned lock")
425    }
426}
427
428/// Possible actions that we'll decide to take when asked for a channel.
429#[allow(clippy::large_enum_variant)]
430enum Action<C: AbstractChannel> {
431    /// We found no channel.  We're going to launch a new one,
432    /// then tell everybody about it.
433    Launch((PendingChannelHandle, Sending)),
434    /// We found an in-progress attempt at making a channel.
435    /// We're going to wait for it to finish.
436    Wait(Pending),
437    /// We found a usable channel.  We're going to return it.
438    Return(Result<Arc<C>>),
439}
440
441#[cfg(test)]
442mod test {
443    // @@ begin test lint list maintained by maint/add_warning @@
444    #![allow(clippy::bool_assert_comparison)]
445    #![allow(clippy::clone_on_copy)]
446    #![allow(clippy::dbg_macro)]
447    #![allow(clippy::mixed_attributes_style)]
448    #![allow(clippy::print_stderr)]
449    #![allow(clippy::print_stdout)]
450    #![allow(clippy::single_char_pattern)]
451    #![allow(clippy::unwrap_used)]
452    #![allow(clippy::unchecked_duration_subtraction)]
453    #![allow(clippy::useless_vec)]
454    #![allow(clippy::needless_pass_by_value)]
455    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
456    use super::*;
457    use crate::Error;
458
459    use futures::join;
460    use std::sync::atomic::{AtomicBool, Ordering};
461    use std::sync::Arc;
462    use std::time::Duration;
463    use tor_error::bad_api_usage;
464    use tor_llcrypto::pk::ed25519::Ed25519Identity;
465    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
466
467    use crate::ChannelUsage as CU;
468    use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
469
470    #[derive(Clone)]
471    struct FakeChannelFactory<RT> {
472        runtime: RT,
473    }
474
475    #[derive(Clone, Debug)]
476    struct FakeChannel {
477        ed_ident: Ed25519Identity,
478        mood: char,
479        closing: Arc<AtomicBool>,
480        detect_reuse: Arc<char>,
481        // last_params: Option<ChannelPaddingInstructionsUpdates>,
482    }
483
484    impl PartialEq for FakeChannel {
485        fn eq(&self, other: &Self) -> bool {
486            Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
487        }
488    }
489
490    impl AbstractChannel for FakeChannel {
491        fn is_usable(&self) -> bool {
492            !self.closing.load(Ordering::SeqCst)
493        }
494        fn duration_unused(&self) -> Option<Duration> {
495            None
496        }
497        fn reparameterize(
498            &self,
499            _updates: Arc<ChannelPaddingInstructionsUpdates>,
500        ) -> tor_proto::Result<()> {
501            // *self.last_params.lock().unwrap() = Some((*updates).clone());
502            Ok(())
503        }
504        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
505            Ok(())
506        }
507        fn engage_padding_activities(&self) {}
508    }
509
510    impl HasRelayIds for FakeChannel {
511        fn identity(
512            &self,
513            key_type: tor_linkspec::RelayIdType,
514        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
515            match key_type {
516                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
517                _ => None,
518            }
519        }
520    }
521
522    impl FakeChannel {
523        fn start_closing(&self) {
524            self.closing.store(true, Ordering::SeqCst);
525        }
526    }
527
528    impl<RT: Runtime> FakeChannelFactory<RT> {
529        fn new(runtime: RT) -> Self {
530            FakeChannelFactory { runtime }
531        }
532    }
533
534    fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
535        let cf = FakeChannelFactory::new(runtime);
536        AbstractChanMgr::new(
537            cf,
538            &ChannelConfig::default(),
539            Default::default(),
540            &Default::default(),
541            BootstrapReporter::fake(),
542            ToplevelAccount::new_noop(),
543        )
544    }
545
546    #[derive(Clone, Debug)]
547    struct FakeBuildSpec(u32, char, Ed25519Identity);
548
549    impl HasRelayIds for FakeBuildSpec {
550        fn identity(
551            &self,
552            key_type: tor_linkspec::RelayIdType,
553        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
554            match key_type {
555                tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
556                _ => None,
557            }
558        }
559    }
560
561    /// Helper to make a fake Ed identity from a u32.
562    fn u32_to_ed(n: u32) -> Ed25519Identity {
563        let mut bytes = [0; 32];
564        bytes[0..4].copy_from_slice(&n.to_be_bytes());
565        bytes.into()
566    }
567
568    #[async_trait]
569    impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
570        type Channel = FakeChannel;
571        type BuildSpec = FakeBuildSpec;
572        type Stream = ();
573
574        async fn build_channel(
575            &self,
576            target: &Self::BuildSpec,
577            _reporter: BootstrapReporter,
578            _memquota: ChannelAccount,
579        ) -> Result<Arc<FakeChannel>> {
580            yield_now().await;
581            let FakeBuildSpec(ident, mood, id) = *target;
582            let ed_ident = u32_to_ed(ident);
583            assert_eq!(ed_ident, id);
584            match mood {
585                // "X" means never connect.
586                '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
587                // "zzz" means wait for 15 seconds then succeed.
588                '💤' => {
589                    self.runtime.sleep(Duration::new(15, 0)).await;
590                }
591                _ => {}
592            }
593            Ok(Arc::new(FakeChannel {
594                ed_ident,
595                mood,
596                closing: Arc::new(AtomicBool::new(false)),
597                detect_reuse: Default::default(),
598                // last_params: None,
599            }))
600        }
601
602        #[cfg(feature = "relay")]
603        async fn build_channel_using_incoming(
604            &self,
605            _peer: std::net::SocketAddr,
606            _stream: Self::Stream,
607            _memquota: ChannelAccount,
608        ) -> Result<Arc<Self::Channel>> {
609            unimplemented!()
610        }
611    }
612
613    #[test]
614    fn connect_one_ok() {
615        test_with_one_runtime!(|runtime| async {
616            let mgr = new_test_abstract_chanmgr(runtime);
617            let target = FakeBuildSpec(413, '!', u32_to_ed(413));
618            let chan1 = mgr
619                .get_or_launch(target.clone(), CU::UserTraffic)
620                .await
621                .unwrap()
622                .0;
623            let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
624
625            assert_eq!(chan1, chan2);
626            assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
627        });
628    }
629
630    #[test]
631    fn connect_one_fail() {
632        test_with_one_runtime!(|runtime| async {
633            let mgr = new_test_abstract_chanmgr(runtime);
634
635            // This is set up to always fail.
636            let target = FakeBuildSpec(999, '❌', u32_to_ed(999));
637            let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
638            assert!(matches!(res1, Err(Error::UnusableTarget(_))));
639
640            assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
641        });
642    }
643
644    #[test]
645    fn test_concurrent() {
646        test_with_one_runtime!(|runtime| async {
647            let mgr = new_test_abstract_chanmgr(runtime);
648
649            // TODO(nickm): figure out how to make these actually run
650            // concurrently. Right now it seems that they don't actually
651            // interact.
652            let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
653                mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
654                mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic),
655                mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic),
656                mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic),
657                mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic),
658                mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic),
659            );
660            let ch3a = ch3a.unwrap();
661            let ch3b = ch3b.unwrap();
662            let ch44a = ch44a.unwrap();
663            let ch44b = ch44b.unwrap();
664            let err_a = ch86a.unwrap_err();
665            let err_b = ch86b.unwrap_err();
666
667            assert_eq!(ch3a, ch3b);
668            assert_eq!(ch44a, ch44b);
669            assert_ne!(ch44a, ch3a);
670
671            assert!(matches!(err_a, Error::UnusableTarget(_)));
672            assert!(matches!(err_b, Error::UnusableTarget(_)));
673        });
674    }
675
676    #[test]
677    fn unusable_entries() {
678        test_with_one_runtime!(|runtime| async {
679            let mgr = new_test_abstract_chanmgr(runtime);
680
681            let (ch3, ch4, ch5) = join!(
682                mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
683                mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic),
684                mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic),
685            );
686
687            let ch3 = ch3.unwrap().0;
688            let _ch4 = ch4.unwrap();
689            let ch5 = ch5.unwrap().0;
690
691            ch3.start_closing();
692            ch5.start_closing();
693
694            let ch3_new = mgr
695                .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic)
696                .await
697                .unwrap()
698                .0;
699            assert_ne!(ch3, ch3_new);
700            assert_eq!(ch3_new.mood, 'b');
701
702            mgr.remove_unusable_entries().unwrap();
703
704            assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
705            assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
706            assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
707        });
708    }
709}