1use std::sync::{Arc, Mutex};
5
6use crate::event::ChanMgrEventSender;
7use async_trait::async_trait;
8#[cfg(feature = "relay")]
9use safelog::Sensitive;
10use tor_error::{HasKind, HasRetryTime, internal};
11use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
12use tor_proto::channel::Channel;
13use tor_proto::memquota::ChannelAccount;
14use tracing::{debug, instrument};
15
16#[derive(Clone)]
21pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
22
23impl BootstrapReporter {
24 #[cfg(test)]
25 pub(crate) fn fake() -> Self {
27 let (snd, _rcv) = crate::event::channel();
28 Self(Arc::new(Mutex::new(snd)))
29 }
30}
31
32#[async_trait]
48pub trait ChannelFactory: Send + Sync {
49 async fn connect_via_transport(
59 &self,
60 target: &OwnedChanTarget,
61 reporter: BootstrapReporter,
62 memquota: ChannelAccount,
63 ) -> crate::Result<Arc<Channel>>;
64}
65
66#[async_trait]
71pub trait IncomingChannelFactory: Send + Sync {
72 type Stream: Send + Sync + 'static;
74
75 #[cfg(feature = "relay")]
78 async fn accept_from_transport(
79 &self,
80 peer: Sensitive<std::net::SocketAddr>,
81 stream: Self::Stream,
82 memquota: ChannelAccount,
83 ) -> crate::Result<Arc<Channel>>;
84}
85
86#[async_trait]
87impl<CF> crate::mgr::AbstractChannelFactory for CF
88where
89 CF: ChannelFactory + IncomingChannelFactory + Sync,
90{
91 type Channel = tor_proto::channel::Channel;
92 type BuildSpec = OwnedChanTarget;
93 type Stream = CF::Stream;
94
95 #[instrument(skip_all, level = "trace")]
96 async fn build_channel(
97 &self,
98 target: &Self::BuildSpec,
99 reporter: BootstrapReporter,
100 memquota: ChannelAccount,
101 ) -> crate::Result<Arc<Self::Channel>> {
102 debug!("Attempting to open a new channel to {target}");
103 self.connect_via_transport(target, reporter, memquota).await
104 }
105
106 #[cfg(feature = "relay")]
107 #[instrument(skip_all, level = "trace")]
108 async fn build_channel_using_incoming(
109 &self,
110 peer: Sensitive<std::net::SocketAddr>,
111 stream: Self::Stream,
112 memquota: ChannelAccount,
113 ) -> crate::Result<Arc<tor_proto::channel::Channel>> {
114 debug!("Attempting to open a new channel from {peer}");
115 self.accept_from_transport(peer, stream, memquota).await
116 }
117}
118
119pub trait AbstractPtError:
121 std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
122{
123}
124
125#[async_trait]
130pub trait AbstractPtMgr: Send + Sync {
131 async fn factory_for_transport(
133 &self,
134 transport: &PtTransportName,
135 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>>;
136}
137
138#[async_trait]
139impl<P> AbstractPtMgr for Option<P>
140where
141 P: AbstractPtMgr,
142{
143 async fn factory_for_transport(
144 &self,
145 transport: &PtTransportName,
146 ) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
147 match self {
148 Some(mgr) => mgr.factory_for_transport(transport).await,
149 None => Ok(None),
150 }
151 }
152}
153
154pub(crate) struct CompoundFactory<CF> {
157 #[cfg(feature = "pt-client")]
158 ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
160 default_factory: Arc<CF>,
162}
163
164impl<CF> Clone for CompoundFactory<CF> {
165 fn clone(&self) -> Self {
166 Self {
167 #[cfg(feature = "pt-client")]
168 ptmgr: self.ptmgr.as_ref().map(Arc::clone),
169 default_factory: Arc::clone(&self.default_factory),
170 }
171 }
172}
173
174#[async_trait]
175impl<CF: ChannelFactory> ChannelFactory for CompoundFactory<CF> {
176 #[instrument(skip_all, level = "trace")]
177 async fn connect_via_transport(
178 &self,
179 target: &OwnedChanTarget,
180 reporter: BootstrapReporter,
181 memquota: ChannelAccount,
182 ) -> crate::Result<Arc<Channel>> {
183 use tor_linkspec::ChannelMethod::*;
184 let factory = match target.chan_method() {
185 Direct(_) => self.default_factory.clone(),
186 #[cfg(feature = "pt-client")]
187 Pluggable(a) => match self.ptmgr.as_ref() {
188 Some(mgr) => mgr
189 .factory_for_transport(a.transport())
190 .await
191 .map_err(crate::Error::Pt)?
192 .ok_or_else(|| crate::Error::NoSuchTransport(a.transport().clone().into()))?,
193 None => return Err(crate::Error::NoSuchTransport(a.transport().clone().into())),
194 },
195 #[allow(unreachable_patterns)]
196 _ => {
197 return Err(crate::Error::Internal(internal!(
198 "No support for channel method"
199 )));
200 }
201 };
202
203 factory
204 .connect_via_transport(target, reporter, memquota)
205 .await
206 }
207}
208
209#[async_trait]
210impl<CF: IncomingChannelFactory> IncomingChannelFactory for CompoundFactory<CF> {
211 type Stream = CF::Stream;
212
213 #[cfg(feature = "relay")]
214 async fn accept_from_transport(
215 &self,
216 peer: Sensitive<std::net::SocketAddr>,
217 stream: Self::Stream,
218 memquota: ChannelAccount,
219 ) -> crate::Result<Arc<Channel>> {
220 self.default_factory
221 .accept_from_transport(peer, stream, memquota)
222 .await
223 }
224}
225
226impl<CF: ChannelFactory + 'static> CompoundFactory<CF> {
227 pub(crate) fn new(
230 default_factory: Arc<CF>,
231 #[cfg(feature = "pt-client")] ptmgr: Option<Arc<dyn AbstractPtMgr + 'static>>,
232 ) -> Self {
233 Self {
234 default_factory,
235 #[cfg(feature = "pt-client")]
236 ptmgr,
237 }
238 }
239
240 #[cfg(feature = "pt-client")]
241 pub(crate) fn replace_ptmgr(&mut self, ptmgr: Arc<dyn AbstractPtMgr + 'static>) {
243 self.ptmgr = Some(ptmgr);
244 }
245}