1
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2
#![doc = include_str!("../README.md")]
3
// @@ begin lint list maintained by maint/add_warning @@
4
#![allow(renamed_and_removed_lints)] // @@REMOVE_WHEN(ci_arti_stable)
5
#![allow(unknown_lints)] // @@REMOVE_WHEN(ci_arti_nightly)
6
#![warn(missing_docs)]
7
#![warn(noop_method_call)]
8
#![warn(unreachable_pub)]
9
#![warn(clippy::all)]
10
#![deny(clippy::await_holding_lock)]
11
#![deny(clippy::cargo_common_metadata)]
12
#![deny(clippy::cast_lossless)]
13
#![deny(clippy::checked_conversions)]
14
#![warn(clippy::cognitive_complexity)]
15
#![deny(clippy::debug_assert_with_mut_call)]
16
#![deny(clippy::exhaustive_enums)]
17
#![deny(clippy::exhaustive_structs)]
18
#![deny(clippy::expl_impl_clone_on_copy)]
19
#![deny(clippy::fallible_impl_from)]
20
#![deny(clippy::implicit_clone)]
21
#![deny(clippy::large_stack_arrays)]
22
#![warn(clippy::manual_ok_or)]
23
#![deny(clippy::missing_docs_in_private_items)]
24
#![warn(clippy::needless_borrow)]
25
#![warn(clippy::needless_pass_by_value)]
26
#![warn(clippy::option_option)]
27
#![deny(clippy::print_stderr)]
28
#![deny(clippy::print_stdout)]
29
#![warn(clippy::rc_buffer)]
30
#![deny(clippy::ref_option_ref)]
31
#![warn(clippy::semicolon_if_nothing_returned)]
32
#![warn(clippy::trait_duplication_in_bounds)]
33
#![deny(clippy::unchecked_duration_subtraction)]
34
#![deny(clippy::unnecessary_wraps)]
35
#![warn(clippy::unseparated_literal_suffix)]
36
#![deny(clippy::unwrap_used)]
37
#![deny(clippy::mod_module_files)]
38
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
39
#![allow(clippy::uninlined_format_args)]
40
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
41
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
42
#![allow(clippy::needless_raw_string_hashes)] // complained-about code is fine, often best
43
#![allow(clippy::needless_lifetimes)] // See arti#1765
44
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
45

            
46
// TODO #1645 (either remove this, or decide to have it everywhere)
47
#![cfg_attr(not(all(feature = "full")), allow(unused))]
48

            
49
#[cfg(all(
50
    any(feature = "native-tls", feature = "rustls"),
51
    any(feature = "async-std", feature = "tokio")
52
))]
53
pub(crate) mod impls;
54
pub mod task;
55

            
56
mod coarse_time;
57
mod compound;
58
mod dyn_time;
59
pub mod general;
60
mod opaque;
61
pub mod scheduler;
62
mod timer;
63
mod traits;
64
pub mod unimpl;
65
pub mod unix;
66

            
67
#[cfg(any(feature = "async-std", feature = "tokio"))]
68
use std::io;
69
pub use traits::{
70
    Blocking, CertifiedConn, CoarseTimeProvider, NetStreamListener, NetStreamProvider,
71
    NoOpStreamOpsHandle, Runtime, SleepProvider, StreamOps, TlsProvider, ToplevelBlockOn,
72
    ToplevelRuntime, UdpProvider, UdpSocket, UnsupportedStreamOp,
73
};
74

            
75
pub use coarse_time::{CoarseDuration, CoarseInstant, RealCoarseTimeProvider};
76
pub use dyn_time::DynTimeProvider;
77
pub use timer::{SleepProviderExt, Timeout, TimeoutError};
78

            
79
/// Traits used to describe TLS connections and objects that can
80
/// create them.
81
pub mod tls {
82
    pub use crate::traits::{CertifiedConn, TlsConnector};
83

            
84
    #[cfg(all(feature = "native-tls", any(feature = "tokio", feature = "async-std")))]
85
    pub use crate::impls::native_tls::NativeTlsProvider;
86
    #[cfg(all(feature = "rustls", any(feature = "tokio", feature = "async-std")))]
87
    pub use crate::impls::rustls::RustlsProvider;
88
}
89

            
90
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
91
pub mod tokio;
92

            
93
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
94
pub mod async_std;
95

            
96
pub use compound::{CompoundRuntime, RuntimeSubstExt};
97

            
98
#[cfg(all(
99
    any(feature = "native-tls", feature = "rustls"),
