tor_proto/stream/
resolve.rs
1use crate::memquota::StreamAccount;
4use crate::stream::StreamReceiver;
5use crate::{Error, Result};
6
7use futures::StreamExt;
8use tor_cell::relaycell::msg::Resolved;
9use tor_cell::relaycell::RelayCmd;
10use tor_cell::restricted_msg;
11
12use super::AnyCmdChecker;
13
14pub struct ResolveStream {
17 s: StreamReceiver,
19
20 _memquota: StreamAccount,
24}
25
26restricted_msg! {
27 enum ResolveResponseMsg : RelayMsg {
29 End,
30 Resolved,
31 }
32}
33
34impl ResolveStream {
35 pub(crate) fn new(s: StreamReceiver, memquota: StreamAccount) -> Self {
39 ResolveStream {
40 s,
41 _memquota: memquota,
42 }
43 }
44
45 pub async fn read_msg(&mut self) -> Result<Resolved> {
48 use ResolveResponseMsg::*;
49 let cell = match self.s.next().await {
50 Some(cell) => cell?,
51 None => return Err(Error::NotConnected),
52 };
53 let msg = match cell.decode::<ResolveResponseMsg>() {
54 Ok(cell) => cell.into_msg(),
55 Err(e) => {
56 self.s.protocol_error();
57 return Err(Error::from_bytes_err(e, "response on a resolve stream"));
58 }
59 };
60 match msg {
61 End(e) => Err(Error::EndReceived(e.reason())),
62 Resolved(r) => Ok(r),
63 }
64 }
65}
66
67#[derive(Debug, Default)]
70pub(crate) struct ResolveCmdChecker {}
71
72impl super::CmdChecker for ResolveCmdChecker {
73 fn check_msg(
74 &mut self,
75 msg: &tor_cell::relaycell::UnparsedRelayMsg,
76 ) -> Result<super::StreamStatus> {
77 use super::StreamStatus::Closed;
78 match msg.cmd() {
79 RelayCmd::RESOLVED => Ok(Closed),
80 RelayCmd::END => Ok(Closed),
81 _ => Err(Error::StreamProto(format!(
82 "Unexpected {} on resolve stream",
83 msg.cmd()
84 ))),
85 }
86 }
87
88 fn consume_checked_msg(&mut self, msg: tor_cell::relaycell::UnparsedRelayMsg) -> Result<()> {
89 let _ = msg
90 .decode::<ResolveResponseMsg>()
91 .map_err(|err| Error::from_bytes_err(err, "message on resolve stream."))?;
92 Ok(())
93 }
94}
95
96impl ResolveCmdChecker {
97 pub(crate) fn new_any() -> AnyCmdChecker {
100 Box::<Self>::default()
101 }
102}