1
//! Helper types for framing Json objects into async read/writes
2

            
3
use std::marker::PhantomData;
4

            
5
use bytes::BytesMut;
6
use serde::Serialize;
7

            
8
/// As JsonCodec, but only supports encoding, and places a newline after every
9
/// object.
10
#[derive(Clone)]
11
pub(crate) struct JsonLinesEncoder<T> {
12
    /// We consume objects of type T.
13
    _phantom: PhantomData<fn(T) -> ()>,
14
}
15

            
16
impl<T> Default for JsonLinesEncoder<T> {
17
2
    fn default() -> Self {
18
2
        Self {
19
2
            _phantom: PhantomData,
20
2
        }
21
2
    }
22
}
23

            
24
impl<T> asynchronous_codec::Encoder for JsonLinesEncoder<T>
25
where
26
    T: Serialize + 'static,
27
{
28
    type Item<'a> = T;
29

            
30
    type Error = asynchronous_codec::JsonCodecError;
31

            
32
6
    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
33
        use std::fmt::Write as _;
34
6
        let j = serde_json::to_string(&item)?;
35
        // The jsonlines format won't work if serde_json starts adding newlines in the middle.
36
6
        debug_assert!(!j.contains('\n'));
37
6
        writeln!(dst, "{}", j).expect("write! of string on BytesMut failed");
38
6
        Ok(())
39
6
    }
40
}
41

            
42
#[cfg(test)]
43
mod test {
44
    // @@ begin test lint list maintained by maint/add_warning @@
45
    #![allow(clippy::bool_assert_comparison)]
46
    #![allow(clippy::clone_on_copy)]
47
    #![allow(clippy::dbg_macro)]
48
    #![allow(clippy::mixed_attributes_style)]
49
    #![allow(clippy::print_stderr)]
50
    #![allow(clippy::print_stdout)]
51
    #![allow(clippy::single_char_pattern)]
52
    #![allow(clippy::unwrap_used)]
53
    #![allow(clippy::unchecked_duration_subtraction)]
54
    #![allow(clippy::useless_vec)]
55
    #![allow(clippy::needless_pass_by_value)]
56
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
57

            
58
    use super::*;
59
    use crate::msgs::*;
60
    use futures::sink::SinkExt as _;
61
    use futures_await_test::async_test;
62
    use tor_rpcbase as rpc;
63

            
64
    #[derive(serde::Serialize)]
65
    struct Empty {}
66

            
67
    #[async_test]
68
    async fn check_sink_basics() {
69
        // Sanity-checking for our sink type.
70
        let mut buf = Vec::new();
71
        let r1 = BoxedResponse {
72
            id: Some(RequestId::Int(7)),
73
            body: ResponseBody::Update(Box::new(Empty {})),
74
        };
75
        let r2 = BoxedResponse {
76
            id: Some(RequestId::Int(8)),
77
            body: ResponseBody::Error(Box::new(rpc::RpcError::from(
78
                crate::connection::RequestCancelled,
79
            ))),
80
        };
81
        let r3 = BoxedResponse {
82
            id: Some(RequestId::Int(9)),
83
            body: ResponseBody::Success(Box::new(Empty {})),
84
        };
85

            
86
        // These should get serialized as follows.
87
        let mut expect = String::new();
88
        expect.extend(serde_json::to_string(&r1));
89
        expect.push('\n');
90
        expect.extend(serde_json::to_string(&r2));
91
        expect.push('\n');
92
        expect.extend(serde_json::to_string(&r3));
93
        expect.push('\n');
94

            
95
        {
96
            let mut sink =
97
                asynchronous_codec::FramedWrite::new(&mut buf, JsonLinesEncoder::default());
98
            sink.send(r1).await.unwrap();
99
            sink.send(r2).await.unwrap();
100
            sink.send(r3).await.unwrap();
101
        }
102
        // Exactly 3 messages means exactly 3 newlines.
103
        assert_eq!(buf.iter().filter(|c| **c == b'\n').count(), 3);
104
        // Make sure that the output is what we expected.
105
        assert_eq!(std::str::from_utf8(&buf).unwrap(), &expect);
106
    }
107
}