arti_rpcserver/
codecs.rs

1//! Helper types for framing Json objects into async read/writes
2
3use std::marker::PhantomData;
4
5use bytes::BytesMut;
6use serde::Serialize;
7
8/// As JsonCodec, but only supports encoding, and places a newline after every
9/// object.
10#[derive(Clone)]
11pub(crate) struct JsonLinesEncoder<T> {
12    /// We consume objects of type T.
13    _phantom: PhantomData<fn(T) -> ()>,
14}
15
16impl<T> Default for JsonLinesEncoder<T> {
17    fn default() -> Self {
18        Self {
19            _phantom: PhantomData,
20        }
21    }
22}
23
24impl<T> asynchronous_codec::Encoder for JsonLinesEncoder<T>
25where
26    T: Serialize + 'static,
27{
28    type Item<'a> = T;
29
30    type Error = asynchronous_codec::JsonCodecError;
31
32    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
33        use std::fmt::Write as _;
34        let j = serde_json::to_string(&item)?;
35        // The jsonlines format won't work if serde_json starts adding newlines in the middle.
36        debug_assert!(!j.contains('\n'));
37        writeln!(dst, "{}", j).expect("write! of string on BytesMut failed");
38        Ok(())
39    }
40}
41
42#[cfg(test)]
43mod test {
44    // @@ begin test lint list maintained by maint/add_warning @@
45    #![allow(clippy::bool_assert_comparison)]
46    #![allow(clippy::clone_on_copy)]
47    #![allow(clippy::dbg_macro)]
48    #![allow(clippy::mixed_attributes_style)]
49    #![allow(clippy::print_stderr)]
50    #![allow(clippy::print_stdout)]
51    #![allow(clippy::single_char_pattern)]
52    #![allow(clippy::unwrap_used)]
53    #![allow(clippy::unchecked_duration_subtraction)]
54    #![allow(clippy::useless_vec)]
55    #![allow(clippy::needless_pass_by_value)]
56    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
57
58    use super::*;
59    use crate::msgs::*;
60    use futures::sink::SinkExt as _;
61    use futures_await_test::async_test;
62    use tor_rpcbase as rpc;
63
64    #[derive(serde::Serialize)]
65    struct Empty {}
66
67    #[async_test]
68    async fn check_sink_basics() {
69        // Sanity-checking for our sink type.
70        let mut buf = Vec::new();
71        let r1 = BoxedResponse {
72            id: Some(RequestId::Int(7)),
73            body: ResponseBody::Update(Box::new(Empty {})),
74        };
75        let r2 = BoxedResponse {
76            id: Some(RequestId::Int(8)),
77            body: ResponseBody::Error(Box::new(rpc::RpcError::from(
78                crate::connection::RequestCancelled,
79            ))),
80        };
81        let r3 = BoxedResponse {
82            id: Some(RequestId::Int(9)),
83            body: ResponseBody::Success(Box::new(Empty {})),
84        };
85
86        // These should get serialized as follows.
87        let mut expect = String::new();
88        expect.extend(serde_json::to_string(&r1));
89        expect.push('\n');
90        expect.extend(serde_json::to_string(&r2));
91        expect.push('\n');
92        expect.extend(serde_json::to_string(&r3));
93        expect.push('\n');
94
95        {
96            let mut sink =
97                asynchronous_codec::FramedWrite::new(&mut buf, JsonLinesEncoder::default());
98            sink.send(r1).await.unwrap();
99            sink.send(r2).await.unwrap();
100            sink.send(r3).await.unwrap();
101        }
102        // Exactly 3 messages means exactly 3 newlines.
103        assert_eq!(buf.iter().filter(|c| **c == b'\n').count(), 3);
104        // Make sure that the output is what we expected.
105        assert_eq!(std::str::from_utf8(&buf).unwrap(), &expect);
106    }
107}