erpc_scanner/
scanner.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! The module for [Scanner], necessary to attempt circuit builds
//!
//! This module only contains one type i.e [Scanner] and it's to be used where you need to
//! test where two hop circuit of given two relays gets built or not
//!
//! **X ---(hop 1)--- RELAY1 ---(hop 2)--- RELAY2**
//!
//! The [Scanner] is built to check if two relays are partitioned or not, on a high level, you push something called [IncompleteWork] using
//! the ```push_incomplete_work```, which is basically the Fingerprint of Relay1 and Relay2, whose two hop circuits we want to build, if this
//! two hop circuit get's built then **(AS OF RIGHT NOW)** we consider that relays is not partitioned and if it fails then (as of right now)
//! we consider that relays are partitioned (Of course direction is important here, that's why ```IncompleteWork``` has a source_relay and
//! destination_relay field)
//!
//! We initiate a [Scanner], with the ```no of clients``` we want, it basically means that those no of clients will attempt to create a circuit through the
//! ```CircuitBuilder``` at once, if they are all loaded with an ```IncompleteWork``` what happens underneath is that we spawn a single
//! ```arti_client::TorClient``` and we clone it's ```CircMgr``` and ```DirMgr```, which is used to build a type called ```NetDirProvider```, that has a broadcast
//! channel to give out a fresh ```Arc<NetDir>``` to it's subscribers, the ```Client```
//!
use crate::relay::NetDirProvider;

use super::{
    pool::ClientPool,
    work::{CompletedWork, IncompleteWork},
};
use anyhow::{anyhow, Result};
use std::sync::Arc;
use tor_circmgr::CircMgr;
use tor_rtcompat::PreferredRuntime;

/// A utility to test for two hop circuits
///
/// It's mainly used for completing the [IncompleteWork] into [CompletedWork] by attempting to create a circuit,
/// it distributes the [IncompleteWork] provided to it through the ```push_incomplete_work```
/// method, among all the ```Client``` it has spawned internally. These ```Client``` basically hold clones of ```CircMgr```
/// and underneath they call the method to build circuit through the ```CircuitBuilder``` abstracted by
/// ```CircMgr```
///
/// If we were to push 1000 [IncompleteWork] into internal channel of the [Scanner], then the speed
/// of completion of those work whould depend mainly on no of clients we spawned within the
/// [Scanner], which can be controlled by passing a ```u8``` value inside of ```Scanner::new()```
pub struct Scanner {
    /// Pool of all tor [Client]s(including the parent tor client)
    pool: ClientPool<PreferredRuntime>,
}

impl Scanner {
    /// Create a [Scanner], behind the scenes it creates a single arti ```TorClient``` and
    /// it's ```DirMgr```, ```CircMgr``` is used for all the so called ```Client```
    ///
    /// We only have to provide no of ```Client``` we wanna spin in the pool, just make sure it's not 0
    pub async fn new(
        no_of_parallel_circuit_builds: u32,
        netdir_provider: Arc<NetDirProvider>,
        circmgr: Arc<CircMgr<PreferredRuntime>>,
    ) -> Result<Self> {
        let pool = if no_of_parallel_circuit_builds < 1 {
            return Err(anyhow!(
                "You should provide atleast have one client to build"
            ));
        } else {
            ClientPool::new(
                no_of_parallel_circuit_builds,
                netdir_provider,
                circmgr,
            )
            .await?
        };

        let scanner = Scanner { pool };
        Ok(scanner)
    }

    /// Push [IncompleteWork] into the internal buffer of the [Scanner]
    pub fn push_incomplete_work(
        &self,
        work: IncompleteWork,
    ) -> anyhow::Result<()> {
        self.pool.push_incomplete_work(work)
    }