100
    feature = "async-std",
101
    not(feature = "tokio")
102
))]
103
use async_std as preferred_backend_mod;
104
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
105
use tokio as preferred_backend_mod;
106

            
107
/// The runtime that we prefer to use, out of all the runtimes compiled into the
108
/// tor-rtcompat crate.
109
///
110
/// If `tokio` and `async-std` are both available, we prefer `tokio` for its
111
/// performance.
112
/// If `native_tls` and `rustls` are both available, we prefer `native_tls` since
113
/// it has been used in Arti for longer.
114
///
115
/// The process [**may not fork**](crate#do-not-fork)
116
/// (except, very carefully, before exec)
117
/// after creating this or any other `Runtime`.
118
#[cfg(all(
119
    any(feature = "native-tls", feature = "rustls"),
120
    any(feature = "async-std", feature = "tokio")
121
))]
122
#[derive(Clone)]
123
pub struct PreferredRuntime {
124
    /// The underlying runtime object.
125
    inner: preferred_backend_mod::PreferredRuntime,
126
}
127

            
128
#[cfg(all(
129
    any(feature = "native-tls", feature = "rustls"),
130
    any(feature = "async-std", feature = "tokio")
131
))]
132
crate::opaque::implement_opaque_runtime! {
133
    PreferredRuntime { inner : preferred_backend_mod::PreferredRuntime }
134
}
135

            
136
#[cfg(all(
137
    any(feature = "native-tls", feature = "rustls"),
138
    any(feature = "async-std", feature = "tokio")
