Lines
63.28 %
Functions
82.13 %
Branches
100 %
//! Define a [`CompoundRuntime`] part that can be built from several component
//! pieces.
use std::{net, sync::Arc, time::Duration};
use crate::traits::*;
use crate::{CoarseInstant, CoarseTimeProvider};
use async_trait::async_trait;
use educe::Educe;
use futures::{future::FutureObj, task::Spawn};
use std::future::Future;
use std::io::Result as IoResult;
use std::time::{Instant, SystemTime};
use tor_general_addr::unix;
/// A runtime made of several parts, each of which implements one trait-group.
///
/// The `TaskR` component should implement [`Spawn`], [`Blocking`] and maybe [`ToplevelBlockOn`];
/// the `SleepR` component should implement [`SleepProvider`];
/// the `CoarseTimeR` component should implement [`CoarseTimeProvider`];
/// the `TcpR` component should implement [`NetStreamProvider`] for [`net::SocketAddr`];
/// the `UnixR` component should implement [`NetStreamProvider`] for [`unix::SocketAddr`];
/// and
/// the `TlsR` component should implement [`TlsProvider`].
/// You can use this structure to create new runtimes in two ways: either by
/// overriding a single part of an existing runtime, or by building an entirely
/// new runtime from pieces.
#[derive(Educe)]
#[educe(Clone)] // #[derive(Clone)] wrongly infers Clone bounds on the generic parameters
pub struct CompoundRuntime<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> {
/// The actual collection of Runtime objects.
/// We wrap this in an Arc rather than requiring that each item implement
/// Clone, though we could change our minds later on.
inner: Arc<Inner<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR>>,
}
/// A collection of objects implementing that traits that make up a [`Runtime`]
struct Inner<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> {
/// A `Spawn` and `BlockOn` implementation.
spawn: TaskR,
/// A `SleepProvider` implementation.
sleep: SleepR,
/// A `CoarseTimeProvider`` implementation.
coarse_time: CoarseTimeR,
/// A `NetStreamProvider<net::SocketAddr>` implementation
tcp: TcpR,
/// A `NetStreamProvider<unix::SocketAddr>` implementation.
unix: UnixR,
/// A `TlsProvider<TcpR::TcpStream>` implementation.
tls: TlsR,
/// A `UdpProvider` implementation
udp: UdpR,
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR>
CompoundRuntime<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR>
{
/// Construct a new CompoundRuntime from its components.
pub fn new(
) -> Self {
#[allow(clippy::arc_with_non_send_sync)]
CompoundRuntime {
inner: Arc::new(Inner {
spawn,
sleep,
coarse_time,
tcp,
unix,
tls,
udp,
}),
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> Spawn
for CompoundRuntime<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR>
where
TaskR: Spawn,
#[inline]
#[track_caller]
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), futures::task::SpawnError> {
self.inner.spawn.spawn_obj(future)
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> Blocking
TaskR: Blocking,
SleepR: Clone + Send + Sync + 'static,
CoarseTimeR: Clone + Send + Sync + 'static,
TcpR: Clone + Send + Sync + 'static,
UnixR: Clone + Send + Sync + 'static,
TlsR: Clone + Send + Sync + 'static,
UdpR: Clone + Send + Sync + 'static,
type ThreadHandle<T: Send + 'static> = TaskR::ThreadHandle<T>;
fn spawn_blocking<F, T>(&self, f: F) -> TaskR::ThreadHandle<T>
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
self.inner.spawn.spawn_blocking(f)
fn reenter_block_on<F>(&self, future: F) -> F::Output
F: Future,
F::Output: Send + 'static,
self.inner.spawn.reenter_block_on(future)
fn blocking_io<F, T>(&self, f: F) -> impl futures::Future<Output = T>
self.inner.spawn.blocking_io(f)
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> ToplevelBlockOn
TaskR: ToplevelBlockOn,
fn block_on<F: futures::Future>(&self, future: F) -> F::Output {
self.inner.spawn.block_on(future)
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> SleepProvider
SleepR: SleepProvider,
TaskR: Clone + Send + Sync + 'static,
type SleepFuture = SleepR::SleepFuture;
fn sleep(&self, duration: Duration) -> Self::SleepFuture {
self.inner.sleep.sleep(duration)
fn now(&self) -> Instant {
self.inner.sleep.now()
fn wallclock(&self) -> SystemTime {
self.inner.sleep.wallclock()
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> CoarseTimeProvider
CoarseTimeR: CoarseTimeProvider,
fn now_coarse(&self) -> CoarseInstant {
self.inner.coarse_time.now_coarse()
#[async_trait]
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> NetStreamProvider<net::SocketAddr>
TcpR: NetStreamProvider<net::SocketAddr>,
TaskR: Send + Sync + 'static,
SleepR: Send + Sync + 'static,
CoarseTimeR: Send + Sync + 'static,
TcpR: Send + Sync + 'static,
TlsR: Send + Sync + 'static,
UdpR: Send + Sync + 'static,
type Stream = TcpR::Stream;
type Listener = TcpR::Listener;
async fn connect(&self, addr: &net::SocketAddr) -> IoResult<Self::Stream> {
self.inner.tcp.connect(addr).await
async fn listen(&self, addr: &net::SocketAddr) -> IoResult<Self::Listener> {
self.inner.tcp.listen(addr).await
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> NetStreamProvider<unix::SocketAddr>
UnixR: NetStreamProvider<unix::SocketAddr>,
type Stream = UnixR::Stream;
type Listener = UnixR::Listener;
async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
self.inner.unix.connect(addr).await
async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
self.inner.unix.listen(addr).await
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR, S> TlsProvider<S>
TcpR: NetStreamProvider,
TlsR: TlsProvider<S>,
S: StreamOps,
type Connector = TlsR::Connector;
type TlsStream = TlsR::TlsStream;
fn tls_connector(&self) -> Self::Connector {
self.inner.tls.tls_connector()
fn supports_keying_material_export(&self) -> bool {
self.inner.tls.supports_keying_material_export()
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> std::fmt::Debug
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompoundRuntime").finish_non_exhaustive()
impl<TaskR, SleepR, CoarseTimeR, TcpR, UnixR, TlsR, UdpR> UdpProvider
UdpR: UdpProvider,
type UdpSocket = UdpR::UdpSocket;
async fn bind(&self, addr: &net::SocketAddr) -> IoResult<Self::UdpSocket> {
self.inner.udp.bind(addr).await
/// Module to seal RuntimeSubstExt
mod sealed {
/// Helper for sealing RuntimeSubstExt
#[allow(unreachable_pub)]
pub trait Sealed {}
/// Extension trait on Runtime:
/// Construct new Runtimes that replace part of an original runtime.
/// (If you need to do more complicated versions of this, you should likely construct
/// CompoundRuntime directly.)
pub trait RuntimeSubstExt: sealed::Sealed + Sized {
/// Return a new runtime wrapping this runtime, but replacing its TCP NetStreamProvider.
fn with_tcp_provider<T>(
&self,
new_tcp: T,
) -> CompoundRuntime<Self, Self, Self, T, Self, Self, Self>;
/// Return a new runtime wrapping this runtime, but replacing its SleepProvider.
fn with_sleep_provider<T>(
new_sleep: T,
) -> CompoundRuntime<Self, T, Self, Self, Self, Self, Self>;
/// Return a new runtime wrapping this runtime, but replacing its CoarseTimeProvider.
fn with_coarse_time_provider<T>(
new_coarse_time: T,
) -> CompoundRuntime<Self, Self, T, Self, Self, Self, Self>;
impl<R: Runtime> sealed::Sealed for R {}
impl<R: Runtime + Sized> RuntimeSubstExt for R {
) -> CompoundRuntime<Self, Self, Self, T, Self, Self, Self> {
CompoundRuntime::new(
self.clone(),
new_tcp,
)
) -> CompoundRuntime<Self, T, Self, Self, Self, Self, Self> {
new_sleep,
) -> CompoundRuntime<Self, Self, T, Self, Self, Self, Self> {
new_coarse_time,