1use futures::lock::Mutex;
7use futures::stream::StreamExt;
8use futures::task::SpawnExt;
9use hickory_proto::op::{
10 header::MessageType, op_code::OpCode, response_code::ResponseCode, Message, Query,
11};
12use hickory_proto::rr::{rdata, DNSClass, Name, RData, Record, RecordType};
13use hickory_proto::serialize::binary::{BinDecodable, BinEncodable};
14use std::collections::HashMap;
15use std::net::{IpAddr, SocketAddr};
16use std::sync::Arc;
17use tracing::{debug, error, info, warn};
18
19use arti_client::{Error, HasKind, StreamPrefs, TorClient};
20use safelog::sensitive as sv;
21use tor_config::Listen;
22use tor_error::{error_report, warn_report};
23use tor_rtcompat::{Runtime, UdpSocket};
24
25use anyhow::{anyhow, Result};
26
27const MAX_DATAGRAM_SIZE: usize = 1536;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35struct DnsIsolationKey(usize, IpAddr);
36
37impl arti_client::isolation::IsolationHelper for DnsIsolationKey {
38 fn compatible_same_type(&self, other: &Self) -> bool {
39 self == other
40 }
41
42 fn join_same_type(&self, other: &Self) -> Option<Self> {
43 if self == other {
44 Some(self.clone())
45 } else {
46 None
47 }
48 }
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Hash)]
53struct DnsCacheKey(DnsIsolationKey, Vec<Query>);
54
55#[derive(Debug, Clone)]
57struct DnsResponseTarget<U> {
58 id: u16,
60 addr: SocketAddr,
62 socket: Arc<U>,
64}
65
66async fn do_query<R>(
68 tor_client: TorClient<R>,
69 queries: &[Query],
70 prefs: &StreamPrefs,
71) -> Result<Vec<Record>, ResponseCode>
72where
73 R: Runtime,
74{
75 let mut answers = Vec::new();
76
77 let err_conv = |error: Error| {
78 if tor_error::ErrorKind::RemoteHostNotFound == error.kind() {
79 ResponseCode::NoError
81 } else {
82 ResponseCode::ServFail
83 }
84 };
85 for query in queries {
86 let mut a = Vec::new();
87 let mut ptr = Vec::new();
88
89 match query.query_class() {
92 DNSClass::IN => {
93 match query.query_type() {
94 typ @ RecordType::A | typ @ RecordType::AAAA => {
95 let mut name = query.name().clone();
96 name.set_fqdn(false);
98 let res = tor_client
99 .resolve_with_prefs(&name.to_utf8(), prefs)
100 .await
101 .map_err(err_conv)?;
102 for ip in res {
103 a.push((query.name().clone(), ip, typ));
104 }
105 }
106 RecordType::PTR => {
107 let addr = query
108 .name()
109 .parse_arpa_name()
110 .map_err(|_| ResponseCode::FormErr)?
111 .addr();
112 let res = tor_client
113 .resolve_ptr_with_prefs(addr, prefs)
114 .await
115 .map_err(err_conv)?;
116 for domain in res {
117 let domain =
118 Name::from_utf8(domain).map_err(|_| ResponseCode::ServFail)?;
119 ptr.push((query.name().clone(), domain));
120 }
121 }
122 _ => {
123 return Err(ResponseCode::NotImp);
124 }
125 }
126 }
127 _ => {
128 return Err(ResponseCode::NotImp);
129 }
130 }
131 for (name, ip, typ) in a {
132 match (ip, typ) {
133 (IpAddr::V4(v4), RecordType::A) => {
134 answers.push(Record::from_rdata(name, 3600, RData::A(rdata::A(v4))));
135 }
136 (IpAddr::V6(v6), RecordType::AAAA) => {
137 answers.push(Record::from_rdata(name, 3600, RData::AAAA(rdata::AAAA(v6))));
138 }
139 _ => (),
140 }
141 }
142 for (ptr, name) in ptr {
143 answers.push(Record::from_rdata(ptr, 3600, RData::PTR(rdata::PTR(name))));
144 }
145 }
146
147 Ok(answers)
148}
149
150#[allow(clippy::cognitive_complexity)] async fn handle_dns_req<R, U>(
154 tor_client: TorClient<R>,
155 socket_id: usize,
156 packet: &[u8],
157 addr: SocketAddr,
158 socket: Arc<U>,
159 current_requests: &Mutex<HashMap<DnsCacheKey, Vec<DnsResponseTarget<U>>>>,
160) -> Result<()>
161where
162 R: Runtime,
163 U: UdpSocket,
164{
165 let mut query = Message::from_bytes(packet)?;
167 let id = query.id();
168 let queries = query.queries();
169 let isolation = DnsIsolationKey(socket_id, addr.ip());
170
171 let request_id = {
172 let request_id = DnsCacheKey(isolation.clone(), queries.to_vec());
173
174 let response_target = DnsResponseTarget { id, addr, socket };
175
176 let mut current_requests = current_requests.lock().await;
177
178 let req = current_requests.entry(request_id.clone()).or_default();
179 req.push(response_target);
180
181 if req.len() > 1 {
182 debug!("Received a query already being served");
183 return Ok(());
184 }
185 debug!("Received a new query");
186
187 request_id
188 };
189
190 let mut prefs = StreamPrefs::new();
191 prefs.set_isolation(isolation);
192
193 let mut response = match do_query(tor_client, queries, &prefs).await {
194 Ok(answers) => {
195 let mut response = Message::new();
196 response
197 .set_message_type(MessageType::Response)
198 .set_op_code(OpCode::Query)
199 .set_recursion_desired(query.recursion_desired())
200 .set_recursion_available(true)
201 .add_queries(query.take_queries())
202 .add_answers(answers);
203 response
205 }
206 Err(error_type) => Message::error_msg(id, OpCode::Query, error_type),
207 };
208
209 let targets = current_requests
211 .lock()
212 .await
213 .remove(&request_id)
214 .unwrap_or_default();
215
216 for target in targets {
217 response.set_id(target.id);
218 let response = match response.to_bytes() {
220 Ok(r) => r,
221 Err(e) => {
222 error_report!(e, "Failed to serialize DNS packet: {:?}", sv(&response));
227 continue;
228 }
229 };
230 let _ = target.socket.send(&response, &target.addr).await;
231 }
232 Ok(())
233}
234
235#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
237#[allow(clippy::cognitive_complexity)] pub(crate) async fn run_dns_resolver<R: Runtime>(
239 runtime: R,
240 tor_client: TorClient<R>,
241 listen: Listen,
242) -> Result<()> {
243 if !listen.is_localhost_only() {
244 warn!("Configured to listen for DNS on non-local addresses. This is usually insecure! We recommend listening on localhost only.");
245 }
246
247 let mut listeners = Vec::new();
248
249 match listen.ip_addrs() {
251 Ok(addrgroups) => {
252 for addrgroup in addrgroups {
253 for addr in addrgroup {
254 match runtime.bind(&addr).await {
257 Ok(listener) => {
258 info!("Listening on {:?}.", addr);
259 listeners.push(listener);
260 }
261 #[cfg(unix)]
262 Err(ref e) if e.raw_os_error() == Some(libc::EAFNOSUPPORT) => {
263 warn_report!(e, "Address family not supported {}", addr);
264 }
265 Err(ref e) => {
266 return Err(anyhow!("Can't listen on {}: {e}", addr));
267 }
268 }
269 }
270 }
271 }
272 Err(e) => warn_report!(e, "Invalid listen spec"),
273 }
274 if listeners.is_empty() {
276 error!("Couldn't open any DNS listeners.");
277 return Err(anyhow!("Couldn't open any DNS listeners"));
278 }
279
280 let mut incoming = futures::stream::select_all(
281 listeners
282 .into_iter()
283 .map(|socket| {
284 futures::stream::unfold(Arc::new(socket), |socket| async {
285 let mut packet = [0; MAX_DATAGRAM_SIZE];
286 let packet = socket
287 .recv(&mut packet)
288 .await
289 .map(|(size, remote)| (packet, size, remote, socket.clone()));
290 Some((packet, socket))
291 })
292 })
293 .enumerate()
294 .map(|(listener_id, incoming_packet)| {
295 Box::pin(incoming_packet.map(move |packet| (packet, listener_id)))
296 }),
297 );
298
299 let pending_requests = Arc::new(Mutex::new(HashMap::new()));
300 while let Some((packet, id)) = incoming.next().await {
301 let (packet, size, addr, socket) = match packet {
302 Ok(packet) => packet,
303 Err(err) => {
304 warn_report!(err, "Incoming datagram failed");
306 continue;
307 }
308 };
309
310 let client_ref = tor_client.clone();
311 runtime.spawn({
312 let pending_requests = pending_requests.clone();
313 async move {
314 let res = handle_dns_req(
315 client_ref,
316 id,
317 &packet[..size],
318 addr,
319 socket,
320 &pending_requests,
321 )
322 .await;
323 if let Err(e) = res {
324 warn!("connection exited with error: {}", tor_error::Report(e));
326 }
327 }
328 })?;
329 }
330
331 Ok(())
332}