tor_ptmgr/
ipc.rs

1//! Launching pluggable transport binaries and communicating with them.
2//!
3//! This module contains utilities to launch pluggable transports supporting pt-spec.txt
4//! version 1, and communicate with them in order to specify configuration parameters and
5//! receive updates as to the current state of the PT.
6
7use crate::err;
8use crate::err::PtError;
9use crate::PtClientMethod;
10use futures::channel::mpsc::Receiver;
11use futures::StreamExt;
12use itertools::Itertools;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::ffi::OsString;
16use std::io::{BufRead, BufReader};
17use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
18use std::path::PathBuf;
19use std::process::{Child, Command, Stdio};
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use std::{io, thread};
24use tor_basic_utils::PathExt as _;
25use tor_error::{internal, warn_report};
26use tor_linkspec::PtTransportName;
27use tor_rtcompat::{Runtime, SleepProviderExt};
28use tor_socksproto::SocksVersion;
29use tracing::{debug, error, info, trace, warn};
30
31/// Amount of time we give a pluggable transport child process to exit gracefully.
32const GRACEFUL_EXIT_TIME: Duration = Duration::from_secs(5);
33/// Default timeout for PT binary startup.
34const PT_START_TIMEOUT: Duration = Duration::from_secs(30);
35/// Size for the buffer storing pluggable transport stdout lines.
36const PT_STDIO_BUFFER: usize = 64;
37
38/// An arbitrary key/value status update from a pluggable transport.
39#[derive(PartialEq, Eq, Debug, Clone)]
40pub struct PtStatus {
41    /// Arbitrary key-value data about the state of this transport, from the binary running
42    /// said transport.
43    // NOTE(eta): This is assumed to not have duplicate keys.
44    data: HashMap<String, String>,
45}
46
47/// A message sent from a pluggable transport child process.
48///
49/// For more in-depth information about these messages, consult pt-spec.txt.
50#[derive(PartialEq, Eq, Debug, Clone)]
51#[non_exhaustive]
52#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
53pub enum PtMessage {
54    /// `VERSION-ERROR`: No compatible pluggable transport specification version was provided.
55    VersionError(String),
56    /// `VERSION`: Specifies the version the binary is using for the IPC protocol.
57    Version(String),
58    /// `ENV-ERROR`: Reports an error with the provided environment variables.
59    EnvError(String),
60    /// `PROXY DONE`: The configured proxy was correctly initialised.
61    ProxyDone,
62    /// `PROXY-ERROR`: An error was encountered setting up the configured proxy.
63    ProxyError(String),
64    /// `CMETHOD`: A client transport has been launched.
65    ClientTransportLaunched {
66        /// The name of the launched transport.
67        transport: PtTransportName,
68        /// The protocol used ('socks4' or 'socks5').
69        protocol: String,
70        /// An address to connect via this transport.
71        /// (This should be localhost.)
72        endpoint: SocketAddr,
73    },
74    /// `CMETHOD-ERROR`: An error was encountered setting up a client transport.
75    ClientTransportFailed {
76        /// The name of the transport.
77        transport: PtTransportName,
78        /// The error message.
79        message: String,
80    },
81    /// `CMETHODS DONE`: All client transports that are supported have been launched.
82    ClientTransportsDone,
83    /// `SMETHOD`: A server transport has been launched.
84    ServerTransportLaunched {
85        /// The name of the launched transport.
86        transport: PtTransportName,
87        /// The endpoint clients should use the reach the transport.
88        endpoint: SocketAddr,
89        /// Additional per-transport information.
90        // NOTE(eta): This assumes it actually is k/v and repeated keys aren't allowed...
91        options: HashMap<String, String>,
92    },
93    /// `SMETHOD-ERROR`: An error was encountered setting up a server transport.
94    ServerTransportFailed {
95        /// The name of the transport.
96        transport: PtTransportName,
97        /// The error message.
98        message: String,
99    },
100    /// `SMETHODS DONE`: All server transports that are supported have been launched.
101    ServerTransportsDone,
102    /// `LOG`: A log message.
103    Log {
104        /// The severity (one of 'error', 'warning', 'notice', 'info', 'debug').
105        severity: String,
106        /// The log message.
107        message: String,
108    },
109    /// `STATUS`: Arbitrary key/value status messages.
110    Status(PtStatus),
111    /// A line containing an unknown command.
112    Unknown(String),
113}
114
115/// Parse a value (something on the RHS of an =), which could be a CString as defined by
116/// control-spec.txt §2. Returns (value, unparsed rest of string).
117fn parse_one_value(from: &str) -> Result<(String, &str), &'static str> {
118    let first_char = from.chars().next();
119    Ok(if first_char.is_none() {
120        (String::new(), "")
121    } else if let Some('"') = first_char {
122        // This is a CString, so we're going to need to parse it char-by-char.
123        // FIXME(eta): This currently doesn't parse octal escape codes, even though the spec says
124        //             we should. That's finicky, though, and probably not used.
125        let mut ret = String::new();
126        let mut chars = from.chars();
127        assert_eq!(chars.next(), Some('"')); // discard "
128        loop {
129            let ch = chars.next().ok_or("ran out of input parsing CString")?;
130            match ch {
131                '\\' => match chars
132                    .next()
133                    .ok_or("encountered trailing backslash in CString")?
134                {
135                    'n' => ret.push('\n'),
136                    'r' => ret.push('\r'),
137                    't' => ret.push('\t'),
138                    '0'..='8' => return Err("attempted unsupported octal escape code"),
139                    ch2 => ret.push(ch2),
140                },
141                '"' => break,
142                _ => ret.push(ch),
143            }
144        }
145        (ret, chars.as_str())
146    } else {
147        // Simple: just find the space
148        let space = from.find(' ').unwrap_or(from.len());
149        (from[0..space].into(), &from[space..])
150    })
151}
152
153/// Chomp one key/value pair off a list of smethod args.
154/// Returns (k, v, unparsed rest of string).
155/// Will also chomp the comma at the end, if there is one.
156fn parse_one_smethod_arg(args: &str) -> Result<(String, String, &str), &'static str> {
157    // NOTE(eta): Apologies for this looking a bit gnarly. Ideally, this is what you'd use
158    //            something like `nom` for, but I didn't want to bring in a dep just for this.
159
160    let mut key = String::new();
161    let mut val = String::new();
162    // If true, we're reading the value, not the key.
163    let mut reading_val = false;
164    let mut chars = args.chars();
165    while let Some(c) = chars.next() {
166        let target = if reading_val { &mut val } else { &mut key };
167        match c {
168            '\\' => {
169                let c = chars
170                    .next()
171                    .ok_or("smethod arg terminates with backslash")?;
172                target.push(c);
173            }
174            '=' => {
175                if reading_val {
176                    return Err("encountered = while parsing value");
177                }
178                reading_val = true;
179            }
180            ',' => break,
181            c => target.push(c),
182        }
183    }
184    if !reading_val {
185        return Err("ran out of chars parsing smethod arg");
186    }
187    Ok((key, val, chars.as_str()))
188}
189
190impl FromStr for PtMessage {
191    type Err = Cow<'static, str>;
192
193    // NOTE(eta): This, of course, implies that the PT IPC communications are valid UTF-8.
194    //            This assumption might turn out to be false.
195    #[allow(clippy::cognitive_complexity)]
196    fn from_str(s: &str) -> Result<Self, Self::Err> {
197        // TODO(eta): Maybe tolerate additional whitespace (using `split_whitespace`)?.
198        //            This requires modified words.join() logic, though.
199        let mut words = s.split(' ');
200        let first_word = words.next().ok_or_else(|| Cow::from("empty line"))?;
201        Ok(match first_word {
202            "VERSION-ERROR" => {
203                let rest = words.join(" ");
204                Self::VersionError(rest)
205            }
206            "VERSION" => {
207                let vers = words.next().ok_or_else(|| Cow::from("no version"))?;
208                Self::Version(vers.into())
209            }
210            "ENV-ERROR" => {
211                let rest = words.join(" ");
212                Self::EnvError(rest)
213            }
214            "PROXY" => match words.next() {
215                Some("DONE") => Self::ProxyDone,
216                _ => Self::Unknown(s.into()),
217            },
218            "PROXY-ERROR" => {
219                let rest = words.join(" ");
220                Self::ProxyError(rest)
221            }
222            "CMETHOD" => {
223                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
224                let protocol = words.next().ok_or_else(|| Cow::from("no protocol"))?;
225                let endpoint = words
226                    .next()
227                    .ok_or_else(|| Cow::from("no endpoint"))?
228                    .parse::<SocketAddr>()
229                    .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
230                if !endpoint.ip().is_loopback() {
231                    return Err(Cow::from(format!(
232                        "CMETHOD endpoint {endpoint} was not localhost"
233                    )));
234                }
235                Self::ClientTransportLaunched {
236                    transport: transport
237                        .parse()
238                        .map_err(|_| Cow::from("bad transport ID"))?,
239                    protocol: protocol.to_string(),
240                    endpoint,
241                }
242            }
243            "CMETHOD-ERROR" => {
244                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
245                let rest = words.join(" ");
246                Self::ClientTransportFailed {
247                    transport: transport
248                        .parse()
249                        .map_err(|_| Cow::from("bad transport ID"))?,
250                    message: rest,
251                }
252            }
253            "CMETHODS" => match words.next() {
254                Some("DONE") => Self::ClientTransportsDone,
255                _ => Self::Unknown(s.into()),
256            },
257            "SMETHOD" => {
258                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
259                let endpoint = words
260                    .next()
261                    .ok_or_else(|| Cow::from("no endpoint"))?
262                    .parse::<SocketAddr>()
263                    .map_err(|e| Cow::from(format!("failed to parse endpoint: {}", e)))?;
264                // The SMETHOD endpoint is the place where _clients_ connect, and it shouldn't be localhost.
265                let mut parsed_args = HashMap::new();
266
267                // NOTE(eta): pt-spec.txt seems to imply these options can't contain spaces, so
268                //            we work under that assumption.
269                //            It also doesn't actually parse them out -- but seeing as the API to
270                //            feed these back in will want them as separated k/v pairs, I think
271                //            it makes sense to here.
272                for option in words {
273                    if let Some(mut args) = option.strip_prefix("ARGS:") {
274                        while !args.is_empty() {
275                            let (k, v, rest) = parse_one_smethod_arg(args).map_err(|e| {
276                                Cow::from(format!("failed to parse SMETHOD ARGS: {}", e))
277                            })?;
278                            if parsed_args.contains_key(&k) {
279                                // At least check our assumption that this is actually k/v
280                                // and not Vec<(String, String)>.
281                                warn!("PT SMETHOD arguments contain repeated key {}!", k);
282                            }
283                            parsed_args.insert(k, v);
284                            args = rest;
285                        }
286                    }
287                }
288                Self::ServerTransportLaunched {
289                    transport: transport
290                        .parse()
291                        .map_err(|_| Cow::from("bad transport ID"))?,
292                    endpoint,
293                    options: parsed_args,
294                }
295            }
296            "SMETHOD-ERROR" => {
297                let transport = words.next().ok_or_else(|| Cow::from("no transport"))?;
298                let rest = words.join(" ");
299                Self::ServerTransportFailed {
300                    transport: transport
301                        .parse()
302                        .map_err(|_| Cow::from("bad transport ID"))?,
303                    message: rest,
304                }
305            }
306            "SMETHODS" => match words.next() {
307                Some("DONE") => Self::ServerTransportsDone,
308                _ => Self::Unknown(s.into()),
309            },
310            "LOG" => {
311                let severity = words
312                    .next()
313                    .ok_or_else(|| Cow::from("no severity"))?
314                    .strip_prefix("SEVERITY=")
315                    .ok_or_else(|| Cow::from("badly formatted severity"))?;
316                let message = words.join(" ");
317                let message = parse_one_value(
318                    message
319                        .strip_prefix("MESSAGE=")
320                        .ok_or_else(|| Cow::from("no or badly formatted message"))?,
321                )
322                .map_err(Cow::from)?
323                .0;
324                Self::Log {
325                    severity: severity.into(),
326                    message,
327                }
328            }
329            "STATUS" => {
330                let mut ret = HashMap::new();
331                let message = words.join(" ");
332                let mut message = &message as &str;
333                while !message.is_empty() {
334                    let equals = message
335                        .find('=')
336                        .ok_or_else(|| Cow::from(format!("failed to find = in '{}'", message)))?;
337                    let k = &message[..equals];
338                    if equals + 1 == message.len() {
339                        return Err(Cow::from("key with no value"));
340                    }
341                    let (v, rest) = parse_one_value(&message[(equals + 1)..]).map_err(Cow::from)?;
342                    if ret.contains_key(k) {
343                        // At least check our assumption that this is actually k/v
344                        // and not Vec<(String, String)>.
345                        warn!("STATUS contains repeated key {}!", k);
346                    }
347                    ret.insert(k.to_owned(), v);
348                    message = rest;
349                    if message.starts_with(' ') {
350                        message = &message[1..];
351                    }
352                }
353                Self::Status(PtStatus { data: ret })
354            }
355            _ => Self::Unknown(s.into()),
356        })
357    }
358}
359
360use sealed::*;
361/// Sealed trait to protect private types and default trait implementations
362pub(crate) mod sealed {
363    use super::*;
364
365    /// A handle to receive lines from a pluggable transport process' stdout asynchronously.
366    //
367    // FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without
368    //             being async-runtime dependent (or adding process spawning to tor-rtcompat).
369    #[derive(Debug)]
370    pub struct AsyncPtChild {
371        /// Channel to receive lines from the child process stdout.
372        stdout: Receiver<io::Result<String>>,
373        /// Identifier to put in logging messages.
374        pub identifier: String,
375    }
376
377    impl AsyncPtChild {
378        /// Wrap an OS child process by spawning a worker thread to forward output from the child
379        /// to the asynchronous runtime via use of a channel.
380        pub fn new(mut child: Child, identifier: String) -> Result<Self, PtError> {
381            let (stdin, stdout) = (
382                child.stdin.take().ok_or_else(|| {
383                    PtError::Internal(internal!("Created child process without stdin pipe"))
384                })?,
385                child.stdout.take().ok_or_else(|| {
386                    PtError::Internal(internal!("Created child process without stdout pipe"))
387                })?,
388            );
389            // TODO RELAY #1649 We don't use a tor_memquota::mq_queue here yet
390            let (mut tx, rx) = tor_async_utils::mpsc_channel_no_memquota(PT_STDIO_BUFFER);
391            let ident = identifier.clone();
392            #[allow(clippy::cognitive_complexity)]
393            thread::spawn(move || {
394                let reader = BufReader::new(stdout);
395                let _stdin = stdin;
396                let mut noted_full = false;
397                // Forward lines from the blocking reader to the async channel.
398                for line in reader.lines() {
399                    let err = line.is_err();
400                    match &line {
401                        Ok(l) => trace!("<-- PT {}: {:?}", ident, l),
402                        Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e),
403                    }
404                    if let Err(e) = tx.try_send(line) {
405                        if e.is_disconnected() {
406                            debug!("PT {} is disconnected; shutting it down.", ident);
407                            // Channel dropped, so shut down the pluggable transport process.
408                            break;
409                        }
410                        // The other kind of error is "full", which we can't do anything about.
411                        // Just throw the line away.
412                        if !noted_full {
413                            noted_full = true; // warn only once per PT.
414                            warn!(
415                                "Bug: Message queue for PT {} became full; dropping message",
416                                ident
417                            );
418                        }
419                    }
420                    if err {
421                        // Encountered an error reading, so ensure the process is shut down (it's
422                        // probably "broken pipe" anyway, so this is slightly redundant, but the
423                        // rest of the code assumes errors are nonrecoverable).
424                        break;
425                    }
426                }
427                // Has it already quit? If so, just exit now.
428                if let Ok(Some(_)) = child.try_wait() {
429                    // FIXME(eta): We currently throw away the exit code, which might be useful
430                    //             for debugging purposes!
431                    debug!("PT {} has exited.", ident);
432                    return;
433                }
434                // Otherwise, tell it to exit.
435                // Dropping stdin should tell the PT to exit, since we set the correct environment
436                // variable for that to happen.
437                trace!("Asking PT {} to exit, nicely.", ident);
438                drop(_stdin);
439                // Give it some time to exit.
440                thread::sleep(GRACEFUL_EXIT_TIME);
441                match child.try_wait() {
442                    Ok(None) => {
443                        // Kill it.
444                        debug!("Sending kill signal to PT {}", ident);
445                        if let Err(e) = child.kill() {
446                            warn_report!(e, "Failed to kill() spawned PT {}", ident);
447                        }
448                    }
449                    Ok(Some(_)) => {
450                        debug!("PT {} shut down successfully.", ident);
451                    } // It exited.
452                    Err(e) => {
453                        warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident);
454                    }
455                }
456            });
457            Ok(AsyncPtChild {
458                stdout: rx,
459                identifier,
460            })
461        }
462
463        /// Receive a message from the pluggable transport binary asynchronously.
464        ///
465        /// Note: This will convert `PtMessage::Log` into a tracing log call automatically.
466        #[allow(clippy::cognitive_complexity)] // due to tracing
467        pub async fn recv(&mut self) -> err::Result<PtMessage> {
468            loop {
469                match self.stdout.next().await {
470                    None => return Err(PtError::ChildGone),
471                    Some(Ok(line)) => {
472                        let line =
473                            line.parse::<PtMessage>()
474                                .map_err(|e| PtError::IpcParseFailed {
475                                    line,
476                                    error: e.into(),
477                                })?;
478                        if let PtMessage::Log { severity, message } = line {
479                            // FIXME(eta): I wanted to make this integrate with `tracing` more nicely,
480                            //             but gave up after 15 minutes of clicking through spaghetti.
481                            match &severity as &str {
482                                "error" => error!("[pt {}] {}", self.identifier, message),
483                                "warning" => warn!("[pt {}] {}", self.identifier, message),
484                                "notice" => info!("[pt {}] {}", self.identifier, message),
485                                "info" => debug!("[pt {}] {}", self.identifier, message),
486                                "debug" => trace!("[pt {}] {}", self.identifier, message),
487                                x => warn!("[pt] {} {} {}", self.identifier, x, message),
488                            }
489                        } else {
490                            return Ok(line);
491                        }
492                    }
493                    Some(Err(e)) => {
494                        return Err(PtError::ChildReadFailed(Arc::new(e)));
495                    }
496                }
497            }
498        }
499    }
500
501    /// Defines some helper methods that are required later on
502    #[async_trait::async_trait]
503    pub trait PluggableTransportPrivate {
504        /// Return the [`AsyncPtChild`] if it exists
505        fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError>;
506
507        /// Set the [`AsyncPtChild`]
508        fn set_inner(&mut self, newval: Option<AsyncPtChild>);
509
510        /// Return a loggable identifier for this transport.
511        fn identifier(&self) -> &str;
512
513        /// Checks whether a transport is specified in our specific parameters
514        fn specific_params_contains(&self, transport: &PtTransportName) -> bool;
515
516        /// Common handler for `ClientTransportLaunched` and `ServerTransportLaunched`
517        fn common_transport_launched_handler(
518            &self,
519            protocol: Option<String>,
520            transport: PtTransportName,
521            endpoint: SocketAddr,
522            methods: &mut HashMap<PtTransportName, PtClientMethod>,
523        ) -> Result<(), PtError> {
524            if !self.specific_params_contains(&transport) {
525                return Err(PtError::ProtocolViolation(format!(
526                    "binary launched unwanted transport '{}'",
527                    transport
528                )));
529            }
530            let protocol = match protocol {
531                Some(protocol_str) => match &protocol_str as &str {
532                    "socks4" => SocksVersion::V4,
533                    "socks5" => SocksVersion::V5,
534                    x => {
535                        return Err(PtError::ProtocolViolation(format!(
536                            "unknown CMETHOD protocol '{}'",
537                            x
538                        )))
539                    }
540                },
541                None => SocksVersion::V5,
542            };
543            let method = PtClientMethod {
544                kind: protocol,
545                endpoint,
546            };
547            info!("Transport '{}' uses method {:?}", transport, method);
548            methods.insert(transport, method);
549            Ok(())
550        }
551
552        /// Attempt to launch the PT and return the corresponding `[AsyncPtChild]`
553        fn get_child_from_pt_launch(
554            inner: &Option<AsyncPtChild>,
555            transports: &Vec<PtTransportName>,
556            binary_path: &PathBuf,
557            arguments: &[String],
558            all_env_vars: HashMap<OsString, OsString>,
559        ) -> Result<AsyncPtChild, PtError> {
560            if inner.is_some() {
561                let warning_msg =
562                    format!("Attempted to launch PT binary for {:?} twice.", transports);
563                warn!("{warning_msg}");
564                // WARN: this may not be the correct error to throw here
565                return Err(PtError::ChildProtocolViolation(warning_msg));
566            }
567            info!(
568                "Launching pluggable transport at {} for {:?}",
569                binary_path.display_lossy(),
570                transports
571            );
572            let child = Command::new(binary_path)
573                .args(arguments.iter())
574                .envs(all_env_vars)
575                .stdout(Stdio::piped())
576                .stdin(Stdio::piped())
577                .spawn()
578                .map_err(|e| PtError::ChildSpawnFailed {
579                    path: binary_path.clone(),
580                    error: Arc::new(e),
581                })?;
582
583            let identifier = crate::managed::pt_identifier(binary_path)?;
584            AsyncPtChild::new(child, identifier)
585        }
586
587        /// Consolidates some of the [`PtMessage`] potential matches to
588        /// deduplicate code
589        ///
590        /// Note that getting a [`PtMessage`] from this method implies that
591        /// the method was unable to match it and thus you should continue handling
592        /// the message. Getting [`None`] after error handling means that a match
593        /// was found and the appropriate action was successfully taken, and you don't
594        /// need to worry about it.
595        async fn try_match_common_messages<R: Runtime>(
596            &self,
597            rt: &R,
598            deadline: Instant,
599            async_child: &mut AsyncPtChild,
600        ) -> Result<Option<PtMessage>, PtError> {
601            match rt
602                .timeout(
603                    // FIXME(eta): It'd be nice if SleepProviderExt took an `Instant` natively.
604                    deadline.saturating_duration_since(Instant::now()),
605                    async_child.recv(),
606                )
607                .await
608                .map_err(|_| PtError::Timeout)??
609            {
610                PtMessage::ClientTransportFailed { transport, message }
611                | PtMessage::ServerTransportFailed { transport, message } => {
612                    warn!(
613                        "PT {} unable to launch {}. It said: {:?}",
614                        async_child.identifier, transport, message
615                    );
616                    return Err(PtError::TransportGaveError {
617                        transport: transport.to_string(),
618                        message,
619                    });
620                }
621                PtMessage::VersionError(e) => {
622                    if e != "no-version" {
623                        warn!("weird VERSION-ERROR: {}", e);
624                    }
625                    return Err(PtError::UnsupportedVersion);
626                }
627                PtMessage::Version(vers) => {
628                    if vers != "1" {
629                        return Err(PtError::ProtocolViolation(format!(
630                            "stated version is {}, asked for 1",
631                            vers
632                        )));
633                    }
634                    Ok(None)
635                }
636                PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)),
637                PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)),
638                // TODO(eta): We don't do anything with these right now!
639                PtMessage::Status(_) => Ok(None),
640                PtMessage::Unknown(x) => {
641                    warn!("unknown PT line: {}", x);
642                    Ok(None)
643                }
644                // Return the PtMessage as it is for further processing
645                // TODO: handle [`PtError::ProtocolViolation`] here somehow
646                x => {
647                    return Ok(Some(x));
648                }
649            }
650        }
651    }
652}
653
654/// Common parameters passed to a pluggable transport.
655#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
656pub struct PtCommonParameters {
657    /// A path where the launched PT can store state.
658    state_location: PathBuf,
659    /// An IPv4 address to bind outgoing connections to (if specified).
660    ///
661    /// Leaving this out will mean the PT uses a sane default.
662    #[builder(default)]
663    outbound_bind_v4: Option<Ipv4Addr>,
664    /// An IPv6 address to bind outgoing connections to (if specified).
665    ///
666    /// Leaving this out will mean the PT uses a sane default.
667    #[builder(default)]
668    outbound_bind_v6: Option<Ipv6Addr>,
669    /// The maximum time we should wait for a pluggable transport binary to report successful
670    /// initialization. If `None`, a default value is used.
671    #[builder(default)]
672    timeout: Option<Duration>,
673}
674
675impl PtCommonParameters {
676    /// Return a new `PtCommonParametersBuilder` for constructing a set of parameters.
677    pub fn builder() -> PtCommonParametersBuilder {
678        PtCommonParametersBuilder::default()
679    }
680
681    /// Convert these parameters into a set of environment variables to be passed to the PT binary
682    /// in accordance with the specification.
683    fn common_environment_variables(&self) -> HashMap<OsString, OsString> {
684        let mut ret = HashMap::new();
685        ret.insert("TOR_PT_MANAGED_TRANSPORT_VER".into(), "1".into());
686        ret.insert(
687            "TOR_PT_STATE_LOCATION".into(),
688            self.state_location.clone().into_os_string(),
689        );
690        ret.insert("TOR_PT_EXIT_ON_STDIN_CLOSE".into(), "1".into());
691        if let Some(v4) = self.outbound_bind_v4 {
692            ret.insert(
693                "TOR_PT_OUTBOUND_BIND_ADDRESS_V4".into(),
694                v4.to_string().into(),
695            );
696        }
697        if let Some(v6) = self.outbound_bind_v6 {
698            // pt-spec.txt: "IPv6 addresses MUST always be wrapped in square brackets."
699            ret.insert(
700                "TOR_PT_OUTBOUND_BIND_ADDRESS_V6".into(),
701                format!("[{}]", v6).into(),
702            );
703        }
704        ret
705    }
706}
707
708/// Parameters passed only to a pluggable transport client.
709#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
710pub struct PtClientParameters {
711    /// A SOCKS URI specifying a proxy to use.
712    #[builder(default)]
713    proxy_uri: Option<String>,
714    /// A list of transports to initialise.
715    ///
716    /// The PT launch will fail if all transports are not successfully initialised.
717    transports: Vec<PtTransportName>,
718}
719
720impl PtClientParameters {
721    /// Return a new `PtClientParametersBuilder` for constructing a set of parameters.
722    pub fn builder() -> PtClientParametersBuilder {
723        PtClientParametersBuilder::default()
724    }
725
726    /// Convert these parameters into a set of environment variables to be passed to the PT binary
727    /// in accordance with the specification.
728    fn environment_variables(
729        &self,
730        common_params: &PtCommonParameters,
731    ) -> HashMap<OsString, OsString> {
732        let mut ret = common_params.common_environment_variables();
733        if let Some(ref proxy_uri) = self.proxy_uri {
734            ret.insert("TOR_PT_PROXY".into(), proxy_uri.clone().into());
735        }
736        ret.insert(
737            "TOR_PT_CLIENT_TRANSPORTS".into(),
738            self.transports.iter().join(",").into(),
739        );
740        ret
741    }
742}
743
744/// Parameters passed only to a pluggable transport server.
745#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
746pub struct PtServerParameters {
747    /// A list of transports to initialise.
748    ///
749    /// The PT launch will fail if all transports are not successfully initialised.
750    transports: Vec<PtTransportName>,
751    /// Transport options for each server transport
752    #[builder(default)]
753    server_transport_options: String,
754    /// Set host:port on which the server transport should listen for connections
755    #[builder(default)]
756    server_bindaddr: String,
757    /// Set host:port on which the server transport should forward requests
758    #[builder(default)]
759    server_orport: Option<String>,
760    /// Set host:port on which the server transport should forward requests (extended ORPORT)
761    #[builder(default)]
762    server_extended_orport: Option<String>,
763}
764
765impl PtServerParameters {
766    /// Return a new `PtServerParametersBuilder` for constructing a set of parameters.
767    pub fn builder() -> PtServerParametersBuilder {
768        PtServerParametersBuilder::default()
769    }
770
771    /// Convert these parameters into a set of environment variables to be passed to the PT binary
772    /// in accordance with the specification.
773    fn environment_variables(
774        &self,
775        common_params: &PtCommonParameters,
776    ) -> HashMap<OsString, OsString> {
777        let mut ret = common_params.common_environment_variables();
778        ret.insert(
779            "TOR_PT_SERVER_TRANSPORTS".into(),
780            self.transports.iter().join(",").into(),
781        );
782        ret.insert(
783            "TOR_PT_SERVER_TRANSPORT_OPTIONS".into(),
784            self.server_transport_options.clone().into(),
785        );
786        ret.insert(
787            "TOR_PT_SERVER_BINDADDR".into(),
788            self.server_bindaddr.clone().into(),
789        );
790        if let Some(ref server_orport) = self.server_orport {
791            ret.insert("TOR_PT_ORPORT".into(), server_orport.into());
792        }
793        if let Some(ref server_extended_orport) = self.server_extended_orport {
794            ret.insert(
795                "TOR_PT_EXTENDED_SERVER_PORT".into(),
796                server_extended_orport.into(),
797            );
798        }
799        ret
800    }
801}
802
803/// Common functionality implemented to allow code reuse
804#[async_trait::async_trait]
805#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
806pub trait PluggableTransport: PluggableTransportPrivate {
807    /// Get all client methods returned by the binary, if it has been launched.
808    ///
809    /// If it hasn't been launched, the returned map will be empty.
810    // TODO(eta): Actually figure out a way to expose this more stably.
811    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod>;
812
813    /// Get the next [`PtMessage`] from the running transport. It is recommended to call this
814    /// in a loop once a PT has been launched, in order to forward log messages and find out about
815    /// status updates.
816    //
817    // FIXME(eta): This API will probably go away and get replaced with something better.
818    //             In particular, we'd want to cache `Status` messages from before this method
819    //             was called.
820    async fn next_message(&mut self) -> err::Result<PtMessage> {
821        let inner = self.inner()?;
822        let ret = inner.recv().await;
823        if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = &ret {
824            // FIXME(eta): Currently this lets the caller still think the methods work by calling
825            //             transport_methods.
826            debug!(
827                "PT {}: Received {:?}; shutting down.",
828                self.identifier(),
829                ret
830            );
831            self.set_inner(None);
832        }
833        ret
834    }
835}
836/// A pluggable transport binary in a child process.
837///
838/// These start out inert, and must be launched with [`PluggableClientTransport::launch`] in order
839/// to be useful.
840#[derive(Debug)]
841pub struct PluggableClientTransport {
842    /// The currently running child, if there is one.
843    inner: Option<AsyncPtChild>,
844    /// The path to the binary to run.
845    pub(crate) binary_path: PathBuf,
846    /// Arguments to pass to the binary.
847    arguments: Vec<String>,
848    /// Configured parameters.
849    common_params: PtCommonParameters,
850    /// Configured client-only parameters.
851    client_params: PtClientParameters,
852    /// Information about client methods obtained from the PT.
853    cmethods: HashMap<PtTransportName, PtClientMethod>,
854}
855
856impl PluggableTransport for PluggableClientTransport {
857    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
858        &self.cmethods
859    }
860}
861
862impl PluggableTransportPrivate for PluggableClientTransport {
863    fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
864        self.inner.as_mut().ok_or(PtError::ChildGone)
865    }
866    fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
867        self.inner = newval;
868    }
869    fn identifier(&self) -> &str {
870        match &self.inner {
871            Some(child) => &child.identifier,
872            None => "<not yet launched>",
873        }
874    }
875    fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
876        self.client_params.transports.contains(transport)
877    }
878}
879
880impl PluggableClientTransport {
881    /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing
882    /// the `params` to it.
883    ///
884    /// You must call [`PluggableClientTransport::launch`] to actually run the PT.
885    pub fn new(
886        binary_path: PathBuf,
887        arguments: Vec<String>,
888        common_params: PtCommonParameters,
889        client_params: PtClientParameters,
890    ) -> Self {
891        Self {
892            common_params,
893            client_params,
894            arguments,
895            binary_path,
896            inner: None,
897            cmethods: Default::default(),
898        }
899    }
900
901    /// Launch the pluggable transport, executing the binary.
902    ///
903    /// Will return an error if the launch fails, one of the transports fail, not all transports
904    /// were launched, or the launch times out.
905    pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
906        let all_env_vars = self
907            .client_params
908            .environment_variables(&self.common_params);
909
910        let mut async_child =
911            <PluggableClientTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
912                &self.inner,
913                &self.client_params.transports,
914                &self.binary_path,
915                &self.arguments,
916                all_env_vars,
917            )?;
918
919        let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
920        let mut cmethods = HashMap::new();
921        let mut proxy_done = self.client_params.proxy_uri.is_none();
922
923        loop {
924            match self
925                .try_match_common_messages(&rt, deadline, &mut async_child)
926                .await
927            {
928                Ok(maybe_message) => {
929                    if let Some(message) = maybe_message {
930                        match message {
931                            PtMessage::ClientTransportLaunched {
932                                transport,
933                                protocol,
934                                endpoint,
935                            } => {
936                                self.common_transport_launched_handler(
937                                    Some(protocol),
938                                    transport,
939                                    endpoint,
940                                    &mut cmethods,
941                                )?;
942                            }
943                            PtMessage::ProxyDone => {
944                                if proxy_done {
945                                    return Err(PtError::ProtocolViolation(
946                                        "binary initiated proxy when not asked (or twice)".into(),
947                                    ));
948                                }
949                                info!("PT binary now proxying connections via supplied URI");
950                                proxy_done = true;
951                            }
952                            // TODO: unify most of the handling of ClientTransportsDone with ServerTransportsDone
953                            PtMessage::ClientTransportsDone => {
954                                let unsupported = self
955                                    .client_params
956                                    .transports
957                                    .iter()
958                                    .filter(|&x| !cmethods.contains_key(x))
959                                    .map(|x| x.to_string())
960                                    .collect::<Vec<_>>();
961                                if !unsupported.is_empty() {
962                                    warn!(
963                                        "PT binary failed to initialise transports: {:?}",
964                                        unsupported
965                                    );
966                                    return Err(PtError::ClientTransportsUnsupported(unsupported));
967                                }
968                                info!("PT binary initialisation done");
969                                break;
970                            }
971                            x => {
972                                return Err(PtError::ProtocolViolation(format!(
973                                    "received unexpected {:?}",
974                                    x
975                                )));
976                            }
977                        }
978                    }
979                }
980                Err(e) => return Err(e),
981            }
982        }
983        self.cmethods = cmethods;
984        self.inner = Some(async_child);
985        // TODO(eta): We need to expose the log and status messages after this function exits!
986        Ok(())
987    }
988}
989
990/// A pluggable transport server binary in a child process.
991///
992/// These start out inert, and must be launched with [`PluggableServerTransport::launch`] in order
993/// to be useful.
994#[derive(Debug)]
995pub struct PluggableServerTransport {
996    /// The currently running child, if there is one.
997    inner: Option<AsyncPtChild>,
998    /// The path to the binary to run.
999    pub(crate) binary_path: PathBuf,
1000    /// Arguments to pass to the binary.
1001    arguments: Vec<String>,
1002    /// Configured parameters.
1003    common_params: PtCommonParameters,
1004    /// Configured server-only parameters.
1005    server_params: PtServerParameters,
1006    /// Information about server methods obtained from the PT.
1007    smethods: HashMap<PtTransportName, PtClientMethod>,
1008}
1009
1010impl PluggableTransportPrivate for PluggableServerTransport {
1011    fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError> {
1012        self.inner.as_mut().ok_or(PtError::ChildGone)
1013    }
1014    fn set_inner(&mut self, newval: Option<AsyncPtChild>) {
1015        self.inner = newval;
1016    }
1017    fn identifier(&self) -> &str {
1018        match &self.inner {
1019            Some(child) => &child.identifier,
1020            None => "<not yet launched>",
1021        }
1022    }
1023    fn specific_params_contains(&self, transport: &PtTransportName) -> bool {
1024        self.server_params.transports.contains(transport)
1025    }
1026}
1027
1028impl PluggableTransport for PluggableServerTransport {
1029    fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
1030        &self.smethods
1031    }
1032}
1033
1034impl PluggableServerTransport {
1035    /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing
1036    /// the `params` to it.
1037    ///
1038    /// You must call [`PluggableServerTransport::launch`] to actually run the PT.
1039    pub fn new(
1040        binary_path: PathBuf,
1041        arguments: Vec<String>,
1042        common_params: PtCommonParameters,
1043        server_params: PtServerParameters,
1044    ) -> Self {
1045        Self {
1046            common_params,
1047            server_params,
1048            arguments,
1049            binary_path,
1050            inner: None,
1051            smethods: Default::default(),
1052        }
1053    }
1054
1055    /// Launch the pluggable transport, executing the binary.
1056    ///
1057    /// Will return an error if the launch fails, one of the transports fail, not all transports
1058    /// were launched, or the launch times out.
1059    pub async fn launch<R: Runtime>(&mut self, rt: R) -> err::Result<()> {
1060        let all_env_vars = self
1061            .server_params
1062            .environment_variables(&self.common_params);
1063
1064        let mut async_child =
1065            <PluggableServerTransport as PluggableTransportPrivate>::get_child_from_pt_launch(
1066                &self.inner,
1067                &self.server_params.transports,
1068                &self.binary_path,
1069                &self.arguments,
1070                all_env_vars,
1071            )?;
1072
1073        let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT);
1074        let mut smethods = HashMap::new();
1075
1076        loop {
1077            match self
1078                .try_match_common_messages(&rt, deadline, &mut async_child)
1079                .await
1080            {
1081                Ok(maybe_message) => {
1082                    if let Some(message) = maybe_message {
1083                        match message {
1084                            PtMessage::ServerTransportLaunched {
1085                                transport,
1086                                endpoint,
1087                                options: _,
1088                            } => {
1089                                self.common_transport_launched_handler(
1090                                    None,
1091                                    transport,
1092                                    endpoint,
1093                                    &mut smethods,
1094                                )?;
1095                            }
1096                            PtMessage::ServerTransportsDone => {
1097                                let unsupported = self
1098                                    .server_params
1099                                    .transports
1100                                    .iter()
1101                                    .filter(|&x| !smethods.contains_key(x))
1102                                    .map(|x| x.to_string())
1103                                    .collect::<Vec<_>>();
1104                                if !unsupported.is_empty() {
1105                                    warn!(
1106                                        "PT binary failed to initialise transports: {:?}",
1107                                        unsupported
1108                                    );
1109                                    return Err(PtError::ClientTransportsUnsupported(unsupported));
1110                                }
1111                                info!("PT binary initialisation done");
1112                                break;
1113                            }
1114                            x => {
1115                                return Err(PtError::ProtocolViolation(format!(
1116                                    "received unexpected {:?}",
1117                                    x
1118                                )));
1119                            }
1120                        }
1121                    }
1122                }
1123                Err(e) => return Err(e),
1124            }
1125        }
1126        self.smethods = smethods;
1127        self.inner = Some(async_child);
1128        // TODO(eta): We need to expose the log and status messages after this function exits!
1129        Ok(())
1130    }
1131}
1132
1133#[cfg(test)]
1134mod test {
1135    // @@ begin test lint list maintained by maint/add_warning @@
1136    #![allow(clippy::bool_assert_comparison)]
1137    #![allow(clippy::clone_on_copy)]
1138    #![allow(clippy::dbg_macro)]
1139    #![allow(clippy::mixed_attributes_style)]
1140    #![allow(clippy::print_stderr)]
1141    #![allow(clippy::print_stdout)]
1142    #![allow(clippy::single_char_pattern)]
1143    #![allow(clippy::unwrap_used)]
1144    #![allow(clippy::unchecked_duration_subtraction)]
1145    #![allow(clippy::useless_vec)]
1146    #![allow(clippy::needless_pass_by_value)]
1147    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1148
1149    use crate::ipc::{PtMessage, PtStatus};
1150    use std::borrow::Cow;
1151    use std::collections::HashMap;
1152    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1153
1154    #[test]
1155    fn it_parses_spec_examples() {
1156        assert_eq!(
1157            "VERSION-ERROR no-version".parse(),
1158            Ok(PtMessage::VersionError("no-version".into()))
1159        );
1160        assert_eq!("VERSION 1".parse(), Ok(PtMessage::Version("1".into())));
1161        assert_eq!(
1162            "ENV-ERROR No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".parse(),
1163            Ok(PtMessage::EnvError(
1164                "No TOR_PT_AUTH_COOKIE_FILE when TOR_PT_EXTENDED_SERVER_PORT set".into()
1165            ))
1166        );
1167        assert_eq!("PROXY DONE".parse(), Ok(PtMessage::ProxyDone));
1168        assert_eq!(
1169            "PROXY-ERROR SOCKS 4 upstream proxies unsupported".parse(),
1170            Ok(PtMessage::ProxyError(
1171                "SOCKS 4 upstream proxies unsupported".into()
1172            ))
1173        );
1174        assert_eq!(
1175            "CMETHOD trebuchet socks5 127.0.0.1:19999".parse(),
1176            Ok(PtMessage::ClientTransportLaunched {
1177                transport: "trebuchet".parse().unwrap(),
1178                protocol: "socks5".to_string(),
1179                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19999)
1180            })
1181        );
1182        assert_eq!(
1183            "CMETHOD-ERROR trebuchet no rocks available".parse(),
1184            Ok(PtMessage::ClientTransportFailed {
1185                transport: "trebuchet".parse().unwrap(),
1186                message: "no rocks available".to_string()
1187            })
1188        );
1189        assert_eq!("CMETHODS DONE".parse(), Ok(PtMessage::ClientTransportsDone));
1190        assert_eq!(
1191            "SMETHOD trebuchet 198.51.100.1:19999".parse(),
1192            Ok(PtMessage::ServerTransportLaunched {
1193                transport: "trebuchet".parse().unwrap(),
1194                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 19999),
1195                options: Default::default()
1196            })
1197        );
1198        let mut map = HashMap::new();
1199        map.insert("N".to_string(), "13".to_string());
1200        assert_eq!(
1201            "SMETHOD rot_by_N 198.51.100.1:2323 ARGS:N=13".parse(),
1202            Ok(PtMessage::ServerTransportLaunched {
1203                transport: "rot_by_N".parse().unwrap(),
1204                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 2323),
1205                options: map
1206            })
1207        );
1208        let mut map = HashMap::new();
1209        map.insert(
1210            "cert".to_string(),
1211            "HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw".to_string(),
1212        );
1213        map.insert("iat-mode".to_string(), "0".to_string());
1214        assert_eq!(
1215            "SMETHOD obfs4 198.51.100.1:43734 ARGS:cert=HszPy3vWfjsESCEOo9ZBkRv6zQ/1mGHzc8arF0y2SpwFr3WhsMu8rK0zyaoyERfbz3ddFw,iat-mode=0".parse(),
1216            Ok(PtMessage::ServerTransportLaunched {
1217                transport: "obfs4".parse().unwrap(),
1218                endpoint: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 43734),
1219                options: map
1220            })
1221        );
1222        assert_eq!(
1223            "SMETHOD-ERROR trebuchet no cows available".parse(),
1224            Ok(PtMessage::ServerTransportFailed {
1225                transport: "trebuchet".parse().unwrap(),
1226                message: "no cows available".to_string()
1227            })
1228        );
1229        assert_eq!(
1230            "LOG SEVERITY=debug MESSAGE=\"Connected to bridge A\"".parse(),
1231            Ok(PtMessage::Log {
1232                severity: "debug".to_string(),
1233                message: "Connected to bridge A".to_string()
1234            })
1235        );
1236        assert_eq!(
1237            "LOG SEVERITY=debug MESSAGE=\"\\r\\n\\t\"".parse(),
1238            Ok(PtMessage::Log {
1239                severity: "debug".to_string(),
1240                message: "\r\n\t".to_string()
1241            })
1242        );
1243        assert_eq!(
1244            "LOG SEVERITY=debug MESSAGE=".parse(),
1245            Ok(PtMessage::Log {
1246                severity: "debug".to_string(),
1247                message: "".to_string()
1248            })
1249        );
1250        assert_eq!(
1251            "LOG SEVERITY=debug MESSAGE=\"\\a\"".parse::<PtMessage>(),
1252            Ok(PtMessage::Log {
1253                severity: "debug".to_string(),
1254                message: "a".to_string()
1255            })
1256        );
1257
1258        for i in 0..9 {
1259            let msg = format!("LOG SEVERITY=debug MESSAGE=\"\\{i}\"");
1260            assert_eq!(
1261                msg.parse::<PtMessage>(),
1262                Err(Cow::from("attempted unsupported octal escape code"))
1263            );
1264        }
1265        assert_eq!(
1266            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=0\\".parse::<PtMessage>(),
1267            Err(Cow::from(
1268                "failed to parse SMETHOD ARGS: smethod arg terminates with backslash"
1269            ))
1270        );
1271        assert_eq!(
1272            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode=fo=o".parse::<PtMessage>(),
1273            Err(Cow::from(
1274                "failed to parse SMETHOD ARGS: encountered = while parsing value"
1275            ))
1276        );
1277        assert_eq!(
1278            "SMETHOD obfs4 198.51.100.1:43734 ARGS:iat-mode".parse::<PtMessage>(),
1279            Err(Cow::from(
1280                "failed to parse SMETHOD ARGS: ran out of chars parsing smethod arg"
1281            ))
1282        );
1283
1284        let mut map = HashMap::new();
1285        map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1286        map.insert("CONNECT".to_string(), "Success".to_string());
1287        assert_eq!(
1288            "STATUS ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1289            Ok(PtMessage::Status(PtStatus { data: map }))
1290        );
1291
1292        let mut map = HashMap::new();
1293        map.insert("ADDRESS".to_string(), "198.51.100.123:1234".to_string());
1294        map.insert("CONNECT".to_string(), "Success".to_string());
1295        map.insert("TRANSPORT".to_string(), "obfs4".to_string());
1296        assert_eq!(
1297            "STATUS TRANSPORT=obfs4 ADDRESS=198.51.100.123:1234 CONNECT=Success".parse(),
1298            Ok(PtMessage::Status(PtStatus { data: map }))
1299        );
1300
1301        let mut map = HashMap::new();
1302        map.insert("ADDRESS".to_string(), "198.51.100.222:2222".to_string());
1303        map.insert("CONNECT".to_string(), "Failed".to_string());
1304        map.insert("FINGERPRINT".to_string(), "<Fingerprint>".to_string());
1305        map.insert("ERRSTR".to_string(), "Connection refused".to_string());
1306        assert_eq!(
1307            "STATUS ADDRESS=198.51.100.222:2222 CONNECT=Failed FINGERPRINT=<Fingerprint> ERRSTR=\"Connection refused\"".parse(),
1308            Ok(PtMessage::Status(PtStatus {
1309                data: map
1310            }))
1311        );
1312    }
1313}