1
//! Abstract implementation of a channel manager
2

            
3
use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
4
use crate::util::defer::Defer;
5
use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
6

            
7
use crate::factory::BootstrapReporter;
8
use async_trait::async_trait;
9
use futures::future::Shared;
10
use oneshot_fused_workaround as oneshot;
11
use std::result::Result as StdResult;
12
use std::sync::Arc;
13
use std::time::Duration;
14
use tor_error::{error_report, internal};
15
use tor_linkspec::HasRelayIds;
16
use tor_netdir::params::NetParameters;
17
use tor_proto::channel::kist::KistParams;
18
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
19
use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
20

            
21
mod select;
22
mod state;
23

            
24
/// Trait to describe as much of a
25
/// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr`
26
/// needs to use.
27
pub(crate) trait AbstractChannel: HasRelayIds {
28
    /// Return true if this channel is usable.
29
    ///
30
    /// A channel might be unusable because it is closed, because it has
31
    /// hit a bug, or for some other reason.  We don't return unusable
32
    /// channels back to the user.
33
    fn is_usable(&self) -> bool;
34
    /// Return the amount of time a channel has not been in use.
35
    /// Return None if the channel is currently in use.
36
    fn duration_unused(&self) -> Option<Duration>;
37

            
38
    /// Reparameterize this channel according to the provided `ChannelPaddingInstructionsUpdates`
39
    ///
40
    /// The changed parameters may not be implemented "immediately",
41
    /// but this will be done "reasonably soon".
42
    fn reparameterize(
43
        &self,
44
        updates: Arc<ChannelPaddingInstructionsUpdates>,
45
    ) -> tor_proto::Result<()>;
46

            
47
    /// Update the KIST parameters.
48
    ///
49
    /// The changed parameters may not be implemented "immediately",
50
    /// but this will be done "reasonably soon".
51
    fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()>;
52

            
53
    /// Specify that this channel should do activities related to channel padding
54
    ///
55
    /// See [`Channel::engage_padding_activities`]
56
    ///
57
    /// [`Channel::engage_padding_activities`]: tor_proto::channel::Channel::engage_padding_activities
58
    fn engage_padding_activities(&self);
59
}
60

            
61
/// Trait to describe how channels-like objects are created.
62
///
63
/// This differs from [`ChannelFactory`](crate::factory::ChannelFactory) in that
64
/// it's a purely crate-internal type that we use to decouple the
65
/// AbstractChanMgr code from actual "what is a channel" concerns.
66
#[async_trait]
67
pub(crate) trait AbstractChannelFactory {
68
    /// The type of channel that this factory can build.
69
    type Channel: AbstractChannel;
70
    /// Type that explains how to build an outgoing channel.
71
    type BuildSpec: HasRelayIds;
72
    /// The type of byte stream that's required to build channels for incoming connections.
73
    type Stream;
74

            
75
    /// Construct a new channel to the destination described at `target`.
76
    ///
77
    /// This function must take care of all timeouts, error detection,
78
    /// and so on.
79
    ///
80
    /// It should not retry; that is handled at a higher level.
81
    async fn build_channel(
82
        &self,
83
        target: &Self::BuildSpec,
84
        reporter: BootstrapReporter,
85
        memquota: ChannelAccount,
86
    ) -> Result<Arc<Self::Channel>>;
87

            
88
    /// Construct a new channel for an incoming connection.
89
    #[cfg(feature = "relay")]
90
    async fn build_channel_using_incoming(
91
        &self,
92
        peer: std::net::SocketAddr,
93
        stream: Self::Stream,
94
        memquota: ChannelAccount,
95
    ) -> Result<Arc<Self::Channel>>;
96
}
97

            
98
/// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr).
99
///
100
/// This type does the work of keeping track of open channels and pending
101
/// channel requests, launching requests as needed, waiting for pending
102
/// requests, and so forth.
103
///
104
/// The actual job of launching connections is deferred to an
105
/// `AbstractChannelFactory` type.
106
pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
107
    /// All internal state held by this channel manager.
108
    ///
109
    /// The most important part is the map from relay identity to channel, or
110
    /// to pending channel status.
111
    pub(crate) channels: state::MgrState<CF>,
112

            
113
    /// A bootstrap reporter to give out when building channels.
114
    pub(crate) reporter: BootstrapReporter,
115

            
116
    /// The memory quota account that every channel will be a child of