139
))]
140
impl PreferredRuntime {
141
    /// Obtain a [`PreferredRuntime`] from the currently running asynchronous runtime.
142
    /// Generally, this is what you want.
143
    ///
144
    /// This tries to get a handle to a currently running asynchronous runtime, and
145
    /// wraps it; the returned [`PreferredRuntime`] isn't the same thing as the
146
    /// asynchronous runtime object itself (e.g. `tokio::runtime::Runtime`).
147
    ///
148
    /// # Panics
149
    ///
150
    /// When `tor-rtcompat` is compiled with the `tokio` feature enabled
151
    /// (regardless of whether the `async-std` feature is also enabled),
152
    /// panics if called outside of Tokio runtime context.
153
    /// See `tokio::runtime::Handle::current`.
154
    ///
155
    /// # Usage notes
156
    ///
157
    /// Once you have a runtime returned by this function, you should
158
    /// just create more handles to it via [`Clone`].
159
    ///
160
    /// # Limitations
161
    ///
162
    /// If the `tor-rtcompat` crate was compiled with `tokio` support,
163
    /// this function will never return a runtime based on `async_std`.
164
    ///
165
    /// The process [**may not fork**](crate#do-not-fork)
166
    /// (except, very carefully, before exec)
167
    /// after creating this or any other `Runtime`.
168
    //
169
    // ## Note to Arti developers
170
    //
171
    // We should never call this from inside other Arti crates, or from
172
    // library crates that want to support multiple runtimes!  This
173
    // function is for Arti _users_ who want to wrap some existing Tokio
174
    // or Async_std runtime as a [`Runtime`].  It is not for library
175
    // crates that want to work with multiple runtimes.
176
    pub fn current() -> io::Result<Self> {
177
        let rt = preferred_backend_mod::PreferredRuntime::current()?;
178

            
179
        Ok(Self { inner: rt })
180
    }
181

            
182
    /// Create and return a new instance of the default [`Runtime`].
183
    ///
184
    /// Generally you should call this function at most once, and then use
185
    /// [`Clone::clone()`] to create additional references to that runtime.
186
    ///
187
    /// Tokio users may want to avoid this function and instead obtain a runtime using
188
    /// [`PreferredRuntime::current`]: this function always _builds_ a runtime,
189
    /// and if you already have a runtime, that isn't what you want with Tokio.
190
    ///
191
    /// If you need more fine-grained control over a runtime, you can create it
192
    /// using an appropriate builder type or function.
193
    ///
194
    /// The process [**may not fork**](crate#do-not-fork)
195
    /// (except, very carefully, before exec)
196
    /// after creating this or any other `Runtime`.
197
    //
198
    // ## Note to Arti developers
199
    //
200
    // We should never call this from inside other Arti crates, or from
201
    // library crates that want to support multiple runtimes!  This
202
    // function is for Arti _users_ who want to wrap some existing Tokio
203
    // or Async_std runtime as a [`Runtime`].  It is not for library
204
    // crates that want to work with multiple runtimes.
205
2316
    pub fn create() -> io::Result<Self> {
206
2316
        let rt = preferred_backend_mod::PreferredRuntime::create()?;
207

            
208
2316
        Ok(Self { inner: rt })
209
2316
    }
210

            
211
    /// Helper to run a single test function in a freshly created runtime.
212
    ///
213
    /// # Panics
214
    ///
215
    /// Panics if we can't create this runtime.
216
    ///
217
    /// # Warning
218
    ///
219
    /// This API is **NOT** for consumption outside Arti. Semver guarantees are not provided.
220
    #[doc(hidden)]
221
80
    pub fn run_test<P, F, O>(func: P) -> O
222
80
    where
223
80
        P: FnOnce(Self) -> F,
224
80
        F: futures::Future<Output = O>,
225
80
    {
226
80
        let runtime = Self::create().expect("Failed to create runtime");
227
80
        runtime.clone().block_on(func(runtime))
228
80
    }
229
}
230

            
231
/// Helpers for test_with_all_runtimes
232
///
233
/// # Warning
234
///
235
/// This API is **NOT** for consumption outside Arti. Semver guarantees are not provided.
236
#[doc(hidden)]
237
pub mod testing__ {
238
    /// A trait for an object that might represent a test failure, or which
239
    /// might just be `()`.
240
    pub trait TestOutcome {
241
        /// Abort if the test has failed.
242
        fn check_ok(&self);
243
    }
244
    impl TestOutcome for () {
245
        fn check_ok(&self) {}
246
    }
247
    impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
248
        fn check_ok(&self) {
249
            self.as_ref().expect("Test failure");
250
        }
251
    }
