erpc_scanner/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
use crate::{
    relay::{NetDirProvider, NetDirProviderEvent, RelaysPool},
    utils::CombinedUnboundedChannel,
    work::{CompletedWork, CompletedWorkStatus, IncompleteWork},
};
use crossbeam::atomic::AtomicCell;
use futures::stream::StreamExt;
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::{
    mpsc::{self, UnboundedSender},
    Mutex,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tor_chanmgr::ChannelUsage;
use tor_circmgr::{
    build::exit_circparams_from_netparams, path::TorPath, CircMgr,
};
use tor_netdir::{NetDir, Relay};
use tor_rtcompat::Runtime;

pub(crate) struct Client<R: Runtime> {
    // CircMgr used to build circuits
    circmgr: Arc<CircMgr<R>>,

    // The [NetDirProvider] to get the latest NetDir
    netdir_provider: Arc<NetDirProvider>,

    // Channel for getting the IncompleteWork from the pool
    incomplete_work_channel: Arc<CombinedUnboundedChannel<IncompleteWork>>,

    /// The stream to access the result of completed work given out by this [Client]
    output_stream: Mutex<UnboundedReceiverStream<CompletedWork>>,

    /// The current no of work left in the channel
    current_load: AtomicCell<u16>,
}

impl<R: Runtime> Client<R> {
    /// Creates a new `WorkerTorClient` instance with the provided `tor_client`.
    pub(crate) async fn new(
        circmgr: Arc<CircMgr<R>>,
        netdir_provider: Arc<NetDirProvider>,
        netdir: Arc<NetDir>,
    ) -> anyhow::Result<Arc<Self>> {
        // Channel for transferring the CompletedWork from client to the pool
        let (sd, rv) = mpsc::unbounded_channel();

        let client = Arc::new(Self {
            circmgr,
            netdir_provider,
            incomplete_work_channel: Arc::new(CombinedUnboundedChannel::<
                IncompleteWork,
            >::new()),
            output_stream: Mutex::new(UnboundedReceiverStream::new(rv)),
            current_load: AtomicCell::<u16>::new(0),
        });

        client.start_work(sd, netdir);
        Ok(client)
    }

    /// Pushes the [IncompleteWork] into the incomplete_work_channel
    pub(crate) fn push_incomplete_work(
        &self,
        work: &IncompleteWork,
    ) -> anyhow::Result<()> {
        self.incomplete_work_channel.sender.send(work.clone())?;

        // Increase the load counter by 1
        self.current_load.store(self.current_load.load() + 1);
        Ok(())
    }

    /// Receiving half for the CompletedWork
    pub(crate) async fn recv_completed_work(&self) -> Option<CompletedWork> {
        let mut output_stream = self.output_stream.lock().await;
        match output_stream.next().await {
            Some(completed_work) => {
                let current_load = self.current_load.load();
                if current_load > 0 {
                    self.current_load.store(self.current_load.load() - 1);
                }
                Some(completed_work)
            }
            None => None,
        }
    }

    /// Starts the work on the WorkerTorClient as a tokio task, the spawned "task",
    /// it listens for IncompleteWork in the ```self.incomplete_work_channel``` half
    /// and then performs the work and gives back the completed work to the internal
    /// channel of the WorkerTorClient through UnboundedSender<CompletedWork> ```self.output_stream```
    /// can be used to dequeue the incomplete work
    fn start_work(
        &self,
        sd: UnboundedSender<CompletedWork>,
        netdir: Arc<NetDir>,
    ) {
        let circmgr = self.circmgr.clone();
        let netdir_provider = self.netdir_provider.clone();

        // The channel where we will get IncompleteWork to work on
        let incomplete_work_channel = self.incomplete_work_channel.clone();

        tokio::spawn(async move {
            let mut current_netdir = netdir;
            let mut relays_pool = {
                let relays: Vec<(Relay<'_>, ())> =
                    current_netdir.relays().map(|relay| (relay, ())).collect();
                RelaysPool::from_relays(&relays)
            };
            let mut incomplete_work_receiver_guard =
                incomplete_work_channel.receiver.lock().await;
            let completed_work_sender = sd;

            let mut netdir_provider_event_receiver =
                netdir_provider.get_netdirprodiver_event_receiver();

            // Start receiving IncompleteWork provided by the pool
            while let Some(incomplete_work) =
                incomplete_work_receiver_guard.recv().await
            {
                // If there's a new NetDirProviderEvent, this loop prevents the IncompleteWork being lost,
                // this loop will run until there's an Error on receiving and is broken after
                // hadning over the work to one of the client
                'work_loss_prevention: loop {
                    // Check if a new NetDirProviderEvent has arrived
                    match netdir_provider_event_receiver.try_recv() {
                        // If there's a new NetDir we update the NetDir and the RelaysPool
                        Ok(netdirprovider_event) => match netdirprovider_event
                        {
                            NetDirProviderEvent::NetDirChanged(netdir) => {
                                current_netdir = netdir;
                                relays_pool = {
                                    let relays: Vec<(Relay<'_>, ())> =
                                        current_netdir
                                            .relays()
                                            .map(|relay| (relay, ()))
                                            .collect();
                                    RelaysPool::from_relays(&relays)
                                };
                            }
                        },
                        Err(_) => {
                            let source_relay = {
                                relays_pool
                                    .get_relay(&incomplete_work.source_relay)
                            };
                            let destination_relay = {
                                relays_pool.get_relay(
                                    &incomplete_work.destination_relay,
                                )
                            };

                            // The relays between whom we were requested for a circuit to be created in
                            // the IncompleteWork, we should check if they even exist in the current FRESH NetDir by
                            // looking at the current RelaysPool
                            //
                            // If either source or destination relay is not present, THEN (TODO)(Figure
                            // out a solution)

                            let timestamp =
                                chrono::Utc::now().timestamp() as u64;
                            let completed_work = if let (
                                Some(source_relay),
                                Some(destination_relay),
                            ) =
                                (source_relay, destination_relay)
                            {
                                let source_relay = source_relay.clone();
                                let destination_relay =
                                    destination_relay.clone();

                                let two_hop_path = TorPath::new_multihop([
                                    source_relay.0.clone(),
                                    destination_relay.0.clone(),
                                ]);
                                let net_params = current_netdir.params();
                                let circ_params =
                                    exit_circparams_from_netparams(net_params)
                                        .unwrap();
                                let circ_usage = ChannelUsage::UselessCircuit;

                                let completed_work_status = match circmgr
                                    .builder()
                                    .build(
                                        &two_hop_path,
                                        &circ_params,
                                        circ_usage,
                                    )
                                    .await
                                {
                                    Ok(circ) => {
                                        // Async order to shut down the circuit
                                        circ.terminate();
                                        CompletedWorkStatus::Success
                                    }
                                    Err(err) => CompletedWorkStatus::Failure(
                                        err.to_string(),
                                    ),
                                };
                                CompletedWork {
                                    source_relay: incomplete_work.source_relay,
                                    destination_relay: incomplete_work
                                        .destination_relay,
                                    timestamp,
                                    status: completed_work_status,
                                }
                            } else {
                                let failure = {
                                    let source_relay_presence =
                                        if source_relay.is_none() {
                                            "not present"
                                        } else {
                                            "present"
                                        };
                                    let destination_relay_presence =
                                        if destination_relay.is_none() {
                                            "not present"
                                        } else {
                                            "present"
                                        };
                                    // TODO: Specify someway to index the NetDir, either give info
                                    // about md consensus that NetDir was built from or the lifetime of
                                    // the NetDir
                                    format!(
                                        "The source relay was {} and destination relay was {} in the NetDir",
                                        source_relay_presence, destination_relay_presence
                                    )
                                };
                                CompletedWork {
                                    source_relay: incomplete_work.source_relay,
                                    destination_relay: incomplete_work
                                        .destination_relay,
                                    timestamp,
                                    status: CompletedWorkStatus::Failure(
                                        failure,
                                    ),
                                }
                            };
                            completed_work_sender
                                .send(completed_work)
                                .unwrap();
                            break 'work_loss_prevention;
                        }
                    }
                }
            }
        });
    }
}

/// The implementation of PartialEq only compares the load value of the WorkerTorClient, which
/// means two [Client]s are the same if they have the same  no of load, they are same in a
/// sense that they carry equal no of work to perform
impl<R: Runtime> PartialEq for Client<R> {
    fn eq(&self, other: &Self) -> bool {
        let current_load_left = self.current_load.load();
        let current_load_right = other.current_load.load();

        current_load_left == current_load_right
    }
}

impl<R: Runtime> Eq for Client<R> {}

// The implementation of PartialOrd also only compares the load value of the Client
impl<R: Runtime> PartialOrd for Client<R> {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl<R: Runtime> Ord for Client<R> {
    fn cmp(&self, other: &Self) -> Ordering {
        let current_load_left = self.current_load.load();
        let current_load_right = other.current_load.load();

        current_load_left.cmp(&current_load_right)
    }
}