arti_rpcserver/
codecs.rs
1use std::marker::PhantomData;
4
5use bytes::BytesMut;
6use serde::Serialize;
7
8#[derive(Clone)]
11pub(crate) struct JsonLinesEncoder<T> {
12 _phantom: PhantomData<fn(T) -> ()>,
14}
15
16impl<T> Default for JsonLinesEncoder<T> {
17 fn default() -> Self {
18 Self {
19 _phantom: PhantomData,
20 }
21 }
22}
23
24impl<T> asynchronous_codec::Encoder for JsonLinesEncoder<T>
25where
26 T: Serialize + 'static,
27{
28 type Item<'a> = T;
29
30 type Error = asynchronous_codec::JsonCodecError;
31
32 fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
33 use std::fmt::Write as _;
34 let j = serde_json::to_string(&item)?;
35 debug_assert!(!j.contains('\n'));
37 writeln!(dst, "{}", j).expect("write! of string on BytesMut failed");
38 Ok(())
39 }
40}
41
42#[cfg(test)]
43mod test {
44 #![allow(clippy::bool_assert_comparison)]
46 #![allow(clippy::clone_on_copy)]
47 #![allow(clippy::dbg_macro)]
48 #![allow(clippy::mixed_attributes_style)]
49 #![allow(clippy::print_stderr)]
50 #![allow(clippy::print_stdout)]
51 #![allow(clippy::single_char_pattern)]
52 #![allow(clippy::unwrap_used)]
53 #![allow(clippy::unchecked_duration_subtraction)]
54 #![allow(clippy::useless_vec)]
55 #![allow(clippy::needless_pass_by_value)]
56 use super::*;
59 use crate::msgs::*;
60 use futures::sink::SinkExt as _;
61 use futures_await_test::async_test;
62 use tor_rpcbase as rpc;
63
64 #[derive(serde::Serialize)]
65 struct Empty {}
66
67 #[async_test]
68 async fn check_sink_basics() {
69 let mut buf = Vec::new();
71 let r1 = BoxedResponse {
72 id: Some(RequestId::Int(7)),
73 body: ResponseBody::Update(Box::new(Empty {})),
74 };
75 let r2 = BoxedResponse {
76 id: Some(RequestId::Int(8)),
77 body: ResponseBody::Error(Box::new(rpc::RpcError::from(
78 crate::connection::RequestCancelled,
79 ))),
80 };
81 let r3 = BoxedResponse {
82 id: Some(RequestId::Int(9)),
83 body: ResponseBody::Success(Box::new(Empty {})),
84 };
85
86 let mut expect = String::new();
88 expect.extend(serde_json::to_string(&r1));
89 expect.push('\n');
90 expect.extend(serde_json::to_string(&r2));
91 expect.push('\n');
92 expect.extend(serde_json::to_string(&r3));
93 expect.push('\n');
94
95 {
96 let mut sink =
97 asynchronous_codec::FramedWrite::new(&mut buf, JsonLinesEncoder::default());
98 sink.send(r1).await.unwrap();
99 sink.send(r2).await.unwrap();
100 sink.send(r3).await.unwrap();
101 }
102 assert_eq!(buf.iter().filter(|c| **c == b'\n').count(), 3);
104 assert_eq!(std::str::from_utf8(&buf).unwrap(), &expect);
106 }
107}