117
    pub(crate) memquota: ToplevelAccount,
118
}
119

            
120
/// Type alias for a future that we wait on to see when a pending
121
/// channel is done or failed.
122
type Pending = Shared<oneshot::Receiver<Result<()>>>;
123

            
124
/// Type alias for the sender we notify when we complete a channel (or fail to
125
/// complete it).
126
type Sending = oneshot::Sender<Result<()>>;
127

            
128
impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
129
    /// Make a new empty channel manager.
130
60
    pub(crate) fn new(
131
60
        connector: CF,
132
60
        config: &ChannelConfig,
133
60
        dormancy: Dormancy,
134
60
        netparams: &NetParameters,
135
60
        reporter: BootstrapReporter,
136
60
        memquota: ToplevelAccount,
137
60
    ) -> Self {
138
60
        AbstractChanMgr {
139
60
            channels: state::MgrState::new(connector, config.clone(), dormancy, netparams),
140
60
            reporter,
141
60
            memquota,
142
60
        }
143
60
    }
144

            
145
    /// Run a function to modify the channel builder in this object.
146
    #[allow(dead_code)]
147
8
    pub(crate) fn with_mut_builder<F>(&self, func: F)
148
8
    where
149
8
        F: FnOnce(&mut CF),
150
8
    {
151
8
        self.channels.with_mut_builder(func);
152
8
    }
153

            
154
    /// Remove every unusable entry from this channel manager.
155
    #[cfg(test)]
156
2
    pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
157
2
        self.channels.remove_unusable()
158
2
    }
159

            
160
    /// Build a channel for an incoming stream. See
161
    /// [`ChanMgr::handle_incoming`](crate::ChanMgr::handle_incoming).
162
    #[cfg(feature = "relay")]
163
    pub(crate) async fn handle_incoming(
164
        &self,
165
        src: std::net::SocketAddr,
166
        stream: CF::Stream,
167
    ) -> Result<Arc<CF::Channel>> {
168
        let chan_builder = self.channels.builder();
169
        let memquota = ChannelAccount::new(&self.memquota)?;
170
        let _outcome = chan_builder
171
            .build_channel_using_incoming(src, stream, memquota)
172
            .await?;
173

            
174
        // TODO RELAY: we need to do something with the channel here now that we've created it
175
        todo!();
176
    }
177

            
178
    /// Get a channel corresponding to the identities of `target`.
179
    ///
180
    /// If a usable channel exists with that identity, return it.
181
    ///
182
    /// If no such channel exists already, and none is in progress,
183
    /// launch a new request using `target`.
184
    ///
185
    /// If no such channel exists already, but we have one that's in
186
    /// progress, wait for it to succeed or fail.
187
34
    pub(crate) async fn get_or_launch(
188
34
        &self,
189
34
        target: CF::BuildSpec,
190
34
        usage: ChannelUsage,
191
34
    ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
192
        use ChannelUsage as CU;
193

            
194
34
        let chan = self.get_or_launch_internal(target).await?;
195

            
196
28
        match usage {
197
2
            CU::Dir | CU::UselessCircuit => {}
198
26
            CU::UserTraffic => chan.0.engage_padding_activities(),
199
        }
200

            
201
28
        Ok(chan)
202
34
    }
203

            
204
    /// Get a channel whose identity is `ident` - internal implementation
