1
//! Re-exports of the async_std runtime for use with arti.
2
//!
3
//! This crate helps define a slim API around our async runtime so that we
4
//! can easily swap it out.
5
//!
6
//! We'll probably want to support tokio as well in the future.
7

            
8
/// Types used for networking (async_std implementation)
9
mod net {
10
    use crate::{impls, traits};
11

            
12
    use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
13
    #[cfg(unix)]
14
    use async_std_crate::os::unix::net::{UnixListener, UnixStream};
15
    use async_trait::async_trait;
16
    use futures::future::Future;
17
    use futures::stream::Stream;
18
    use paste::paste;
19
    use std::io::Result as IoResult;
20
    use std::net::SocketAddr;
21
    use std::pin::Pin;
22
    use std::task::{Context, Poll};
23
    use tor_general_addr::unix;
24

            
25
    /// Implement NetStreamProvider-related functionality for a single address type.
26
    macro_rules! impl_stream {
27
        { $kind:ident, $addr:ty } => {paste!{
28
            /// A `Stream` of incoming streams.
29
            ///
30
            /// Differs from the output of `*Listener::incoming` in that this
31
            /// struct is a real type, and that it returns a stream and an address
32
            /// for each input.
33
            pub struct [<Incoming $kind Streams>] {
34
                /// A state object, stored in an Option so we can take ownership of it
35
                /// while poll is being called.
36
                // TODO(nickm): I hate using this trick.  At some point in the
37
                // future, once Rust has nice support for async traits, maybe
38
                // we can refactor it.
39
                state: Option<[<Incoming $kind StreamsState>]>,
40
            }
41
            /// The result type returned by `take_and_poll_*`.
42
            ///
43
            /// It has to include the Listener, since take_and_poll() has
44
            /// ownership of the listener.
45
            type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
46
            /// Helper to implement `Incoming*Streams`
47
            ///
48
            /// This function calls `Listener::accept` while owning the
49
            /// listener.  Thus, it returns a future that itself owns the listener,
50
            /// and we don't have lifetime troubles.
51
21
            async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
52
14
                let result = lis.accept().await;
53
14
                (result, lis)
54
14
            }
55
            /// The possible states for an `Incoming*Streams`.
56
            enum [<Incoming $kind StreamsState>] {
57
                /// We're ready to call `accept` on the listener again.
58
                Ready([<$kind Listener>]),
59
                /// We've called `accept` on the listener, and we're waiting
60
                /// for a future to complete.
61
                Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
62
            }
63
            impl [<Incoming $kind Streams>] {
64
                /// Create a new IncomingStreams from a Listener.
65
4
                pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
66
4
                    Self {
67
4
                        state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
68
4
                    }
69
4
                }
70
            }
71
            impl Stream for [< Incoming $kind Streams >] {
72
                type Item = IoResult<([<$kind Stream>], $addr)>;
73

            
74
18
                fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
75
                    use [<Incoming $kind StreamsState>] as St;
76
18
                    let state = self.state.take().expect("No valid state!");
77
18
                    let mut future = match state {
78
14
                        St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
79
4
                        St::Accepting(fut) => fut,
80
                    };
81
18
                    match future.as_mut().poll(cx) {
82
14
                        Poll::Ready((val, lis)) => {
83
14
                            self.state = Some(St::Ready(lis));
84
14
                            Poll::Ready(Some(val))
85
                        }
86
                        Poll::Pending => {
87
4
                            self.state = Some(St::Accepting(future));
88
4
                            Poll::Pending
89
                        }
90
                    }
91
18
                }
92
            }
93
            impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
94
                type Stream = [<$kind Stream>];
95
                type Incoming = [<Incoming $kind Streams>];
96
4
                fn incoming(self) -> [<Incoming $kind Streams>] {
97
4
                    [<Incoming $kind Streams>]::from_listener(self)
98
4
                }
99
4
                fn local_addr(&self) -> IoResult<$addr> {
100
4
                    [<$kind Listener>]::local_addr(self)
101
4
                }
102
            }