252
}
253

            
254
/// Helper: define a macro that expands a token tree iff a pair of features are
255
/// both present.
256
macro_rules! declare_conditional_macro {
257
    ( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
258
        $( #[$meta] )*
259
        #[cfg(all(feature=$f1, feature=$f2))]
260
        #[macro_export]
261
        macro_rules! $name {
262
            ($tt:tt) => {
263
                $tt
264
            };
265
        }
266

            
267
        $( #[$meta] )*
268
        #[cfg(not(all(feature=$f1, feature=$f2)))]
269
        #[macro_export]
270
        macro_rules! $name {
271
            ($tt:tt) => {};
272
        }
273

            
274
        // Needed so that we can access this macro at this path, both within the
275
        // crate and without.
276
        pub use $name;
277
    };
278
}
279

            
280
/// Defines macros that will expand when certain runtimes are available.
281
#[doc(hidden)]
282
pub mod cond {
283
    declare_conditional_macro! {
284
        /// Expand a token tree if the TokioNativeTlsRuntime is available.
285
        #[doc(hidden)]
286
        macro if_tokio_native_tls_present = ("tokio", "native-tls")
287
    }
288
    declare_conditional_macro! {
289
        /// Expand a token tree if the TokioRustlsRuntime is available.
290
        #[doc(hidden)]
291
        macro if_tokio_rustls_present = ("tokio", "rustls")
292
    }
293
    declare_conditional_macro! {
294
        /// Expand a token tree if the TokioNativeTlsRuntime is available.
295
        #[doc(hidden)]
296
        macro if_async_std_native_tls_present = ("async-std", "native-tls")
297
    }
298
    declare_conditional_macro! {
299
        /// Expand a token tree if the TokioNativeTlsRuntime is available.
300
        #[doc(hidden)]
301
        macro if_async_std_rustls_present = ("async-std", "rustls")
302
    }
303
}
304

            
305
/// Run a test closure, passing as argument every supported runtime.
306
///
307
/// Usually, prefer `tor_rtmock::MockRuntime::test_with_various` to this.
308
/// Use this macro only when you need to interact with things
309
/// that `MockRuntime` can't handle,
310
///
311
/// If everything in your test case is supported by `MockRuntime`,
312
/// you should use that instead:
313
/// that will give superior test coverage *and* a (more) deterministic test.
314
///
315
/// (This is a macro so that it can repeat the closure as multiple separate
316
/// expressions, so it can take on two different types, if needed.)
317
//
318
// NOTE(eta): changing this #[cfg] can affect tests inside this crate that use
319
//            this macro, like in scheduler.rs
320
#[macro_export]
321
#[cfg(all(
322
    any(feature = "native-tls", feature = "rustls"),
323
    any(feature = "tokio", feature = "async-std"),
324
))]
325
macro_rules! test_with_all_runtimes {
326
    ( $fn:expr ) => {{
327
        use $crate::cond::*;
328
        use $crate::testing__::TestOutcome;
329
        // We have to do this outcome-checking business rather than just using
330
        // the ? operator or calling expect() because some of the closures that
331
        // we use this macro with return (), and some return Result.
332

            
333
        if_tokio_native_tls_present! {{
334
           $crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
335
        }}
336
        if_tokio_rustls_present! {{
337
            $crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
338
        }}
339
        if_async_std_native_tls_present! {{
340
            $crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
341
        }}
342
        if_async_std_rustls_present! {{
343
            $crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
344
        }}
345
    }};
346
}
347

            
348
/// Run a test closure, passing as argument one supported runtime.
349
///
350
/// Usually, prefer `tor_rtmock::MockRuntime::test_with_various` to this.
351
/// Use this macro only when you need to interact with things
352
/// that `MockRuntime` can't handle.
353
///
354
/// If everything in your test case is supported by `MockRuntime`,
355
/// you should use that instead:
356
/// that will give superior test coverage *and* a (more) deterministic test.
357
///
358
/// (Always prefers tokio if present.)
359
#[macro_export]
360
#[cfg(all(
361
    any(feature = "native-tls", feature = "rustls"),
362
    any(feature = "tokio", feature = "async-std"),
363
))]
364
macro_rules! test_with_one_runtime {
365
    ( $fn:expr ) => {{
366
        $crate::PreferredRuntime::run_test($fn)
367
    }};
368
}
369

            
370
#[cfg(all(
371
    test,
372
    any(feature = "native-tls", feature = "rustls"),
373
    any(feature = "async-std", feature = "tokio"),
374
    not(miri), // Many of these tests use real sockets or SystemTime
