1
//! Re-exports of the async_std runtime for use with arti.
2
//!
3
//! This crate helps define a slim API around our async runtime so that we
4
//! can easily swap it out.
5
//!
6
//! We'll probably want to support tokio as well in the future.
7

            
8
/// Types used for networking (async_std implementation)
9
mod net {
10
    use crate::{impls, traits};
11

            
12
    use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
13
    #[cfg(unix)]
14
    use async_std_crate::os::unix::net::{UnixListener, UnixStream};
15
    use async_trait::async_trait;
16
    use futures::future::Future;
17
    use futures::stream::Stream;
18
    use paste::paste;
19
    use std::io::Result as IoResult;
20
    use std::net::SocketAddr;
21
    use std::pin::Pin;
22
    use std::task::{Context, Poll};
23
    #[cfg(unix)]
24
    use tor_general_addr::unix;
25

            
26
    /// Implement NetStreamProvider-related functionality for a single address type.
27
    macro_rules! impl_stream {
28
        { $kind:ident, $addr:ty } => {paste!{
29
            /// A `Stream` of incoming streams.
30
            ///
31
            /// Differs from the output of `*Listener::incoming` in that this
32
            /// struct is a real type, and that it returns a stream and an address
33
            /// for each input.
34
            pub struct [<Incoming $kind Streams>] {
35
                /// A state object, stored in an Option so we can take ownership of it
36
                /// while poll is being called.
37
                // TODO(nickm): I hate using this trick.  At some point in the
38
                // future, once Rust has nice support for async traits, maybe
39
                // we can refactor it.
40
                state: Option<[<Incoming $kind StreamsState>]>,
41
            }
42
            /// The result type returned by `take_and_poll_*`.
43
            ///
44
            /// It has to include the Listener, since take_and_poll() has
45
            /// ownership of the listener.
46
            type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
47
            /// Helper to implement `Incoming*Streams`
48
            ///
49
            /// This function calls `Listener::accept` while owning the
50
            /// listener.  Thus, it returns a future that itself owns the listener,
51
            /// and we don't have lifetime troubles.
52
21
            async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
53
14
                let result = lis.accept().await;
54
14
                (result, lis)
55
14
            }
56
            /// The possible states for an `Incoming*Streams`.
57
            enum [<Incoming $kind StreamsState>] {
58
                /// We're ready to call `accept` on the listener again.
59
                Ready([<$kind Listener>]),
60
                /// We've called `accept` on the listener, and we're waiting
61
                /// for a future to complete.
62
                Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
63
            }
64
            impl [<Incoming $kind Streams>] {
65
                /// Create a new IncomingStreams from a Listener.
66
4
                pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
67
4
                    Self {
68
4
                        state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
69
4
                    }
70
4
                }
71
            }
72
            impl Stream for [< Incoming $kind Streams >] {
73
                type Item = IoResult<([<$kind Stream>], $addr)>;
74

            
75
18
                fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
76
                    use [<Incoming $kind StreamsState>] as St;
77
18
                    let state = self.state.take().expect("No valid state!");
78
18
                    let mut future = match state {
79
14
                        St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
80
4
                        St::Accepting(fut) => fut,
81
                    };
82
18
                    match future.as_mut().poll(cx) {
83
14
                        Poll::Ready((val, lis)) => {
84
14
                            self.state = Some(St::Ready(lis));
85
14
                            Poll::Ready(Some(val))
86
                        }
87
                        Poll::Pending => {
88
4
                            self.state = Some(St::Accepting(future));
89
4
                            Poll::Pending
90
                        }
91
                    }
92
18
                }
93
            }
94
            impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
95
                type Stream = [<$kind Stream>];
96
                type Incoming = [<Incoming $kind Streams>];
97
4
                fn incoming(self) -> [<Incoming $kind Streams>] {
98
4
                    [<Incoming $kind Streams>]::from_listener(self)
99
4
                }
100
4
                fn local_addr(&self) -> IoResult<$addr> {
101
4
                    [<$kind Listener>]::local_addr(self)
102
4
                }
103
            }
