arti_rpc_client_core/
llconn.rs
1use crate::{
6 msgs::{
7 request::{InvalidRequestError, ValidatedRequest},
8 response::UnparsedResponse,
9 },
10 util::define_from_for_arc,
11};
12use std::{io, sync::Arc};
13
14pub struct Reader {
19 backend: Box<dyn io::BufRead + Send>,
21}
22
23pub struct Writer {
27 backend: Box<dyn io::Write + Send>,
29}
30
31impl Reader {
32 pub fn new<T>(backend: T) -> Self
34 where
35 T: io::BufRead + Send + 'static,
36 {
37 Self {
38 backend: Box::new(backend),
39 }
40 }
41
42 pub fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
48 let mut s = String::new();
49
50 match self.backend.read_line(&mut s) {
52 Err(e) => Err(e),
53 Ok(0) => Ok(None),
54 Ok(_) if s.ends_with('\n') => Ok(Some(UnparsedResponse::new(s))),
55 Ok(_) => Ok(None),
59 }
60 }
61}
62
63impl Writer {
64 pub fn new<T>(backend: T) -> Self
66 where
67 T: io::Write + Send + 'static,
68 {
69 Self {
70 backend: Box::new(backend),
71 }
72 }
73
74 pub fn send_request(&mut self, request: &str) -> Result<(), SendRequestError> {
78 let validated = ValidatedRequest::from_string_strict(request)?;
79 self.send_valid(&validated)?;
80 Ok(())
81 }
82
83 pub(crate) fn send_valid(&mut self, request: &ValidatedRequest) -> io::Result<()> {
88 self.backend.write_all(request.as_ref().as_bytes())
89 }
90
91 pub fn flush(&mut self) -> io::Result<()> {
93 self.backend.flush()
94 }
95}
96
97#[derive(Clone, Debug, thiserror::Error)]
99#[non_exhaustive]
100pub enum SendRequestError {
101 #[error("Unable to send request")]
103 Io(#[source] Arc<io::Error>),
104 #[error("Invalid Json request")]
106 InvalidRequest(#[from] InvalidRequestError),
107 #[error("Unable to re-encode request after parsing it‽")]
109 ReEncode(#[source] Arc<serde_json::Error>),
110}
111define_from_for_arc!( io::Error => SendRequestError [Io] );
112
113#[cfg(test)]
114mod test {
115 #![allow(clippy::bool_assert_comparison)]
117 #![allow(clippy::clone_on_copy)]
118 #![allow(clippy::dbg_macro)]
119 #![allow(clippy::mixed_attributes_style)]
120 #![allow(clippy::print_stderr)]
121 #![allow(clippy::print_stdout)]
122 #![allow(clippy::single_char_pattern)]
123 #![allow(clippy::unwrap_used)]
124 #![allow(clippy::unchecked_duration_subtraction)]
125 #![allow(clippy::useless_vec)]
126 #![allow(clippy::needless_pass_by_value)]
127 use std::thread;
130
131 use io::{BufRead, BufReader, Cursor};
132
133 use crate::util::assert_same_json;
134
135 use super::*;
136
137 struct NeverConnected;
138 impl io::Read for NeverConnected {
139 fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
140 Err(io::ErrorKind::NotConnected.into())
141 }
142 }
143 impl io::Write for NeverConnected {
144 fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
145 Err(io::ErrorKind::NotConnected.into())
146 }
147
148 fn flush(&mut self) -> io::Result<()> {
149 Err(io::ErrorKind::NotConnected.into())
150 }
151 }
152
153 #[test]
154 fn reading() {
155 let mut v = r#"{"id":7,"result":{}}"#.as_bytes().to_vec();
157 v.push(b'\n');
158 let mut r = Reader::new(Cursor::new(v));
159 let m = r.read_msg();
160 let msg = m.unwrap().unwrap();
161 assert_eq!(
162 msg.as_ref().strip_suffix('\n').unwrap(),
163 r#"{"id":7,"result":{}}"#
164 );
165
166 let mut r = Reader::new(Cursor::new(r#"{"id":7"#));
168 let m = r.read_msg();
169 assert!(m.unwrap().is_none());
170
171 let mut r = Reader::new(Cursor::new(""));
173 let m = r.read_msg();
174 assert!(m.unwrap().is_none());
175
176 let mut r = Reader::new(BufReader::new(NeverConnected));
178 let m = r.read_msg();
179 assert_eq!(m.unwrap_err().kind(), io::ErrorKind::NotConnected);
180 }
181
182 #[test]
183 fn write_success() {
184 let (r, w) = crate::testing::construct_socketpair().unwrap();
185 let mut w = Writer::new(w);
186 let mut r = io::BufReader::new(r);
187
188 let wt: thread::JoinHandle<Result<(), SendRequestError>> = thread::spawn(move || {
189 let res = w.send_request(
190 r#"{"id":7,
191 "obj":"foo",
192 "method":"arti:x-frob", "params":{},
193 "extra": "preserved"
194 }"#,
195 );
196 w.flush().unwrap();
197 drop(w);
198 res
199 });
200 let rt = thread::spawn(move || -> io::Result<String> {
201 let mut s = String::new();
202 r.read_line(&mut s)?;
203 Ok(s)
204 });
205 let write_result = wt.join().unwrap();
206 assert!(write_result.is_ok());
207 let read_result = rt.join().unwrap().unwrap();
208 assert_same_json!(
209 read_result.strip_suffix('\n').unwrap(),
210 r#"{"id":7,"obj":"foo","method":"arti:x-frob","params":{},"extra":"preserved"}"#
211 );
212 }
213
214 #[test]
215 fn write_failure() {
216 let mut w = Writer::new(NeverConnected);
217
218 assert!(matches!(
220 w.send_request("{"),
221 Err(SendRequestError::InvalidRequest(_))
222 ));
223
224 assert!(matches!(
226 w.send_request("{}"),
227 Err(SendRequestError::InvalidRequest(_))
228 ));
229
230 let r = w.send_request(r#"{"id":7,"obj":"foo","method":"arti:x-frob","params":{}}"#);
232 assert!(
233 matches!(r, Err(SendRequestError::Io(e)) if e.kind() == io::ErrorKind::NotConnected)
234 );
235 }
236}