tor_rtcompat/impls/
async_std.rs
1mod net {
10 use crate::{impls, traits};
11
12 use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
13 #[cfg(unix)]
14 use async_std_crate::os::unix::net::{UnixListener, UnixStream};
15 use async_trait::async_trait;
16 use futures::future::Future;
17 use futures::stream::Stream;
18 use paste::paste;
19 use std::io::Result as IoResult;
20 use std::net::SocketAddr;
21 use std::pin::Pin;
22 use std::task::{Context, Poll};
23 use tor_general_addr::unix;
24
25 macro_rules! impl_stream {
27 { $kind:ident, $addr:ty } => {paste!{
28 pub struct [<Incoming $kind Streams>] {
34 state: Option<[<Incoming $kind StreamsState>]>,
40 }
41 type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
46 async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
52 let result = lis.accept().await;
53 (result, lis)
54 }
55 enum [<Incoming $kind StreamsState>] {
57 Ready([<$kind Listener>]),
59 Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
62 }
63 impl [<Incoming $kind Streams>] {
64 pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
66 Self {
67 state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
68 }
69 }
70 }
71 impl Stream for [< Incoming $kind Streams >] {
72 type Item = IoResult<([<$kind Stream>], $addr)>;
73
74 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
75 use [<Incoming $kind StreamsState>] as St;
76 let state = self.state.take().expect("No valid state!");
77 let mut future = match state {
78 St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
79 St::Accepting(fut) => fut,
80 };
81 match future.as_mut().poll(cx) {
82 Poll::Ready((val, lis)) => {
83 self.state = Some(St::Ready(lis));
84 Poll::Ready(Some(val))
85 }
86 Poll::Pending => {
87 self.state = Some(St::Accepting(future));
88 Poll::Pending
89 }
90 }
91 }
92 }
93 impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
94 type Stream = [<$kind Stream>];
95 type Incoming = [<Incoming $kind Streams>];
96 fn incoming(self) -> [<Incoming $kind Streams>] {
97 [<Incoming $kind Streams>]::from_listener(self)
98 }
99 fn local_addr(&self) -> IoResult<$addr> {
100 [<$kind Listener>]::local_addr(self)
101 }
102 }
103 }}
104 }
105
106 impl_stream! { Tcp, std::net::SocketAddr }
107 #[cfg(unix)]
108 impl_stream! { Unix, unix::SocketAddr}
109
110 #[async_trait]
111 impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
112 type Stream = TcpStream;
113 type Listener = TcpListener;
114 async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
115 TcpStream::connect(addr).await
116 }
117 async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
118 TcpListener::bind(*addr).await
119 }
120 }
121
122 #[cfg(unix)]
123 #[async_trait]
124 impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
125 type Stream = UnixStream;
126 type Listener = UnixListener;
127 async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
128 let path = addr
129 .as_pathname()
130 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
131 UnixStream::connect(path).await
132 }
133 async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
134 let path = addr
135 .as_pathname()
136 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
137 UnixListener::bind(path).await
138 }
139 }
140
141 #[cfg(not(unix))]
142 crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
143
144 #[async_trait]
145 impl traits::UdpProvider for async_executors::AsyncStd {
146 type UdpSocket = UdpSocket;
147
148 async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
149 StdUdpSocket::bind(*addr)
150 .await
151 .map(|socket| UdpSocket { socket })
152 }
153 }
154
155 pub struct UdpSocket {
157 socket: StdUdpSocket,
159 }
160
161 #[async_trait]
162 impl traits::UdpSocket for UdpSocket {
163 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
164 self.socket.recv_from(buf).await
165 }
166
167 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
168 self.socket.send_to(buf, target).await
169 }
170
171 fn local_addr(&self) -> IoResult<SocketAddr> {
172 self.socket.local_addr()
173 }
174 }
175
176 impl traits::StreamOps for TcpStream {
177 fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
178 impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
179 }
180
181 #[cfg(target_os = "linux")]
182 fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
183 Box::new(impls::streamops::TcpSockFd::from_fd(self))
184 }
185 }
186
187 #[cfg(unix)]
188 impl traits::StreamOps for UnixStream {
189 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
190 Err(traits::UnsupportedStreamOp::new(
191 "set_tcp_notsent_lowat",
192 "unsupported on Unix streams",
193 )
194 .into())
195 }
196 }
197}
198
199use futures::{Future, FutureExt};
202use std::pin::Pin;
203use std::time::Duration;
204
205use crate::traits::*;
206
207pub fn create_runtime() -> async_executors::AsyncStd {
209 async_executors::AsyncStd::new()
210}
211
212impl SleepProvider for async_executors::AsyncStd {
213 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
214 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
215 Box::pin(async_io::Timer::after(duration).map(|_| ()))
216 }
217}
218
219impl ToplevelBlockOn for async_executors::AsyncStd {
220 fn block_on<F: Future>(&self, f: F) -> F::Output {
221 async_executors::AsyncStd::block_on(f)
222 }
223}
224
225impl Blocking for async_executors::AsyncStd {
226 type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
227
228 fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
229 where
230 F: FnOnce() -> T + Send + 'static,
231 T: Send + 'static,
232 {
233 async_executors::SpawnBlocking::spawn_blocking(&self, f)
234 }
235
236 fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
237 async_executors::AsyncStd::block_on(f)
238 }
239}