tor_rtcompat/impls/
async_std.rs
1mod net {
10 use crate::{impls, traits};
11
12 use async_std_crate::net::{TcpListener, TcpStream, UdpSocket as StdUdpSocket};
13 #[cfg(unix)]
14 use async_std_crate::os::unix::net::{UnixListener, UnixStream};
15 use async_trait::async_trait;
16 use futures::future::Future;
17 use futures::stream::Stream;
18 use paste::paste;
19 use std::io::Result as IoResult;
20 use std::net::SocketAddr;
21 use std::pin::Pin;
22 use std::task::{Context, Poll};
23 #[cfg(unix)]
24 use tor_general_addr::unix;
25
26 macro_rules! impl_stream {
28 { $kind:ident, $addr:ty } => {paste!{
29 pub struct [<Incoming $kind Streams>] {
35 state: Option<[<Incoming $kind StreamsState>]>,
41 }
42 type [<$kind FResult>] = (IoResult<([<$kind Stream>], $addr)>, [<$kind Listener>]);
47 async fn [<take_and_poll_ $kind:lower>](lis: [<$kind Listener>]) -> [<$kind FResult>] {
53 let result = lis.accept().await;
54 (result, lis)
55 }
56 enum [<Incoming $kind StreamsState>] {
58 Ready([<$kind Listener>]),
60 Accepting(Pin<Box<dyn Future<Output = [<$kind FResult>]> + Send + Sync>>),
63 }
64 impl [<Incoming $kind Streams>] {
65 pub fn from_listener(lis: [<$kind Listener>]) -> [<Incoming $kind Streams>] {
67 Self {
68 state: Some([<Incoming $kind StreamsState>]::Ready(lis)),
69 }
70 }
71 }
72 impl Stream for [< Incoming $kind Streams >] {
73 type Item = IoResult<([<$kind Stream>], $addr)>;
74
75 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
76 use [<Incoming $kind StreamsState>] as St;
77 let state = self.state.take().expect("No valid state!");
78 let mut future = match state {
79 St::Ready(lis) => Box::pin([<take_and_poll_ $kind:lower>](lis)),
80 St::Accepting(fut) => fut,
81 };
82 match future.as_mut().poll(cx) {
83 Poll::Ready((val, lis)) => {
84 self.state = Some(St::Ready(lis));
85 Poll::Ready(Some(val))
86 }
87 Poll::Pending => {
88 self.state = Some(St::Accepting(future));
89 Poll::Pending
90 }
91 }
92 }
93 }
94 impl traits::NetStreamListener<$addr> for [<$kind Listener>] {
95 type Stream = [<$kind Stream>];
96 type Incoming = [<Incoming $kind Streams>];
97 fn incoming(self) -> [<Incoming $kind Streams>] {
98 [<Incoming $kind Streams>]::from_listener(self)
99 }
100 fn local_addr(&self) -> IoResult<$addr> {
101 [<$kind Listener>]::local_addr(self)
102 }
103 }
104 }}
105 }
106
107 impl_stream! { Tcp, std::net::SocketAddr }
108 #[cfg(unix)]
109 impl_stream! { Unix, unix::SocketAddr}
110
111 #[async_trait]
112 impl traits::NetStreamProvider<std::net::SocketAddr> for async_executors::AsyncStd {
113 type Stream = TcpStream;
114 type Listener = TcpListener;
115 async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::Stream> {
116 TcpStream::connect(addr).await
117 }
118 async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::Listener> {
119 TcpListener::bind(*addr).await
120 }
121 }
122
123 #[cfg(unix)]
124 #[async_trait]
125 impl traits::NetStreamProvider<unix::SocketAddr> for async_executors::AsyncStd {
126 type Stream = UnixStream;
127 type Listener = UnixListener;
128 async fn connect(&self, addr: &unix::SocketAddr) -> IoResult<Self::Stream> {
129 let path = addr
130 .as_pathname()
131 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
132 UnixStream::connect(path).await
133 }
134 async fn listen(&self, addr: &unix::SocketAddr) -> IoResult<Self::Listener> {
135 let path = addr
136 .as_pathname()
137 .ok_or(crate::unix::UnsupportedAfUnixAddressType)?;
138 UnixListener::bind(path).await
139 }
140 }
141
142 #[cfg(not(unix))]
143 crate::impls::impl_unix_non_provider! { async_executors::AsyncStd }
144
145 #[async_trait]
146 impl traits::UdpProvider for async_executors::AsyncStd {
147 type UdpSocket = UdpSocket;
148
149 async fn bind(&self, addr: &std::net::SocketAddr) -> IoResult<Self::UdpSocket> {
150 StdUdpSocket::bind(*addr)
151 .await
152 .map(|socket| UdpSocket { socket })
153 }
154 }
155
156 pub struct UdpSocket {
158 socket: StdUdpSocket,
160 }
161
162 #[async_trait]
163 impl traits::UdpSocket for UdpSocket {
164 async fn recv(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> {
165 self.socket.recv_from(buf).await
166 }
167
168 async fn send(&self, buf: &[u8], target: &SocketAddr) -> IoResult<usize> {
169 self.socket.send_to(buf, target).await
170 }
171
172 fn local_addr(&self) -> IoResult<SocketAddr> {
173 self.socket.local_addr()
174 }
175 }
176
177 impl traits::StreamOps for TcpStream {
178 fn set_tcp_notsent_lowat(&self, notsent_lowat: u32) -> IoResult<()> {
179 impls::streamops::set_tcp_notsent_lowat(self, notsent_lowat)
180 }
181
182 #[cfg(target_os = "linux")]
183 fn new_handle(&self) -> Box<dyn traits::StreamOps + Send + Unpin> {
184 Box::new(impls::streamops::TcpSockFd::from_fd(self))
185 }
186 }
187
188 #[cfg(unix)]
189 impl traits::StreamOps for UnixStream {
190 fn set_tcp_notsent_lowat(&self, _notsent_lowat: u32) -> IoResult<()> {
191 Err(traits::UnsupportedStreamOp::new(
192 "set_tcp_notsent_lowat",
193 "unsupported on Unix streams",
194 )
195 .into())
196 }
197 }
198}
199
200use futures::{Future, FutureExt};
203use std::pin::Pin;
204use std::time::Duration;
205
206use crate::traits::*;
207
208pub fn create_runtime() -> async_executors::AsyncStd {
210 async_executors::AsyncStd::new()
211}
212
213impl SleepProvider for async_executors::AsyncStd {
214 type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
215 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
216 Box::pin(async_io::Timer::after(duration).map(|_| ()))
217 }
218}
219
220impl ToplevelBlockOn for async_executors::AsyncStd {
221 fn block_on<F: Future>(&self, f: F) -> F::Output {
222 async_executors::AsyncStd::block_on(f)
223 }
224}
225
226impl Blocking for async_executors::AsyncStd {
227 type ThreadHandle<T: Send + 'static> = async_executors::BlockingHandle<T>;
228
229 fn spawn_blocking<F, T>(&self, f: F) -> async_executors::BlockingHandle<T>
230 where
231 F: FnOnce() -> T + Send + 'static,
232 T: Send + 'static,
233 {
234 async_executors::SpawnBlocking::spawn_blocking(&self, f)
235 }
236
237 fn reenter_block_on<F: Future>(&self, f: F) -> F::Output {
238 async_executors::AsyncStd::block_on(f)
239 }
240}