arti_rpc_client_core/
llconn.rs

1//! Lowest-level API interface to an active RPC connection.
2//!
3//! Treats messages as unrelated strings, and validates outgoing messages for correctness.
4
5use crate::{
6    msgs::{
7        request::{InvalidRequestError, ValidatedRequest},
8        response::UnparsedResponse,
9    },
10    util::define_from_for_arc,
11};
12use std::{io, sync::Arc};
13
14/// A low-level reader type, wrapping a boxed [`Read`](io::Read).
15///
16/// (Currently it performs no additional validation; instead it assumes
17/// that Arti is obeying its specification.)
18pub struct Reader {
19    /// The underlying reader.
20    backend: Box<dyn io::BufRead + Send>,
21}
22
23/// A low-level writer type, wrapping a boxed [`Write`](io::Write).
24///
25/// It enforces the property that outbound requests are syntactically well-formed.
26pub struct Writer {
27    /// The underlying writer.
28    backend: Box<dyn io::Write + Send>,
29}
30
31impl Reader {
32    /// Create a new Reader, wrapping an [`io::BufRead`].
33    pub fn new<T>(backend: T) -> Self
34    where
35        T: io::BufRead + Send + 'static,
36    {
37        Self {
38            backend: Box::new(backend),
39        }
40    }
41
42    /// Receive an inbound reply.
43    ///
44    /// Blocks as needed until the reply is available.
45    ///
46    /// Returns `Ok(None)` on end-of-stream.
47    pub fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
48        let mut s = String::new();
49
50        // TODO: possibly ensure that the value is legit?
51        match self.backend.read_line(&mut s) {
52            Err(e) => Err(e),
53            Ok(0) => Ok(None),
54            Ok(_) if s.ends_with('\n') => Ok(Some(UnparsedResponse::new(s))),
55            // NOTE: This can happen if we hit EOF.
56            //
57            // We discard any truncated lines in this case.
58            Ok(_) => Ok(None),
59        }
60    }
61}
62
63impl Writer {
64    /// Create a new writer, wrapping an [`io::Write`].
65    pub fn new<T>(backend: T) -> Self
66    where
67        T: io::Write + Send + 'static,
68    {
69        Self {
70            backend: Box::new(backend),
71        }
72    }
73
74    /// Send an outbound request.
75    ///
76    /// Return an error if an IO problems occurred, or if the request was not well-formed.
77    pub fn send_request(&mut self, request: &str) -> Result<(), SendRequestError> {
78        let validated = ValidatedRequest::from_string_strict(request)?;
79        self.send_valid(&validated)?;
80        Ok(())
81    }
82
83    /// Crate-internal: Send a request that is known to be valid.
84    ///
85    /// (This is reliable since we never construct a `ValidRequest` except by encoding a
86    /// known-correct object.)
87    pub(crate) fn send_valid(&mut self, request: &ValidatedRequest) -> io::Result<()> {
88        self.backend.write_all(request.as_ref().as_bytes())
89    }
90
91    /// Flush any queued data in this writer.
92    pub fn flush(&mut self) -> io::Result<()> {
93        self.backend.flush()
94    }
95}
96
97/// An error that has occurred while sending a request.
98#[derive(Clone, Debug, thiserror::Error)]
99#[non_exhaustive]
100pub enum SendRequestError {
101    /// An IO error occurred while sending a request.
102    #[error("Unable to send request")]
103    Io(#[source] Arc<io::Error>),
104    /// We found a problem in the JSON while sending a request.
105    #[error("Invalid Json request")]
106    InvalidRequest(#[from] InvalidRequestError),
107    /// Internal error while re-encoding request.  Should be impossible.
108    #[error("Unable to re-encode request after parsing it‽")]
109    ReEncode(#[source] Arc<serde_json::Error>),
110}
111define_from_for_arc!( io::Error => SendRequestError [Io] );
112
113#[cfg(test)]
114mod test {
115    // @@ begin test lint list maintained by maint/add_warning @@
116    #![allow(clippy::bool_assert_comparison)]
117    #![allow(clippy::clone_on_copy)]
118    #![allow(clippy::dbg_macro)]
119    #![allow(clippy::mixed_attributes_style)]
120    #![allow(clippy::print_stderr)]
121    #![allow(clippy::print_stdout)]
122    #![allow(clippy::single_char_pattern)]
123    #![allow(clippy::unwrap_used)]
124    #![allow(clippy::unchecked_duration_subtraction)]
125    #![allow(clippy::useless_vec)]
126    #![allow(clippy::needless_pass_by_value)]
127    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
128
129    use std::thread;
130
131    use io::{BufRead, BufReader, Cursor};
132
133    use crate::util::assert_same_json;
134
135    use super::*;
136
137    struct NeverConnected;
138    impl io::Read for NeverConnected {
139        fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
140            Err(io::ErrorKind::NotConnected.into())
141        }
142    }
143    impl io::Write for NeverConnected {
144        fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
145            Err(io::ErrorKind::NotConnected.into())
146        }
147
148        fn flush(&mut self) -> io::Result<()> {
149            Err(io::ErrorKind::NotConnected.into())
150        }
151    }
152
153    #[test]
154    fn reading() {
155        // basic case: valid reply.
156        let mut v = r#"{"id":7,"result":{}}"#.as_bytes().to_vec();
157        v.push(b'\n');
158        let mut r = Reader::new(Cursor::new(v));
159        let m = r.read_msg();
160        let msg = m.unwrap().unwrap();
161        assert_eq!(
162            msg.as_ref().strip_suffix('\n').unwrap(),
163            r#"{"id":7,"result":{}}"#
164        );
165
166        // case 2: incomplete reply (gets treated as EOF)
167        let mut r = Reader::new(Cursor::new(r#"{"id":7"#));
168        let m = r.read_msg();
169        assert!(m.unwrap().is_none());
170
171        // Case 3: empty buffer (gets treated as EOF since there is no more to read.
172        let mut r = Reader::new(Cursor::new(""));
173        let m = r.read_msg();
174        assert!(m.unwrap().is_none());
175
176        // Case 4: reader gives an error
177        let mut r = Reader::new(BufReader::new(NeverConnected));
178        let m = r.read_msg();
179        assert_eq!(m.unwrap_err().kind(), io::ErrorKind::NotConnected);
180    }
181
182    #[test]
183    fn write_success() {
184        let (r, w) = crate::testing::construct_socketpair().unwrap();
185        let mut w = Writer::new(w);
186        let mut r = io::BufReader::new(r);
187
188        let wt: thread::JoinHandle<Result<(), SendRequestError>> = thread::spawn(move || {
189            let res = w.send_request(
190                r#"{"id":7,
191                 "obj":"foo",
192                 "method":"arti:x-frob", "params":{},
193                 "extra": "preserved"
194            }"#,
195            );
196            w.flush().unwrap();
197            drop(w);
198            res
199        });
200        let rt = thread::spawn(move || -> io::Result<String> {
201            let mut s = String::new();
202            r.read_line(&mut s)?;
203            Ok(s)
204        });
205        let write_result = wt.join().unwrap();
206        assert!(write_result.is_ok());
207        let read_result = rt.join().unwrap().unwrap();
208        assert_same_json!(
209            read_result.strip_suffix('\n').unwrap(),
210            r#"{"id":7,"obj":"foo","method":"arti:x-frob","params":{},"extra":"preserved"}"#
211        );
212    }
213
214    #[test]
215    fn write_failure() {
216        let mut w = Writer::new(NeverConnected);
217
218        // Write an incomplete request.
219        assert!(matches!(
220            w.send_request("{"),
221            Err(SendRequestError::InvalidRequest(_))
222        ));
223
224        // Write an invalid request.
225        assert!(matches!(
226            w.send_request("{}"),
227            Err(SendRequestError::InvalidRequest(_))
228        ));
229
230        // Valid request, but get an IO error.
231        let r = w.send_request(r#"{"id":7,"obj":"foo","method":"arti:x-frob","params":{}}"#);
232        assert!(
233            matches!(r, Err(SendRequestError::Io(e)) if e.kind() == io::ErrorKind::NotConnected)
234        );
235    }
236}