1
//! Lowest-level API interface to an active RPC connection.
2
//!
3
//! Treats messages as unrelated strings, and validates outgoing messages for correctness.
4

            
5
use crate::{
6
    msgs::{
7
        request::{InvalidRequestError, ValidatedRequest},
8
        response::UnparsedResponse,
9
    },
10
    util::define_from_for_arc,
11
};
12
use std::{io, sync::Arc};
13

            
14
/// A low-level reader type, wrapping a boxed [`Read`](io::Read).
15
///
16
/// (Currently it performs no additional validation; instead it assumes
17
/// that Arti is obeying its specification.)
18
pub struct Reader {
19
    /// The underlying reader.
20
    backend: Box<dyn io::BufRead + Send>,
21
}
22

            
23
/// A low-level writer type, wrapping a boxed [`Write`](io::Write).
24
///
25
/// It enforces the property that outbound requests are syntactically well-formed.
26
pub struct Writer {
27
    /// The underlying writer.
28
    backend: Box<dyn io::Write + Send>,
29
}
30

            
31
impl Reader {
32
    /// Create a new Reader, wrapping an [`io::BufRead`].
33
18
    pub fn new<T>(backend: T) -> Self
34
18
    where
35
18
        T: io::BufRead + Send + 'static,
36
18
    {
37
18
        Self {
38
18
            backend: Box::new(backend),
39
18
        }
40
18
    }
41

            
42
    /// Receive an inbound reply.
43
    ///
44
    /// Blocks as needed until the reply is available.
45
    ///
46
    /// Returns `Ok(None)` on end-of-stream.
47
8094
    pub fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
48
8094
        let mut s = String::new();
49
8094

            
50
8094
        // TODO: possibly ensure that the value is legit?
51
8094
        match self.backend.read_line(&mut s) {
52
4
            Err(e) => Err(e),
53
2
            Ok(0) => Ok(None),
54
8088
            Ok(_) if s.ends_with('\n') => Ok(Some(UnparsedResponse::new(s))),
55
            // NOTE: This can happen if we hit EOF.
56
            //
57
            // We discard any truncated lines in this case.
58
2
            Ok(_) => Ok(None),
59
        }
60
8094
    }
