1use crate::err;
8use crate::err::PtError;
9use crate::PtClientMethod;
10use futures::channel::mpsc::Receiver;
11use futures::StreamExt;
12use itertools::Itertools;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::ffi::OsString;
16use std::io::{BufRead, BufReader};
17use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
18use std::path::PathBuf;
19use std::process::{Child, Command, Stdio};
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{io, thread};
24use tor_basic_utils::PathExt as _;
25use tor_error::{internal, warn_report};
26use tor_linkspec::PtTransportName;
27use tor_rtcompat::{Runtime, SleepProviderExt};
28use tor_socksproto::SocksVersion;
29use tracing::{debug, error, info, trace, warn};
30
31const GRACEFUL_EXIT_TIME: Duration = Duration::from_secs(5);
33const PT_START_TIMEOUT: Duration = Duration::from_secs(30);
35const PT_STDIO_BUFFER: usize = 64;
37
38#[derive(PartialEq, Eq, Debug, Clone)]
40pub struct PtStatus {
41 data: HashMap<String, String>,
45}
46
47#[derive(PartialEq, Eq, Debug, Clone)]
51#[non_exhaustive]
52#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
53pub enum PtMessage {
54 VersionError(String),
56 Version(String),
58 EnvError(String),
60 ProxyDone,
62 ProxyError(String),
64 ClientTransportLaunched {
66 transport: PtTransportName,
68 protocol: String,
70 endpoint: SocketAddr,
73 },
74 ClientTransportFailed {
76 transport: PtTransportName,
78 message: String,
80 },
81 ClientTransportsDone,
83 ServerTransportLaunched {
85 transport: PtTransportName,
87 endpoint: SocketAddr,
89 options: HashMap<String, String>,
92 },
93 ServerTransportFailed {
95 transport: PtTransportName,
97 message: String,
99 },
100 ServerTransportsDone,
102 Log {
104 severity: String,
106 message: String,
108 },
109 Status(PtStatus),
111 Unknown(String),
113}
114
115fn parse_one_value(from: &str) -> Result<(String, &str), &'static str> {
118 let first_char = from.chars().next();
119 Ok(if first_char.is_none() {
120 (String::new(), "")
121 } else if let Some('"') = first_char {
122 let mut ret = String::new();
126 let mut chars = from.chars();
127 assert_eq!(chars.next(), Some('"')); loop {
129 let ch = chars.next().ok_or("ran out of input parsing CString")?;
130 match ch {
131 '\\' => match chars
132 .next()
133 .ok_or("encountered trailing backslash in CString")?
134 {
135 'n' => ret.push('\n'),
136 'r' => ret.push('\r'),
137 't' => ret.push('\t'),
138 '0'..='8' => return Err("attempted unsupported octal escape code"),
139 ch2 => ret.push(ch2),
140 },
141 '"' => break,
142 _ => ret.push(ch),
143 }
144 }
145 (ret, chars.as_str())
146 } else {
147 let space = from.find(' ').unwrap_or(from.len());
149 (from[0..space].into(), &from[space..])
150 })
151}
152
153fn parse_one_smethod_arg(args: &str) -> Result<(String, String, &str), &'static str> {
157 let mut key = String::new();
161 let mut val = String::new();
162 let mut reading_val = false;
164 let mut chars = args.chars();
165 while let Some(c) = chars.next() {
166 let target = if reading_val { &mut val } else { &mut key };
167 match c {
168 '\\' => {
169 let c = chars
170 .next()
171 .ok_or("smethod arg terminates with backslash")?;
172 target.push(c);
173 }
174 '=' => {
175 if reading_val {
176 return Err("encountered = while parsing value");
177 }
178 reading_val = true;
179 }
180 ',' => break,
181 c => target.push(c),
182 }
183 }
184 if !reading_val {
185 return Err("ran out of chars parsing smethod arg");
186 }
187 Ok((key, val, chars.as_str()))
188}
189
190impl FromStr for PtMessage {
191 type Err = Cow<'static, str>;
192
193 #[allow(clippy::cognitive_complexity)]
196 fn from_str(s: &str) -> Result<Self, Self::Err> {
197 let mut words = s.split(' ');
200 let first_word = words.next().ok_or_else(|| Cow::from("empty line"))?;
201 Ok(match first_word {
202 "VERSION-ERROR" => {
203 let rest = words.join(" ");
204 Self::VersionError(rest)
205 }
206 "VERSION" => {
207 let vers = words.next().ok_or_else(|| Cow::from("no version"))?;
208 Self::Version(vers.into())
209 }
210 "ENV-ERROR" => {
211 let rest = words.join(" ");
212 Self::EnvError(rest)
213 }
214 "PROXY" => match words.next() {
215 Some("DONE") => Self::ProxyDone,
216 _ => Self::Unknown(s.into()),
217 },
218 "PROXY-ERROR" => {
219 let rest = words.join(" ");
220 Self::ProxyError(rest)
221 }
222 "CMETHOD" => {
223 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
224 let protocol = words.next().ok_or_else(|| Cow::from("no protocol"))?;
225 let endpoint = words
226 .next()
227 .ok_or_else(|| Cow::from("no endpoint"))?
228 .parse::<SocketAddr>()
229 .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
230 if !endpoint.ip().is_loopback() {
231 return Err(Cow::from(format!(
232 "CMETHOD endpoint {endpoint} was not localhost"
233 )));
234 }
235 Self::ClientTransportLaunched {
236 transport: transport
237 .parse()
238 .map_err(|_| Cow::from("bad transport ID"))?,
239 protocol: protocol.to_string(),
240 endpoint,
241 }
242 }
243 "CMETHOD-ERROR" => {
244 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
245 let rest = words.join(" ");
246 Self::ClientTransportFailed {
247 transport: transport
248 .parse()
249 .map_err(|_| Cow::from("bad transport ID"))?,
250 message: rest,
251 }
252 }
253 "CMETHODS" => match words.next() {
254 Some("DONE") => Self::ClientTransportsDone,
255 _ => Self::Unknown(s.into()),
256 },
257 "SMETHOD" => {
258 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
259 let endpoint = words
260 .next()
261 .ok_or_else(|| Cow::from("no endpoint"))?
262 .parse::<SocketAddr>()
263 .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
264 let mut parsed_args = HashMap::new();
266
267 for option in words {
273 if let Some(mut args) = option.strip_prefix("ARGS:") {
274 while !args.is_empty() {
275 let (k, v, rest) = parse_one_smethod_arg(args).map_err(|e| {
276 Cow::from(format!("failed to parse SMETHOD ARGS: {}", e))
277 })?;
278 if parsed_args.contains_key(&k) {
279 warn!("PT SMETHOD arguments contain repeated key {}!", k);
282 }
283 parsed_args.insert(k, v);
284 args = rest;
285 }
286 }
287 }
288 Self::ServerTransportLaunched {
289 transport: transport
290 .parse()
291 .map_err(|_| Cow::from("bad transport ID"))?,
292 endpoint,
293 options: parsed_args,
294 }
295 }
296 "SMETHOD-ERROR" => {
297 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
298 let rest = words.join(" ");
299 Self::ServerTransportFailed {
300 transport: transport
301 .parse()
302 .map_err(|_| Cow::from("bad transport ID"))?,
303 message: rest,
304 }
305 }
306 "SMETHODS" => match words.next() {
307 Some("DONE") => Self::ServerTransportsDone,
308 _ => Self::Unknown(s.into()),
309 },
310 "LOG" => {
311 let severity = words
312 .next()
313 .ok_or_else(|| Cow::from("no severity"))?
314 .strip_prefix("SEVERITY=")
315 .ok_or_else(|| Cow::from("badly formatted severity"))?;
316 let message = words.join(" ");
317 let message = parse_one_value(
318 message
319 .strip_prefix("MESSAGE=")
320 .ok_or_else(|| Cow::from("no or badly formatted message"))?,
321 )
322 .map_err(Cow::from)?
323 .0;
324 Self::Log {
325 severity: severity.into(),
326 message,
327 }
328 }
329 "STATUS" => {
330 let mut ret = HashMap::new();
331 let message = words.join(" ");
332 let mut message = &message as &str;
333 while !message.is_empty() {
334 let equals = message
335 .find('=')
336 .ok_or_else(|| Cow::from(format!("failed to find = in '{}'", message)))?;
337 let k = &message[..equals];
338 if equals + 1 == message.len() {
339 return Err(Cow::from("key with no value"));
340 }
341 let (v, rest) = parse_one_value(&message[(equals + 1)..]).map_err(Cow::from)?;
342 if ret.contains_key(k) {
343 warn!("STATUS contains repeated key {}!", k);
346 }
347 ret.insert(k.to_owned(), v);
348 message = rest;
349 if message.starts_with(' ') {
350 message = &message[1..];
351 }
352 }
353 Self::Status(PtStatus { data: ret })
354 }
355 _ => Self::Unknown(s.into()),
356 })
357 }
358}
359
360use sealed::*;
361pub(crate) mod sealed {
363 use super::*;
364
365 #[derive(Debug)]
370 pub struct AsyncPtChild {
371 stdout: Receiver<io::Result<String>>,
373 pub identifier: String,
375 }
376
377 impl AsyncPtChild {
378 pub fn new(mut child: Child, identifier: String) -> Result<Self, PtError> {
381 let (stdin, stdout) = (
382 child.stdin.take().ok_or_else(|| {
383 PtError::Internal(internal!("Created child process without stdin pipe"))
384 })?,
385 child.stdout.take().ok_or_else(|| {
386 PtError::Internal(internal!("Created child process without stdout pipe"))
387 })?,
388 );
389 let (mut tx, rx) = tor_async_utils::mpsc_channel_no_memquota(PT_STDIO_BUFFER);
391 let ident = identifier.clone();
392 #[allow(clippy::cognitive_complexity)]
393 thread::spawn(move || {
394 let reader = BufReader::new(stdout);
395 let _stdin = stdin;
396 let mut noted_full = false;
397 for line in reader.lines() {
399 let err = line.is_err();
400 match &line {
401 Ok(l) => trace!("<-- PT {}: {:?}", ident, l),
402 Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e),
403 }
404 if let Err(e) = tx.try_send(line) {
405 if e.is_disconnected() {
406 debug!("PT {} is disconnected; shutting it down.", ident);
407 break;
409 }
410 if !noted_full {
413 noted_full = true; warn!(
415 "Bug: Message queue for PT {} became full; dropping message",
416 ident
417 );
418 }
419 }
420 if err {
421 break;
425 }
426 }
427 if let Ok(Some(_)) = child.try_wait() {
429 debug!("PT {} has exited.", ident);
432 return;
433 }
434 trace!("Asking PT {} to exit, nicely.", ident);
438 drop(_stdin);
439 thread::sleep(GRACEFUL_EXIT_TIME);
441 match child.try_wait() {
442 Ok(None) => {
443 debug!("Sending kill signal to PT {}", ident);
445 if let Err(e) = child.kill() {
446 warn_report!(e, "Failed to kill() spawned PT {}", ident);
447 }
448 }
449 Ok(Some(_)) => {
450 debug!("PT {} shut down successfully.", ident);
451 } Err(e) => {
453 warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident);
454 }
455 }
456 });
457 Ok(AsyncPtChild {
458 stdout: rx,
459 identifier,
460 })
461 }
462
463 #[allow(clippy::cognitive_complexity)] pub async fn recv(&mut self) -> err::Result<PtMessage> {
468 loop {
469 match self.stdout.next().await {
470 None => return Err(PtError::ChildGone),
471 Some(Ok(line)) => {
472 let line =
473 line.parse::<PtMessage>()
474 .map_err(|e| PtError::IpcParseFailed {
475 line,
476 error: e.into(),
477 })?;
478 if let PtMessage::Log { severity, message } = line {
479 match &severity as &str {
482 "error" => error!("[pt {}] {}", self.identifier, message),
483 "warning" => warn!("[pt {}] {}", self.identifier, message),
484 "notice" => info!("[pt {}] {}", self.identifier, message),
485 "info" => debug!("[pt {}] {}", self.identifier, message),
486 "debug" => trace!("[pt {}] {}", self.identifier, message),
487 x => warn!("[pt] {} {} {}", self.identifier, x, message),
488 }
489 } else {
490 return Ok(line);
491 }
492 }
493 Some(Err(e)) => {
494 return Err(PtError::ChildReadFailed(Arc::new(e)));
495 }
496 }
497 }
498 }
499 }
500
501 #[async_trait::async_trait]
503 pub trait PluggableTransportPrivate {
504 fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError>;
506
507 fn set_inner(&mut self, newval: Option<AsyncPtChild>);
509
510 fn identifier(&self) -> &str;
512
513 fn specific_params_contains(&self, transport: &PtTransportName) -> bool;
515
516 fn common_transport_launched_handler(
518 &self,
519 protocol: Option<String>,
520 transport: PtTransportName,
521 endpoint: SocketAddr,
522 methods: &mut HashMap<PtTransportName, PtClientMethod>,
523 ) -> Result<(), PtError> {
524 if !self.specific_params_contains(&transport) {
525 return Err(PtError::ProtocolViolation(format!(
526 "binary launched unwanted transport '{}'",
527 transport
528 )));
529 }
530 let protocol = match protocol {
531 Some(protocol_str) => match &protocol_str as &str {
532 "socks4" => SocksVersion::V4,
533 "socks5" => SocksVersion::V5,
534 x => {
535 return Err(PtError::ProtocolViolation(format!(
536 "unknown CMETHOD protocol '{}'",
537 x
538 )))
539 }
540 },
541 None => SocksVersion::V5,
542 };
543 let method = PtClientMethod {
544 kind: protocol,
545 endpoint,
546 };
547 info!("Transport '{}' uses method {:?}", transport, method);
548 methods.insert(transport, method);
549 Ok(())
550 }
551
552 fn get_child_from_pt_launch(
554 inner: &Option<AsyncPtChild>,
555 transports: &Vec<PtTransportName>,
556 binary_path: &PathBuf,
557 arguments: &[String],
558 all_env_vars: HashMap<OsString, OsString>,
559 ) -> Result<AsyncPtChild, PtError> {
560 if inner.is_some() {
561 let warning_msg =
562 format!("Attempted to launch PT binary for {:?} twice.", transports);
563 warn!("{warning_msg}");
564 return Err(PtError::ChildProtocolViolation(warning_msg));
566 }
567 info!(
568 "Launching pluggable transport at {} for {:?}",
569 binary_path.display_lossy(),
570 transports
571 );
572 let child = Command::new(binary_path)
573 .args(arguments.iter())
574 .envs(all_env_vars)
575 .stdout(Stdio::piped())
576 .stdin(Stdio::piped())
577 .spawn()
578 .map_err(|e| PtError::ChildSpawnFailed {
579 path: binary_path.clone(),
580 error: Arc::new(e),
581 })?;
582
583 let identifier = crate::managed::pt_identifier(binary_path)?;
584 AsyncPtChild::new(child, identifier)
585 }
586
587 async fn try_match_common_messages<R: Runtime>(
596 &self,
597 rt: &R,
598 deadline: Instant,
599 async_child: &mut AsyncPtChild,
600 ) -> Result<Option<PtMessage>, PtError> {
601 match rt
602 .timeout(
603 deadline.saturating_duration_since(Instant::now()),
605 async_child.recv(),
606 )
607 .await
608 .map_err(|_| PtError::Timeout)??
609 {
610 PtMessage::ClientTransportFailed { transport, message }
611 | PtMessage::ServerTransportFailed { transport, message } => {
612 warn!(
613 "PT {} unable to launch {}. It said: {:?}",
614 async_child.identifier, transport, message
615 );
616 return Err(PtError::TransportGaveError {
617 transport: transport.to_string(),
618 message,
619 });
620 }
621 PtMessage::VersionError(e) => {
622 if e != "no-version" {
623 warn!("weird VERSION-ERROR: {}", e);
624 }
625 return Err(PtError::UnsupportedVersion);
626 }
627 PtMessage::Version(vers) => {
628 if vers != "1" {
629 return Err(PtError::ProtocolViolation(format!(
630 "stated version is {}, asked for 1",
631 vers
632 )));
633 }
634 Ok(None)
635 }
636 PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)),
637 PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)),
638 PtMessage::Status(_) => Ok(None),
640 PtMessage::Unknown(x) => {
641 warn!("unknown PT line: {}", x);
642 Ok(None)
643 }
644 x => {
647 return Ok(Some(x));
648 }
649 }
650 }
651 }
652}
653
654#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
656pub struct PtCommonParameters {
657 state_location: PathBuf,
659 #[builder(default)]
663 outbound_bind_v4: Option<Ipv4Addr>,
664 #[builder(default)]
668 outbound_bind_v6: Option<Ipv6Addr>,
669 #[builder(default)]
672 timeout: Option<Duration>,
673}
674
675impl PtCommonParameters {
676 pub fn builder() -> PtCommonParametersBuilder {
678 PtCommonParametersBuilder::default()
679 }
680
681 fn common_environment_variables(&self) -> HashMap<OsString, OsString> {
684 let mut ret = HashMap::new();
685 ret.insert("TOR_PT_MANAGED_TRANSPORT_VER".into(), "1".into());
686 ret.insert(
687 "TOR_PT_STATE_LOCATION".into(),
688 self.state_location.clone().into_os_string(),
689 );
690 ret.insert("TOR_PT_EXIT_ON_STDIN_CLOSE".into(), "1".into());
691 if let Some(v4) = self.outbound_bind_v4 {
692 ret.insert(
693 "TOR_PT_OUTBOUND_BIND_ADDRESS_V4".into(),
694 v4.to_string().into(),
695 );
696 }
697 if let Some(v6) = self.outbound_bind_v6 {
698 ret.insert(
700 "TOR_PT_OUTBOUND_BIND_ADDRESS_V6".into(),
701 format!("[{}]", v6).into(),
702 );
703 }
704 ret
705 }
706}
707
708#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
710pub struct PtClientParameters {
711 #[builder(default)]
713 proxy_uri: Option<String>,
714 transports: Vec<PtTransportName>,
718}
719
720impl PtClientParameters {
721 pub fn builder() -> PtClientParametersBuilder {
723 PtClientParametersBuilder::default()
724 }
725
726 fn environment_variables(
729 &self,
730 common_params: &PtCommonParameters,
731 ) -> HashMap<OsString, OsString> {
732 let mut ret = common_params.common_environment_variables();
733 if let Some(ref proxy_uri) = self.proxy_uri {
734 ret.insert("TOR_PT_PROXY".into(), proxy_uri.clone().into());
735 }
736 ret.insert(
737 "TOR_PT_CLIENT_TRANSPORTS".into(),
738 self.transports.iter().join(",").into(),
739 );
740 ret
741 }
742}
743
744#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
746pub struct PtServerParameters {
747 transports: Vec<PtTransportName>,
751 #[builder(default)]
753 server_transport_options: String,
754 #[builder(default)]
756 server_bindaddr: String,
757 #[builder(default)]
759 server_orport: Option<String>,
760 #[builder(default)]
762 server_extended_orport: Option<String>,
763}
764
765impl PtServerParameters {
766 pub fn builder() -> PtServerParametersBuilder {
768 PtServerParametersBuilder::default()
769 }
770
771 fn environment_variables(
774 &self,
775 common_params: &PtCommonParameters,
776 ) -> HashMap<OsString, OsString> {
777 let mut ret = common_params.common_environment_variables();
778 ret.insert(
779 "TOR_PT_SERVER_TRANSPORTS".into(),
780 self.transports.iter().join(",").into(),
781 );
782 ret.insert(
783 "TOR_PT_SERVER_TRANSPORT_OPTIONS".into(),
784 self.server_transport_options.clone().into(),
785 );
786 ret.insert(
787 "TOR_PT_SERVER_BINDADDR".into(),
788 self.server_bindaddr.clone().into(),
789 );
790 if let Some(ref server_orport) = self.server_orport {
791 ret.insert("TOR_PT_ORPORT".into(), server_orport.into());
792 }
793 if let Some(ref server_extended_orport) = self.server_extended_orport {
794 ret.insert(
795 "TOR_PT_EXTENDED_SERVER_PORT".into(),
796 server_extended_orport.into(),
797 );
798 }
799 ret
800 }
801}
802
803#[async_trait::async_trait]
805#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
806pub trait PluggableTransport: PluggableTransportPrivate {
807 fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod>;
812
813 async fn next_message(&mut self) -> err::Result<PtMessage> {
821 let inner = self.inner()?;
822 let ret = inner.recv().await;
823 if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = &ret {
824 debug!(
827 "PT {}: Received {:?}; shutting down.",
828 self.identifier(),
829 ret
830 );
831 self.set_inner(None);
832 }
833 ret
834 }
835}
836#[derive(Debug)]
841pub struct PluggableClientTransport {
842 inner: Option<AsyncPtChild>,
844 pub(crate) binary_path: PathBuf,
846 arguments: Vec<String>,
848 common_params: PtCommonParameters,
850 client_params: PtClientParameters,
852 cmethods: HashMap<PtTransportName, PtClientMethod>,
854}
855
856impl PluggableTransport for PluggableClientTransport {
857 fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
858 &self.cmethods
859 }
860}
861
862impl PluggableTransportPrivate for PluggableClientTransport {
863 fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
864 self.inner.as_mut().ok_or(PtError::ChildGone)
865 }
866 fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
867 self.inner = newval;
868 }
869 fn identifier(&self) -> &str {
870 match &self.inner {
871 Some(child) => &child.identifier,
872 None => "<not yet launched>",
873 }
874 }
875 fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
876 self.client_params.transports.contains(transport)
877 }
878}
879
880impl PluggableClientTransport {
881 pub fn new(
886 binary_path: PathBuf,
887 arguments: Vec<String>,
888 common_params: PtCommonParameters,
889 client_params: PtClientParameters,
890 ) -> Self {
891 Self {
892 common_params,
893 client_params,
894 arguments,
895 binary_path,
896 inner: None,
897 cmethods: Default::default(),
898 }
899 }
900
901 pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
906 let all_env_vars = self
907 .client_params
908 .environment_variables(&self.common_params);
909
910 let mut async_child =
911 <PluggableClientTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
912 &self.inner,
913 &self.client_params.transports,
914 &self.binary_path,
915 &self.arguments,
916 all_env_vars,
917 )?;
918
919 let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
920 let mut cmethods = HashMap::new();
921 let mut proxy_done = self.client_params.proxy_uri.is_none();
922
923 loop {
924 match self
925 .try_match_common_messages(&rt, deadline, &mut async_child)
926 .await
927 {
928 Ok(maybe_message) => {
929 if let Some(message) = maybe_message {
930 match message {
931 PtMessage::ClientTransportLaunched {
932 transport,
933 protocol,
934 endpoint,
935 } => {
936 self.common_transport_launched_handler(
937 Some(protocol),
938 transport,
939 endpoint,
940 &mut cmethods,
941 )?;
942 }
943 PtMessage::ProxyDone => {
944 if proxy_done {
945 return Err(PtError::ProtocolViolation(
946 "binary initiated proxy when not asked (or twice)".into(),
947 ));
948 }
949 info!("PT binary now proxying connections via supplied URI");
950 proxy_done = true;
951 }
952 PtMessage::ClientTransportsDone => {
954 let unsupported = self
955 .client_params
956 .transports
957 .iter()
958 .filter(|&x| !cmethods.contains_key(x))
959 .map(|x| x.to_string())
960 .collect::<Vec<_>>();
961 if !unsupported.is_empty() {
962 warn!(
963 "PT binary failed to initialise transports: {:?}",
964 unsupported
965 );
966 return Err(PtError::ClientTransportsUnsupported(unsupported));
967 }
968 info!("PT binary initialisation done");
969 break;
970 }
971 x => {
972 return Err(PtError::ProtocolViolation(format!(
973 "received unexpected {:?}",
974 x
975 )));
976 }
977 }
978 }
979 }
980 Err(e) => return Err(e),
981 }
982 }
983 self.cmethods = cmethods;
984 self.inner = Some(async_child);
985 Ok(())
987 }
988}
989
990#[derive(Debug)]
995pub struct PluggableServerTransport {
996 inner: Option<AsyncPtChild>,
998 pub(crate) binary_path: PathBuf,
1000 arguments: Vec<String>,
1002 common_params: PtCommonParameters,
1004 server_params: PtServerParameters,
1006 smethods: HashMap<PtTransportName, PtClientMethod>,
1008}
1009
1010impl PluggableTransportPrivate for PluggableServerTransport {
1011 fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
1012 self.inner.as_mut().ok_or(PtError::ChildGone)
1013 }
1014 fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
1015 self.inner = newval;
1016 }
1017 fn identifier(&self) -> &str {
1018 match &self.inner {
1019 Some(child) => &child.identifier,
1020 None => "<not yet launched>",
1021 }
1022 }
1023 fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
1024 self.server_params.transports.contains(transport)
1025 }
1026}
1027
1028impl PluggableTransport for PluggableServerTransport {
1029 fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
1030 &self.smethods
1031 }
1032}
1033
1034impl PluggableServerTransport {
1035 pub fn new(
1040 binary_path: PathBuf,
1041 arguments: Vec<String>,
1042 common_params: PtCommonParameters,
1043 server_params: PtServerParameters,
1044 ) -> Self {
1045 Self {
1046 common_params,
1047 server_params,
1048 arguments,
1049 binary_path,
1050 inner: None,
1051 smethods: Default::default(),
1052 }
1053 }
1054
1055 pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
1060 let all_env_vars = self
1061 .server_params
1062 .environment_variables(&self.common_params);
1063
1064 let mut async_child =
1065 <PluggableServerTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
1066 &self.inner,
1067 &self.server_params.transports,
1068 &self.binary_path,
1069 &self.arguments,
1070 all_env_vars,
1071 )?;
1072
1073 let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
1074 let mut smethods = HashMap::new();
1075
1076 loop {
1077 match self
1078 .try_match_common_messages(&rt, deadline, &mut async_child)
1079 .await
1080 {
1081 Ok(maybe_message) => {
1082 if let Some(message) = maybe_message {
1083 match message {
1084 PtMessage::ServerTransportLaunched {
1085 transport,
1086 endpoint,
1087 options: _,
1088 } => {
1089 self.common_transport_launched_handler(
1090 None,
1091 transport,
1092 endpoint,
1093 &mut smethods,
1094 )?;
1095 }
1096 PtMessage::ServerTransportsDone => {
1097 let unsupported = self
1098 .server_params
1099 .transports
1100 .iter()
1101 .filter(|&x| !smethods.contains_key(x))
1102 .map(|x| x.to_string())
1103 .collect::<Vec<_>>();
1104 if !unsupported.is_empty() {
1105 warn!(
1106 "PT binary failed to initialise transports: {:?}",
1107 unsupported
1108 );
1109 return Err(PtError::ClientTransportsUnsupported(unsupported));
1110 }
1111 info!("PT binary initialisation done");
1112 break;
1113 }
1114 x => {
1115 return Err(PtError::ProtocolViolation(format!(
1116 "received unexpected {:?}",
1117 x
1118 )));
1119 }
1120 }
1121 }
1122 }
1123 Err(e) => return Err(e),
1124 }
1125 }
1126 self.smethods = smethods;
1127 self.inner = Some(async_child);
1128 Ok(())
1130 }
1131}
1132
1133#[cfg(test)]
1134mod test {
1135 #![allow(clippy::bool_assert_comparison)]
1137 #![allow(clippy::clone_on_copy)]
1138 #![allow(clippy::dbg_macro)]
1139 #![allow(clippy::mixed_attributes_style)]
1140 #![allow(clippy::print_stderr)]
1141 #![allow(clippy::print_stdout)]
1142 #![allow(clippy::single_char_pattern)]
1143 #![allow(clippy::unwrap_used)]
1144 #![allow(clippy::unchecked_duration_subtraction)]
1145 #![allow(clippy::useless_vec)]
1146 #![allow(clippy::needless_pass_by_value)]
1147 use crate::ipc::{PtMessage, PtStatus};
1150 use std::borrow::Cow;
1151 use std::collections::HashMap;
1152 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1153
1154 #[test]
1155 fn it_parses_spec_examples() {
1156 assert_eq!(
1157 "VERSION-ERROR no-version".parse(),
1158 Ok(PtMessage::VersionError("no-version".into()))
1159 );
1160 assert_eq!("VERSION 1".parse(), Ok(PtMessage::Version("1".into())));
1161 assert_eq!(
1162 "ENV-ERROR No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".parse(),
1163 Ok(PtMessage::EnvError(
1164 "No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".into()
1165 ))
1166 );
1167 assert_eq!("PROXY DONE".parse(), Ok(PtMessage::ProxyDone));
1168 assert_eq!(
1169 "PROXY-ERROR SOCKS 4 upstream proxies unsupported".parse(),
1170 Ok(PtMessage::ProxyError(
1171 "SOCKS 4 upstream proxies unsupported".into()
1172 ))
1173 );
1174 assert_eq!(
1175 "CMETHOD trebuchet socks5 127.0.0.1:19999".parse(),
1176 Ok(PtMessage::ClientTransportLaunched {
1177 transport: "trebuchet".parse().unwrap(),
1178 protocol: "socks5".to_string(),
1179 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19999)
1180 })
1181 );
1182 assert_eq!(
1183 "CMETHOD-ERROR trebuchet no rocks available".parse(),
1184 Ok(PtMessage::ClientTransportFailed {
1185 transport: "trebuchet".parse().unwrap(),
1186 message: "no rocks available".to_string()
1187 })
1188 );
1189 assert_eq!("CMETHODS DONE".parse(), Ok(PtMessage::ClientTransportsDone));
1190 assert_eq!(
1191 "SMETHOD trebuchet 198.51.100.1:19999".parse(),
1192 Ok(PtMessage::ServerTransportLaunched {
1193 transport: "trebuchet".parse().unwrap(),
1194 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 19999),
1195 options: Default::default()
1196 })
1197 );
1198 let mut map = HashMap::new();
1199 map.insert("N".to_string(), "13".to_string());
1200 assert_eq!(
1201 "SMETHOD rot_by_N 198.51.100.1:2323 ARGS:N=13".parse(),
1202 Ok(PtMessage::ServerTransportLaunched {
1203 transport: "rot_by_N".parse().unwrap(),
1204 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 2323),
1205 options: map
1206 })
1207 );
1208 let mut map = HashMap::new();
1209 map.insert(
1210 "cert".to_string(),
1211 "HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw".to_string(),
1212 );
1213 map.insert("iat-mode".to_string(), "0".to_string());
1214 assert_eq!(
1215 "SMETHOD obfs4 198.51.100.1:43734 ARGS:cert=HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw,iat-mode=0".parse(),
1216 Ok(PtMessage::ServerTransportLaunched {
1217 transport: "obfs4".parse().unwrap(),
1218 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 43734),
1219 options: map
1220 })
1221 );
1222 assert_eq!(
1223 "SMETHOD-ERROR trebuchet no cows available".parse(),
1224 Ok(PtMessage::ServerTransportFailed {
1225 transport: "trebuchet".parse().unwrap(),
1226 message: "no cows available".to_string()
1227 })
1228 );
1229 assert_eq!(
1230 "LOG SEVERITY=debug MESSAGE=\"Connected to bridge A\"".parse(),
1231 Ok(PtMessage::Log {
1232 severity: "debug".to_string(),
1233 message: "Connected to bridge A".to_string()
1234 })
1235 );
1236 assert_eq!(
1237 "LOG SEVERITY=debug MESSAGE=\"\\r\\n\\t\"".parse(),
1238 Ok(PtMessage::Log {
1239 severity: "debug".to_string(),
1240 message: "\r\n\t".to_string()
1241 })
1242 );
1243 assert_eq!(
1244 "LOG SEVERITY=debug MESSAGE=".parse(),
1245 Ok(PtMessage::Log {
1246 severity: "debug".to_string(),
1247 message: "".to_string()
1248 })
1249 );
1250 assert_eq!(
1251 "LOG SEVERITY=debug MESSAGE=\"\\a\"".parse::<PtMessage>(),
1252 Ok(PtMessage::Log {
1253 severity: "debug".to_string(),
1254 message: "a".to_string()
1255 })
1256 );
1257
1258 for i in 0..9 {
1259 let msg = format!("LOG SEVERITY=debug MESSAGE=\"\\{i}\"");
1260 assert_eq!(
1261 msg.parse::<PtMessage>(),
1262 Err(Cow::from("attempted unsupported octal escape code"))
1263 );
1264 }
1265 assert_eq!(
1266 "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=0\\".parse::<PtMessage>(),
1267 Err(Cow::from(
1268 "failed to parse SMETHOD ARGS: smethod arg terminates with backslash"
1269 ))
1270 );
1271 assert_eq!(
1272 "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=fo=o".parse::<PtMessage>(),
1273 Err(Cow::from(
1274 "failed to parse SMETHOD ARGS: encountered = while parsing value"
1275 ))
1276 );
1277 assert_eq!(
1278 "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode".parse::<PtMessage>(),
1279 Err(Cow::from(
1280 "failed to parse SMETHOD ARGS: ran out of chars parsing smethod arg"
1281 ))
1282 );
1283
1284 let mut map = HashMap::new();
1285 map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1286 map.insert("CONNECT".to_string(), "Success".to_string());
1287 assert_eq!(
1288 "STATUS ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1289 Ok(PtMessage::Status(PtStatus { data: map }))
1290 );
1291
1292 let mut map = HashMap::new();
1293 map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1294 map.insert("CONNECT".to_string(), "Success".to_string());
1295 map.insert("TRANSPORT".to_string(), "obfs4".to_string());
1296 assert_eq!(
1297 "STATUS TRANSPORT=obfs4 ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1298 Ok(PtMessage::Status(PtStatus { data: map }))
1299 );
1300
1301 let mut map = HashMap::new();
1302 map.insert("ADDRESS".to_string(), "198.51.100.222:2222".to_string());
1303 map.insert("CONNECT".to_string(), "Failed".to_string());
1304 map.insert("FINGERPRINT".to_string(), "<Fingerprint>".to_string());
1305 map.insert("ERRSTR".to_string(), "Connection refused".to_string());
1306 assert_eq!(
1307 "STATUS ADDRESS=198.51.100.222:2222 CONNECT=Failed FINGERPRINT=<Fingerprint> ERRSTR=\"Connection refused\"".parse(),
1308 Ok(PtMessage::Status(PtStatus {
1309 data: map
1310 }))
1311 );
1312 }
1313}