1use std::{
7 io::{self},
8 sync::{Arc, Mutex},
9};
10
11use crate::msgs::{
12 request::InvalidRequestError,
13 response::{ResponseKind, RpcError, ValidatedResponse},
14 AnyRequestId, ObjectId,
15};
16
17mod auth;
18mod builder;
19mod connimpl;
20mod stream;
21
22use crate::util::Utf8CString;
23pub use builder::{BuilderError, ConnPtDescription, RpcConnBuilder};
24pub use connimpl::RpcConn;
25use serde::{de::DeserializeOwned, Deserialize};
26pub use stream::StreamError;
27use tor_rpc_connect::{auth::cookie::CookieAccessError, HasClientErrorAction};
28
29#[derive(educe::Educe)]
37#[educe(Debug)]
38pub struct RequestHandle {
39 #[educe(Debug(ignore))]
49 conn: Mutex<Arc<connimpl::Receiver>>,
50 id: AnyRequestId,
52}
53
54#[derive(Clone, Debug, derive_more::AsRef, derive_more::Into)]
74#[as_ref(forward)]
75pub struct SuccessResponse(Utf8CString);
76
77impl SuccessResponse {
78 fn decode<D: DeserializeOwned>(&self) -> Result<D, serde_json::Error> {
80 #[derive(Deserialize)]
82 struct Response<R> {
83 result: R,
85 }
86 let response: Response<D> = serde_json::from_str(self.as_ref())?;
87 Ok(response.result)
88 }
89}
90
91#[derive(Clone, Debug, derive_more::AsRef, derive_more::Into)]
99#[as_ref(forward)]
100pub struct UpdateResponse(Utf8CString);
101
102#[derive(Clone, Debug, derive_more::AsRef, derive_more::Into)]
116#[as_ref(forward)]
117pub struct ErrorResponse(Utf8CString);
119impl ErrorResponse {
120 pub(crate) fn from_validated_string(s: Utf8CString) -> Self {
124 ErrorResponse(s)
125 }
126
127 pub(crate) fn internal_error(&self, cmd: &str) -> ProtoError {
131 ProtoError::InternalRequestFailed(UnexpectedReply {
132 request: cmd.to_string(),
133 reply: self.to_string(),
134 problem: UnexpectedReplyProblem::ErrorNotExpected,
135 })
136 }
137
138 pub fn decode(&self) -> RpcError {
140 crate::msgs::response::try_decode_response_as_err(self.0.as_ref())
141 .expect("Could not decode response that was already decoded as an error?")
142 .expect("Could not extract error from response that was already decoded as an error?")
143 }
144}
145
146impl std::fmt::Display for ErrorResponse {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 let e = self.decode();
149 write!(f, "Peer said {:?}", e.message())
150 }
151}
152
153type FinalResponse = Result<SuccessResponse, ErrorResponse>;
156
157#[derive(Clone, Debug)]
159#[allow(clippy::exhaustive_structs)]
160pub enum AnyResponse {
161 Success(SuccessResponse),
163 Error(ErrorResponse),
165 Update(UpdateResponse),
167}
168impl AnyResponse {
171 fn from_validated(v: ValidatedResponse) -> Self {
173 match v.meta.kind {
176 ResponseKind::Error => AnyResponse::Error(ErrorResponse::from_validated_string(v.msg)),
177 ResponseKind::Success => AnyResponse::Success(SuccessResponse(v.msg)),
178 ResponseKind::Update => AnyResponse::Update(UpdateResponse(v.msg)),
179 }
180 }
181
182 #[cfg(feature = "ffi")]
184 pub(crate) fn into_string(self) -> Utf8CString {
185 match self {
186 AnyResponse::Success(m) => m.into(),
187 AnyResponse::Error(m) => m.into(),
188 AnyResponse::Update(m) => m.into(),
189 }
190 }
191}
192
193impl RpcConn {
194 pub fn session(&self) -> Option<&ObjectId> {
202 self.session.as_ref()
203 }
204
205 pub fn execute(&self, cmd: &str) -> Result<FinalResponse, ProtoError> {
214 let hnd = self.execute_with_handle(cmd)?;
215 hnd.wait()
216 }
217
218 pub(crate) fn execute_internal<T: DeserializeOwned>(
229 &self,
230 cmd: &str,
231 ) -> Result<Result<T, ErrorResponse>, ProtoError> {
232 match self.execute(cmd)? {
233 Ok(success) => match success.decode::<T>() {
234 Ok(result) => Ok(Ok(result)),
235 Err(json_error) => Err(ProtoError::InternalRequestFailed(UnexpectedReply {
236 request: cmd.to_string(),
237 reply: Utf8CString::from(success).to_string(),
238 problem: UnexpectedReplyProblem::CannotDecode(Arc::new(json_error)),
239 })),
240 },
241 Err(error) => Ok(Err(error)),
242 }
243 }
244
245 pub(crate) fn execute_internal_ok<T: DeserializeOwned>(
253 &self,
254 cmd: &str,
255 ) -> Result<T, ProtoError> {
256 match self.execute_internal(cmd)? {
257 Ok(v) => Ok(v),
258 Err(err_response) => Err(err_response.internal_error(cmd)),
259 }
260 }
261
262 pub fn cancel(&self, request_id: &AnyRequestId) -> Result<(), ProtoError> {
264 #[derive(serde::Serialize, Debug)]
266 struct CancelParams<'a> {
267 request_id: &'a AnyRequestId,
269 }
270
271 let request = crate::msgs::request::Request::new(
272 ObjectId::connection_id(),
273 "rpc:cancel",
274 CancelParams { request_id },
275 );
276 match self.execute_internal::<EmptyReply>(&request.encode()?)? {
277 Ok(EmptyReply {}) => Ok(()),
278 Err(_) => Err(ProtoError::RequestCompleted),
279 }
280 }
281
282 pub fn execute_with_handle(&self, cmd: &str) -> Result<RequestHandle, ProtoError> {
285 self.send_request(cmd)
286 }
287 pub fn execute_with_updates<F>(
289 &self,
290 cmd: &str,
291 mut update_cb: F,
292 ) -> Result<FinalResponse, ProtoError>
293 where
294 F: FnMut(UpdateResponse) + Send + Sync,
295 {
296 let hnd = self.execute_with_handle(cmd)?;
297 loop {
298 match hnd.wait_with_updates()? {
299 AnyResponse::Success(s) => return Ok(Ok(s)),
300 AnyResponse::Error(e) => return Ok(Err(e)),
301 AnyResponse::Update(u) => update_cb(u),
302 }
303 }
304 }
305
306 pub(crate) fn release_obj(&self, obj: ObjectId) -> Result<(), ProtoError> {
311 let release_request = crate::msgs::request::Request::new(obj, "rpc:release", NoParams {});
312 let _empty_response: EmptyReply = self.execute_internal_ok(&release_request.encode()?)?;
313 Ok(())
314 }
315
316 }
318
319impl RequestHandle {
320 pub fn id(&self) -> &AnyRequestId {
322 &self.id
323 }
324 pub fn wait(self) -> Result<FinalResponse, ProtoError> {
332 loop {
333 match self.wait_with_updates()? {
334 AnyResponse::Success(s) => return Ok(Ok(s)),
335 AnyResponse::Error(e) => return Ok(Err(e)),
336 AnyResponse::Update(_) => {}
337 }
338 }
339 }
340 pub fn wait_with_updates(&self) -> Result<AnyResponse, ProtoError> {
353 let conn = self.conn.lock().expect("Poisoned lock");
354 let validated = conn.wait_on_message_for(&self.id)?;
355
356 Ok(AnyResponse::from_validated(validated))
357 }
358
359 }
362
363#[derive(Clone, Debug, thiserror::Error)]
365#[non_exhaustive]
366pub enum ShutdownError {
367 #[error("Unable to read response")]
369 Read(#[source] Arc<io::Error>),
370 #[error("Unable to write request")]
372 Write(#[source] Arc<io::Error>),
373 #[error("Arti sent a message that didn't conform to the RPC protocol: {0:?}")]
375 ProtocolViolated(String),
376 #[error("Arti reported a fatal error: {0:?}")]
378 ProtocolViolationReport(ErrorResponse),
379 #[error("Connection closed")]
383 ConnectionClosed,
384}
385
386impl From<crate::msgs::response::DecodeResponseError> for ShutdownError {
387 fn from(value: crate::msgs::response::DecodeResponseError) -> Self {
388 use crate::msgs::response::DecodeResponseError::*;
389 use ShutdownError as E;
390 match value {
391 JsonProtocolViolation(e) => E::ProtocolViolated(e.to_string()),
392 ProtocolViolation(s) => E::ProtocolViolated(s.to_string()),
393 Fatal(rpc_err) => E::ProtocolViolationReport(rpc_err),
394 }
395 }
396}
397
398#[derive(Clone, Debug, thiserror::Error)]
400#[non_exhaustive]
401pub enum ProtoError {
402 #[error("RPC connection is shut down")]
404 Shutdown(#[from] ShutdownError),
405
406 #[error("Invalid request")]
408 InvalidRequest(#[from] InvalidRequestError),
409
410 #[error("Request ID already in use.")]
412 RequestIdInUse,
413
414 #[error("Request has already completed (or failed)")]
416 RequestCompleted,
417
418 #[error("Internal error: waiting on the same request more than once at a time.")]
422 DuplicateWait,
423
424 #[error("Internal error while encoding request")]
428 CouldNotEncode(#[source] Arc<serde_json::Error>),
429
430 #[error("{0}")]
432 InternalRequestFailed(#[source] UnexpectedReply),
433}
434
435#[derive(Clone, Debug, thiserror::Error)]
437pub struct ConnectFailure {
438 declined: Vec<(builder::ConnPtDescription, ConnectError)>,
440 final_desc: Option<builder::ConnPtDescription>,
442 #[source]
447 pub(crate) final_error: ConnectError,
448}
449
450impl std::fmt::Display for ConnectFailure {
451 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
452 write!(f, "Unable to connect")?;
453 if !self.declined.is_empty() {
454 write!(
455 f,
456 " ({} attempts failed{})",
457 self.declined.len(),
458 if matches!(self.final_error, ConnectError::AllAttemptsDeclined) {
459 ""
460 } else {
461 " before fatal error"
462 }
463 )?;
464 }
465 Ok(())
466 }
467}
468
469impl ConnectFailure {
470 pub fn fatal_error_origin(&self) -> Option<&builder::ConnPtDescription> {
473 self.final_desc.as_ref()
474 }
475
476 pub fn declined_attempt_outcomes(
479 &self,
480 ) -> impl Iterator<Item = (&builder::ConnPtDescription, &ConnectError)> {
481 self.declined.iter().map(|(a, b)| (a, b))
483 }
484
485 pub fn display_verbose(&self) -> ConnectFailureVerboseFmt<'_> {
490 ConnectFailureVerboseFmt(self)
491 }
492}
493
494#[derive(Debug, Clone)]
497pub struct ConnectFailureVerboseFmt<'a>(&'a ConnectFailure);
498
499impl<'a> std::fmt::Display for ConnectFailureVerboseFmt<'a> {
500 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
501 use tor_error::ErrorReport as _;
502 writeln!(f, "{}:", self.0)?;
503 for (idx, (origin, error)) in self.0.declined_attempt_outcomes().enumerate() {
504 writeln!(f, " {}. {}: {}", idx + 1, origin, error.report())?;
505 }
506 if let Some(origin) = self.0.fatal_error_origin() {
507 writeln!(
508 f,
509 " {}. [FATAL] {}: {}",
510 self.0.declined.len() + 1,
511 origin,
512 self.0.final_error.report()
513 )?;
514 } else {
515 writeln!(f, " - {}", self.0.final_error.report())?;
516 }
517 Ok(())
518 }
519}
520
521#[derive(Clone, Debug, thiserror::Error)]
523#[non_exhaustive]
524pub enum ConnectError {
525 #[error("Cannot parse connect points from environment variable")]
527 BadEnvironment,
528 #[error("Unable to load and parse connect point")]
530 CannotParse(#[from] tor_rpc_connect::load::LoadError),
531 #[error("Unable to resolve connect point path")]
533 CannotResolvePath(#[source] tor_config_path::CfgPathError),
534 #[error("Unable to resolve connect point")]
536 CannotResolveConnectPoint(#[from] tor_rpc_connect::ResolveError),
537 #[error("Unable to make a connection")]
539 CannotConnect(#[from] tor_rpc_connect::ConnectError),
540 #[error("Did not receive expected banner message upon connecting")]
545 InvalidBanner,
546 #[error("All connect points were declined (or there were none)")]
548 AllAttemptsDeclined,
549 #[error("Connect file was given as a relative path.")]
552 RelativeConnectFile,
553 #[error("Received an error while trying to authenticate: {0}")]
555 AuthenticationFailed(ErrorResponse),
556 #[error("Authentication type is not supported")]
558 AuthenticationNotSupported,
559 #[error("Message not in expected format")]
561 BadMessage(#[source] Arc<serde_json::Error>),
562 #[error("Error while negotiating with Arti")]
564 ProtoError(#[from] ProtoError),
565 #[error("We connected to the server at {ours}, but it thinks it's listening at {theirs}")]
568 ServerAddressMismatch {
569 ours: String,
571 theirs: String,
573 },
574 #[error("Server's cookie MAC was not as expected.")]
576 CookieMismatch,
577 #[error("Unable to load secret cookie value")]
579 LoadCookie(#[from] CookieAccessError),
580}
581
582impl HasClientErrorAction for ConnectError {
583 fn client_action(&self) -> tor_rpc_connect::ClientErrorAction {
584 use tor_rpc_connect::ClientErrorAction as A;
585 use ConnectError as E;
586 match self {
587 E::BadEnvironment => A::Abort,
588 E::CannotParse(e) => e.client_action(),
589 E::CannotResolvePath(_) => A::Abort,
590 E::CannotResolveConnectPoint(e) => e.client_action(),
591 E::CannotConnect(e) => e.client_action(),
592 E::InvalidBanner => A::Decline,
593 E::RelativeConnectFile => A::Abort,
594 E::AuthenticationFailed(_) => A::Decline,
595 E::BadMessage(_) => A::Abort,
598 E::ProtoError(e) => e.client_action(),
599 E::AllAttemptsDeclined => A::Abort,
600 E::AuthenticationNotSupported => A::Decline,
601 E::ServerAddressMismatch { .. } => A::Abort,
602 E::CookieMismatch => A::Abort,
603 E::LoadCookie(e) => e.client_action(),
604 }
605 }
606}
607
608impl HasClientErrorAction for ProtoError {
609 fn client_action(&self) -> tor_rpc_connect::ClientErrorAction {
610 use tor_rpc_connect::ClientErrorAction as A;
611 use ProtoError as E;
612 match self {
613 E::Shutdown(_) => A::Decline,
614 E::InternalRequestFailed(_) => A::Decline,
615 E::InvalidRequest(_)
618 | E::RequestIdInUse
619 | E::RequestCompleted
620 | E::DuplicateWait
621 | E::CouldNotEncode(_) => A::Abort,
622 }
623 }
624}
625
626#[derive(Clone, Debug, thiserror::Error)]
632#[error("In response to our request {request:?}, Arti gave the unexpected reply {reply:?}")]
633pub struct UnexpectedReply {
634 request: String,
636 reply: String,
638 #[source]
640 problem: UnexpectedReplyProblem,
641}
642
643#[derive(Clone, Debug, thiserror::Error)]
645enum UnexpectedReplyProblem {
646 #[error("Cannot decode as correct JSON type")]
649 CannotDecode(Arc<serde_json::Error>),
650 #[error("Unexpected error")]
652 ErrorNotExpected,
653}
654
655#[derive(serde::Serialize, Debug)]
657struct NoParams {}
658
659#[derive(serde::Deserialize, Debug)]
661struct EmptyReply {}
662
663#[cfg(test)]
664mod test {
665 #![allow(clippy::bool_assert_comparison)]
667 #![allow(clippy::clone_on_copy)]
668 #![allow(clippy::dbg_macro)]
669 #![allow(clippy::mixed_attributes_style)]
670 #![allow(clippy::print_stderr)]
671 #![allow(clippy::print_stdout)]
672 #![allow(clippy::single_char_pattern)]
673 #![allow(clippy::unwrap_used)]
674 #![allow(clippy::unchecked_duration_subtraction)]
675 #![allow(clippy::useless_vec)]
676 #![allow(clippy::needless_pass_by_value)]
677 use std::{sync::atomic::AtomicUsize, thread, time::Duration};
680
681 use io::{BufRead as _, BufReader, Write as _};
682 use rand::{seq::SliceRandom as _, Rng as _, SeedableRng as _};
683 use tor_basic_utils::{test_rng::testing_rng, RngExt as _};
684
685 use crate::{
686 llconn,
687 msgs::request::{JsonMap, Request, ValidatedRequest},
688 };
689
690 use super::*;
691
692 fn dummy_connected() -> (RpcConn, crate::testing::SocketpairStream) {
694 let (s1, s2) = crate::testing::construct_socketpair().unwrap();
695 let s1_w = s1.try_clone().unwrap();
696 let s1_r = io::BufReader::new(s1);
697 let conn = RpcConn::new(llconn::Reader::new(s1_r), llconn::Writer::new(s1_w));
698
699 (conn, s2)
700 }
701
702 fn write_val(w: &mut impl io::Write, v: &serde_json::Value) {
703 let mut enc = serde_json::to_string(v).unwrap();
704 enc.push('\n');
705 w.write_all(enc.as_bytes()).unwrap();
706 }
707
708 #[test]
709 fn simple() {
710 let (conn, sock) = dummy_connected();
711
712 let user_thread = thread::spawn(move || {
713 let response1 = conn
714 .execute_internal_ok::<JsonMap>(
715 r#"{"obj":"fred","method":"arti:x-frob","params":{}}"#,
716 )
717 .unwrap();
718 (response1, conn)
719 });
720
721 let fake_arti_thread = thread::spawn(move || {
722 let mut sock = BufReader::new(sock);
723 let mut s = String::new();
724 let _len = sock.read_line(&mut s).unwrap();
725 let request = ValidatedRequest::from_string_strict(s.as_ref()).unwrap();
726 let response = serde_json::json!({
727 "id": request.id().clone(),
728 "result": { "xyz" : 3 }
729 });
730 write_val(sock.get_mut(), &response);
731 sock });
733
734 let _sock = fake_arti_thread.join().unwrap();
735 let (map, _conn) = user_thread.join().unwrap();
736 assert_eq!(map.get("xyz"), Some(&serde_json::Value::Number(3.into())));
737 }
738
739 #[test]
740 fn complex() {
741 use std::sync::atomic::Ordering::SeqCst;
742 let n_threads = 16;
743 let n_commands_per_thread = 128;
744 let n_commands_total = n_threads * n_commands_per_thread;
745 let n_completed = Arc::new(AtomicUsize::new(0));
746
747 let (conn, sock) = dummy_connected();
748 let conn = Arc::new(conn);
749 let mut user_threads = Vec::new();
750 let mut rng = testing_rng();
751
752 for th_idx in 0..n_threads {
755 let conn = Arc::clone(&conn);
756 let n_completed = Arc::clone(&n_completed);
757 let mut rng = rand_chacha::ChaCha12Rng::from_seed(rng.gen());
758 let th = thread::spawn(move || {
759 for cmd_idx in 0..n_commands_per_thread {
760 let s = format!("{}:{}", th_idx, cmd_idx);
765 let want_updates: bool = rng.gen();
766 let want_failure: bool = rng.gen();
767 let req = serde_json::json!({
768 "obj":"fred",
769 "method":"arti:x-echo",
770 "meta": {
771 "updates": want_updates,
772 },
773 "params": {
774 "val": &s,
775 "fail": want_failure,
776 },
777 });
778 let req = serde_json::to_string(&req).unwrap();
779
780 let mut n_updates = 0;
782 let outcome = conn
783 .execute_with_updates(&req, |_update| {
784 n_updates += 1;
785 })
786 .unwrap();
787 assert_eq!(n_updates > 0, want_updates);
788
789 if want_failure {
791 let e = outcome.unwrap_err().decode();
792 assert_eq!(e.message(), "You asked me to fail");
793 assert_eq!(i32::from(e.code()), 33);
794 assert_eq!(
795 e.kinds_iter().collect::<Vec<_>>(),
796 vec!["Example".to_string()]
797 );
798 } else {
799 let success = outcome.unwrap();
800 let map = success.decode::<JsonMap>().unwrap();
801 assert_eq!(map.get("echo"), Some(&serde_json::Value::String(s)));
802 }
803 n_completed.fetch_add(1, SeqCst);
804 if rng.gen::<f32>() < 0.02 {
805 thread::sleep(Duration::from_millis(3));
806 }
807 }
808 });
809 user_threads.push(th);
810 }
811
812 #[derive(serde::Deserialize, Debug)]
813 struct Echo {
814 val: String,
815 fail: bool,
816 }
817
818 let worker_rng = rand_chacha::ChaCha12Rng::from_seed(rng.gen());
821 let worker_thread = thread::spawn(move || {
822 let mut rng = worker_rng;
823 let mut sock = BufReader::new(sock);
824 let mut pending: Vec<Request<Echo>> = Vec::new();
825 let mut n_received = 0;
826
827 let scramble_factor = 7;
829 let scramble_threshold =
833 n_commands_total - (n_commands_per_thread + 1) * scramble_factor;
834
835 'outer: loop {
836 let flush_pending_at = if n_received >= scramble_threshold {
837 1
838 } else {
839 scramble_factor
840 };
841
842 while pending.len() < flush_pending_at {
844 let mut buf = String::new();
845 if sock.read_line(&mut buf).unwrap() == 0 {
846 break 'outer;
847 }
848 n_received += 1;
849 let req: Request<Echo> = serde_json::from_str(&buf).unwrap();
850 pending.push(req);
851 }
852
853 let mut handling = std::mem::take(&mut pending);
855 handling.shuffle(&mut rng);
856
857 for req in handling {
858 if req.meta.unwrap_or_default().updates {
859 let n_updates = rng.gen_range_checked(1..4).unwrap();
860 for _ in 0..n_updates {
861 let up = serde_json::json!({
862 "id": req.id.clone(),
863 "update": {
864 "hello": req.params.val.clone(),
865 }
866 });
867 write_val(sock.get_mut(), &up);
868 }
869 }
870
871 let response = if req.params.fail {
872 serde_json::json!({
873 "id": req.id.clone(),
874 "error": { "message": "You asked me to fail", "code": 33, "kinds": ["Example"], "data": req.params.val },
875 })
876 } else {
877 serde_json::json!({
878 "id": req.id.clone(),
879 "result": {
880 "echo": req.params.val
881 }
882 })
883 };
884 write_val(sock.get_mut(), &response);
885 }
886 }
887 });
888 drop(conn);
889 for t in user_threads {
890 t.join().unwrap();
891 }
892
893 worker_thread.join().unwrap();
894
895 assert_eq!(n_completed.load(SeqCst), n_commands_total);
896 }
897
898 #[test]
899 fn arti_socket_closed() {
900 let n_threads = 16;
904
905 let (conn, sock) = dummy_connected();
906 let conn = Arc::new(conn);
907 let mut user_threads = Vec::new();
908 for _ in 0..n_threads {
909 let conn = Arc::clone(&conn);
910 let th = thread::spawn(move || {
911 let req = serde_json::json!({
914 "obj":"fred",
915 "method":"arti:x-echo",
916 "params":{}
917 });
918 let req = serde_json::to_string(&req).unwrap();
919 let outcome = conn.execute(&req);
920 if !matches!(
921 &outcome,
922 Err(ProtoError::Shutdown(ShutdownError::Write(_)))
923 | Err(ProtoError::Shutdown(ShutdownError::Read(_))),
924 ) {
925 dbg!(&outcome);
926 }
927
928 assert!(matches!(
929 outcome,
930 Err(ProtoError::Shutdown(ShutdownError::Write(_)))
931 | Err(ProtoError::Shutdown(ShutdownError::Read(_)))
932 | Err(ProtoError::Shutdown(ShutdownError::ConnectionClosed))
933 ));
934 });
935 user_threads.push(th);
936 }
937
938 drop(sock);
939
940 for t in user_threads {
941 t.join().unwrap();
942 }
943 }
944
945 fn proto_err_with_msg<F>(msg: &str, outcome_ok: F)
949 where
950 F: Fn(ProtoError) -> bool,
951 {
952 let n_threads = 16;
953
954 let (conn, mut sock) = dummy_connected();
955 let conn = Arc::new(conn);
956 let mut user_threads = Vec::new();
957 for _ in 0..n_threads {
958 let conn = Arc::clone(&conn);
959 let th = thread::spawn(move || {
960 let req = serde_json::json!({
963 "obj":"fred",
964 "method":"arti:x-echo",
965 "params":{}
966 });
967 let req = serde_json::to_string(&req).unwrap();
968 conn.execute(&req)
969 });
970 user_threads.push(th);
971 }
972
973 sock.write_all(msg.as_bytes()).unwrap();
974
975 for t in user_threads {
976 let outcome = t.join().unwrap();
977 assert!(outcome_ok(outcome.unwrap_err()));
978 }
979 }
980
981 #[test]
982 fn syntax_error() {
983 proto_err_with_msg("this is not json\n", |outcome| {
984 matches!(
985 outcome,
986 ProtoError::Shutdown(ShutdownError::ProtocolViolated(_))
987 )
988 });
989 }
990
991 #[test]
992 fn fatal_error() {
993 let j = serde_json::json!({
994 "error":{ "message": "This test is doomed", "code": 413, "kinds": ["Example"], "data": {} },
995 });
996 let mut s = serde_json::to_string(&j).unwrap();
997 s.push('\n');
998
999 proto_err_with_msg(&s, |outcome| {
1000 matches!(
1001 outcome,
1002 ProtoError::Shutdown(ShutdownError::ProtocolViolationReport(_))
1003 )
1004 });
1005 }
1006}