103
        }}
104
    }
105

            
106
    impl_stream! { Tcp, std::net::SocketAddr }
107
    #[cfg(unix)]
108
    impl_stream! { Unix, unix::SocketAddr}
109

            
110
    #[async_trait]
111
    impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
112
        type Stream = TcpStream;
113
        type Listener = TcpListener;
114
18
        async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
115
18
            TcpStream::connect(addr).await
116
36
        }
117
4
        async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
118
4
            TcpListener::bind(*addr).await
119
8
        }
120
    }
121

            
122
    #[cfg(unix)]
123
    #[async_trait]
124
    impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
125
        type Stream = UnixStream;
126
        type Listener = UnixListener;
127
        async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
128
            let path = addr
129
                .as_pathname()
130
                .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
131
            UnixStream::connect(path).await
132
        }
133
        async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
134
            let path = addr
135
                .as_pathname()
136
                .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
137
            UnixListener::bind(path).await
138
        }
139
    }
140

            
141
    #[cfg(not(unix))]
142
    crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
143

            
144
    #[async_trait]
145
    impl traits::UdpProvider for async_executors::AsyncStd {
146
        type UdpSocket = UdpSocket;
147

            
148
4
        async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
149
4
            StdUdpSocket::bind(*addr)
150
4
                .await
151
4
                .map(|socket| UdpSocket { socket })
152
8
        }
153
    }
154

            
155
    /// Wrap a AsyncStd UdpSocket
156
    pub struct UdpSocket {
157
        /// The underlying UdpSocket
158
        socket: StdUdpSocket,
159
    }
160

            
161
    #[async_trait]
162
    impl traits::UdpSocket for UdpSocket {
163
2
        async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
164
2
            self.socket.recv_from(buf).await
165
4
        }
166

            
167
2
        async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
168
2
            self.socket.send_to(buf, target).await
169
4
        }
170

            
171
4
        fn local_addr(&self) -> IoResult<SocketAddr> {
172
4
            self.socket.local_addr()
173
4
        }
174
    }
175

            
176
    impl traits::StreamOps for TcpStream {
177
        fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
178
            impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
179
        }
180

            
181
        #[cfg(target_os = "linux")]
182
        fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
183
            Box::new(impls::streamops::TcpSockFd::from_fd(self))
184
        }
185
    }
186

            
187
    #[cfg(unix)]
188
    impl traits::StreamOps for UnixStream {
189
        fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
190
            Err(traits::UnsupportedStreamOp::new(
191
                "set_tcp_notsent_lowat",
192
                "unsupported on Unix streams",
193
            )
194
            .into())
195
        }
196
    }
197
}
198

            
199
// ==============================
200

            
201
use futures::{Future, FutureExt};
202
use std::pin::Pin;
203
use std::time::Duration;
204

            
205
use crate::traits::*;
206

            
207
/// Create and return a new `async_std` runtime.
208
884
pub fn create_runtime() -> async_executors::AsyncStd {
209
884
    async_executors::AsyncStd::new()
210
884
}
211

            
212
impl SleepProvider for async_executors::AsyncStd {
213
    type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
214
200
    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
215
2066
        Box::pin(async_io::Timer::after(duration).map(|_| ()))
216
200
    }
217
}
218

            
219
impl ToplevelBlockOn for async_executors::AsyncStd {
220
304
    fn block_on<F: Future>(&self, f: F) -> F::Output {
221
304
        async_executors::AsyncStd::block_on(f)
222
304
    }
223
}
224

            
225
impl Blocking for async_executors::AsyncStd {
226
    type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
227

            
228
    fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
229
    where
230
        F: FnOnce() -> T + Send + 'static,
231
        T: Send + 'static,
232
    {
233
        async_executors::SpawnBlocking::spawn_blocking(&self, f)
234
    }
235

            
236
    fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
237
        async_executors::AsyncStd::block_on(f)
238
    }
239
}