375
))]
376
mod test {
377
    #![allow(clippy::unwrap_used, clippy::unnecessary_wraps)]
378
    use crate::SleepProviderExt;
379
    use crate::ToplevelRuntime;
380

            
381
    use crate::traits::*;
382

            
383
    use futures::io::{AsyncReadExt, AsyncWriteExt};
384
    use futures::stream::StreamExt;
385
    use native_tls_crate as native_tls;
386
    use std::io::Result as IoResult;
387
    use std::net::SocketAddr;
388
    use std::net::{Ipv4Addr, SocketAddrV4};
389
    use std::time::{Duration, Instant};
390

            
391
    // Test "sleep" with a tiny delay, and make sure that at least that
392
    // much delay happens.
393
    fn small_delay<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
394
        let rt = runtime.clone();
395
        runtime.block_on(async {
396
            let i1 = Instant::now();
397
            let one_msec = Duration::from_millis(1);
398
            rt.sleep(one_msec).await;
399
            let i2 = Instant::now();
400
            assert!(i2 >= i1 + one_msec);
401
        });
402
        Ok(())
403
    }
404

            
405
    // Try a timeout operation that will succeed.
406
    fn small_timeout_ok<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
407
        let rt = runtime.clone();
408
        runtime.block_on(async {
409
            let one_day = Duration::from_secs(86400);
410
            let outcome = rt.timeout(one_day, async { 413_u32 }).await;
411
            assert_eq!(outcome, Ok(413));
412
        });
413
        Ok(())
414
    }
415

            
416
    // Try a timeout operation that will time out.
417
    fn small_timeout_expire<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
418
        use futures::future::pending;
419

            
420
        let rt = runtime.clone();
421
        runtime.block_on(async {
422
            let one_micros = Duration::from_micros(1);
423
            let outcome = rt.timeout(one_micros, pending::<()>()).await;
424
            assert_eq!(outcome, Err(crate::TimeoutError));
425
            assert_eq!(
426
                outcome.err().unwrap().to_string(),
427
                "Timeout expired".to_string()
428
            );
429
        });
430
        Ok(())
431
    }
432
    // Try a little wallclock delay.
433
    //
434
    // NOTE: This test will fail if the clock jumps a lot while it's
435
    // running.  We should use simulated time instead.
436
    fn tiny_wallclock<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
437
        let rt = runtime.clone();
438
        runtime.block_on(async {
439
            let i1 = Instant::now();
440
            let now = runtime.wallclock();
441
            let one_millis = Duration::from_millis(1);
442
            let one_millis_later = now + one_millis;
443

            
444
            rt.sleep_until_wallclock(one_millis_later).await;
445

            
446
            let i2 = Instant::now();
447
            let newtime = runtime.wallclock();
448
            assert!(newtime >= one_millis_later);
449
            assert!(i2 - i1 >= one_millis);
450
        });
451
        Ok(())
452
    }
453

            
454
    // Try connecting to ourself and sending a little data.
455
    //
456
    // NOTE: requires Ipv4 localhost.
457
    fn self_connect_tcp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
458
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
459
        let rt1 = runtime.clone();
460

            
461
        let listener = runtime.block_on(rt1.listen(&(SocketAddr::from(localhost))))?;
462
        let addr = listener.local_addr()?;
463

            
464
        runtime.block_on(async {
465
            let task1 = async {
466
                let mut buf = vec![0_u8; 11];
467
                let (mut con, _addr) = listener.incoming().next().await.expect("closed?")?;
468
                con.read_exact(&mut buf[..]).await?;
469
                IoResult::Ok(buf)
470
            };
471
            let task2 = async {
472
                let mut con = rt1.connect(&addr).await?;
473
                con.write_all(b"Hello world").await?;
474
                con.flush().await?;
475
                IoResult::Ok(())
476
            };
477

            
478
            let (data, send_r) = futures::join!(task1, task2);
479
            send_r?;
480

            
481
            assert_eq!(&data?[..], b"Hello world");
482

            
483
            Ok(())
484
        })
485
    }
486

            
487
    // Try connecting to ourself and sending a little data.
488
    //
489
    // NOTE: requires Ipv4 localhost.
490
    fn self_connect_udp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