205
34
    async fn get_or_launch_internal(
206
34
        &self,
207
34
        target: CF::BuildSpec,
208
34
    ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
209
        /// How many times do we try?
210
        const N_ATTEMPTS: usize = 2;
211
34
        let mut attempts_so_far = 0;
212
34
        let mut final_attempt = false;
213
34
        let mut provenance = ChanProvenance::Preexisting;
214
34

            
215
34
        // TODO(nickm): It would be neat to use tor_retry instead.
216
34
        let mut last_err = None;
217

            
218
50
        while attempts_so_far < N_ATTEMPTS || final_attempt {
219
44
            attempts_so_far += 1;
220

            
221
            // For each attempt, we _first_ look at the state of the channel map
222
            // to decide on an `Action`, and _then_ we execute that action.
223

            
224
            // First, see what state we're in, and what we should do about it.
225
44
            let action = self.choose_action(&target, final_attempt)?;
226

            
227
            // We are done deciding on our Action! It's time act based on the
228
            // Action that we chose.
229
44
            match action {
230
                // If this happens, we were trying to make one final check of our state, but
231
                // we would have had to make additional attempts.
232
                None => {
233
                    if !final_attempt {
234
                        return Err(Error::Internal(internal!(
235
                            "No action returned while not on final attempt"
236
                        )));
237
                    }
238
                    break;
239
                }
240
                // Easy case: we have an error or a channel to return.
241
6
                Some(Action::Return(v)) => {
242
6
                    return v.map(|chan| (chan, provenance));
243
                }
244
                // There's an in-progress channel.  Wait for it.
245
8
                Some(Action::Wait(pend)) => {
246
8
                    match pend.await {
247
4
                        Ok(Ok(())) => {
248
4
                            // We were waiting for a channel, and it succeeded, or it
249
4
                            // got cancelled.  But it might have gotten more
250
4
                            // identities while negotiating than it had when it was
251
4
                            // launched, or it might have failed to get all the
252
4
                            // identities we want. Check for this.
253
4
                            final_attempt = true;
254
4
                            provenance = ChanProvenance::NewlyCreated;
255
4
                            last_err.get_or_insert(Error::RequestCancelled);
256
4
                        }
257
4
                        Ok(Err(e)) => {
258
4
                            last_err = Some(e);
259
4
                        }
260
                        Err(_) => {
261
                            last_err =
262
                                Some(Error::Internal(internal!("channel build task disappeared")));
263
                        }
264
                    }
265
                }
266
                // We need to launch a channel.
267
30
                Some(Action::Launch((handle, send))) => {
268
30
                    // If the remainder of this code returns early or is cancelled, we still want to
269
30
                    // clean up our pending entry in the channel map. The following closure will be
270
30
                    // run when dropped to ensure that it's cleaned up properly.
271
30
                    //
272
30
                    // The `remove_pending_channel` will acquire the lock within `MgrState`, but
273
30
                    // this won't lead to deadlocks since the lock is only ever acquired within
274
30
                    // methods of `MgrState`. When this `Defer` is being dropped, no other
275
30
                    // `MgrState` methods will be running on this thread, so the lock will not have
276
30
                    // already been acquired.
277
30
                    let defer_remove_pending = Defer::new(handle, |handle| {
278
8
                        if let Err(e) = self.channels.remove_pending_channel(handle) {
279
                            // Just log an error if we're unable to remove it, since there's
280
                            // nothing else we can do here, and returning the error would
281
                            // hide the actual error that we care about (the channel build
282
                            // failure).
283
                            #[allow(clippy::missing_docs_in_private_items)]
284
                            const MSG: &str = "Unable to remove the pending channel";
285
                            error_report!(internal!("{e}"), "{}", MSG);
286
8
                        }
287
30
                    });
288
30

            
289
30
                    let connector = self.channels.builder();
290
30
                    let memquota = ChannelAccount::new(&self.memquota)?;
291

            
292
30
                    let outcome = connector
293
30
                        .build_channel(&target, self.reporter.clone(), memquota)
294
30
                        .await;
295

            
296
30
                    match outcome {
297
22
                        Ok(ref chan) => {
298
22
                            // Replace the pending channel with the newly built channel.
299
22
                            let handle = defer_remove_pending.cancel();
300
22
                            self.channels
301
22
                                .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?;
302
                        }
303
8
                        Err(_) => {
304
8
                            // Remove the pending channel.
305
8
                            drop(defer_remove_pending);
306
8
                        }
307
                    }
308

            
309
                    // It's okay if all the receivers went away:
310
                    // that means that nobody was waiting for this channel.
311
30
                    let _ignore_err = send.send(outcome.clone().map(|_| ()));
312
30

            
313
30
                    match outcome {
314
22
                        Ok(chan) => {
315
22
                            return Ok((chan, ChanProvenance::NewlyCreated));
316
                        }
317
8
                        Err(e) => last_err = Some(e),
318
                    }
319
                }
320
            }
321

            
322
            // End of this attempt. We will try again...
323
        }
324

            
325
6
        Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
326
34
    }
327

            
328
    /// Helper: based on our internal state, decide which action to take when
329
    /// asked for a channel, and update our internal state accordingly.
330
    ///
331
    /// If `final_attempt` is true, then we will not pick any action that does
332
    /// not result in an immediate result. If we would pick such an action, we
