tor_ptmgr/
ipc.rs

1//! Launching pluggable transport binaries and communicating with them.
2//!
3//! This module contains utilities to launch pluggable transports supporting pt-spec.txt
4//! version 1, and communicate with them in order to specify configuration parameters and
5//! receive updates as to the current state of the PT.
6
7use crate::err;
8use crate::err::PtError;
9use crate::PtClientMethod;
10use futures::channel::mpsc::Receiver;
11use futures::StreamExt;
12use itertools::Itertools;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::ffi::OsString;
16use std::io::{BufRead, BufReader};
17use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
18use std::path::PathBuf;
19use std::process::{Child, Command, Stdio};
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{io, thread};
24use tor_basic_utils::PathExt as _;
25use tor_error::{internal, warn_report};
26use tor_linkspec::PtTransportName;
27use tor_rtcompat::{Runtime, SleepProviderExt};
28use tor_socksproto::SocksVersion;
29use tracing::{debug, error, info, trace, warn};
30
31/// Amount of time we give a pluggable transport child process to exit gracefully.
32const GRACEFUL_EXIT_TIME: Duration = Duration::from_secs(5);
33/// Default timeout for PT binary startup.
34const PT_START_TIMEOUT: Duration = Duration::from_secs(30);
35/// Size for the buffer storing pluggable transport stdout lines.
36const PT_STDIO_BUFFER: usize = 64;
37
38/// An arbitrary key/value status update from a pluggable transport.
39#[derive(PartialEq, Eq, Debug, Clone)]
40pub struct PtStatus {
41    /// Arbitrary key-value data about the state of this transport, from the binary running
42    /// said transport.
43    // NOTE(eta): This is assumed to not have duplicate keys.
44    data: HashMap<String, String>,
45}
46
47/// A message sent from a pluggable transport child process.
48///
49/// For more in-depth information about these messages, consult pt-spec.txt.
50#[derive(PartialEq, Eq, Debug, Clone)]
51#[non_exhaustive]
52#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
53pub enum PtMessage {
54    /// `VERSION-ERROR`: No compatible pluggable transport specification version was provided.
55    VersionError(String),
56    /// `VERSION`: Specifies the version the binary is using for the IPC protocol.
57    Version(String),
58    /// `ENV-ERROR`: Reports an error with the provided environment variables.
59    EnvError(String),
60    /// `PROXY DONE`: The configured proxy was correctly initialised.
61    ProxyDone,
62    /// `PROXY-ERROR`: An error was encountered setting up the configured proxy.
63    ProxyError(String),
64    /// `CMETHOD`: A client transport has been launched.
65    ClientTransportLaunched {
66        /// The name of the launched transport.
67        transport: PtTransportName,
68        /// The protocol used ('socks4' or 'socks5').
69        protocol: String,
70        /// An address to connect via this transport.
71        /// (This should be localhost.)
72        endpoint: SocketAddr,
73    },
74    /// `CMETHOD-ERROR`: An error was encountered setting up a client transport.
75    ClientTransportFailed {
76        /// The name of the transport.
77        transport: PtTransportName,
78        /// The error message.
79        message: String,
80    },
81    /// `CMETHODS DONE`: All client transports that are supported have been launched.
82    ClientTransportsDone,
83    /// `SMETHOD`: A server transport has been launched.
84    ServerTransportLaunched {
85        /// The name of the launched transport.
86        transport: PtTransportName,
87        /// The endpoint clients should use the reach the transport.
88        endpoint: SocketAddr,
89        /// Additional per-transport information.
90        // NOTE(eta): This assumes it actually is k/v and repeated keys aren't allowed...
91        options: HashMap<String, String>,
92    },
93    /// `SMETHOD-ERROR`: An error was encountered setting up a server transport.
94    ServerTransportFailed {
95        /// The name of the transport.
96        transport: PtTransportName,
97        /// The error message.
98        message: String,
99    },
100    /// `SMETHODS DONE`: All server transports that are supported have been launched.
101    ServerTransportsDone,
102    /// `LOG`: A log message.
103    Log {
104        /// The severity (one of 'error', 'warning', 'notice', 'info', 'debug').
105        severity: String,
106        /// The log message.
107        message: String,
108    },
109    /// `STATUS`: Arbitrary key/value status messages.
110    Status(PtStatus),
111    /// A line containing an unknown command.
112    Unknown(String),
113}
114
115/// Parse a value (something on the RHS of an =), which could be a CString as defined by
116/// control-spec.txt §2. Returns (value, unparsed rest of string).
117fn parse_one_value(from: &str) -> Result<(String, &str), &'static str> {
118    let first_char = from.chars().next();
119    Ok(if first_char.is_none() {
120        (String::new(), "")
121    } else if let Some('"') = first_char {
122        // This is a CString, so we're going to need to parse it char-by-char.
123        // FIXME(eta): This currently doesn't parse octal escape codes, even though the spec says
124        //             we should. That's finicky, though, and probably not used.
125        let mut ret = String::new();
126        let mut chars = from.chars();
127        assert_eq!(chars.next(), Some('"')); // discard "
128        loop {
129            let ch = chars.next().ok_or("ran out of input parsing CString")?;
130            match ch {
131                '\\' => match chars
132                    .next()
133                    .ok_or("encountered trailing backslash in CString")?
134                {
135                    'n' => ret.push('\n'),
136                    'r' => ret.push('\r'),
137                    't' => ret.push('\t'),
138                    '0'..='8' => return Err("attempted unsupported octal escape code"),
139                    ch2 => ret.push(ch2),
140                },
141                '"' => break,
142                _ => ret.push(ch),
143            }
144        }
145        (ret, chars.as_str())
146    } else {
147        // Simple: just find the space
148        let space = from.find(' ').unwrap_or(from.len());
149        (from[0..space].into(), &from[space..])
150    })
151}
152
153/// Chomp one key/value pair off a list of smethod args.
154/// Returns (k, v, unparsed rest of string).
155/// Will also chomp the comma at the end, if there is one.
156fn parse_one_smethod_arg(args: &str) -> Result<(String, String, &str), &'static str> {
157    // NOTE(eta): Apologies for this looking a bit gnarly. Ideally, this is what you'd use
158    //            something like `nom` for, but I didn't want to bring in a dep just for this.
159
160    let mut key = String::new();
161    let mut val = String::new();
162    // If true, we're reading the value, not the key.
163    let mut reading_val = false;
164    let mut chars = args.chars();
165    while let Some(c) = chars.next() {
166        let target = if reading_val { &mut val } else { &mut key };
167        match c {
168            '\\' => {
169                let c = chars
170                    .next()
171                    .ok_or("smethod arg terminates with backslash")?;
172                target.push(c);
173            }
174            '=' => {
175                if reading_val {
176                    return Err("encountered = while parsing value");
177                }
178                reading_val = true;
179            }
180            ',' => break,
181            c => target.push(c),
182        }
183    }
184    if !reading_val {
185        return Err("ran out of chars parsing smethod arg");
186    }
187    Ok((key, val, chars.as_str()))
188}
189
190impl FromStr for PtMessage {
191    type Err = Cow<'static, str>;
192
193    // NOTE(eta): This, of course, implies that the PT IPC communications are valid UTF-8.
194    //            This assumption might turn out to be false.
195    #[allow(clippy::cognitive_complexity)]
196    fn from_str(s: &str) -> Result<Self, Self::Err> {
197        // TODO(eta): Maybe tolerate additional whitespace (using `split_whitespace`)?.
198        //            This requires modified words.join() logic, though.
199        let mut words = s.split(' ');
200        let first_word = words.next().ok_or_else(|| Cow::from("empty line"))?;
201        Ok(match first_word {
202            "VERSION-ERROR" => {
203                let rest = words.join(" ");
204                Self::VersionError(rest)
205            }
206            "VERSION" => {
207                let vers = words.next().ok_or_else(|| Cow::from("no version"))?;
208                Self::Version(vers.into())
209            }
210            "ENV-ERROR" => {
211                let rest = words.join(" ");
212                Self::EnvError(rest)
213            }
214            "PROXY" => match words.next() {
215                Some("DONE") => Self::ProxyDone,
216                _ => Self::Unknown(s.into()),
217            },
218            "PROXY-ERROR" => {
219                let rest = words.join(" ");
220                Self::ProxyError(rest)
221            }
222            "CMETHOD" => {
223                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
224                let protocol = words.next().ok_or_else(|| Cow::from("no protocol"))?;
225                let endpoint = words
226                    .next()
227                    .ok_or_else(|| Cow::from("no endpoint"))?
228                    .parse::<SocketAddr>()
229                    .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
230                if !endpoint.ip().is_loopback() {
231                    return Err(Cow::from(format!(
232                        "CMETHOD endpoint {endpoint} was not localhost"
233                    )));
234                }
235                Self::ClientTransportLaunched {
236                    transport: transport
237                        .parse()
238                        .map_err(|_| Cow::from("bad transport ID"))?,
239                    protocol: protocol.to_string(),
240                    endpoint,
241                }
242            }
243            "CMETHOD-ERROR" => {
244                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
245                let rest = words.join(" ");
246                Self::ClientTransportFailed {
247                    transport: transport
248                        .parse()
249                        .map_err(|_| Cow::from("bad transport ID"))?,
250                    message: rest,
251                }
252            }
253            "CMETHODS" => match words.next() {
254                Some("DONE") => Self::ClientTransportsDone,
255                _ => Self::Unknown(s.into()),
256            },
257            "SMETHOD" => {
258                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
259                let endpoint = words
260                    .next()
261                    .ok_or_else(|| Cow::from("no endpoint"))?
262                    .parse::<SocketAddr>()
263                    .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
264                // The SMETHOD endpoint is the place where _clients_ connect, and it shouldn't be localhost.
265                let mut parsed_args = HashMap::new();
266
267                // NOTE(eta): pt-spec.txt seems to imply these options can't contain spaces, so
268                //            we work under that assumption.
269                //            It also doesn't actually parse them out -- but seeing as the API to
270                //            feed these back in will want them as separated k/v pairs, I think
271                //            it makes sense to here.
272                for option in words {
273                    if let Some(mut args) = option.strip_prefix("ARGS:") {
274                        while !args.is_empty() {
275                            let (k, v, rest) = parse_one_smethod_arg(args).map_err(|e| {
276                                Cow::from(format!("failed to parse SMETHOD ARGS: {}", e))
277                            })?;
278                            if parsed_args.contains_key(&k) {
279                                // At least check our assumption that this is actually k/v
280                                // and not Vec<(String, String)>.
281                                warn!("PT SMETHOD arguments contain repeated key {}!", k);
282                            }
283                            parsed_args.insert(k, v);
284                            args = rest;
285                        }
286                    }
287                }
288                Self::ServerTransportLaunched {
289                    transport: transport
290                        .parse()
291                        .map_err(|_| Cow::from("bad transport ID"))?,
292                    endpoint,
293                    options: parsed_args,
294                }
295            }
296            "SMETHOD-ERROR" => {
297                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
298                let rest = words.join(" ");
299                Self::ServerTransportFailed {
300                    transport: transport
301                        .parse()
302                        .map_err(|_| Cow::from("bad transport ID"))?,
303                    message: rest,
304                }
305            }
306            "SMETHODS" => match words.next() {
307                Some("DONE") => Self::ServerTransportsDone,
308                _ => Self::Unknown(s.into()),
309            },
310            "LOG" => {
311                let severity = words
312                    .next()
313                    .ok_or_else(|| Cow::from("no severity"))?
314                    .strip_prefix("SEVERITY=")
315                    .ok_or_else(|| Cow::from("badly formatted severity"))?;
316                let message = words.join(" ");
317                let message = parse_one_value(
318                    message
319                        .strip_prefix("MESSAGE=")
320                        .ok_or_else(|| Cow::from("no or badly formatted message"))?,
321                )
322                .map_err(Cow::from)?
323                .0;
324                Self::Log {
325                    severity: severity.into(),
326                    message,
327                }
328            }
329            "STATUS" => {
330                let mut ret = HashMap::new();
331                let message = words.join(" ");
332                let mut message = &message as &str;
333                while !message.is_empty() {
334                    let equals = message
335                        .find('=')
336                        .ok_or_else(|| Cow::from(format!("failed to find = in '{}'", message)))?;
337                    let k = &message[..equals];
338                    if equals + 1 == message.len() {
339                        return Err(Cow::from("key with no value"));
340                    }
341                    let (v, rest) = parse_one_value(&message[(equals + 1)..]).map_err(Cow::from)?;
342                    if ret.contains_key(k) {
343                        // At least check our assumption that this is actually k/v
344                        // and not Vec<(String, String)>.
345                        warn!("STATUS contains repeated key {}!", k);
346                    }
347                    ret.insert(k.to_owned(), v);
348                    message = rest;
349                    if message.starts_with(' ') {
350                        message = &message[1..];
351                    }
352                }
353                Self::Status(PtStatus { data: ret })
354            }
355            _ => Self::Unknown(s.into()),
356        })
357    }
358}
359
360use sealed::*;
361/// Sealed trait to protect private types and default trait implementations
362pub(crate) mod sealed {
363    use super::*;
364
365    /// A handle to receive lines from a pluggable transport process' stdout asynchronously.
366    //
367    // FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without
368    //             being async-runtime dependent (or adding process spawning to tor-rtcompat).
369    #[derive(Debug)]
370    pub struct AsyncPtChild {
371        /// Channel to receive lines from the child process stdout.
372        stdout: Receiver<io::Result<String>>,
373        /// Identifier to put in logging messages.
374        pub identifier: String,
375    }
376
377    impl AsyncPtChild {
378        /// Wrap an OS child process by spawning a worker thread to forward output from the child
379        /// to the asynchronous runtime via use of a channel.
380        pub fn new(mut child: Child, identifier: String) -> Result<Self, PtError> {
381            let (stdin, stdout) = (
382                child.stdin.take().ok_or_else(|| {
383                    PtError::Internal(internal!("Created child process without stdin pipe"))
384                })?,
385                child.stdout.take().ok_or_else(|| {
386                    PtError::Internal(internal!("Created child process without stdout pipe"))
387                })?,
388            );
389            // TODO RELAY #1649 We don't use a tor_memquota::mq_queue here yet
390            let (mut tx, rx) = tor_async_utils::mpsc_channel_no_memquota(PT_STDIO_BUFFER);
391            let ident = identifier.clone();
392            #[allow(clippy::cognitive_complexity)]
393            thread::spawn(move || {
394                let reader = BufReader::new(stdout);
395                let _stdin = stdin;
396                let mut noted_full = false;
397                // Forward lines from the blocking reader to the async channel.
398                for line in reader.lines() {
399                    let err = line.is_err();
400                    match &line {
401                        Ok(l) => trace!("<-- PT {}: {:?}", ident, l),
402                        Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e),
403                    }
404                    if let Err(e) = tx.try_send(line) {
405                        if e.is_disconnected() {
406                            debug!("PT {} is disconnected; shutting it down.", ident);
407                            // Channel dropped, so shut down the pluggable transport process.
408                            break;
409                        }
410                        // The other kind of error is "full", which we can't do anything about.
411                        // Just throw the line away.
412                        if !noted_full {
413                            noted_full = true; // warn only once per PT.
414                            warn!(
415                                "Bug: Message queue for PT {} became full; dropping message",
416                                ident
417                            );
418                        }
419                    }
420                    if err {
421                        // Encountered an error reading, so ensure the process is shut down (it's
422                        // probably "broken pipe" anyway, so this is slightly redundant, but the
423                        // rest of the code assumes errors are nonrecoverable).
424                        break;
425                    }
426                }
427                // Has it already quit? If so, just exit now.
428                if let Ok(Some(_)) = child.try_wait() {
429                    // FIXME(eta): We currently throw away the exit code, which might be useful
430                    //             for debugging purposes!
431                    debug!("PT {} has exited.", ident);
432                    return;
433                }
434                // Otherwise, tell it to exit.
435                // Dropping stdin should tell the PT to exit, since we set the correct environment
436                // variable for that to happen.
437                trace!("Asking PT {} to exit, nicely.", ident);
438                drop(_stdin);
439                // Give it some time to exit.
440                thread::sleep(GRACEFUL_EXIT_TIME);
441                match child.try_wait() {
442                    Ok(None) => {
443                        // Kill it.
444                        debug!("Sending kill signal to PT {}", ident);
445                        if let Err(e) = child.kill() {
446                            warn_report!(e, "Failed to kill() spawned PT {}", ident);
447                        }
448                    }
449                    Ok(Some(_)) => {
450                        debug!("PT {} shut down successfully.", ident);
451                    } // It exited.
452                    Err(e) => {
453                        warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident);
454                    }
455                }
456            });
457            Ok(AsyncPtChild {
458                stdout: rx,
459                identifier,
460            })
461        }
462
463        /// Receive a message from the pluggable transport binary asynchronously.
464        ///
465        /// Note: This will convert `PtMessage::Log` into a tracing log call automatically.
466        pub async fn recv(&mut self) -> err::Result<PtMessage> {
467            loop {
468                match self.stdout.next().await {
469                    None => return Err(PtError::ChildGone),
470                    Some(Ok(line)) => {
471                        let line =
472                            line.parse::<PtMessage>()
473                                .map_err(|e| PtError::IpcParseFailed {
474                                    line,
475                                    error: e.into(),
476                                })?;
477                        if let PtMessage::Log { severity, message } = line {
478                            // FIXME(eta): I wanted to make this integrate with `tracing` more nicely,
479                            //             but gave up after 15 minutes of clicking through spaghetti.
480                            match &severity as &str {
481                                "error" => error!("[pt {}] {}", self.identifier, message),
482                                "warning" => warn!("[pt {}] {}", self.identifier, message),
483                                "notice" => info!("[pt {}] {}", self.identifier, message),
484                                "info" => debug!("[pt {}] {}", self.identifier, message),
485                                "debug" => trace!("[pt {}] {}", self.identifier, message),
486                                x => warn!("[pt] {} {} {}", self.identifier, x, message),
487                            }
488                        } else {
489                            return Ok(line);
490                        }
491                    }
492                    Some(Err(e)) => {
493                        return Err(PtError::ChildReadFailed(Arc::new(e)));
494                    }
495                }
496            }
497        }
498    }
499
500    /// Defines some helper methods that are required later on
501    #[async_trait::async_trait]
502    pub trait PluggableTransportPrivate {
503        /// Return the [`AsyncPtChild`] if it exists
504        fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError>;
505
506        /// Set the [`AsyncPtChild`]
507        fn set_inner(&mut self, newval: Option<AsyncPtChild>);
508
509        /// Return a loggable identifier for this transport.
510        fn identifier(&self) -> &str;
511
512        /// Checks whether a transport is specified in our specific parameters
513        fn specific_params_contains(&self, transport: &PtTransportName) -> bool;
514
515        /// Common handler for `ClientTransportLaunched` and `ServerTransportLaunched`
516        fn common_transport_launched_handler(
517            &self,
518            protocol: Option<String>,
519            transport: PtTransportName,
520            endpoint: SocketAddr,
521            methods: &mut HashMap<PtTransportName, PtClientMethod>,
522        ) -> Result<(), PtError> {
523            if !self.specific_params_contains(&transport) {
524                return Err(PtError::ProtocolViolation(format!(
525                    "binary launched unwanted transport '{}'",
526                    transport
527                )));
528            }
529            let protocol = match protocol {
530                Some(protocol_str) => match &protocol_str as &str {
531                    "socks4" => SocksVersion::V4,
532                    "socks5" => SocksVersion::V5,
533                    x => {
534                        return Err(PtError::ProtocolViolation(format!(
535                            "unknown CMETHOD protocol '{}'",
536                            x
537                        )))
538                    }
539                },
540                None => SocksVersion::V5,
541            };
542            let method = PtClientMethod {
543                kind: protocol,
544                endpoint,
545            };
546            info!("Transport '{}' uses method {:?}", transport, method);
547            methods.insert(transport, method);
548            Ok(())
549        }
550
551        /// Attempt to launch the PT and return the corresponding `[AsyncPtChild]`
552        fn get_child_from_pt_launch(
553            inner: &Option<AsyncPtChild>,
554            transports: &Vec<PtTransportName>,
555            binary_path: &PathBuf,
556            arguments: &[String],
557            all_env_vars: HashMap<OsString, OsString>,
558        ) -> Result<AsyncPtChild, PtError> {
559            if inner.is_some() {
560                let warning_msg =
561                    format!("Attempted to launch PT binary for {:?} twice.", transports);
562                warn!("{warning_msg}");
563                // WARN: this may not be the correct error to throw here
564                return Err(PtError::ChildProtocolViolation(warning_msg));
565            }
566            info!(
567                "Launching pluggable transport at {} for {:?}",
568                binary_path.display_lossy(),
569                transports
570            );
571            let child = Command::new(binary_path)
572                .args(arguments.iter())
573                .envs(all_env_vars)
574                .stdout(Stdio::piped())
575                .stdin(Stdio::piped())
576                .spawn()
577                .map_err(|e| PtError::ChildSpawnFailed {
578                    path: binary_path.clone(),
579                    error: Arc::new(e),
580                })?;
581
582            let identifier = crate::managed::pt_identifier(binary_path)?;
583            AsyncPtChild::new(child, identifier)
584        }
585
586        /// Consolidates some of the [`PtMessage`] potential matches to
587        /// deduplicate code
588        ///
589        /// Note that getting a [`PtMessage`] from this method implies that
590        /// the method was unable to match it and thus you should continue handling
591        /// the message. Getting [`None`] after error handling means that a match
592        /// was found and the appropriate action was successfully taken, and you don't
593        /// need to worry about it.
594        async fn try_match_common_messages<R: Runtime>(
595            &self,
596            rt: &R,
597            deadline: Instant,
598            async_child: &mut AsyncPtChild,
599        ) -> Result<Option<PtMessage>, PtError> {
600            match rt
601                .timeout(
602                    // FIXME(eta): It'd be nice if SleepProviderExt took an `Instant` natively.
603                    deadline.saturating_duration_since(Instant::now()),
604                    async_child.recv(),
605                )
606                .await
607                .map_err(|_| PtError::Timeout)??
608            {
609                PtMessage::ClientTransportFailed { transport, message }
610                | PtMessage::ServerTransportFailed { transport, message } => {
611                    warn!(
612                        "PT {} unable to launch {}. It said: {:?}",
613                        async_child.identifier, transport, message
614                    );
615                    return Err(PtError::TransportGaveError {
616                        transport: transport.to_string(),
617                        message,
618                    });
619                }
620                PtMessage::VersionError(e) => {
621                    if e != "no-version" {
622                        warn!("weird VERSION-ERROR: {}", e);
623                    }
624                    return Err(PtError::UnsupportedVersion);
625                }
626                PtMessage::Version(vers) => {
627                    if vers != "1" {
628                        return Err(PtError::ProtocolViolation(format!(
629                            "stated version is {}, asked for 1",
630                            vers
631                        )));
632                    }
633                    Ok(None)
634                }
635                PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)),
636                PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)),
637                // TODO(eta): We don't do anything with these right now!
638                PtMessage::Status(_) => Ok(None),
639                PtMessage::Unknown(x) => {
640                    warn!("unknown PT line: {}", x);
641                    Ok(None)
642                }
643                // Return the PtMessage as it is for further processing
644                // TODO: handle [`PtError::ProtocolViolation`] here somehow
645                x => {
646                    return Ok(Some(x));
647                }
648            }
649        }
650    }
651}
652
653/// Common parameters passed to a pluggable transport.
654#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
655pub struct PtCommonParameters {
656    /// A path where the launched PT can store state.
657    state_location: PathBuf,
658    /// An IPv4 address to bind outgoing connections to (if specified).
659    ///
660    /// Leaving this out will mean the PT uses a sane default.
661    #[builder(default)]
662    outbound_bind_v4: Option<Ipv4Addr>,
663    /// An IPv6 address to bind outgoing connections to (if specified).
664    ///
665    /// Leaving this out will mean the PT uses a sane default.
666    #[builder(default)]
667    outbound_bind_v6: Option<Ipv6Addr>,
668    /// The maximum time we should wait for a pluggable transport binary to report successful
669    /// initialization. If `None`, a default value is used.
670    #[builder(default)]
671    timeout: Option<Duration>,
672}
673
674impl PtCommonParameters {
675    /// Return a new `PtCommonParametersBuilder` for constructing a set of parameters.
676    pub fn builder() -> PtCommonParametersBuilder {
677        PtCommonParametersBuilder::default()
678    }
679
680    /// Convert these parameters into a set of environment variables to be passed to the PT binary
681    /// in accordance with the specification.
682    fn common_environment_variables(&self) -> HashMap<OsString, OsString> {
683        let mut ret = HashMap::new();
684        ret.insert("TOR_PT_MANAGED_TRANSPORT_VER".into(), "1".into());
685        ret.insert(
686            "TOR_PT_STATE_LOCATION".into(),
687            self.state_location.clone().into_os_string(),
688        );
689        ret.insert("TOR_PT_EXIT_ON_STDIN_CLOSE".into(), "1".into());
690        if let Some(v4) = self.outbound_bind_v4 {
691            ret.insert(
692                "TOR_PT_OUTBOUND_BIND_ADDRESS_V4".into(),
693                v4.to_string().into(),
694            );
695        }
696        if let Some(v6) = self.outbound_bind_v6 {
697            // pt-spec.txt: "IPv6 addresses MUST always be wrapped in square brackets."
698            ret.insert(
699                "TOR_PT_OUTBOUND_BIND_ADDRESS_V6".into(),
700                format!("[{}]", v6).into(),
701            );
702        }
703        ret
704    }
705}
706
707/// Parameters passed only to a pluggable transport client.
708#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
709pub struct PtClientParameters {
710    /// A SOCKS URI specifying a proxy to use.
711    #[builder(default)]
712    proxy_uri: Option<String>,
713    /// A list of transports to initialise.
714    ///
715    /// The PT launch will fail if all transports are not successfully initialised.
716    transports: Vec<PtTransportName>,
717}
718
719impl PtClientParameters {
720    /// Return a new `PtClientParametersBuilder` for constructing a set of parameters.
721    pub fn builder() -> PtClientParametersBuilder {
722        PtClientParametersBuilder::default()
723    }
724
725    /// Convert these parameters into a set of environment variables to be passed to the PT binary
726    /// in accordance with the specification.
727    fn environment_variables(
728        &self,
729        common_params: &PtCommonParameters,
730    ) -> HashMap<OsString, OsString> {
731        let mut ret = common_params.common_environment_variables();
732        if let Some(ref proxy_uri) = self.proxy_uri {
733            ret.insert("TOR_PT_PROXY".into(), proxy_uri.clone().into());
734        }
735        ret.insert(
736            "TOR_PT_CLIENT_TRANSPORTS".into(),
737            self.transports.iter().join(",").into(),
738        );
739        ret
740    }
741}
742
743/// Parameters passed only to a pluggable transport server.
744#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
745pub struct PtServerParameters {
746    /// A list of transports to initialise.
747    ///
748    /// The PT launch will fail if all transports are not successfully initialised.
749    transports: Vec<PtTransportName>,
750    /// Transport options for each server transport
751    #[builder(default)]
752    server_transport_options: String,
753    /// Set host:port on which the server transport should listen for connections
754    #[builder(default)]
755    server_bindaddr: String,
756    /// Set host:port on which the server transport should forward requests
757    #[builder(default)]
758    server_orport: Option<String>,
759    /// Set host:port on which the server transport should forward requests (extended ORPORT)
760    #[builder(default)]
761    server_extended_orport: Option<String>,
762}
763
764impl PtServerParameters {
765    /// Return a new `PtServerParametersBuilder` for constructing a set of parameters.
766    pub fn builder() -> PtServerParametersBuilder {
767        PtServerParametersBuilder::default()
768    }
769
770    /// Convert these parameters into a set of environment variables to be passed to the PT binary
771    /// in accordance with the specification.
772    fn environment_variables(
773        &self,
774        common_params: &PtCommonParameters,
775    ) -> HashMap<OsString, OsString> {
776        let mut ret = common_params.common_environment_variables();
777        ret.insert(
778            "TOR_PT_SERVER_TRANSPORTS".into(),
779            self.transports.iter().join(",").into(),
780        );
781        ret.insert(
782            "TOR_PT_SERVER_TRANSPORT_OPTIONS".into(),
783            self.server_transport_options.clone().into(),
784        );
785        ret.insert(
786            "TOR_PT_SERVER_BINDADDR".into(),
787            self.server_bindaddr.clone().into(),
788        );
789        if let Some(ref server_orport) = self.server_orport {
790            ret.insert("TOR_PT_ORPORT".into(), server_orport.into());
791        }
792        if let Some(ref server_extended_orport) = self.server_extended_orport {
793            ret.insert(
794                "TOR_PT_EXTENDED_SERVER_PORT".into(),
795                server_extended_orport.into(),
796            );
797        }
798        ret
799    }
800}
801
802/// Common functionality implemented to allow code reuse
803#[async_trait::async_trait]
804#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
805pub trait PluggableTransport: PluggableTransportPrivate {
806    /// Get all client methods returned by the binary, if it has been launched.
807    ///
808    /// If it hasn't been launched, the returned map will be empty.
809    // TODO(eta): Actually figure out a way to expose this more stably.
810    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod>;
811
812    /// Get the next [`PtMessage`] from the running transport. It is recommended to call this
813    /// in a loop once a PT has been launched, in order to forward log messages and find out about
814    /// status updates.
815    //
816    // FIXME(eta): This API will probably go away and get replaced with something better.
817    //             In particular, we'd want to cache `Status` messages from before this method
818    //             was called.
819    async fn next_message(&mut self) -> err::Result<PtMessage> {
820        let inner = self.inner()?;
821        let ret = inner.recv().await;
822        if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = &ret {
823            // FIXME(eta): Currently this lets the caller still think the methods work by calling
824            //             transport_methods.
825            debug!(
826                "PT {}: Received {:?}; shutting down.",
827                self.identifier(),
828                ret
829            );
830            self.set_inner(None);
831        }
832        ret
833    }
834}
835/// A pluggable transport binary in a child process.
836///
837/// These start out inert, and must be launched with [`PluggableClientTransport::launch`] in order
838/// to be useful.
839#[derive(Debug)]
840pub struct PluggableClientTransport {
841    /// The currently running child, if there is one.
842    inner: Option<AsyncPtChild>,
843    /// The path to the binary to run.
844    pub(crate) binary_path: PathBuf,
845    /// Arguments to pass to the binary.
846    arguments: Vec<String>,
847    /// Configured parameters.
848    common_params: PtCommonParameters,
849    /// Configured client-only parameters.
850    client_params: PtClientParameters,
851    /// Information about client methods obtained from the PT.
852    cmethods: HashMap<PtTransportName, PtClientMethod>,
853}
854
855impl PluggableTransport for PluggableClientTransport {
856    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
857        &self.cmethods
858    }
859}
860
861impl PluggableTransportPrivate for PluggableClientTransport {
862    fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
863        self.inner.as_mut().ok_or(PtError::ChildGone)
864    }
865    fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
866        self.inner = newval;
867    }
868    fn identifier(&self) -> &str {
869        match &self.inner {
870            Some(child) => &child.identifier,
871            None => "<not yet launched>",
872        }
873    }
874    fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
875        self.client_params.transports.contains(transport)
876    }
877}
878
879impl PluggableClientTransport {
880    /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing
881    /// the `params` to it.
882    ///
883    /// You must call [`PluggableClientTransport::launch`] to actually run the PT.
884    pub fn new(
885        binary_path: PathBuf,
886        arguments: Vec<String>,
887        common_params: PtCommonParameters,
888        client_params: PtClientParameters,
889    ) -> Self {
890        Self {
891            common_params,
892            client_params,
893            arguments,
894            binary_path,
895            inner: None,
896            cmethods: Default::default(),
897        }
898    }
899
900    /// Launch the pluggable transport, executing the binary.
901    ///
902    /// Will return an error if the launch fails, one of the transports fail, not all transports
903    /// were launched, or the launch times out.
904    pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
905        let all_env_vars = self
906            .client_params
907            .environment_variables(&self.common_params);
908
909        let mut async_child =
910            <PluggableClientTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
911                &self.inner,
912                &self.client_params.transports,
913                &self.binary_path,
914                &self.arguments,
915                all_env_vars,
916            )?;
917
918        let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
919        let mut cmethods = HashMap::new();
920        let mut proxy_done = self.client_params.proxy_uri.is_none();
921
922        loop {
923            match self
924                .try_match_common_messages(&rt, deadline, &mut async_child)
925                .await
926            {
927                Ok(maybe_message) => {
928                    if let Some(message) = maybe_message {
929                        match message {
930                            PtMessage::ClientTransportLaunched {
931                                transport,
932                                protocol,
933                                endpoint,
934                            } => {
935                                self.common_transport_launched_handler(
936                                    Some(protocol),
937                                    transport,
938                                    endpoint,
939                                    &mut cmethods,
940                                )?;
941                            }
942                            PtMessage::ProxyDone => {
943                                if proxy_done {
944                                    return Err(PtError::ProtocolViolation(
945                                        "binary initiated proxy when not asked (or twice)".into(),
946                                    ));
947                                }
948                                info!("PT binary now proxying connections via supplied URI");
949                                proxy_done = true;
950                            }
951                            // TODO: unify most of the handling of ClientTransportsDone with ServerTransportsDone
952                            PtMessage::ClientTransportsDone => {
953                                let unsupported = self
954                                    .client_params
955                                    .transports
956                                    .iter()
957                                    .filter(|&x| !cmethods.contains_key(x))
958                                    .map(|x| x.to_string())
959                                    .collect::<Vec<_>>();
960                                if !unsupported.is_empty() {
961                                    warn!(
962                                        "PT binary failed to initialise transports: {:?}",
963                                        unsupported
964                                    );
965                                    return Err(PtError::ClientTransportsUnsupported(unsupported));
966                                }
967                                info!("PT binary initialisation done");
968                                break;
969                            }
970                            x => {
971                                return Err(PtError::ProtocolViolation(format!(
972                                    "received unexpected {:?}",
973                                    x
974                                )));
975                            }
976                        }
977                    }
978                }
979                Err(e) => return Err(e),
980            }
981        }
982        self.cmethods = cmethods;
983        self.inner = Some(async_child);
984        // TODO(eta): We need to expose the log and status messages after this function exits!
985        Ok(())
986    }
987}
988
989/// A pluggable transport server binary in a child process.
990///
991/// These start out inert, and must be launched with [`PluggableServerTransport::launch`] in order
992/// to be useful.
993#[derive(Debug)]
994pub struct PluggableServerTransport {
995    /// The currently running child, if there is one.
996    inner: Option<AsyncPtChild>,
997    /// The path to the binary to run.
998    pub(crate) binary_path: PathBuf,
999    /// Arguments to pass to the binary.
1000    arguments: Vec<String>,
1001    /// Configured parameters.
1002    common_params: PtCommonParameters,
1003    /// Configured server-only parameters.
1004    server_params: PtServerParameters,
1005    /// Information about server methods obtained from the PT.
1006    smethods: HashMap<PtTransportName, PtClientMethod>,
1007}
1008
1009impl PluggableTransportPrivate for PluggableServerTransport {
1010    fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
1011        self.inner.as_mut().ok_or(PtError::ChildGone)
1012    }
1013    fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
1014        self.inner = newval;
1015    }
1016    fn identifier(&self) -> &str {
1017        match &self.inner {
1018            Some(child) => &child.identifier,
1019            None => "<not yet launched>",
1020        }
1021    }
1022    fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
1023        self.server_params.transports.contains(transport)
1024    }
1025}
1026
1027impl PluggableTransport for PluggableServerTransport {
1028    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
1029        &self.smethods
1030    }
1031}
1032
1033impl PluggableServerTransport {
1034    /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing
1035    /// the `params` to it.
1036    ///
1037    /// You must call [`PluggableServerTransport::launch`] to actually run the PT.
1038    pub fn new(
1039        binary_path: PathBuf,
1040        arguments: Vec<String>,
1041        common_params: PtCommonParameters,
1042        server_params: PtServerParameters,
1043    ) -> Self {
1044        Self {
1045            common_params,
1046            server_params,
1047            arguments,
1048            binary_path,
1049            inner: None,
1050            smethods: Default::default(),
1051        }
1052    }
1053
1054    /// Launch the pluggable transport, executing the binary.
1055    ///
1056    /// Will return an error if the launch fails, one of the transports fail, not all transports
1057    /// were launched, or the launch times out.
1058    pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
1059        let all_env_vars = self
1060            .server_params
1061            .environment_variables(&self.common_params);
1062
1063        let mut async_child =
1064            <PluggableServerTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
1065                &self.inner,
1066                &self.server_params.transports,
1067                &self.binary_path,
1068                &self.arguments,
1069                all_env_vars,
1070            )?;
1071
1072        let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
1073        let mut smethods = HashMap::new();
1074
1075        loop {
1076            match self
1077                .try_match_common_messages(&rt, deadline, &mut async_child)
1078                .await
1079            {
1080                Ok(maybe_message) => {
1081                    if let Some(message) = maybe_message {
1082                        match message {
1083                            PtMessage::ServerTransportLaunched {
1084                                transport,
1085                                endpoint,
1086                                options: _,
1087                            } => {
1088                                self.common_transport_launched_handler(
1089                                    None,
1090                                    transport,
1091                                    endpoint,
1092                                    &mut smethods,
1093                                )?;
1094                            }
1095                            PtMessage::ServerTransportsDone => {
1096                                let unsupported = self
1097                                    .server_params
1098                                    .transports
1099                                    .iter()
1100                                    .filter(|&x| !smethods.contains_key(x))
1101                                    .map(|x| x.to_string())
1102                                    .collect::<Vec<_>>();
1103                                if !unsupported.is_empty() {
1104                                    warn!(
1105                                        "PT binary failed to initialise transports: {:?}",
1106                                        unsupported
1107                                    );
1108                                    return Err(PtError::ClientTransportsUnsupported(unsupported));
1109                                }
1110                                info!("PT binary initialisation done");
1111                                break;
1112                            }
1113                            x => {
1114                                return Err(PtError::ProtocolViolation(format!(
1115                                    "received unexpected {:?}",
1116                                    x
1117                                )));
1118                            }
1119                        }
1120                    }
1121                }
1122                Err(e) => return Err(e),
1123            }
1124        }
1125        self.smethods = smethods;
1126        self.inner = Some(async_child);
1127        // TODO(eta): We need to expose the log and status messages after this function exits!
1128        Ok(())
1129    }
1130}
1131
1132#[cfg(test)]
1133mod test {
1134    // @@ begin test lint list maintained by maint/add_warning @@
1135    #![allow(clippy::bool_assert_comparison)]
1136    #![allow(clippy::clone_on_copy)]
1137    #![allow(clippy::dbg_macro)]
1138    #![allow(clippy::mixed_attributes_style)]
1139    #![allow(clippy::print_stderr)]
1140    #![allow(clippy::print_stdout)]
1141    #![allow(clippy::single_char_pattern)]
1142    #![allow(clippy::unwrap_used)]
1143    #![allow(clippy::unchecked_duration_subtraction)]
1144    #![allow(clippy::useless_vec)]
1145    #![allow(clippy::needless_pass_by_value)]
1146    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1147
1148    use crate::ipc::{PtMessage, PtStatus};
1149    use std::borrow::Cow;
1150    use std::collections::HashMap;
1151    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1152
1153    #[test]
1154    fn it_parses_spec_examples() {
1155        assert_eq!(
1156            "VERSION-ERROR no-version".parse(),
1157            Ok(PtMessage::VersionError("no-version".into()))
1158        );
1159        assert_eq!("VERSION 1".parse(), Ok(PtMessage::Version("1".into())));
1160        assert_eq!(
1161            "ENV-ERROR No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".parse(),
1162            Ok(PtMessage::EnvError(
1163                "No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".into()
1164            ))
1165        );
1166        assert_eq!("PROXY DONE".parse(), Ok(PtMessage::ProxyDone));
1167        assert_eq!(
1168            "PROXY-ERROR SOCKS 4 upstream proxies unsupported".parse(),
1169            Ok(PtMessage::ProxyError(
1170                "SOCKS 4 upstream proxies unsupported".into()
1171            ))
1172        );
1173        assert_eq!(
1174            "CMETHOD trebuchet socks5 127.0.0.1:19999".parse(),
1175            Ok(PtMessage::ClientTransportLaunched {
1176                transport: "trebuchet".parse().unwrap(),
1177                protocol: "socks5".to_string(),
1178                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19999)
1179            })
1180        );
1181        assert_eq!(
1182            "CMETHOD-ERROR trebuchet no rocks available".parse(),
1183            Ok(PtMessage::ClientTransportFailed {
1184                transport: "trebuchet".parse().unwrap(),
1185                message: "no rocks available".to_string()
1186            })
1187        );
1188        assert_eq!("CMETHODS DONE".parse(), Ok(PtMessage::ClientTransportsDone));
1189        assert_eq!(
1190            "SMETHOD trebuchet 198.51.100.1:19999".parse(),
1191            Ok(PtMessage::ServerTransportLaunched {
1192                transport: "trebuchet".parse().unwrap(),
1193                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 19999),
1194                options: Default::default()
1195            })
1196        );
1197        let mut map = HashMap::new();
1198        map.insert("N".to_string(), "13".to_string());
1199        assert_eq!(
1200            "SMETHOD rot_by_N 198.51.100.1:2323 ARGS:N=13".parse(),
1201            Ok(PtMessage::ServerTransportLaunched {
1202                transport: "rot_by_N".parse().unwrap(),
1203                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 2323),
1204                options: map
1205            })
1206        );
1207        let mut map = HashMap::new();
1208        map.insert(
1209            "cert".to_string(),
1210            "HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw".to_string(),
1211        );
1212        map.insert("iat-mode".to_string(), "0".to_string());
1213        assert_eq!(
1214            "SMETHOD obfs4 198.51.100.1:43734 ARGS:cert=HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw,iat-mode=0".parse(),
1215            Ok(PtMessage::ServerTransportLaunched {
1216                transport: "obfs4".parse().unwrap(),
1217                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 43734),
1218                options: map
1219            })
1220        );
1221        assert_eq!(
1222            "SMETHOD-ERROR trebuchet no cows available".parse(),
1223            Ok(PtMessage::ServerTransportFailed {
1224                transport: "trebuchet".parse().unwrap(),
1225                message: "no cows available".to_string()
1226            })
1227        );
1228        assert_eq!(
1229            "LOG SEVERITY=debug MESSAGE=\"Connected to bridge A\"".parse(),
1230            Ok(PtMessage::Log {
1231                severity: "debug".to_string(),
1232                message: "Connected to bridge A".to_string()
1233            })
1234        );
1235        assert_eq!(
1236            "LOG SEVERITY=debug MESSAGE=\"\\r\\n\\t\"".parse(),
1237            Ok(PtMessage::Log {
1238                severity: "debug".to_string(),
1239                message: "\r\n\t".to_string()
1240            })
1241        );
1242        assert_eq!(
1243            "LOG SEVERITY=debug MESSAGE=".parse(),
1244            Ok(PtMessage::Log {
1245                severity: "debug".to_string(),
1246                message: "".to_string()
1247            })
1248        );
1249        assert_eq!(
1250            "LOG SEVERITY=debug MESSAGE=\"\\a\"".parse::<PtMessage>(),
1251            Ok(PtMessage::Log {
1252                severity: "debug".to_string(),
1253                message: "a".to_string()
1254            })
1255        );
1256
1257        for i in 0..9 {
1258            let msg = format!("LOG SEVERITY=debug MESSAGE=\"\\{i}\"");
1259            assert_eq!(
1260                msg.parse::<PtMessage>(),
1261                Err(Cow::from("attempted unsupported octal escape code"))
1262            );
1263        }
1264        assert_eq!(
1265            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=0\\".parse::<PtMessage>(),
1266            Err(Cow::from(
1267                "failed to parse SMETHOD ARGS: smethod arg terminates with backslash"
1268            ))
1269        );
1270        assert_eq!(
1271            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=fo=o".parse::<PtMessage>(),
1272            Err(Cow::from(
1273                "failed to parse SMETHOD ARGS: encountered = while parsing value"
1274            ))
1275        );
1276        assert_eq!(
1277            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode".parse::<PtMessage>(),
1278            Err(Cow::from(
1279                "failed to parse SMETHOD ARGS: ran out of chars parsing smethod arg"
1280            ))
1281        );
1282
1283        let mut map = HashMap::new();
1284        map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1285        map.insert("CONNECT".to_string(), "Success".to_string());
1286        assert_eq!(
1287            "STATUS ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1288            Ok(PtMessage::Status(PtStatus { data: map }))
1289        );
1290
1291        let mut map = HashMap::new();
1292        map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1293        map.insert("CONNECT".to_string(), "Success".to_string());
1294        map.insert("TRANSPORT".to_string(), "obfs4".to_string());
1295        assert_eq!(
1296            "STATUS TRANSPORT=obfs4 ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1297            Ok(PtMessage::Status(PtStatus { data: map }))
1298        );
1299
1300        let mut map = HashMap::new();
1301        map.insert("ADDRESS".to_string(), "198.51.100.222:2222".to_string());
1302        map.insert("CONNECT".to_string(), "Failed".to_string());
1303        map.insert("FINGERPRINT".to_string(), "<Fingerprint>".to_string());
1304        map.insert("ERRSTR".to_string(), "Connection refused".to_string());
1305        assert_eq!(
1306            "STATUS ADDRESS=198.51.100.222:2222 CONNECT=Failed FINGERPRINT=<Fingerprint> ERRSTR=\"Connection refused\"".parse(),
1307            Ok(PtMessage::Status(PtStatus {
1308                data: map
1309            }))
1310        );
1311    }
1312}