491
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
492
        let rt1 = runtime.clone();
493

            
494
        let socket1 = runtime.block_on(rt1.bind(&(localhost.into())))?;
495
        let addr1 = socket1.local_addr()?;
496

            
497
        let socket2 = runtime.block_on(rt1.bind(&(localhost.into())))?;
498
        let addr2 = socket2.local_addr()?;
499

            
500
        runtime.block_on(async {
501
            let task1 = async {
502
                let mut buf = [0_u8; 16];
503
                let (len, addr) = socket1.recv(&mut buf[..]).await?;
504
                IoResult::Ok((buf[..len].to_vec(), addr))
505
            };
506
            let task2 = async {
507
                socket2.send(b"Hello world", &addr1).await?;
508
                IoResult::Ok(())
509
            };
510

            
511
            let (recv_r, send_r) = futures::join!(task1, task2);
512
            send_r?;
513
            let (buff, addr) = recv_r?;
514
            assert_eq!(addr2, addr);
515
            assert_eq!(&buff, b"Hello world");
516

            
517
            Ok(())
518
        })
519
    }
520

            
521
    // Try out our incoming connection stream code.
522
    //
523
    // We launch a few connections and make sure that we can read data on
524
    // them.
525
    fn listener_stream<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
526
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
527
        let rt1 = runtime.clone();
528

            
529
        let listener = runtime
530
            .block_on(rt1.listen(&SocketAddr::from(localhost)))
531
            .unwrap();
532
        let addr = listener.local_addr().unwrap();
533
        let mut stream = listener.incoming();
534

            
535
        runtime.block_on(async {
536
            let task1 = async {
537
                let mut n = 0_u32;
538
                loop {
539
                    let (mut con, _addr) = stream.next().await.unwrap()?;
540
                    let mut buf = [0_u8; 11];
541
                    con.read_exact(&mut buf[..]).await?;
542
                    n += 1;
543
                    if &buf[..] == b"world done!" {
544
                        break IoResult::Ok(n);
545
                    }
546
                }
547
            };
548
            let task2 = async {
549
                for _ in 0_u8..5 {
550
                    let mut con = rt1.connect(&addr).await?;
551
                    con.write_all(b"Hello world").await?;
552
                    con.flush().await?;
553
                }
554
                let mut con = rt1.connect(&addr).await?;
555
                con.write_all(b"world done!").await?;
556
                con.flush().await?;
557
                con.close().await?;
558
                IoResult::Ok(())
559
            };
560

            
561
            let (n, send_r) = futures::join!(task1, task2);
562
            send_r?;
563

            
564
            assert_eq!(n?, 6);
565

            
566
            Ok(())
567
        })
568
    }
569

            
570
    // Try listening on an address and connecting there, except using TLS.
571
    //
572
    // Note that since we don't have async tls server support yet, I'm just
573
    // going to use a thread.
574
    fn simple_tls<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
575
        /*
576
         A simple expired self-signed rsa-2048 certificate.
577

            
578
         Generated by running the make-cert.c program in tor-rtcompat/test-data-helper,
579
         and then making a PFX file using
580

            
581
         openssl pkcs12 -export -certpbe PBE-SHA1-3DES -out test.pfx -inkey test.key -in test.crt
582

            
583
         The password is "abc".
584
        */
585
        static PFX_ID: &[u8] = include_bytes!("test.pfx");
586
        // Note that we need to set a password on the pkcs12 file, since apparently
587
        // OSX doesn't support pkcs12 with empty passwords. (That was arti#111).
588
        static PFX_PASSWORD: &str = "abc";
589

            
590
        let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
591
        let listener = std::net::TcpListener::bind(localhost)?;
592
        let addr = listener.local_addr()?;
593

            
594
        let identity = native_tls::Identity::from_pkcs12(PFX_ID, PFX_PASSWORD).unwrap();
595

            
596
        // See note on function for why we're using a thread here.