333
    /// instead return `Ok(None)`.  (We could instead have the caller detect
334
    /// such actions, but it's less efficient to construct them, insert them,
335
    /// and immediately revert them.)
336
44
    fn choose_action(
337
44
        &self,
338
44
        target: &CF::BuildSpec,
339
44
        final_attempt: bool,
340
44
    ) -> Result<Option<Action<CF::Channel>>> {
341
44
        // don't create new channels on the final attempt
342
44
        let response = self.channels.request_channel(
343
44
            target,
344
44
            /* add_new_entry_if_not_found= */ !final_attempt,
345
44
        );
346

            
347
44
        match response {
348
6
            Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
349
8
            Ok(Some(ChannelForTarget::Pending(pending))) => {
350
8
                if !final_attempt {
351
8
                    Ok(Some(Action::Wait(pending)))
352
                } else {
353
                    // don't return a pending channel on the final attempt
354
                    Ok(None)
355
                }
356
            }
357
30
            Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
358
30
                // do not drop the handle if refactoring; see `PendingChannelHandle` for details
359
30
                Ok(Some(Action::Launch((handle, send))))
360
            }
361
            Ok(None) => Ok(None),
362
            Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
363
            Err(e) => Err(e),
364
        }
365
44
    }
366

            
367
    /// Update the netdir
368
4
    pub(crate) fn update_netparams(
369
4
        &self,
370
4
        netparams: Arc<dyn AsRef<NetParameters>>,
371
4
    ) -> StdResult<(), tor_error::Bug> {
372
4
        self.channels.reconfigure_general(None, None, netparams)
373
4
    }
374

            
375
    /// Notifies the chanmgr to be dormant like dormancy
376
12
    pub(crate) fn set_dormancy(
377
12
        &self,
378
12
        dormancy: Dormancy,
379
12
        netparams: Arc<dyn AsRef<NetParameters>>,
380
12
    ) -> StdResult<(), tor_error::Bug> {
381
12
        self.channels
382
12
            .reconfigure_general(None, Some(dormancy), netparams)
383
12
    }
384

            
385
    /// Reconfigure all channels
386
6
    pub(crate) fn reconfigure(
387
6
        &self,
388
6
        config: &ChannelConfig,
389
6
        netparams: Arc<dyn AsRef<NetParameters>>,
390
6
    ) -> StdResult<(), tor_error::Bug> {
391
6
        self.channels
392
6
            .reconfigure_general(Some(config), None, netparams)
393
6
    }
394

            
395
    /// Expire any channels that have been unused longer than
396
    /// their maximum unused duration assigned during creation.
397
    ///
398
    /// Return a duration from now until next channel expires.
399
    ///
400
    /// If all channels are in use or there are no open channels,
401
    /// return 180 seconds which is the minimum value of
402
    /// max_unused_duration.
403
16
    pub(crate) fn expire_channels(&self) -> Duration {
404
16
        self.channels.expire_channels()
405
16
    }
406

            
407
    /// Test only: return the open usable channels with a given `ident`.
408
    #[cfg(test)]
409
10
    pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
410
10
    where
411
10
        T: Into<tor_linkspec::RelayIdRef<'a>>,
