1use crate::err;
8use crate::err::PtError;
9use crate::PtClientMethod;
10use futures::channel::mpsc::Receiver;
11use futures::StreamExt;
12use itertools::Itertools;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::ffi::OsString;
16use std::io::{BufRead, BufReader};
17use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
18use std::path::PathBuf;
19use std::process::{Child, Command, Stdio};
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{io, thread};
24use tor_basic_utils::PathExt as _;
25use tor_error::{internal, warn_report};
26use tor_linkspec::PtTransportName;
27use tor_rtcompat::{Runtime, SleepProviderExt};
28use tor_socksproto::SocksVersion;
29use tracing::{debug, error, info, trace, warn};
30
31const GRACEFUL_EXIT_TIME: Duration = Duration::from_secs(5);
33const PT_START_TIMEOUT: Duration = Duration::from_secs(30);
35const PT_STDIO_BUFFER: usize = 64;
37
38#[derive(PartialEq, Eq, Debug, Clone)]
40pub struct PtStatus {
41 data: HashMap<String, String>,
45}
46
47#[derive(PartialEq, Eq, Debug, Clone)]
51#[non_exhaustive]
52#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
53pub enum PtMessage {
54 VersionError(String),
56 Version(String),
58 EnvError(String),
60 ProxyDone,
62 ProxyError(String),
64 ClientTransportLaunched {
66 transport: PtTransportName,
68 protocol: String,
70 endpoint: SocketAddr,
73 },
74 ClientTransportFailed {
76 transport: PtTransportName,
78 message: String,
80 },
81 ClientTransportsDone,
83 ServerTransportLaunched {
85 transport: PtTransportName,
87 endpoint: SocketAddr,
89 options: HashMap<String, String>,
92 },
93 ServerTransportFailed {
95 transport: PtTransportName,
97 message: String,
99 },
100 ServerTransportsDone,
102 Log {
104 severity: String,
106 message: String,
108 },
109 Status(PtStatus),
111 Unknown(String),
113}
114
115fn parse_one_value(from: &str) -> Result<(String, &str), &'static str> {
118 let first_char = from.chars().next();
119 Ok(if first_char.is_none() {
120 (String::new(), "")
121 } else if let Some('"') = first_char {
122 let mut ret = String::new();
126 let mut chars = from.chars();
127 assert_eq!(chars.next(), Some('"')); loop {
129 let ch = chars.next().ok_or("ran out of input parsing CString")?;
130 match ch {
131 '\\' => match chars
132 .next()
133 .ok_or("encountered trailing backslash in CString")?
134 {
135 'n' => ret.push('\n'),
136 'r' => ret.push('\r'),
137 't' => ret.push('\t'),
138 '0'..='8' => return Err("attempted unsupported octal escape code"),
139 ch2 => ret.push(ch2),
140 },
141 '"' => break,
142 _ => ret.push(ch),
143 }
144 }
145 (ret, chars.as_str())
146 } else {
147 let space = from.find(' ').unwrap_or(from.len());
149 (from[0..space].into(), &from[space..])
150 })
151}
152
153fn parse_one_smethod_arg(args: &str) -> Result<(String, String, &str), &'static str> {
157 let mut key = String::new();
161 let mut val = String::new();
162 let mut reading_val = false;
164 let mut chars = args.chars();
165 while let Some(c) = chars.next() {
166 let target = if reading_val { &mut val } else { &mut key };
167 match c {
168 '\\' => {
169 let c = chars
170 .next()
171 .ok_or("smethod arg terminates with backslash")?;
172 target.push(c);
173 }
174 '=' => {
175 if reading_val {
176 return Err("encountered = while parsing value");
177 }
178 reading_val = true;
179 }
180 ',' => break,
181 c => target.push(c),
182 }
183 }
184 if !reading_val {
185 return Err("ran out of chars parsing smethod arg");
186 }
187 Ok((key, val, chars.as_str()))
188}
189
190impl FromStr for PtMessage {
191 type Err = Cow<'static, str>;
192
193 #[allow(clippy::cognitive_complexity)]
196 fn from_str(s: &str) -> Result<Self, Self::Err> {
197 let mut words = s.split(' ');
200 let first_word = words.next().ok_or_else(|| Cow::from("empty line"))?;
201 Ok(match first_word {
202 "VERSION-ERROR" => {
203 let rest = words.join(" ");
204 Self::VersionError(rest)
205 }
206 "VERSION" => {
207 let vers = words.next().ok_or_else(|| Cow::from("no version"))?;
208 Self::Version(vers.into())
209 }
210 "ENV-ERROR" => {
211 let rest = words.join(" ");
212 Self::EnvError(rest)
213 }
214 "PROXY" => match words.next() {
215 Some("DONE") => Self::ProxyDone,
216 _ => Self::Unknown(s.into()),
217 },
218 "PROXY-ERROR" => {
219 let rest = words.join(" ");
220 Self::ProxyError(rest)
221 }
222 "CMETHOD" => {
223 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
224 let protocol = words.next().ok_or_else(|| Cow::from("no protocol"))?;
225 let endpoint = words
226 .next()
227 .ok_or_else(|| Cow::from("no endpoint"))?
228 .parse::<SocketAddr>()
229 .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
230 if !endpoint.ip().is_loopback() {
231 return Err(Cow::from(format!(
232 "CMETHOD endpoint {endpoint} was not localhost"
233 )));
234 }
235 Self::ClientTransportLaunched {
236 transport: transport
237 .parse()
238 .map_err(|_| Cow::from("bad transport ID"))?,
239 protocol: protocol.to_string(),
240 endpoint,
241 }
242 }
243 "CMETHOD-ERROR" => {
244 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
245 let rest = words.join(" ");
246 Self::ClientTransportFailed {
247 transport: transport
248 .parse()
249 .map_err(|_| Cow::from("bad transport ID"))?,
250 message: rest,
251 }
252 }
253 "CMETHODS" => match words.next() {
254 Some("DONE") => Self::ClientTransportsDone,
255 _ => Self::Unknown(s.into()),
256 },
257 "SMETHOD" => {
258 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
259 let endpoint = words
260 .next()
261 .ok_or_else(|| Cow::from("no endpoint"))?
262 .parse::<SocketAddr>()
263 .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
264 let mut parsed_args = HashMap::new();
266
267 for option in words {
273 if let Some(mut args) = option.strip_prefix("ARGS:") {
274 while !args.is_empty() {
275 let (k, v, rest) = parse_one_smethod_arg(args).map_err(|e| {
276 Cow::from(format!("failed to parse SMETHOD ARGS: {}", e))
277 })?;
278 if parsed_args.contains_key(&k) {
279 warn!("PT SMETHOD arguments contain repeated key {}!", k);
282 }
283 parsed_args.insert(k, v);
284 args = rest;
285 }
286 }
287 }
288 Self::ServerTransportLaunched {
289 transport: transport
290 .parse()
291 .map_err(|_| Cow::from("bad transport ID"))?,
292 endpoint,
293 options: parsed_args,
294 }
295 }
296 "SMETHOD-ERROR" => {
297 let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
298 let rest = words.join(" ");
299 Self::ServerTransportFailed {
300 transport: transport
301 .parse()
302 .map_err(|_| Cow::from("bad transport ID"))?,
303 message: rest,
304 }
305 }
306 "SMETHODS" => match words.next() {
307 Some("DONE") => Self::ServerTransportsDone,
308 _ => Self::Unknown(s.into()),
309 },
310 "LOG" => {
311 let severity = words
312 .next()
313 .ok_or_else(|| Cow::from("no severity"))?
314 .strip_prefix("SEVERITY=")
315 .ok_or_else(|| Cow::from("badly formatted severity"))?;
316 let message = words.join(" ");
317 let message = parse_one_value(
318 message
319 .strip_prefix("MESSAGE=")
320 .ok_or_else(|| Cow::from("no or badly formatted message"))?,
321 )
322 .map_err(Cow::from)?
323 .0;
324 Self::Log {
325 severity: severity.into(),
326 message,
327 }
328 }
329 "STATUS" => {
330 let mut ret = HashMap::new();
331 let message = words.join(" ");
332 let mut message = &message as &str;
333 while !message.is_empty() {
334 let equals = message
335 .find('=')
336 .ok_or_else(|| Cow::from(format!("failed to find = in '{}'", message)))?;
337 let k = &message[..equals];
338 if equals + 1 == message.len() {
339 return Err(Cow::from("key with no value"));
340 }
341 let (v, rest) = parse_one_value(&message[(equals + 1)..]).map_err(Cow::from)?;
342 if ret.contains_key(k) {
343 warn!("STATUS contains repeated key {}!", k);
346 }
347 ret.insert(k.to_owned(), v);
348 message = rest;
349 if message.starts_with(' ') {
350 message = &message[1..];
351 }
352 }
353 Self::Status(PtStatus { data: ret })
354 }
355 _ => Self::Unknown(s.into()),
356 })
357 }
358}
359
360use sealed::*;
361pub(crate) mod sealed {
363 use super::*;
364
365 #[derive(Debug)]
370 pub struct AsyncPtChild {
371 stdout: Receiver<io::Result<String>>,
373 pub identifier: String,
375 }
376
377 impl AsyncPtChild {
378 pub fn new(mut child: Child, identifier: String) -> Result<Self, PtError> {
381 let (stdin, stdout) = (
382 child.stdin.take().ok_or_else(|| {
383 PtError::Internal(internal!("Created child process without stdin pipe"))
384 })?,
385 child.stdout.take().ok_or_else(|| {
386 PtError::Internal(internal!("Created child process without stdout pipe"))
387 })?,
388 );
389 let (mut tx, rx) = tor_async_utils::mpsc_channel_no_memquota(PT_STDIO_BUFFER);
391 let ident = identifier.clone();
392 #[allow(clippy::cognitive_complexity)]
393 thread::spawn(move || {
394 let reader = BufReader::new(stdout);
395 let _stdin = stdin;
396 let mut noted_full = false;
397 for line in reader.lines() {
399 let err = line.is_err();
400 match &line {
401 Ok(l) => trace!("<-- PT {}: {:?}", ident, l),
402 Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e),
403 }
404 if let Err(e) = tx.try_send(line) {
405 if e.is_disconnected() {
406 debug!("PT {} is disconnected; shutting it down.", ident);
407 break;
409 }
410 if !noted_full {
413 noted_full = true; warn!(
415 "Bug: Message queue for PT {} became full; dropping message",
416 ident
417 );
418 }
419 }
420 if err {
421 break;
425 }
426 }
427 if let Ok(Some(_)) = child.try_wait() {
429 debug!("PT {} has exited.", ident);
432 return;
433 }
434 trace!("Asking PT {} to exit, nicely.", ident);
438 drop(_stdin);
439 thread::sleep(GRACEFUL_EXIT_TIME);
441 match child.try_wait() {
442 Ok(None) => {
443 debug!("Sending kill signal to PT {}", ident);
445 if let Err(e) = child.kill() {
446 warn_report!(e, "Failed to kill() spawned PT {}", ident);
447 }
448 }
449 Ok(Some(_)) => {
450 debug!("PT {} shut down successfully.", ident);
451 } Err(e) => {
453 warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident);
454 }
455 }
456 });
457 Ok(AsyncPtChild {
458 stdout: rx,
459 identifier,
460 })
461 }
462
463 pub async fn recv(&mut self) -> err::Result<PtMessage> {
467 loop {
468 match self.stdout.next().await {
469 None => return Err(PtError::ChildGone),
470 Some(Ok(line)) => {
471 let line =
472 line.parse::<PtMessage>()
473 .map_err(|e| PtError::IpcParseFailed {
474 line,
475 error: e.into(),
476 })?;
477 if let PtMessage::Log { severity, message } = line {
478 match &severity as &str {
481 "error" => error!("[pt {}] {}", self.identifier, message),
482 "warning" => warn!("[pt {}] {}", self.identifier, message),
483 "notice" => info!("[pt {}] {}", self.identifier, message),
484 "info" => debug!("[pt {}] {}", self.identifier, message),
485 "debug" => trace!("[pt {}] {}", self.identifier, message),
486 x => warn!("[pt] {} {} {}", self.identifier, x, message),
487 }
488 } else {
489 return Ok(line);
490 }
491 }
492 Some(Err(e)) => {
493 return Err(PtError::ChildReadFailed(Arc::new(e)));
494 }
495 }
496 }
497 }
498 }
499
500 #[async_trait::async_trait]
502 pub trait PluggableTransportPrivate {
503 fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError>;
505
506 fn set_inner(&mut self, newval: Option<AsyncPtChild>);
508
509 fn identifier(&self) -> &str;
511
512 fn specific_params_contains(&self, transport: &PtTransportName) -> bool;
514
515 fn common_transport_launched_handler(
517 &self,
518 protocol: Option<String>,
519 transport: PtTransportName,
520 endpoint: SocketAddr,
521 methods: &mut HashMap<PtTransportName, PtClientMethod>,
522 ) -> Result<(), PtError> {
523 if !self.specific_params_contains(&transport) {
524 return Err(PtError::ProtocolViolation(format!(
525 "binary launched unwanted transport '{}'",
526 transport
527 )));
528 }
529 let protocol = match protocol {
530 Some(protocol_str) => match &protocol_str as &str {
531 "socks4" => SocksVersion::V4,
532 "socks5" => SocksVersion::V5,
533 x => {
534 return Err(PtError::ProtocolViolation(format!(
535 "unknown CMETHOD protocol '{}'",
536 x
537 )))
538 }
539 },
540 None => SocksVersion::V5,
541 };
542 let method = PtClientMethod {
543 kind: protocol,
544 endpoint,
545 };
546 info!("Transport '{}' uses method {:?}", transport, method);
547 methods.insert(transport, method);
548 Ok(())
549 }
550
551 fn get_child_from_pt_launch(
553 inner: &Option<AsyncPtChild>,
554 transports: &Vec<PtTransportName>,
555 binary_path: &PathBuf,
556 arguments: &[String],
557 all_env_vars: HashMap<OsString, OsString>,
558 ) -> Result<AsyncPtChild, PtError> {
559 if inner.is_some() {
560 let warning_msg =
561 format!("Attempted to launch PT binary for {:?} twice.", transports);
562 warn!("{warning_msg}");
563 return Err(PtError::ChildProtocolViolation(warning_msg));
565 }
566 info!(
567 "Launching pluggable transport at {} for {:?}",
568 binary_path.display_lossy(),
569 transports
570 );
571 let child = Command::new(binary_path)
572 .args(arguments.iter())
573 .envs(all_env_vars)
574 .stdout(Stdio::piped())
575 .stdin(Stdio::piped())
576 .spawn()
577 .map_err(|e| PtError::ChildSpawnFailed {
578 path: binary_path.clone(),
579 error: Arc::new(e),
580 })?;
581
582 let identifier = crate::managed::pt_identifier(binary_path)?;
583 AsyncPtChild::new(child, identifier)
584 }
585
586 async fn try_match_common_messages<R: Runtime>(
595 &self,
596 rt: &R,
597 deadline: Instant,
598 async_child: &mut AsyncPtChild,
599 ) -> Result<Option<PtMessage>, PtError> {
600 match rt
601 .timeout(
602 deadline.saturating_duration_since(Instant::now()),
604 async_child.recv(),
605 )
606 .await
607 .map_err(|_| PtError::Timeout)??
608 {
609 PtMessage::ClientTransportFailed { transport, message }
610 | PtMessage::ServerTransportFailed { transport, message } => {
611 warn!(
612 "PT {} unable to launch {}. It said: {:?}",
613 async_child.identifier, transport, message
614 );
615 return Err(PtError::TransportGaveError {
616 transport: transport.to_string(),
617 message,
618 });
619 }
620 PtMessage::VersionError(e) => {
621 if e != "no-version" {
622 warn!("weird VERSION-ERROR: {}", e);
623 }
624 return Err(PtError::UnsupportedVersion);
625 }
626 PtMessage::Version(vers) => {
627 if vers != "1" {
628 return Err(PtError::ProtocolViolation(format!(
629 "stated version is {}, asked for 1",
630 vers
631 )));
632 }
633 Ok(None)
634 }
635 PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)),
636 PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)),
637 PtMessage::Status(_) => Ok(None),
639 PtMessage::Unknown(x) => {
640 warn!("unknown PT line: {}", x);
641 Ok(None)
642 }
643 x => {
646 return Ok(Some(x));
647 }
648 }
649 }
650 }
651}
652
653#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
655pub struct PtCommonParameters {
656 state_location: PathBuf,
658 #[builder(default)]
662 outbound_bind_v4: Option<Ipv4Addr>,
663 #[builder(default)]
667 outbound_bind_v6: Option<Ipv6Addr>,
668 #[builder(default)]
671 timeout: Option<Duration>,
672}
673
674impl PtCommonParameters {
675 pub fn builder() -> PtCommonParametersBuilder {
677 PtCommonParametersBuilder::default()
678 }
679
680 fn common_environment_variables(&self) -> HashMap<OsString, OsString> {
683 let mut ret = HashMap::new();
684 ret.insert("TOR_PT_MANAGED_TRANSPORT_VER".into(), "1".into());
685 ret.insert(
686 "TOR_PT_STATE_LOCATION".into(),
687 self.state_location.clone().into_os_string(),
688 );
689 ret.insert("TOR_PT_EXIT_ON_STDIN_CLOSE".into(), "1".into());
690 if let Some(v4) = self.outbound_bind_v4 {
691 ret.insert(
692 "TOR_PT_OUTBOUND_BIND_ADDRESS_V4".into(),
693 v4.to_string().into(),
694 );
695 }
696 if let Some(v6) = self.outbound_bind_v6 {
697 ret.insert(
699 "TOR_PT_OUTBOUND_BIND_ADDRESS_V6".into(),
700 format!("[{}]", v6).into(),
701 );
702 }
703 ret
704 }
705}
706
707#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
709pub struct PtClientParameters {
710 #[builder(default)]
712 proxy_uri: Option<String>,
713 transports: Vec<PtTransportName>,
717}
718
719impl PtClientParameters {
720 pub fn builder() -> PtClientParametersBuilder {
722 PtClientParametersBuilder::default()
723 }
724
725 fn environment_variables(
728 &self,
729 common_params: &PtCommonParameters,
730 ) -> HashMap<OsString, OsString> {
731 let mut ret = common_params.common_environment_variables();
732 if let Some(ref proxy_uri) = self.proxy_uri {
733 ret.insert("TOR_PT_PROXY".into(), proxy_uri.clone().into());
734 }
735 ret.insert(
736 "TOR_PT_CLIENT_TRANSPORTS".into(),
737 self.transports.iter().join(",").into(),
738 );
739 ret
740 }
741}
742
743#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
745pub struct PtServerParameters {
746 transports: Vec<PtTransportName>,
750 #[builder(default)]
752 server_transport_options: String,
753 #[builder(default)]
755 server_bindaddr: String,
756 #[builder(default)]
758 server_orport: Option<String>,
759 #[builder(default)]
761 server_extended_orport: Option<String>,
762}
763
764impl PtServerParameters {
765 pub fn builder() -> PtServerParametersBuilder {
767 PtServerParametersBuilder::default()
768 }
769
770 fn environment_variables(
773 &self,
774 common_params: &PtCommonParameters,
775 ) -> HashMap<OsString, OsString> {
776 let mut ret = common_params.common_environment_variables();
777 ret.insert(
778 "TOR_PT_SERVER_TRANSPORTS".into(),
779 self.transports.iter().join(",").into(),
780 );
781 ret.insert(
782 "TOR_PT_SERVER_TRANSPORT_OPTIONS".into(),
783 self.server_transport_options.clone().into(),
784 );
785 ret.insert(
786 "TOR_PT_SERVER_BINDADDR".into(),
787 self.server_bindaddr.clone().into(),
788 );
789 if let Some(ref server_orport) = self.server_orport {
790 ret.insert("TOR_PT_ORPORT".into(), server_orport.into());
791 }
792 if let Some(ref server_extended_orport) = self.server_extended_orport {
793 ret.insert(
794 "TOR_PT_EXTENDED_SERVER_PORT".into(),
795 server_extended_orport.into(),
796 );
797 }
798 ret
799 }
800}
801
802#[async_trait::async_trait]
804#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
805pub trait PluggableTransport: PluggableTransportPrivate {
806 fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod>;
811
812 async fn next_message(&mut self) -> err::Result<PtMessage> {
820 let inner = self.inner()?;
821 let ret = inner.recv().await;
822 if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = &ret {
823 debug!(
826 "PT {}: Received {:?}; shutting down.",
827 self.identifier(),
828 ret
829 );
830 self.set_inner(None);
831 }
832 ret
833 }
834}
835#[derive(Debug)]
840pub struct PluggableClientTransport {
841 inner: Option<AsyncPtChild>,
843 pub(crate) binary_path: PathBuf,
845 arguments: Vec<String>,
847 common_params: PtCommonParameters,
849 client_params: PtClientParameters,
851 cmethods: HashMap<PtTransportName, PtClientMethod>,
853}
854
855impl PluggableTransport for PluggableClientTransport {
856 fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
857 &self.cmethods
858 }
859}
860
861impl PluggableTransportPrivate for PluggableClientTransport {
862 fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
863 self.inner.as_mut().ok_or(PtError::ChildGone)
864 }
865 fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
866 self.inner = newval;
867 }
868 fn identifier(&self) -> &str {
869 match &self.inner {
870 Some(child) => &child.identifier,
871 None => "<not yet launched>",
872 }
873 }
874 fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
875 self.client_params.transports.contains(transport)
876 }
877}
878
879impl PluggableClientTransport {
880 pub fn new(
885 binary_path: PathBuf,
886 arguments: Vec<String>,
887 common_params: PtCommonParameters,
888 client_params: PtClientParameters,
889 ) -> Self {
890 Self {
891 common_params,
892 client_params,
893 arguments,
894 binary_path,
895 inner: None,
896 cmethods: Default::default(),
897 }
898 }
899
900 pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
905 let all_env_vars = self
906 .client_params
907 .environment_variables(&self.common_params);
908
909 let mut async_child =
910 <PluggableClientTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
911 &self.inner,
912 &self.client_params.transports,
913 &self.binary_path,
914 &self.arguments,
915 all_env_vars,
916 )?;
917
918 let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
919 let mut cmethods = HashMap::new();
920 let mut proxy_done = self.client_params.proxy_uri.is_none();
921
922 loop {
923 match self
924 .try_match_common_messages(&rt, deadline, &mut async_child)
925 .await
926 {
927 Ok(maybe_message) => {
928 if let Some(message) = maybe_message {
929 match message {
930 PtMessage::ClientTransportLaunched {
931 transport,
932 protocol,
933 endpoint,
934 } => {
935 self.common_transport_launched_handler(
936 Some(protocol),
937 transport,
938 endpoint,
939 &mut cmethods,
940 )?;
941 }
942 PtMessage::ProxyDone => {
943 if proxy_done {
944 return Err(PtError::ProtocolViolation(
945 "binary initiated proxy when not asked (or twice)".into(),
946 ));
947 }
948 info!("PT binary now proxying connections via supplied URI");
949 proxy_done = true;
950 }
951 PtMessage::ClientTransportsDone => {
953 let unsupported = self
954 .client_params
955 .transports
956 .iter()
957 .filter(|&x| !cmethods.contains_key(x))
958 .map(|x| x.to_string())
959 .collect::<Vec<_>>();
960 if !unsupported.is_empty() {
961 warn!(
962 "PT binary failed to initialise transports: {:?}",
963 unsupported
964 );
965 return Err(PtError::ClientTransportsUnsupported(unsupported));
966 }
967 info!("PT binary initialisation done");
968 break;
969 }
970 x => {
971 return Err(PtError::ProtocolViolation(format!(
972 "received unexpected {:?}",
973 x
974 )));
975 }
976 }
977 }
978 }
979 Err(e) => return Err(e),
980 }
981 }
982 self.cmethods = cmethods;
983 self.inner = Some(async_child);
984 Ok(())
986 }
987}
988
989#[derive(Debug)]
994pub struct PluggableServerTransport {
995 inner: Option<AsyncPtChild>,
997 pub(crate) binary_path: PathBuf,
999 arguments: Vec<String>,
1001 common_params: PtCommonParameters,
1003 server_params: PtServerParameters,
1005 smethods: HashMap<PtTransportName, PtClientMethod>,
1007}
1008
1009impl PluggableTransportPrivate for PluggableServerTransport {
1010 fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
1011 self.inner.as_mut().ok_or(PtError::ChildGone)
1012 }
1013 fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
1014 self.inner = newval;
1015 }
1016 fn identifier(&self) -> &str {
1017 match &self.inner {
1018 Some(child) => &child.identifier,
1019 None => "<not yet launched>",
1020 }
1021 }
1022 fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
1023 self.server_params.transports.contains(transport)
1024 }
1025}
1026
1027impl PluggableTransport for PluggableServerTransport {
1028 fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
1029 &self.smethods
1030 }
1031}
1032
1033impl PluggableServerTransport {
1034 pub fn new(
1039 binary_path: PathBuf,
1040 arguments: Vec<String>,
1041 common_params: PtCommonParameters,
1042 server_params: PtServerParameters,
1043 ) -> Self {
1044 Self {
1045 common_params,
1046 server_params,
1047 arguments,
1048 binary_path,
1049 inner: None,
1050 smethods: Default::default(),
1051 }
1052 }
1053
1054 pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
1059 let all_env_vars = self
1060 .server_params
1061 .environment_variables(&self.common_params);
1062
1063 let mut async_child =
1064 <PluggableServerTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
1065 &self.inner,
1066 &self.server_params.transports,
1067 &self.binary_path,
1068 &self.arguments,
1069 all_env_vars,
1070 )?;
1071
1072 let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
1073 let mut smethods = HashMap::new();
1074
1075 loop {
1076 match self
1077 .try_match_common_messages(&rt, deadline, &mut async_child)
1078 .await
1079 {
1080 Ok(maybe_message) => {
1081 if let Some(message) = maybe_message {
1082 match message {
1083 PtMessage::ServerTransportLaunched {
1084 transport,
1085 endpoint,
1086 options: _,
1087 } => {
1088 self.common_transport_launched_handler(
1089 None,
1090 transport,
1091 endpoint,
1092 &mut smethods,
1093 )?;
1094 }
1095 PtMessage::ServerTransportsDone => {
1096 let unsupported = self
1097 .server_params
1098 .transports
1099 .iter()
1100 .filter(|&x| !smethods.contains_key(x))
1101 .map(|x| x.to_string())
1102 .collect::<Vec<_>>();
1103 if !unsupported.is_empty() {
1104 warn!(
1105 "PT binary failed to initialise transports: {:?}",
1106 unsupported
1107 );
1108 return Err(PtError::ClientTransportsUnsupported(unsupported));
1109 }
1110 info!("PT binary initialisation done");
1111 break;
1112 }
1113 x => {
1114 return Err(PtError::ProtocolViolation(format!(
1115 "received unexpected {:?}",
1116 x
1117 )));
1118 }
1119 }
1120 }
1121 }
1122 Err(e) => return Err(e),
1123 }
1124 }
1125 self.smethods = smethods;
1126 self.inner = Some(async_child);
1127 Ok(())
1129 }
1130}
1131
1132#[cfg(test)]
1133mod test {
1134 #![allow(clippy::bool_assert_comparison)]
1136 #![allow(clippy::clone_on_copy)]
1137 #![allow(clippy::dbg_macro)]
1138 #![allow(clippy::mixed_attributes_style)]
1139 #![allow(clippy::print_stderr)]
1140 #![allow(clippy::print_stdout)]
1141 #![allow(clippy::single_char_pattern)]
1142 #![allow(clippy::unwrap_used)]
1143 #![allow(clippy::unchecked_duration_subtraction)]
1144 #![allow(clippy::useless_vec)]
1145 #![allow(clippy::needless_pass_by_value)]
1146 use crate::ipc::{PtMessage, PtStatus};
1149 use std::borrow::Cow;
1150 use std::collections::HashMap;
1151 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1152
1153 #[test]
1154 fn it_parses_spec_examples() {
1155 assert_eq!(
1156 "VERSION-ERROR no-version".parse(),
1157 Ok(PtMessage::VersionError("no-version".into()))
1158 );
1159 assert_eq!("VERSION 1".parse(), Ok(PtMessage::Version("1".into())));
1160 assert_eq!(
1161 "ENV-ERROR No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".parse(),
1162 Ok(PtMessage::EnvError(
1163 "No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".into()
1164 ))
1165 );
1166 assert_eq!("PROXY DONE".parse(), Ok(PtMessage::ProxyDone));
1167 assert_eq!(
1168 "PROXY-ERROR SOCKS 4 upstream proxies unsupported".parse(),
1169 Ok(PtMessage::ProxyError(
1170 "SOCKS 4 upstream proxies unsupported".into()
1171 ))
1172 );
1173 assert_eq!(
1174 "CMETHOD trebuchet socks5 127.0.0.1:19999".parse(),
1175 Ok(PtMessage::ClientTransportLaunched {
1176 transport: "trebuchet".parse().unwrap(),
1177 protocol: "socks5".to_string(),
1178 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19999)
1179 })
1180 );
1181 assert_eq!(
1182 "CMETHOD-ERROR trebuchet no rocks available".parse(),
1183 Ok(PtMessage::ClientTransportFailed {
1184 transport: "trebuchet".parse().unwrap(),
1185 message: "no rocks available".to_string()
1186 })
1187 );
1188 assert_eq!("CMETHODS DONE".parse(), Ok(PtMessage::ClientTransportsDone));
1189 assert_eq!(
1190 "SMETHOD trebuchet 198.51.100.1:19999".parse(),
1191 Ok(PtMessage::ServerTransportLaunched {
1192 transport: "trebuchet".parse().unwrap(),
1193 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 19999),
1194 options: Default::default()
1195 })
1196 );
1197 let mut map = HashMap::new();
1198 map.insert("N".to_string(), "13".to_string());
1199 assert_eq!(
1200 "SMETHOD rot_by_N 198.51.100.1:2323 ARGS:N=13".parse(),
1201 Ok(PtMessage::ServerTransportLaunched {
1202 transport: "rot_by_N".parse().unwrap(),
1203 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 2323),
1204 options: map
1205 })
1206 );
1207 let mut map = HashMap::new();
1208 map.insert(
1209 "cert".to_string(),
1210 "HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw".to_string(),
1211 );
1212 map.insert("iat-mode".to_string(), "0".to_string());
1213 assert_eq!(
1214 "SMETHOD obfs4 198.51.100.1:43734 ARGS:cert=HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw,iat-mode=0".parse(),
1215 Ok(PtMessage::ServerTransportLaunched {
1216 transport: "obfs4".parse().unwrap(),
1217 endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 43734),
1218 options: map
1219 })
1220 );
1221 assert_eq!(
1222 "SMETHOD-ERROR trebuchet no cows available".parse(),
1223 Ok(PtMessage::ServerTransportFailed {
1224 transport: "trebuchet".parse().unwrap(),
1225 message: "no cows available".to_string()
1226 })
1227 );
1228 assert_eq!(
1229 "LOG SEVERITY=debug MESSAGE=\"Connected to bridge A\"".parse(),
1230 Ok(PtMessage::Log {
1231 severity: "debug".to_string(),
1232 message: "Connected to bridge A".to_string()
1233 })
1234 );
1235 assert_eq!(
1236 "LOG SEVERITY=debug MESSAGE=\"\\r\\n\\t\"".parse(),
1237 Ok(PtMessage::Log {
1238 severity: "debug".to_string(),
1239 message: "\r\n\t".to_string()
1240 })
1241 );
1242 assert_eq!(
1243 "LOG SEVERITY=debug MESSAGE=".parse(),
1244 Ok(PtMessage::Log {
1245 severity: "debug".to_string(),
1246 message: "".to_string()
1247 })
1248 );
1249 assert_eq!(
1250 "LOG SEVERITY=debug MESSAGE=\"\\a\"".parse::<PtMessage>(),
1251 Ok(PtMessage::Log {
1252 severity: "debug".to_string(),
1253 message: "a".to_string()
1254 })
1255 );
1256
1257 for i in 0..9 {
1258 let msg = format!("LOG SEVERITY=debug MESSAGE=\"\\{i}\"");
1259 assert_eq!(
1260 msg.parse::<PtMessage>(),
1261 Err(Cow::from("attempted unsupported octal escape code"))
1262 );
1263 }
1264 assert_eq!(
1265 "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=0\\".parse::<PtMessage>(),
1266 Err(Cow::from(
1267 "failed to parse SMETHOD ARGS: smethod arg terminates with backslash"
1268 ))
1269 );
1270 assert_eq!(
1271 "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=fo=o".parse::<PtMessage>(),
1272 Err(Cow::from(
1273 "failed to parse SMETHOD ARGS: encountered = while parsing value"
1274 ))
1275 );
1276 assert_eq!(
1277 "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode".parse::<PtMessage>(),
1278 Err(Cow::from(
1279 "failed to parse SMETHOD ARGS: ran out of chars parsing smethod arg"
1280 ))
1281 );
1282
1283 let mut map = HashMap::new();
1284 map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1285 map.insert("CONNECT".to_string(), "Success".to_string());
1286 assert_eq!(
1287 "STATUS ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1288 Ok(PtMessage::Status(PtStatus { data: map }))
1289 );
1290
1291 let mut map = HashMap::new();
1292 map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1293 map.insert("CONNECT".to_string(), "Success".to_string());
1294 map.insert("TRANSPORT".to_string(), "obfs4".to_string());
1295 assert_eq!(
1296 "STATUS TRANSPORT=obfs4 ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1297 Ok(PtMessage::Status(PtStatus { data: map }))
1298 );
1299
1300 let mut map = HashMap::new();
1301 map.insert("ADDRESS".to_string(), "198.51.100.222:2222".to_string());
1302 map.insert("CONNECT".to_string(), "Failed".to_string());
1303 map.insert("FINGERPRINT".to_string(), "<Fingerprint>".to_string());
1304 map.insert("ERRSTR".to_string(), "Connection refused".to_string());
1305 assert_eq!(
1306 "STATUS ADDRESS=198.51.100.222:2222 CONNECT=Failed FINGERPRINT=<Fingerprint> ERRSTR=\"Connection refused\"".parse(),
1307 Ok(PtMessage::Status(PtStatus {
1308 data: map
1309 }))
1310 );
1311 }
1312}