597
        let th = std::thread::spawn(move || {
598
            // Accept a single TLS connection and run an echo server
599
            use std::io::{Read, Write};
600
            let acceptor = native_tls::TlsAcceptor::new(identity).unwrap();
601
            let (con, _addr) = listener.accept()?;
602
            let mut con = acceptor.accept(con).unwrap();
603
            let mut buf = [0_u8; 16];
604
            loop {
605
                let n = con.read(&mut buf)?;
606
                if n == 0 {
607
                    break;
608
                }
609
                con.write_all(&buf[..n])?;
610
            }
611
            IoResult::Ok(())
612
        });
613

            
614
        let connector = runtime.tls_connector();
615

            
616
        runtime.block_on(async {
617
            let text = b"I Suddenly Dont Understand Anything";
618
            let mut buf = vec![0_u8; text.len()];
619
            let conn = runtime.connect(&addr).await?;
620
            let mut conn = connector.negotiate_unvalidated(conn, "Kan.Aya").await?;
621
            assert!(conn.peer_certificate()?.is_some());
622
            conn.write_all(text).await?;
623
            conn.flush().await?;
624
            conn.read_exact(&mut buf[..]).await?;
625
            assert_eq!(&buf[..], text);
626
            conn.close().await?;
627
            IoResult::Ok(())
628
        })?;
629

            
630
        th.join().unwrap()?;
631
        IoResult::Ok(())
632
    }
633

            
634
    macro_rules! tests_with_runtime {
635
        { $runtime:expr  => $($id:ident),* $(,)? } => {
636
            $(
637
                #[test]
638
                fn $id() -> std::io::Result<()> {
639
                    super::$id($runtime)
640
                }
641
            )*
642
        }
643
    }
644

            
645
    macro_rules! runtime_tests {
646
        { $($id:ident),* $(,)? } =>
647
        {
648
           #[cfg(feature="tokio")]
649
            mod tokio_runtime_tests {
650
                tests_with_runtime! { &crate::tokio::PreferredRuntime::create()? => $($id),* }
651
            }
652
            #[cfg(feature="async-std")]
653
            mod async_std_runtime_tests {
654
                tests_with_runtime! { &crate::async_std::PreferredRuntime::create()? => $($id),* }
655
            }
656
            mod default_runtime_tests {
657
                tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
658
            }
659
        }
660
    }
661

            
662
    macro_rules! tls_runtime_tests {
663
        { $($id:ident),* $(,)? } =>
664
        {
665
            #[cfg(all(feature="tokio", feature = "native-tls"))]
666
            mod tokio_native_tls_tests {
667
                tests_with_runtime! { &crate::tokio::TokioNativeTlsRuntime::create()? => $($id),* }
668
            }
669
            #[cfg(all(feature="async-std", feature = "native-tls"))]
670
            mod async_std_native_tls_tests {
671
                tests_with_runtime! { &crate::async_std::AsyncStdNativeTlsRuntime::create()? => $($id),* }
672
            }
673
            #[cfg(all(feature="tokio", feature="rustls"))]
674
            mod tokio_rustls_tests {
675
                tests_with_runtime! {  &crate::tokio::TokioRustlsRuntime::create()? => $($id),* }
676
            }
677
            #[cfg(all(feature="async-std", feature="rustls"))]
678
            mod async_std_rustls_tests {
679
                tests_with_runtime! {  &crate::async_std::AsyncStdRustlsRuntime::create()? => $($id),* }
680
            }
681
            mod default_runtime_tls_tests {
682
                tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
683
            }
684
        }
685
    }
686

            
687
    runtime_tests! {
688
        small_delay,
689
        small_timeout_ok,
690
        small_timeout_expire,
691
        tiny_wallclock,
692
        self_connect_tcp,
693
        self_connect_udp,
694
        listener_stream,
695
    }
696

            
697
    tls_runtime_tests! {
698
        simple_tls,
699
    }
700
}