104
        }}
105
    }
106

            
107
    impl_stream! { Tcp, std::net::SocketAddr }
108
    #[cfg(unix)]
109
    impl_stream! { Unix, unix::SocketAddr}
110

            
111
    #[async_trait]
112
    impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
113
        type Stream = TcpStream;
114
        type Listener = TcpListener;
115
18
        async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
116
18
            TcpStream::connect(addr).await
117
36
        }
118
4
        async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
119
4
            TcpListener::bind(*addr).await
120
8
        }
121
    }
122

            
123
    #[cfg(unix)]
124
    #[async_trait]
125
    impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
126
        type Stream = UnixStream;
127
        type Listener = UnixListener;
128
        async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
129
            let path = addr
130
                .as_pathname()
131
                .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
132
            UnixStream::connect(path).await
133
        }
134
        async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
135
            let path = addr
136
                .as_pathname()
137
                .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
138
            UnixListener::bind(path).await
139
        }
140
    }
141

            
142
    #[cfg(not(unix))]
143
    crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
144

            
145
    #[async_trait]
146
    impl traits::UdpProvider for async_executors::AsyncStd {
147
        type UdpSocket = UdpSocket;
148

            
149
4
        async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
150
4
            StdUdpSocket::bind(*addr)
151
4
                .await
152
4
                .map(|socket| UdpSocket { socket })
153
8
        }
154
    }
155

            
156
    /// Wrap a AsyncStd UdpSocket
157
    pub struct UdpSocket {
158
        /// The underlying UdpSocket
159
        socket: StdUdpSocket,
160
    }
161

            
162
    #[async_trait]
163
    impl traits::UdpSocket for UdpSocket {
164
2
        async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
165
2
            self.socket.recv_from(buf).await
166
4
        }
167

            
168
2
        async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
169
2
            self.socket.send_to(buf, target).await
170
4
        }
171

            
172
4
        fn local_addr(&self) -> IoResult<SocketAddr> {
173
4
            self.socket.local_addr()
174
4
        }
175
    }
176

            
177
    impl traits::StreamOps for TcpStream {
178
        fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
179
            impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
180
        }
181

            
182
        #[cfg(target_os = "linux")]
183
        fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
184
            Box::new(impls::streamops::TcpSockFd::from_fd(self))
185
        }
186
    }
187

            
188
    #[cfg(unix)]
189
    impl traits::StreamOps for UnixStream {
190
        fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
191
            Err(traits::UnsupportedStreamOp::new(
192
                "set_tcp_notsent_lowat",
193
                "unsupported on Unix streams",
194
            )
195
            .into())
196
        }
197
    }
198
}
199

            
200
// ==============================
201

            
202
use futures::{Future, FutureExt};
203
use std::pin::Pin;
204
use std::time::Duration;
205

            
206
use crate::traits::*;
207

            
208
/// Create and return a new `async_std` runtime.
209
884
pub fn create_runtime() -> async_executors::AsyncStd {
210
884
    async_executors::AsyncStd::new()
211
884
}
212

            
213
impl SleepProvider for async_executors::AsyncStd {
214
    type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
215
216
    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
216
2818
        Box::pin(async_io::Timer::after(duration).map(|_| ()))
217
216
    }
218
}
219

            
220
impl ToplevelBlockOn for async_executors::AsyncStd {
221
304
    fn block_on<F: Future>(&self, f: F) -> F::Output {
222
304
        async_executors::AsyncStd::block_on(f)
223
304
    }
224
}
225

            
226
impl Blocking for async_executors::AsyncStd {
227
    type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
228

            
229
    fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
230
    where
231
        F: FnOnce() -> T + Send + 'static,
232
        T: Send + 'static,
233
    {
234
        async_executors::SpawnBlocking::spawn_blocking(&self, f)
235
    }
236

            
237
    fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
238
        async_executors::AsyncStd::block_on(f)
239
    }
240
}