tor_chanmgr/
factory.rs

1//! Traits and code to define different mechanisms for building Channels to
2//! different kinds of targets.
3
4use std::sync::{Arc, Mutex};
5
6use crate::event::ChanMgrEventSender;
7use async_trait::async_trait;
8#[cfg(feature = "relay")]
9use safelog::Sensitive;
10use tor_error::{HasKind, HasRetryTime, internal};
11use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
12use tor_proto::channel::Channel;
13use tor_proto::memquota::ChannelAccount;
14use tracing::{debug, instrument};
15
16/// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress.
17///
18/// A future release of this crate might make this type less opaque.
19// FIXME(eta): Do that.
20#[derive(Clone)]
21pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
22
23impl BootstrapReporter {
24    #[cfg(test)]
25    /// Create a useless version of this type to satisfy some test.
26    pub(crate) fn fake() -> Self {
27        let (snd, _rcv) = crate::event::channel();
28        Self(Arc::new(Mutex::new(snd)))
29    }
30}
31
32/// An object that knows how to build `Channels` to `ChanTarget`s.
33///
34/// This trait must be object-safe.
35///
36/// Every [`ChanMgr`](crate::ChanMgr) has a `ChannelFactory` that it uses to
37/// construct all of its channels.
38///
39/// A `ChannelFactory` can be implemented in terms of a
40/// [`TransportImplHelper`](crate::transport::TransportImplHelper), by wrapping it in a
41/// `ChanBuilder`.
42///
43// FIXME(eta): Rectify the below situation.
44/// (In fact, as of the time of writing, this is the *only* way to implement this trait
45/// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter`
46/// is an opaque type.)
47#[async_trait]
48pub trait ChannelFactory: Send + Sync {
49    /// Open an authenticated channel to `target`.
50    ///
51    /// This method does does not necessarily handle retries or timeouts,
52    /// although some of its implementations may.
53    ///
54    /// This method does not necessarily handle every kind of transport. If the
55    /// caller provides a target with an unsupported
56    /// [`TransportId`](tor_linkspec::TransportId), this method should return
57    /// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport).
58    async fn connect_via_transport(
59        &self,
60        target: &OwnedChanTarget,
61        reporter: BootstrapReporter,
62        memquota: ChannelAccount,
63    ) -> crate::Result<Arc<Channel>>;
64}
65
66/// Similar to [`ChannelFactory`], but for building channels from incoming streams.
67// This is a separate trait since for some `ChannelFactory`s like the one returned from
68// `tor_ptmgr::PtMgr::factory_for_transport`, it doesn't make sense to deal with incoming streams
69// (all PT connections are outgoing).
70#[async_trait]
71pub trait IncomingChannelFactory: Send + Sync {
72    /// The type of byte stream that's required to build channels for incoming connections.
73    type Stream: Send + Sync + 'static;
74
75    /// Open a channel from `peer` with the given `stream`. The channel may or may not be
76    /// authenticated.
77    #[cfg(feature = "relay")]
78    async fn accept_from_transport(
79        &self,
80        peer: Sensitive<std::net::SocketAddr>,
81        stream: Self::Stream,
82        memquota: ChannelAccount,
83    ) -> crate::Result<Arc<Channel>>;
84}
85
86#[async_trait]
87impl<CF> crate::mgr::AbstractChannelFactory for CF
88where
89    CF: ChannelFactory + IncomingChannelFactory + Sync,
90{
91    type Channel = tor_proto::channel::Channel;
92    type BuildSpec = OwnedChanTarget;
93    type Stream = CF::Stream;
94
95    #[instrument(skip_all, level = "trace")]
96    async fn build_channel(
97        &self,
98        target: &Self::BuildSpec,
99        reporter: BootstrapReporter,
100        memquota: ChannelAccount,
101    ) -> crate::Result<Arc<Self::Channel>> {
102        debug!("Attempting to open a new channel to {target}");
103        self.connect_via_transport(target, reporter, memquota).await
104    }
105
106    #[cfg(feature = "relay")]
107    #[instrument(skip_all, level = "trace")]
108    async fn build_channel_using_incoming(
109        &self,
110        peer: Sensitive<std::net::SocketAddr>,
111        stream: Self::Stream,
112        memquota: ChannelAccount,
113    ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
114        debug!("Attempting to open a new channel from {peer}");
115        self.accept_from_transport(peer, stream, memquota).await
116    }
117}
118
119/// The error type returned by a pluggable transport manager.
120pub trait AbstractPtError:
121    std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
122{
123}
124
125/// A pluggable transport manager.
126///
127/// We can't directly reference the `PtMgr` type from `tor-ptmgr`, because of dependency resolution
128/// constraints, so this defines the interface for what one should look like.
129#[async_trait]
130pub trait AbstractPtMgr: Send + Sync {
131    /// Get a `ChannelFactory` for the provided `PtTransportName`.
132    async fn factory_for_transport(
133        &self,
134        transport: &PtTransportName,
135    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>;
136}
137
138#[async_trait]
139impl<P> AbstractPtMgr for Option<P>
140where
141    P: AbstractPtMgr,
142{
143    async fn factory_for_transport(
144        &self,
145        transport: &PtTransportName,
146    ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
147        match self {
148            Some(mgr) => mgr.factory_for_transport(transport).await,
149            None => Ok(None),
150        }
151    }
152}
153
154/// A ChannelFactory built from an optional PtMgr to use for pluggable transports, and a
155/// ChannelFactory to use for everything else.
156pub(crate) struct CompoundFactory<CF> {
157    #[cfg(feature = "pt-client")]
158    /// The PtMgr to use for pluggable transports
159    ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
160    /// The factory to use for everything else
161    default_factory: Arc<CF>,
162}
163
164impl<CF> Clone for CompoundFactory<CF> {
165    fn clone(&self) -> Self {
166        Self {
167            #[cfg(feature = "pt-client")]
168            ptmgr: self.ptmgr.as_ref().map(Arc::clone),
169            default_factory: Arc::clone(&self.default_factory),
170        }
171    }
172}
173
174#[async_trait]
175impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> {
176    #[instrument(skip_all, level = "trace")]
177    async fn connect_via_transport(
178        &self,
179        target: &OwnedChanTarget,
180        reporter: BootstrapReporter,
181        memquota: ChannelAccount,
182    ) -> crate::Result<Arc<Channel>> {
183        use tor_linkspec::ChannelMethod::*;
184        let factory = match target.chan_method() {
185            Direct(_) => self.default_factory.clone(),
186            #[cfg(feature = "pt-client")]
187            Pluggable(a) => match self.ptmgr.as_ref() {
188                Some(mgr) => mgr
189                    .factory_for_transport(a.transport())
190                    .await
191                    .map_err(crate::Error::Pt)?
192                    .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?,
193                None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())),
194            },
195            #[allow(unreachable_patterns)]
196            _ => {
197                return Err(crate::Error::Internal(internal!(
198                    "No support for channel method"
199                )));
200            }
201        };
202
203        factory
204            .connect_via_transport(target, reporter, memquota)
205            .await
206    }
207}
208
209#[async_trait]
210impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> {
211    type Stream = CF::Stream;
212
213    #[cfg(feature = "relay")]
214    async fn accept_from_transport(
215        &self,
216        peer: Sensitive<std::net::SocketAddr>,
217        stream: Self::Stream,
218        memquota: ChannelAccount,
219    ) -> crate::Result<Arc<Channel>> {
220        self.default_factory
221            .accept_from_transport(peer, stream, memquota)
222            .await
223    }
224}
225
226impl<CF: ChannelFactory + 'static> CompoundFactory<CF> {
227    /// Create a new `Factory` that will try to use `ptmgr` to handle pluggable
228    /// transports requests, and `default_factory` to handle everything else.
229    pub(crate) fn new(
230        default_factory: Arc<CF>,
231        #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
232    ) -> Self {
233        Self {
234            default_factory,
235            #[cfg(feature = "pt-client")]
236            ptmgr,
237        }
238    }
239
240    #[cfg(feature = "pt-client")]
241    /// Replace the PtMgr in this object.
242    pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) {
243        self.ptmgr = Some(ptmgr);
244    }
245}