412
10
    {
413
        use state::ChannelState::*;
414
10
        self.channels
415
10
            .with_channels(|channel_map| {
416
10
                channel_map
417
10
                    .by_id(ident)
418
10
                    .filter_map(|entry| match entry {
419
6
                        Open(ref ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
420
                        _ => None,
421
10
                    })
422
10
                    .collect()
423
10
            })
424
10
            .expect("Poisoned lock")
425
10
    }
426
}
427

            
428
/// Possible actions that we'll decide to take when asked for a channel.
429
#[allow(clippy::large_enum_variant)]
430
enum Action<C: AbstractChannel> {
431
    /// We found no channel.  We're going to launch a new one,
432
    /// then tell everybody about it.
433
    Launch((PendingChannelHandle, Sending)),
434
    /// We found an in-progress attempt at making a channel.
435
    /// We're going to wait for it to finish.
436
    Wait(Pending),
437
    /// We found a usable channel.  We're going to return it.
438
    Return(Result<Arc<C>>),
439
}
440

            
441
#[cfg(test)]
442
mod test {
443
    // @@ begin test lint list maintained by maint/add_warning @@
444
    #![allow(clippy::bool_assert_comparison)]
445
    #![allow(clippy::clone_on_copy)]
446
    #![allow(clippy::dbg_macro)]
447
    #![allow(clippy::mixed_attributes_style)]
448
    #![allow(clippy::print_stderr)]
449
    #![allow(clippy::print_stdout)]
450
    #![allow(clippy::single_char_pattern)]
451
    #![allow(clippy::unwrap_used)]
452
    #![allow(clippy::unchecked_duration_subtraction)]
453
    #![allow(clippy::useless_vec)]
454
    #![allow(clippy::needless_pass_by_value)]
455
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
456
    use super::*;
457
    use crate::Error;
458

            
459
    use futures::join;
460
    use std::sync::atomic::{AtomicBool, Ordering};
461
    use std::sync::Arc;
462
    use std::time::Duration;
463
    use tor_error::bad_api_usage;
464
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
465
    use tor_memquota::ArcMemoryQuotaTrackerExt as _;
466

            
467
    use crate::ChannelUsage as CU;
468
    use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
469

            
470
    #[derive(Clone)]
471
    struct FakeChannelFactory<RT> {
472
        runtime: RT,
473
    }
474

            
475
    #[derive(Clone, Debug)]
476
    struct FakeChannel {
477
        ed_ident: Ed25519Identity,
478
        mood: char,
479
        closing: Arc<AtomicBool>,
480
        detect_reuse: Arc<char>,
481
        // last_params: Option<ChannelPaddingInstructionsUpdates>,
482
    }
483

            
484
    impl PartialEq for FakeChannel {
485
        fn eq(&self, other: &Self) -> bool {
486
            Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
487
        }
488
    }
489

            
490
    impl AbstractChannel for FakeChannel {
491
        fn is_usable(&self) -> bool {
492
            !self.closing.load(Ordering::SeqCst)
493
        }
494
        fn duration_unused(&self) -> Option<Duration> {
495
            None
496
        }
497
        fn reparameterize(
498
            &self,
499
            _updates: Arc<ChannelPaddingInstructionsUpdates>,
500
        ) -> tor_proto::Result<()> {
501
            // *self.last_params.lock().unwrap() = Some((*updates).clone());
502
            Ok(())
503
        }
504
        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
505
            Ok(())
506
        }
507
        fn engage_padding_activities(&self) {}
508
    }
509

            
510
    impl HasRelayIds for FakeChannel {
511
        fn identity(
512
            &self,
513
            key_type: tor_linkspec::RelayIdType,
514
        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
515
            match key_type {
516
                tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
517
                _ => None,
518
            }
519
        }
520
    }
521

            
522
    impl FakeChannel {
523
        fn start_closing(&self) {
524
            self.closing.store(true, Ordering::SeqCst);
525
        }
526
    }
527

            
528
    impl<RT: Runtime> FakeChannelFactory<RT> {
529
        fn new(runtime: RT) -> Self {
530
            FakeChannelFactory { runtime }
531
        }
532
    }
533

            
534
    fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
535
        let cf = FakeChannelFactory::new(runtime);
536
        AbstractChanMgr::new(
537
            cf,
538
            &ChannelConfig::default(),
539
            Default::default(),
540
            &Default::default(),
541
            BootstrapReporter::fake(),
542
            ToplevelAccount::new_noop(),
543
        )
544
    }
545

            
546
    #[derive(Clone, Debug)]
547
    struct FakeBuildSpec(u32, char, Ed25519Identity);
548

            
549
    impl HasRelayIds for FakeBuildSpec {
550
        fn identity(
551
            &self,
552
            key_type: tor_linkspec::RelayIdType,
553
        ) -> Option<tor_linkspec::RelayIdRef<'_>> {
554
            match key_type {
555
                tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
556
                _ => None,
557
            }
558
        }
559
    }
560

            
561
    /// Helper to make a fake Ed identity from a u32.
562
    fn u32_to_ed(n: u32) -> Ed25519Identity {
563
        let mut bytes = [0; 32];
564
        bytes[0..4].copy_from_slice(&n.to_be_bytes());
565
        bytes.into()
566
    }
567

            
568
    #[async_trait]
569
    impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
570
        type Channel = FakeChannel;
571
        type BuildSpec = FakeBuildSpec;
572
        type Stream = ();