    /// Receive the [CompletedWork] over time from the internal buffer of the [Scanner]
    pub async fn recv_completed_work(&self) -> Option<CompletedWork> {
        self.pool.recv_completed_work().await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::work::CompletedWorkStatus;
    use arti_client::{TorClient, TorClientConfig};
    use serial_test::serial;
    use std::time::Duration;
    use tokio::time;

    #[tokio::test]
    #[should_panic]
    #[serial]
    async fn build_scanner_with_0_clients() {
        let tor_client_config = TorClientConfig::default();
        if let Ok(tor_client) =
            TorClient::create_bootstrapped(tor_client_config).await
        {
            let dirmgr = tor_client.dirmgr().clone();
            if let Ok(netdir_provider) =
                NetDirProvider::from_dirmgr(dirmgr).await
            {
                let circmgr = tor_client.circmgr().clone();
                let no_of_clients = 0;
                let _ = Scanner::new(no_of_clients, netdir_provider, circmgr)
                    .await
                    .unwrap();
            }
        }
    }

    #[tokio::test]
    #[serial]
    async fn build_scanner_with_high_number_of_clients() {
        let tor_client_config = TorClientConfig::default();
        let tor_client = TorClient::create_bootstrapped(tor_client_config)
            .await
            .unwrap();

        let dirmgr = tor_client.dirmgr().clone();
        let circmgr = tor_client.circmgr().clone();
        let netdir_provider =
            NetDirProvider::from_dirmgr(dirmgr.clone()).await.unwrap();

        let no_of_clients = 10000;
        let _ = Scanner::new(no_of_clients, netdir_provider, circmgr)
            .await
            .unwrap();
    }

    #[tokio::test]
    #[serial]
    async fn scanner_completes_all_assigned_number_of_works() {
        let tor_client_config = TorClientConfig::default();
        let tor_client = TorClient::create_bootstrapped(tor_client_config)
            .await
            .unwrap();

        let dirmgr = tor_client.dirmgr().clone();
        let circmgr = tor_client.circmgr().clone();
        let netdir_provider =
            NetDirProvider::from_dirmgr(dirmgr.clone()).await.unwrap();

        let no_of_clients = 10;
        let scanner = Scanner::new(no_of_clients, netdir_provider, circmgr)
            .await
            .unwrap();

        let x_y = IncompleteWork {
            source_relay: "$f956360aa5f1e61064e2671fe231c5064d2afead"
                .to_string(),
            destination_relay: "$ffb8295543e765fddcfe63ad63c10b307604f72c"
                .to_string(),
        };

        let y_x = IncompleteWork {
            source_relay: "$ffb8295543e765fddcfe63ad63c10b307604f72c"
                .to_string(),
            destination_relay: "$f956360aa5f1e61064e2671fe231c5064d2afead"
                .to_string(),
        };

        scanner.push_incomplete_work(x_y).unwrap();
        scanner.push_incomplete_work(y_x).unwrap();

        // Even waiting for upto 10 seconds, it should give us only 2 CompletedWork
        //
        // This proves that all the work assigned(only 2) was finished and no any extra
        // CompletedWork was given to us
        let mut completed_works = Vec::new();
        let _ = time::timeout(Duration::from_secs(20), async {
            while let Some(cw) = scanner.recv_completed_work().await {
                completed_works.push(cw);
            }
        })
        .await;

        assert_eq!(2, completed_works.len());
    }

    #[tokio::test]
    #[serial]
    async fn testing_for_circuit_which_doesnt_exist_in_netdir() {
        let tor_client_config = TorClientConfig::default();
        let tor_client = TorClient::create_bootstrapped(tor_client_config)
            .await
            .unwrap();

        let dirmgr = tor_client.dirmgr().clone();
        let circmgr = tor_client.circmgr().clone();
        let netdir_provider =
            NetDirProvider::from_dirmgr(dirmgr.clone()).await.unwrap();

        let no_of_clients = 10;
        let scanner = Scanner::new(no_of_clients, netdir_provider, circmgr)
            .await
            .unwrap();

        let x_y = IncompleteWork {
            source_relay: "XXXXX".to_string(),
            destination_relay: "YYYYY".to_string(),
        };

        scanner.push_incomplete_work(x_y.clone()).unwrap();

        // Even waiting for upto 10 seconds, it should give us only one CompletedWork
        // This proves that all the work assigned(only 1) was finished and no any extra
        // CompletedWork was given to us
        let mut completed_works = Vec::new();
        let _ = time::timeout(Duration::from_secs(10), async {
            while let Some(cw) = scanner.recv_completed_work().await {
                completed_works.push(cw);
            }
        })
        .await;

        assert_eq!(1, completed_works.len());
        assert_ne!(completed_works[0].status, CompletedWorkStatus::Success);
        assert!(x_y.is_source_relay(completed_works[0].source_relay.as_str()));
        assert!(x_y.is_destination_relay(
            completed_works[0].destination_relay.as_str()
        ));
    }
}