#![allow(dead_code)]
use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
};
use futures::{AsyncReadExt, AsyncWriteExt};
use tor_error::internal;
use tor_linkspec::PtTargetAddr;
use tor_rtcompat::TcpProvider;
use tor_socksproto::{
SocksAddr, SocksAuth, SocksClientHandshake, SocksCmd, SocksRequest, SocksStatus, SocksVersion,
};
use tracing::trace;
#[cfg(feature = "pt-client")]
use super::TransportImplHelper;
#[cfg(feature = "pt-client")]
use async_trait::async_trait;
#[cfg(feature = "pt-client")]
use tor_error::bad_api_usage;
#[cfg(feature = "pt-client")]
use tor_linkspec::{ChannelMethod, HasChanMethod, OwnedChanTarget};
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum Protocol {
Socks(SocksVersion, SocksAuth),
}
const NO_ADDR: IpAddr = IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 1));
pub(crate) async fn connect_via_proxy<R: TcpProvider + Send + Sync>(
runtime: &R,
proxy: &SocketAddr,
protocol: &Protocol,
target: &PtTargetAddr,
) -> Result<R::TcpStream, ProxyError> {
trace!(
"Launching a proxied connection to {} via proxy at {} using {:?}",
target,
proxy,
protocol
);
let mut stream = runtime
.connect(proxy)
.await
.map_err(|e| ProxyError::ProxyConnect(Arc::new(e)))?;
let Protocol::Socks(version, auth) = protocol;
let (target_addr, target_port): (tor_socksproto::SocksAddr, u16) = match target {
PtTargetAddr::IpPort(a) => (SocksAddr::Ip(a.ip()), a.port()),
#[cfg(feature = "pt-client")]
PtTargetAddr::HostPort(host, port) => (
SocksAddr::Hostname(
host.clone()
.try_into()
.map_err(ProxyError::InvalidSocksAddr)?,
),
*port,
),
#[cfg(feature = "pt-client")]
PtTargetAddr::None => (SocksAddr::Ip(NO_ADDR), 1),
_ => return Err(ProxyError::UnrecognizedAddr),
};
let request = SocksRequest::new(
*version,
SocksCmd::CONNECT,
target_addr,
target_port,
auth.clone(),
)
.map_err(ProxyError::InvalidSocksRequest)?;
let mut handshake = SocksClientHandshake::new(request);
let mut inbuf = [0_u8; 1024];
let mut n_read = 0;
let reply = loop {
let action = match handshake.handshake(&inbuf[..n_read]) {
Err(_) => {
if n_read == inbuf.len() {
return Err(ProxyError::Bug(internal!(
"SOCKS parser wanted excessively many bytes! {:?} {:?}",
handshake,
inbuf
)));
}
continue;
}
Ok(Err(e)) => return Err(ProxyError::SocksProto(e)), Ok(Ok(action)) => action,
};
if action.drain > 0 {
inbuf.copy_within(action.drain..action.drain + n_read, 0);
n_read -= action.drain;
}
if !action.reply.is_empty() {
stream.write_all(&action.reply[..]).await?;
stream.flush().await?;
}
if action.finished {
break handshake.into_reply();
}
if n_read == inbuf.len() {
return Err(ProxyError::SocksProto(
tor_socksproto::Error::NotImplemented(
"Socks handshake did not fit in 1KiB buffer".into(),
),
));
}
n_read += stream.read(&mut inbuf[n_read..]).await?;
};
let status = reply
.ok_or_else(|| internal!("SOCKS protocol finished, but gave no status!"))?
.status();
trace!(
"SOCKS handshake with {} succeeded, with status {:?}",
proxy,
status
);
if status != SocksStatus::SUCCEEDED {
return Err(ProxyError::SocksError(status));
}
if n_read != 0 {
return Err(ProxyError::UnexpectedData);
}
Ok(stream)
}
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ProxyError {
#[error("Problem while connecting to proxy")]
ProxyConnect(#[source] Arc<std::io::Error>),
#[error("Problem while communicating with proxy")]
ProxyIo(#[source] Arc<std::io::Error>),
#[error("SOCKS proxy does not support target address")]
InvalidSocksAddr(#[source] tor_socksproto::Error),
#[error("Got an address type we don't recognize")]
UnrecognizedAddr,
#[error("Tried to make an invalid SOCKS request")]
InvalidSocksRequest(#[source] tor_socksproto::Error),
#[error("Protocol error while communicating with SOCKS proxy")]
SocksProto(#[source] tor_socksproto::Error),
#[error("Internal error")]
Bug(#[from] tor_error::Bug),
#[error("Received unexpected early data from peer")]
UnexpectedData,
#[error("SOCKS proxy reported an error: {0}")]
SocksError(SocksStatus),
}
impl From<std::io::Error> for ProxyError {
fn from(e: std::io::Error) -> Self {
ProxyError::ProxyIo(Arc::new(e))
}
}
impl tor_error::HasKind for ProxyError {
fn kind(&self) -> tor_error::ErrorKind {
use tor_error::ErrorKind as EK;
use ProxyError as E;
match self {
E::ProxyConnect(_) | E::ProxyIo(_) => EK::LocalNetworkError,
E::InvalidSocksAddr(_) | E::InvalidSocksRequest(_) => EK::BadApiUsage,
E::UnrecognizedAddr => EK::NotImplemented,
E::SocksProto(_) => EK::LocalProtocolViolation,
E::Bug(e) => e.kind(),
E::UnexpectedData => EK::NotImplemented,
E::SocksError(_) => EK::LocalProtocolViolation,
}
}
}
impl tor_error::HasRetryTime for ProxyError {
fn retry_time(&self) -> tor_error::RetryTime {
use tor_error::RetryTime as RT;
use ProxyError as E;
use SocksStatus as S;
match self {
E::ProxyConnect(_) | E::ProxyIo(_) => RT::AfterWaiting,
E::InvalidSocksAddr(_) => RT::Never,
E::UnrecognizedAddr => RT::Never,
E::InvalidSocksRequest(_) => RT::Never,
E::SocksProto(_) => RT::AfterWaiting,
E::Bug(_) => RT::Never,
E::UnexpectedData => RT::Never,
E::SocksError(e) => match *e {
S::CONNECTION_REFUSED
| S::GENERAL_FAILURE
| S::HOST_UNREACHABLE
| S::NETWORK_UNREACHABLE
| S::TTL_EXPIRED => RT::AfterWaiting,
_ => RT::Never,
},
}
}
}
#[cfg(feature = "pt-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "pt-client")))]
#[derive(Clone, Debug)]
pub struct ExternalProxyPlugin<R> {
runtime: R,
proxy_addr: SocketAddr,
proxy_version: SocksVersion,
}
#[cfg(feature = "pt-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "pt-client")))]
impl<R: TcpProvider + Send + Sync> ExternalProxyPlugin<R> {
pub fn new(rt: R, proxy_addr: SocketAddr, proxy_version: SocksVersion) -> Self {
Self {
runtime: rt,
proxy_addr,
proxy_version,
}
}
}
#[cfg(feature = "pt-client")]
#[async_trait]
impl<R: TcpProvider + Send + Sync> TransportImplHelper for ExternalProxyPlugin<R> {
type Stream = R::TcpStream;
async fn connect(
&self,
target: &OwnedChanTarget,
) -> crate::Result<(OwnedChanTarget, R::TcpStream)> {
let pt_target = match target.chan_method() {
ChannelMethod::Direct(_) => {
return Err(crate::Error::UnusableTarget(bad_api_usage!(
"Used pluggable transport for a TCP connection."
)))
}
ChannelMethod::Pluggable(target) => target,
other => {
return Err(crate::Error::UnusableTarget(bad_api_usage!(
"Used unknown, unsupported, transport {:?} for a TCP connection.",
other,
)))
}
};
let protocol =
settings_to_protocol(self.proxy_version, encode_settings(pt_target.settings()))?;
Ok((
target.clone(),
connect_via_proxy(&self.runtime, &self.proxy_addr, &protocol, pt_target.addr()).await?,
))
}
}
#[cfg(feature = "pt-client")]
fn encode_settings<'a, IT>(settings: IT) -> String
where
IT: Iterator<Item = (&'a str, &'a str)>,
{
enum EscChar {
Backslash(char),
Literal(char),
Done,
}
impl EscChar {
fn new(ch: char, in_key: bool) -> Self {
match ch {
'\\' | ';' => EscChar::Backslash(ch),
'=' if in_key => EscChar::Backslash(ch),
_ => EscChar::Literal(ch),
}
}
}
impl Iterator for EscChar {
type Item = char;
fn next(&mut self) -> Option<Self::Item> {
match *self {
EscChar::Backslash(ch) => {
*self = EscChar::Literal(ch);
Some('\\')
}
EscChar::Literal(ch) => {
*self = EscChar::Done;
Some(ch)
}
EscChar::Done => None,
}
}
}
fn esc(s: &str, in_key: bool) -> impl Iterator<Item = char> + '_ {
s.chars().flat_map(move |c| EscChar::new(c, in_key))
}
let mut result = String::new();
for (k, v) in settings {
result.extend(esc(k, true));
result.push('=');
result.extend(esc(v, false));
result.push(';');
}
result.pop(); result
}
#[cfg(feature = "pt-client")]
pub fn settings_to_protocol(vers: SocksVersion, s: String) -> Result<Protocol, ProxyError> {
let mut bytes: Vec<_> = s.into();
Ok(if bytes.is_empty() {
Protocol::Socks(vers, SocksAuth::NoAuth)
} else if vers == SocksVersion::V4 {
if bytes.contains(&0) {
return Err(ProxyError::InvalidSocksRequest(
tor_socksproto::Error::NotImplemented(
"SOCKS 4 doesn't support internal NUL bytes (for PT settings list)".into(),
),
));
} else {
Protocol::Socks(SocksVersion::V4, SocksAuth::Socks4(bytes))
}
} else if bytes.len() <= 255 {
Protocol::Socks(SocksVersion::V5, SocksAuth::Username(bytes, vec![0]))
} else if bytes.len() <= (255 * 2) {
let password = bytes.split_off(255);
Protocol::Socks(SocksVersion::V5, SocksAuth::Username(bytes, password))
} else {
return Err(ProxyError::InvalidSocksRequest(
tor_socksproto::Error::NotImplemented("PT settings list too long for SOCKS 5".into()),
));
})
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
#[allow(unused_imports)]
use super::*;
#[cfg(feature = "pt-client")]
#[test]
fn setting_encoding() {
fn check(settings: Vec<(&str, &str)>, expected: &str) {
assert_eq!(encode_settings(settings.into_iter()), expected);
}
check(vec![], "");
check(vec![("hello", "world")], "hello=world");
check(
vec![("hey", "verden"), ("hello", "world")],
"hey=verden;hello=world",
);
check(
vec![("hey", "verden"), ("hello", "world"), ("selv", "tak")],
"hey=verden;hello=world;selv=tak",
);
check(
vec![("semi;colon", "equals=sign")],
r"semi\;colon=equals=sign",
);
check(
vec![("equals=sign", "semi;colon")],
r"equals\=sign=semi\;colon",
);
check(
vec![("semi;colon", "equals=sign"), ("also", "back\\slash")],
r"semi\;colon=equals=sign;also=back\\slash",
);
}
#[cfg(feature = "pt-client")]
#[test]
fn split_settings() {
use SocksVersion::*;
let long_string = "examplestrg".to_owned().repeat(50);
assert_eq!(long_string.len(), 550);
let sv = |v, a, b| settings_to_protocol(v, long_string[a..b].to_owned()).unwrap();
let s = |a, b| sv(V5, a, b);
let v = |a, b| long_string.as_bytes()[a..b].to_vec();
assert_eq!(s(0, 0), Protocol::Socks(V5, SocksAuth::NoAuth));
assert_eq!(
s(0, 50),
Protocol::Socks(V5, SocksAuth::Username(v(0, 50), vec![0]))
);
assert_eq!(
s(0, 255),
Protocol::Socks(V5, SocksAuth::Username(v(0, 255), vec![0]))
);
assert_eq!(
s(0, 256),
Protocol::Socks(V5, SocksAuth::Username(v(0, 255), v(255, 256)))
);
assert_eq!(
s(0, 300),
Protocol::Socks(V5, SocksAuth::Username(v(0, 255), v(255, 300)))
);
assert_eq!(
s(0, 510),
Protocol::Socks(V5, SocksAuth::Username(v(0, 255), v(255, 510)))
);
assert_eq!(
sv(V4, 0, 511),
Protocol::Socks(V4, SocksAuth::Socks4(v(0, 511)))
);
assert_eq!(
settings_to_protocol(V5, "\0".to_owned()).unwrap(),
Protocol::Socks(V5, SocksAuth::Username(vec![0], vec![0]))
);
assert_eq!(
settings_to_protocol(V5, "\0".to_owned().repeat(510)).unwrap(),
Protocol::Socks(V5, SocksAuth::Username(vec![0; 255], vec![0; 255]))
);
assert!(settings_to_protocol(V5, "\0".to_owned().repeat(511)).is_err());
assert!(settings_to_protocol(V5, long_string[0..512].to_owned()).is_err());
assert!(settings_to_protocol(V4, "\0".to_owned()).is_err());
}
}