573

            
574
        async fn build_channel(
575
            &self,
576
            target: &Self::BuildSpec,
577
            _reporter: BootstrapReporter,
578
            _memquota: ChannelAccount,
579
        ) -> Result<Arc<FakeChannel>> {
580
            yield_now().await;
581
            let FakeBuildSpec(ident, mood, id) = *target;
582
            let ed_ident = u32_to_ed(ident);
583
            assert_eq!(ed_ident, id);
584
            match mood {
585
                // "X" means never connect.
586
                '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
587
                // "zzz" means wait for 15 seconds then succeed.
588
                '💤' => {
589
                    self.runtime.sleep(Duration::new(15, 0)).await;
590
                }
591
                _ => {}
592
            }
593
            Ok(Arc::new(FakeChannel {
594
                ed_ident,
595
                mood,
596
                closing: Arc::new(AtomicBool::new(false)),
597
                detect_reuse: Default::default(),
598
                // last_params: None,
599
            }))
600
        }
601

            
602
        #[cfg(feature = "relay")]
603
        async fn build_channel_using_incoming(
604
            &self,
605
            _peer: std::net::SocketAddr,
606
            _stream: Self::Stream,
607
            _memquota: ChannelAccount,
608
        ) -> Result<Arc<Self::Channel>> {
609
            unimplemented!()
610
        }
611
    }
612

            
613
    #[test]
614
    fn connect_one_ok() {
615
        test_with_one_runtime!(|runtime| async {
616
            let mgr = new_test_abstract_chanmgr(runtime);
617
            let target = FakeBuildSpec(413, '!', u32_to_ed(413));
618
            let chan1 = mgr
619
                .get_or_launch(target.clone(), CU::UserTraffic)
620
                .await
621
                .unwrap()
622
                .0;
623
            let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
624

            
625
            assert_eq!(chan1, chan2);
626
            assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
627
        });
628
    }
629

            
630
    #[test]
631
    fn connect_one_fail() {
632
        test_with_one_runtime!(|runtime| async {
633
            let mgr = new_test_abstract_chanmgr(runtime);
634

            
635
            // This is set up to always fail.
636
            let target = FakeBuildSpec(999, '❌', u32_to_ed(999));
637
            let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
638
            assert!(matches!(res1, Err(Error::UnusableTarget(_))));
639

            
640
            assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
641
        });
642
    }
643

            
644
    #[test]
645
    fn test_concurrent() {
646
        test_with_one_runtime!(|runtime| async {
647
            let mgr = new_test_abstract_chanmgr(runtime);
648

            
649
            // TODO(nickm): figure out how to make these actually run
650
            // concurrently. Right now it seems that they don't actually
651
            // interact.
652
            let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
653
                mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
654
                mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic),
655
                mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic),
656
                mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic),
657
                mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic),
658
                mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic),
659
            );
660
            let ch3a = ch3a.unwrap();
661
            let ch3b = ch3b.unwrap();
662
            let ch44a = ch44a.unwrap();
663
            let ch44b = ch44b.unwrap();
664
            let err_a = ch86a.unwrap_err();
665
            let err_b = ch86b.unwrap_err();
666

            
667
            assert_eq!(ch3a, ch3b);
668
            assert_eq!(ch44a, ch44b);
669
            assert_ne!(ch44a, ch3a);
670

            
671
            assert!(matches!(err_a, Error::UnusableTarget(_)));
672
            assert!(matches!(err_b, Error::UnusableTarget(_)));
673
        });
674
    }
675

            
676
    #[test]
677
    fn unusable_entries() {
678
        test_with_one_runtime!(|runtime| async {
679
            let mgr = new_test_abstract_chanmgr(runtime);
680

            
681
            let (ch3, ch4, ch5) = join!(
682
                mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
683
                mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic),
684
                mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic),
685
            );
686

            
687
            let ch3 = ch3.unwrap().0;
688
            let _ch4 = ch4.unwrap();
689
            let ch5 = ch5.unwrap().0;
690

            
691
            ch3.start_closing();
692
            ch5.start_closing();
693

            
694
            let ch3_new = mgr
695
                .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic)
696
                .await
697
                .unwrap()
698
                .0;
699
            assert_ne!(ch3, ch3_new);
700
            assert_eq!(ch3_new.mood, 'b');
701

            
702
            mgr.remove_unusable_entries().unwrap();
703

            
704
            assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
705
            assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
706
            assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
707
        });
708
    }
709
}