61
}
62

            
63
impl Writer {
64
    /// Create a new writer, wrapping an [`io::Write`].
65
14
    pub fn new<T>(backend: T) -> Self
66
14
    where
67
14
        T: io::Write + Send + 'static,
68
14
    {
69
14
        Self {
70
14
            backend: Box::new(backend),
71
14
        }
72
14
    }
73

            
74
    /// Send an outbound request.
75
    ///
76
    /// Return an error if an IO problems occurred, or if the request was not well-formed.
77
8
    pub fn send_request(&mut self, request: &str) -> Result<(), SendRequestError> {
78
8
        let validated = ValidatedRequest::from_string_strict(request)?;
79
4
        self.send_valid(&validated)?;
80
2
        Ok(())
81
8
    }
82

            
83
    /// Crate-internal: Send a request that is known to be valid.
84
    ///
85
    /// (This is reliable since we never construct a `ValidRequest` except by encoding a
86
    /// known-correct object.)
87
4148
    pub(crate) fn send_valid(&mut self, request: &ValidatedRequest) -> io::Result<()> {
88
4148
        self.backend.write_all(request.as_ref().as_bytes())
89
4148
    }
90

            
91
    /// Flush any queued data in this writer.
92
2
    pub fn flush(&mut self) -> io::Result<()> {
93
2
        self.backend.flush()
94
2
    }
95
}
96

            
97
/// An error that has occurred while sending a request.
98
#[derive(Clone, Debug, thiserror::Error)]
99
#[non_exhaustive]
100
pub enum SendRequestError {
101
    /// An IO error occurred while sending a request.
102
    #[error("Unable to send request")]
103
    Io(#[source] Arc<io::Error>),
104
    /// We found a problem in the JSON while sending a request.
105
    #[error("Invalid Json request")]
106
    InvalidRequest(#[from] InvalidRequestError),
107
    /// Internal error while re-encoding request.  Should be impossible.
108
    #[error("Unable to re-encode request after parsing it‽")]
109
    ReEncode(#[source] Arc<serde_json::Error>),
110
}
111
define_from_for_arc!( io::Error => SendRequestError [Io] );
112

            
113
#[cfg(test)]
114
mod test {
115
    // @@ begin test lint list maintained by maint/add_warning @@
116
    #![allow(clippy::bool_assert_comparison)]
117
    #![allow(clippy::clone_on_copy)]
118
    #![allow(clippy::dbg_macro)]
119
    #![allow(clippy::mixed_attributes_style)]
120
    #![allow(clippy::print_stderr)]
121
    #![allow(clippy::print_stdout)]
122
    #![allow(clippy::single_char_pattern)]
123
    #![allow(clippy::unwrap_used)]
124
    #![allow(clippy::unchecked_duration_subtraction)]
125
    #![allow(clippy::useless_vec)]
126
    #![allow(clippy::needless_pass_by_value)]
127
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
128

            
129
    use std::thread;
130

            
131
    use io::{BufRead, BufReader, Cursor};
132

            
133
    use crate::util::assert_same_json;
134

            
135
    use super::*;
136

            
137
    struct NeverConnected;
138
    impl io::Read for NeverConnected {
139
        fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
140
            Err(io::ErrorKind::NotConnected.into())
141
        }
142
    }
143
    impl io::Write for NeverConnected {
144
        fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
145
            Err(io::ErrorKind::NotConnected.into())
146
        }
147

            
148
        fn flush(&mut self) -> io::Result<()> {
149
            Err(io::ErrorKind::NotConnected.into())
150
        }
151
    }
152

            
153
    #[test]
154
    fn reading() {
155
        // basic case: valid reply.
156
        let mut v = r#"{"id":7,"result":{}}"#.as_bytes().to_vec();
157
        v.push(b'\n');
158
        let mut r = Reader::new(Cursor::new(v));
159
        let m = r.read_msg();
160
        let msg = m.unwrap().unwrap();
161
        assert_eq!(
162
            msg.as_ref().strip_suffix('\n').unwrap(),
163
            r#"{"id":7,"result":{}}"#
164
        );
165

            
166
        // case 2: incomplete reply (gets treated as EOF)
167
        let mut r = Reader::new(Cursor::new(r#"{"id":7"#));
168
        let m = r.read_msg();
169
        assert!(m.unwrap().is_none());
170

            
171
        // Case 3: empty buffer (gets treated as EOF since there is no more to read.
172
        let mut r = Reader::new(Cursor::new(""));
173
        let m = r.read_msg();
174
        assert!(m.unwrap().is_none());
175

            
176
        // Case 4: reader gives an error
177
        let mut r = Reader::new(BufReader::new(NeverConnected));
178
        let m = r.read_msg();
179
        assert_eq!(m.unwrap_err().kind(), io::ErrorKind::NotConnected);
180
    }
181

            
182
    #[test]
183
    fn write_success() {
184
        let (r, w) = crate::testing::construct_socketpair().unwrap();
185
        let mut w = Writer::new(w);
186
        let mut r = io::BufReader::new(r);
187

            
188
        let wt: thread::JoinHandle<Result<(), SendRequestError>> = thread::spawn(move || {
189
            let res = w.send_request(
190
                r#"{"id":7,
191
                 "obj":"foo",
192
                 "method":"arti:x-frob", "params":{},
193
                 "extra": "preserved"
194
            }"#,
195
            );
196
            w.flush().unwrap();
197
            drop(w);
198
            res
199
        });
200
        let rt = thread::spawn(move || -> io::Result<String> {
201
            let mut s = String::new();
202
            r.read_line(&mut s)?;
203
            Ok(s)
204
        });
205
        let write_result = wt.join().unwrap();
206
        assert!(write_result.is_ok());
207
        let read_result = rt.join().unwrap().unwrap();
208
        assert_same_json!(
209
            read_result.strip_suffix('\n').unwrap(),
210
            r#"{"id":7,"obj":"foo","method":"arti:x-frob","params":{},"extra":"preserved"}"#
211
        );
212
    }
213

            
214
    #[test]
215
    fn write_failure() {
216
        let mut w = Writer::new(NeverConnected);
217

            
218
        // Write an incomplete request.
219
        assert!(matches!(
220
            w.send_request("{"),
221
            Err(SendRequestError::InvalidRequest(_))
222
        ));
223

            
224
        // Write an invalid request.
225
        assert!(matches!(
226
            w.send_request("{}"),
227
            Err(SendRequestError::InvalidRequest(_))
228
        ));
229

            
230
        // Valid request, but get an IO error.
231
        let r = w.send_request(r#"{"id":7,"obj":"foo","method":"arti:x-frob","params":{}}"#);
232
        assert!(
233
            matches!(r, Err(SendRequestError::Io(e)) if e.kind() == io::ErrorKind::NotConnected)
234
        );
235
    }
236
}