1#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![allow(renamed_and_removed_lints)] #![allow(unknown_lints)] #![warn(missing_docs)]
7#![warn(noop_method_call)]
8#![warn(unreachable_pub)]
9#![warn(clippy::all)]
10#![deny(clippy::await_holding_lock)]
11#![deny(clippy::cargo_common_metadata)]
12#![deny(clippy::cast_lossless)]
13#![deny(clippy::checked_conversions)]
14#![warn(clippy::cognitive_complexity)]
15#![deny(clippy::debug_assert_with_mut_call)]
16#![deny(clippy::exhaustive_enums)]
17#![deny(clippy::exhaustive_structs)]
18#![deny(clippy::expl_impl_clone_on_copy)]
19#![deny(clippy::fallible_impl_from)]
20#![deny(clippy::implicit_clone)]
21#![deny(clippy::large_stack_arrays)]
22#![warn(clippy::manual_ok_or)]
23#![deny(clippy::missing_docs_in_private_items)]
24#![warn(clippy::needless_borrow)]
25#![warn(clippy::needless_pass_by_value)]
26#![warn(clippy::option_option)]
27#![deny(clippy::print_stderr)]
28#![deny(clippy::print_stdout)]
29#![warn(clippy::rc_buffer)]
30#![deny(clippy::ref_option_ref)]
31#![warn(clippy::semicolon_if_nothing_returned)]
32#![warn(clippy::trait_duplication_in_bounds)]
33#![deny(clippy::unchecked_duration_subtraction)]
34#![deny(clippy::unnecessary_wraps)]
35#![warn(clippy::unseparated_literal_suffix)]
36#![deny(clippy::unwrap_used)]
37#![deny(clippy::mod_module_files)]
38#![allow(clippy::let_unit_value)] #![allow(clippy::uninlined_format_args)]
40#![allow(clippy::significant_drop_in_scrutinee)] #![allow(clippy::result_large_err)] #![allow(clippy::needless_raw_string_hashes)] #![allow(clippy::needless_lifetimes)] #![cfg_attr(not(all(feature = "full")), allow(unused))]
48
49#[cfg(all(
50 any(feature = "native-tls", feature = "rustls"),
51 any(feature = "async-std", feature = "tokio")
52))]
53pub(crate) mod impls;
54pub mod task;
55
56mod coarse_time;
57mod compound;
58mod dyn_time;
59pub mod general;
60mod opaque;
61pub mod scheduler;
62mod timer;
63mod traits;
64pub mod unimpl;
65pub mod unix;
66
67#[cfg(any(feature = "async-std", feature = "tokio"))]
68use std::io;
69pub use traits::{
70 Blocking, CertifiedConn, CoarseTimeProvider, NetStreamListener, NetStreamProvider,
71 NoOpStreamOpsHandle, Runtime, SleepProvider, StreamOps, TlsProvider, ToplevelBlockOn,
72 ToplevelRuntime, UdpProvider, UdpSocket, UnsupportedStreamOp,
73};
74
75pub use coarse_time::{CoarseDuration, CoarseInstant, RealCoarseTimeProvider};
76pub use dyn_time::DynTimeProvider;
77pub use timer::{SleepProviderExt, Timeout, TimeoutError};
78
79pub mod tls {
82 pub use crate::traits::{CertifiedConn, TlsConnector};
83
84 #[cfg(all(feature = "native-tls", any(feature = "tokio", feature = "async-std")))]
85 pub use crate::impls::native_tls::NativeTlsProvider;
86 #[cfg(all(feature = "rustls", any(feature = "tokio", feature = "async-std")))]
87 pub use crate::impls::rustls::RustlsProvider;
88}
89
90#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
91pub mod tokio;
92
93#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
94pub mod async_std;
95
96pub use compound::{CompoundRuntime, RuntimeSubstExt};
97
98#[cfg(all(
99 any(feature = "native-tls", feature = "rustls"),
100 feature = "async-std",
101 not(feature = "tokio")
102))]
103use async_std as preferred_backend_mod;
104#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
105use tokio as preferred_backend_mod;
106
107#[cfg(all(
119 any(feature = "native-tls", feature = "rustls"),
120 any(feature = "async-std", feature = "tokio")
121))]
122#[derive(Clone)]
123pub struct PreferredRuntime {
124 inner: preferred_backend_mod::PreferredRuntime,
126}
127
128#[cfg(all(
129 any(feature = "native-tls", feature = "rustls"),
130 any(feature = "async-std", feature = "tokio")
131))]
132crate::opaque::implement_opaque_runtime! {
133 PreferredRuntime { inner : preferred_backend_mod::PreferredRuntime }
134}
135
136#[cfg(all(
137 any(feature = "native-tls", feature = "rustls"),
138 any(feature = "async-std", feature = "tokio")
139))]
140impl PreferredRuntime {
141 pub fn current() -> io::Result<Self> {
177 let rt = preferred_backend_mod::PreferredRuntime::current()?;
178
179 Ok(Self { inner: rt })
180 }
181
182 pub fn create() -> io::Result<Self> {
206 let rt = preferred_backend_mod::PreferredRuntime::create()?;
207
208 Ok(Self { inner: rt })
209 }
210
211 #[doc(hidden)]
221 pub fn run_test<P, F, O>(func: P) -> O
222 where
223 P: FnOnce(Self) -> F,
224 F: futures::Future<Output = O>,
225 {
226 let runtime = Self::create().expect("Failed to create runtime");
227 runtime.clone().block_on(func(runtime))
228 }
229}
230
231#[doc(hidden)]
237pub mod testing__ {
238 pub trait TestOutcome {
241 fn check_ok(&self);
243 }
244 impl TestOutcome for () {
245 fn check_ok(&self) {}
246 }
247 impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
248 fn check_ok(&self) {
249 self.as_ref().expect("Test failure");
250 }
251 }
252}
253
254macro_rules! declare_conditional_macro {
257 ( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
258 $( #[$meta] )*
259 #[cfg(all(feature=$f1, feature=$f2))]
260 #[macro_export]
261 macro_rules! $name {
262 ($tt:tt) => {
263 $tt
264 };
265 }
266
267 $( #[$meta] )*
268 #[cfg(not(all(feature=$f1, feature=$f2)))]
269 #[macro_export]
270 macro_rules! $name {
271 ($tt:tt) => {};
272 }
273
274 pub use $name;
277 };
278}
279
280#[doc(hidden)]
282pub mod cond {
283 declare_conditional_macro! {
284 #[doc(hidden)]
286 macro if_tokio_native_tls_present = ("tokio", "native-tls")
287 }
288 declare_conditional_macro! {
289 #[doc(hidden)]
291 macro if_tokio_rustls_present = ("tokio", "rustls")
292 }
293 declare_conditional_macro! {
294 #[doc(hidden)]
296 macro if_async_std_native_tls_present = ("async-std", "native-tls")
297 }
298 declare_conditional_macro! {
299 #[doc(hidden)]
301 macro if_async_std_rustls_present = ("async-std", "rustls")
302 }
303}
304
305#[macro_export]
321#[cfg(all(
322 any(feature = "native-tls", feature = "rustls"),
323 any(feature = "tokio", feature = "async-std"),
324))]
325macro_rules! test_with_all_runtimes {
326 ( $fn:expr ) => {{
327 use $crate::cond::*;
328 use $crate::testing__::TestOutcome;
329 if_tokio_native_tls_present! {{
334 $crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
335 }}
336 if_tokio_rustls_present! {{
337 $crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
338 }}
339 if_async_std_native_tls_present! {{
340 $crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
341 }}
342 if_async_std_rustls_present! {{
343 $crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
344 }}
345 }};
346}
347
348#[macro_export]
360#[cfg(all(
361 any(feature = "native-tls", feature = "rustls"),
362 any(feature = "tokio", feature = "async-std"),
363))]
364macro_rules! test_with_one_runtime {
365 ( $fn:expr ) => {{
366 $crate::PreferredRuntime::run_test($fn)
367 }};
368}
369
370#[cfg(all(
371 test,
372 any(feature = "native-tls", feature = "rustls"),
373 any(feature = "async-std", feature = "tokio"),
374 not(miri), ))]
376mod test {
377 #![allow(clippy::unwrap_used, clippy::unnecessary_wraps)]
378 use crate::SleepProviderExt;
379 use crate::ToplevelRuntime;
380
381 use crate::traits::*;
382
383 use futures::io::{AsyncReadExt, AsyncWriteExt};
384 use futures::stream::StreamExt;
385 use native_tls_crate as native_tls;
386 use std::io::Result as IoResult;
387 use std::net::SocketAddr;
388 use std::net::{Ipv4Addr, SocketAddrV4};
389 use std::time::{Duration, Instant};
390
391 fn small_delay<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
394 let rt = runtime.clone();
395 runtime.block_on(async {
396 let i1 = Instant::now();
397 let one_msec = Duration::from_millis(1);
398 rt.sleep(one_msec).await;
399 let i2 = Instant::now();
400 assert!(i2 >= i1 + one_msec);
401 });
402 Ok(())
403 }
404
405 fn small_timeout_ok<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
407 let rt = runtime.clone();
408 runtime.block_on(async {
409 let one_day = Duration::from_secs(86400);
410 let outcome = rt.timeout(one_day, async { 413_u32 }).await;
411 assert_eq!(outcome, Ok(413));
412 });
413 Ok(())
414 }
415
416 fn small_timeout_expire<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
418 use futures::future::pending;
419
420 let rt = runtime.clone();
421 runtime.block_on(async {
422 let one_micros = Duration::from_micros(1);
423 let outcome = rt.timeout(one_micros, pending::<()>()).await;
424 assert_eq!(outcome, Err(crate::TimeoutError));
425 assert_eq!(
426 outcome.err().unwrap().to_string(),
427 "Timeout expired".to_string()
428 );
429 });
430 Ok(())
431 }
432 fn tiny_wallclock<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
437 let rt = runtime.clone();
438 runtime.block_on(async {
439 let i1 = Instant::now();
440 let now = runtime.wallclock();
441 let one_millis = Duration::from_millis(1);
442 let one_millis_later = now + one_millis;
443
444 rt.sleep_until_wallclock(one_millis_later).await;
445
446 let i2 = Instant::now();
447 let newtime = runtime.wallclock();
448 assert!(newtime >= one_millis_later);
449 assert!(i2 - i1 >= one_millis);
450 });
451 Ok(())
452 }
453
454 fn self_connect_tcp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
458 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
459 let rt1 = runtime.clone();
460
461 let listener = runtime.block_on(rt1.listen(&(SocketAddr::from(localhost))))?;
462 let addr = listener.local_addr()?;
463
464 runtime.block_on(async {
465 let task1 = async {
466 let mut buf = vec![0_u8; 11];
467 let (mut con, _addr) = listener.incoming().next().await.expect("closed?")?;
468 con.read_exact(&mut buf[..]).await?;
469 IoResult::Ok(buf)
470 };
471 let task2 = async {
472 let mut con = rt1.connect(&addr).await?;
473 con.write_all(b"Hello world").await?;
474 con.flush().await?;
475 IoResult::Ok(())
476 };
477
478 let (data, send_r) = futures::join!(task1, task2);
479 send_r?;
480
481 assert_eq!(&data?[..], b"Hello world");
482
483 Ok(())
484 })
485 }
486
487 fn self_connect_udp<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
491 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
492 let rt1 = runtime.clone();
493
494 let socket1 = runtime.block_on(rt1.bind(&(localhost.into())))?;
495 let addr1 = socket1.local_addr()?;
496
497 let socket2 = runtime.block_on(rt1.bind(&(localhost.into())))?;
498 let addr2 = socket2.local_addr()?;
499
500 runtime.block_on(async {
501 let task1 = async {
502 let mut buf = [0_u8; 16];
503 let (len, addr) = socket1.recv(&mut buf[..]).await?;
504 IoResult::Ok((buf[..len].to_vec(), addr))
505 };
506 let task2 = async {
507 socket2.send(b"Hello world", &addr1).await?;
508 IoResult::Ok(())
509 };
510
511 let (recv_r, send_r) = futures::join!(task1, task2);
512 send_r?;
513 let (buff, addr) = recv_r?;
514 assert_eq!(addr2, addr);
515 assert_eq!(&buff, b"Hello world");
516
517 Ok(())
518 })
519 }
520
521 fn listener_stream<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
526 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
527 let rt1 = runtime.clone();
528
529 let listener = runtime
530 .block_on(rt1.listen(&SocketAddr::from(localhost)))
531 .unwrap();
532 let addr = listener.local_addr().unwrap();
533 let mut stream = listener.incoming();
534
535 runtime.block_on(async {
536 let task1 = async {
537 let mut n = 0_u32;
538 loop {
539 let (mut con, _addr) = stream.next().await.unwrap()?;
540 let mut buf = [0_u8; 11];
541 con.read_exact(&mut buf[..]).await?;
542 n += 1;
543 if &buf[..] == b"world done!" {
544 break IoResult::Ok(n);
545 }
546 }
547 };
548 let task2 = async {
549 for _ in 0_u8..5 {
550 let mut con = rt1.connect(&addr).await?;
551 con.write_all(b"Hello world").await?;
552 con.flush().await?;
553 }
554 let mut con = rt1.connect(&addr).await?;
555 con.write_all(b"world done!").await?;
556 con.flush().await?;
557 con.close().await?;
558 IoResult::Ok(())
559 };
560
561 let (n, send_r) = futures::join!(task1, task2);
562 send_r?;
563
564 assert_eq!(n?, 6);
565
566 Ok(())
567 })
568 }
569
570 fn simple_tls<R: ToplevelRuntime>(runtime: &R) -> IoResult<()> {
575 static PFX_ID: &[u8] = include_bytes!("test.pfx");
586 static PFX_PASSWORD: &str = "abc";
589
590 let localhost = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0);
591 let listener = std::net::TcpListener::bind(localhost)?;
592 let addr = listener.local_addr()?;
593
594 let identity = native_tls::Identity::from_pkcs12(PFX_ID, PFX_PASSWORD).unwrap();
595
596 let th = std::thread::spawn(move || {
598 use std::io::{Read, Write};
600 let acceptor = native_tls::TlsAcceptor::new(identity).unwrap();
601 let (con, _addr) = listener.accept()?;
602 let mut con = acceptor.accept(con).unwrap();
603 let mut buf = [0_u8; 16];
604 loop {
605 let n = con.read(&mut buf)?;
606 if n == 0 {
607 break;
608 }
609 con.write_all(&buf[..n])?;
610 }
611 IoResult::Ok(())
612 });
613
614 let connector = runtime.tls_connector();
615
616 runtime.block_on(async {
617 let text = b"I Suddenly Dont Understand Anything";
618 let mut buf = vec![0_u8; text.len()];
619 let conn = runtime.connect(&addr).await?;
620 let mut conn = connector.negotiate_unvalidated(conn, "Kan.Aya").await?;
621 assert!(conn.peer_certificate()?.is_some());
622 conn.write_all(text).await?;
623 conn.flush().await?;
624 conn.read_exact(&mut buf[..]).await?;
625 assert_eq!(&buf[..], text);
626 conn.close().await?;
627 IoResult::Ok(())
628 })?;
629
630 th.join().unwrap()?;
631 IoResult::Ok(())
632 }
633
634 macro_rules! tests_with_runtime {
635 { $runtime:expr => $($id:ident),* $(,)? } => {
636 $(
637 #[test]
638 fn $id() -> std::io::Result<()> {
639 super::$id($runtime)
640 }
641 )*
642 }
643 }
644
645 macro_rules! runtime_tests {
646 { $($id:ident),* $(,)? } =>
647 {
648 #[cfg(feature="tokio")]
649 mod tokio_runtime_tests {
650 tests_with_runtime! { &crate::tokio::PreferredRuntime::create()? => $($id),* }
651 }
652 #[cfg(feature="async-std")]
653 mod async_std_runtime_tests {
654 tests_with_runtime! { &crate::async_std::PreferredRuntime::create()? => $($id),* }
655 }
656 mod default_runtime_tests {
657 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
658 }
659 }
660 }
661
662 macro_rules! tls_runtime_tests {
663 { $($id:ident),* $(,)? } =>
664 {
665 #[cfg(all(feature="tokio", feature = "native-tls"))]
666 mod tokio_native_tls_tests {
667 tests_with_runtime! { &crate::tokio::TokioNativeTlsRuntime::create()? => $($id),* }
668 }
669 #[cfg(all(feature="async-std", feature = "native-tls"))]
670 mod async_std_native_tls_tests {
671 tests_with_runtime! { &crate::async_std::AsyncStdNativeTlsRuntime::create()? => $($id),* }
672 }
673 #[cfg(all(feature="tokio", feature="rustls"))]
674 mod tokio_rustls_tests {
675 tests_with_runtime! { &crate::tokio::TokioRustlsRuntime::create()? => $($id),* }
676 }
677 #[cfg(all(feature="async-std", feature="rustls"))]
678 mod async_std_rustls_tests {
679 tests_with_runtime! { &crate::async_std::AsyncStdRustlsRuntime::create()? => $($id),* }
680 }
681 mod default_runtime_tls_tests {
682 tests_with_runtime! { &crate::PreferredRuntime::create()? => $($id),* }
683 }
684 }
685 }
686
687 runtime_tests! {
688 small_delay,
689 small_timeout_ok,
690 small_timeout_expire,
691 tiny_wallclock,
692 self_connect_tcp,
693 self_connect_udp,
694 listener_stream,
695 }
696
697 tls_runtime_tests! {
698 simple_tls,
699 }
700}