erpc_metrics/
runner.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
//! Downloads the onionperf analysis file from date in in format of **YYYY-MM-DD**.
//! Eg(2023-10-21)
//!
//! It generates the URL
//! https://op-de7a.onionperf.torproject.net:8443/htdocs/2023-08-05.onionperf.analysis.json.xz
//! **or**
//! https://op-de7a.onionperf.torproject.net:8443/2023-08-05.onionperf.analysis.json.xz
//!
//! Only one of the two url works

use super::model::OnionPerfAnalysis;
use super::utils::decompress_xz;
use chrono::{DateTime, Datelike, Days, Utc};
use log::error;
use reqwest::{StatusCode, Url};
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{oneshot, Mutex};
use tokio::time::sleep;
use tokio_stream::wrappers::UnboundedReceiverStream;

// The paths to lookup for onionperfs analysis file
const ONION_PERF_ANALYAIS_PATHS: [&str; 2] = ["/htdocs/", "/"];

// The interval (hour) to recheck for existence of the file
const RECHECK_INTERVAL: u64 = 2;

/// The client to connect to a OnionPerfRunner Host and get the most recent OnionPerf Analysis file
/// at certain intervals
pub struct OnionPerfRunner {
    /// Name of the host **Eg. "op-de7a"**
    pub host_name: String,

    /// The UTC time(only day) at which the OnionPerf analysis file have been already checked and need not to be checked again
    already_checked: Arc<Mutex<Vec<DateTime<Utc>>>>,

    /// The sender half to send the OnionPerfAnalysis files
    onionperf_analysis_sender: UnboundedSender<OnionPerfAnalysis>,

    /// The receiver stream to receive the OnionPerfAnalysis files
    pub onionperf_analysis_receiver_stream:
        Arc<Mutex<UnboundedReceiverStream<OnionPerfAnalysis>>>,

    // The sender to terminate a running OnionPerfRunner
    onionperf_terminate_sender: Mutex<Option<oneshot::Sender<()>>>,

    // The receiver to receive signal to terminate a running OnionPerfRunner
    onionperf_terminate_receiver: Mutex<oneshot::Receiver<()>>,
}

impl OnionPerfRunner {
    /// Create a new client to get data from OnionPerfRunner Host
    pub fn new<T: AsRef<str>>(host_name: T) -> Self {
        let already_checked = Arc::default();
        let (sd, rv) =
            tokio::sync::mpsc::unbounded_channel::<OnionPerfAnalysis>();
        let onionperf_analysis_receiver_stream =
            Arc::new(Mutex::new(UnboundedReceiverStream::new(rv)));
        let (onionperf_terminate_sender, onionperf_terminate_receiver) =
            oneshot::channel();
        let onionperf_terminate_receiver =
            Mutex::new(onionperf_terminate_receiver);
        let onionperf_terminate_sender =
            Mutex::new(Some(onionperf_terminate_sender));

        OnionPerfRunner {
            host_name: String::from(host_name.as_ref()),
            onionperf_analysis_sender: sd,
            onionperf_analysis_receiver_stream,
            already_checked,
            onionperf_terminate_sender,
            onionperf_terminate_receiver,
        }
    }

    async fn process_one_try(
        &self,
        current_utc_time: &mut DateTime<Utc>,
        current_utc_date: &str,
        tries: &mut i32,
        onionperf_analysis_file: Result<Vec<u8>, OnionPerfRunnerError>,
    ) -> OneTryResult {
        match onionperf_analysis_file {
            Ok(onionperf_analysis_file_compressed) => {
                match decompress_xz(&onionperf_analysis_file_compressed).await
                {
                    Ok(onionperf_analysis_file_decompressed) => {
                        // Parsing the OnionPerf analysis file
                        let onionperf_analysis =
                            serde_json::from_slice::<OnionPerfAnalysis>(
                                &onionperf_analysis_file_decompressed[..],
                            );

                        match onionperf_analysis {
                            Ok(mut v) => {
                                v.time = Some(current_utc_date.to_owned());
                                let _ = self.onionperf_analysis_sender.send(v);

                                let mut already_checked: tokio::sync::MutexGuard<'_, Vec<DateTime<Utc>>> = self.already_checked.lock().await;
                                already_checked.push(*current_utc_time);
                                return OneTryResult::ReceivedAndParsed;
                            }
                            Err(e) => {
                                // If there was parsing issue we'll download again
                                log::error!("There was error on parsing OnionPerfAnalysis JSON file at tries no {tries} {:?}", e);
                                *tries += 1;
                            }
                        }
                    }
                    Err(e) => {
                        log::error!(
                        "There was error on decompressing the xz compressed OnionPerfAnalysis JSON file aat tries no {tries} {:?}",
                        e
                    );
                        *tries += 1;
                        // If there was decompression issue then we'll download the file
                        // with same date
                    }
                }
            }
            Err(onionperfrunner_error) => {
                match onionperfrunner_error {
                    // Reasons :
                    // - We're checking the wrong date(usually too ahead of time), for that we can reduce the day by 1 (TODO : add a guard on how far back we can go by days)
                    // - The file has (actually not been published, how do we determine that?) and we need to wait
                    // for the files by the onionperf server to publish(TODO: Imp!!!!)
                    OnionPerfRunnerError::OnionPerfAnalysisFileNotFound => {
                        log::warn!(
                            "The OnionPerfAnalysis file was not found, it mostly due to file not being available at the link at this time 
                                    and server throwing 404 errors"
                        );

                        *current_utc_time = current_utc_time
                            .checked_sub_days(Days::new(1))
                            .unwrap();
                        let checked = {
                            let already_checked =
                                self.already_checked.lock().await;

                            // We don't try to download again if we have already downloaded the ones whose "day" matches
                            already_checked.iter().any(
                                |already_checked_utc_time| {
                                    already_checked_utc_time.day()
                                        == current_utc_time.day()
                                },
                            )
                        };

                        // If the date has been checked then we go back to where we were
                        // left at before
                        if checked {
                            *current_utc_time = current_utc_time
                                .checked_add_days(Days::new(1))
                                .unwrap();
                        }
                        return OneTryResult::NotFound;
                    }
                    OnionPerfRunnerError::OnionPerfAnalysisNetworkError => {
                        log::error!("Network error has occured");
                    }
                }
            }
        };
        OneTryResult::Continue
    }

    // Start downloading data from the OnionPerfRunner hosts
    pub async fn start(&self) {
        let mut current_utc_time = Utc::now();
        let mut terminate_receiver_guard =
            self.onionperf_terminate_receiver.lock().await;
        'onionperf_running_main_loop: loop {
            let checked = {
                let already_checked = self.already_checked.lock().await;

                // We don't try to download again if we have already downloaded the ones whose "day" matches
                already_checked.iter().any(|already_checked_utc_time| {
                    already_checked_utc_time.day() == current_utc_time.day()
                })
            };

            if !checked {
                let current_utc_date = self.generate_date(current_utc_time);
                let mut tries = 0;

                // Loop for retrying the failed item due to decompresssion issue or parsing issue
                'decompression_parsing_error_try_iteration: loop {
                    let onionpef_analysis_file_future = self
                        .download_onionperf_analysis_file(
                            current_utc_date.clone(),
                        );
                    let onionperf_analysis_file;

                    // Terminate the main loop if received stop signal
                    select! {
                        file = onionpef_analysis_file_future => {
                            onionperf_analysis_file = file;
                        }
                        _ = &mut *terminate_receiver_guard => {
                            break 'onionperf_running_main_loop;
                        }
                    }

                    let res = self
                        .process_one_try(
                            &mut current_utc_time,
                            &current_utc_date,
                            &mut tries,
                            onionperf_analysis_file,
                        )
                        .await;

                    if let OneTryResult::NotFound
                    | OneTryResult::ReceivedAndParsed = res
                    {
                        break 'decompression_parsing_error_try_iteration;
                    }

                    // We'll make max of 5 iterations
                    if tries >= 5 {
                        // Terminate the main loop if received stop signal while wating
                        tokio::select! {
                            _ = &mut *terminate_receiver_guard => {
                                break 'onionperf_running_main_loop;
                            }
                            _ = sleep(Duration::from_secs(RECHECK_INTERVAL * 60 * 60)) => {
                                break 'decompression_parsing_error_try_iteration;
                            }
                        }
                    }
                }
            } else {
                // Sleep for 2 hours if the latest UTC Time data was also already fetched
                // Terminate the main loop if received stop signal while wating
                tokio::select! {
                    _ = &mut *terminate_receiver_guard => {
                        break 'onionperf_running_main_loop;
                    }
                    _ = sleep(Duration::from_secs(RECHECK_INTERVAL * 60 * 60)) => ()
                }

                // We'll increase the time by 1 day, because the current date has been already
                // checked
                current_utc_time =
                    current_utc_time.checked_add_days(Days::new(1)).unwrap();
            }
        }
    }

    /// Stop the running loop in start().
    /// If a analysis file has already download, it will stop after the current process has been down.
    /// It will stop immediately if you try to start again after stop.
    /// The runner will do nothing if you start after stop.
    pub async fn stop(&self) {
        let mut terminate_sender_guard =
            self.onionperf_terminate_sender.lock().await;
        if let Some(sender) = (*terminate_sender_guard).take() {
            let sd_res = sender.send(());
            if let Err(e) = sd_res {
                error!("Error occurred when sending OnionPerfRunner stop signal. {:?}", e);
            }
        }
    }

    /// Downloads and decompresses the onionperf analysis file that's in ".xz" form
    async fn download_onionperf_analysis_file(
        &self,
        date: String,
    ) -> Result<Vec<u8>, OnionPerfRunnerError> {
        let (url_1, url_2) = self.generate_onionperf_analysis_urls(date);

        let reqwest_client = reqwest::Client::new();

        let resp_1 = reqwest_client.get(url_1).send().await;
        let resp_2 = reqwest_client.get(url_2).send().await;

        if let Ok(resp) = resp_1 {
            if resp.status() == StatusCode::OK {
                let data = resp.bytes().await.unwrap().into();
                return Ok(data);
            }
        }

        match resp_2 {
            Ok(resp) => {
                if resp.status() == StatusCode::OK {
                    let data = resp.bytes().await.unwrap().into();
                    Ok(data)
                } else {
                    Err(OnionPerfRunnerError::OnionPerfAnalysisFileNotFound)
                }
            }
            Err(_) => {
                // TODO : LOg error
                Err(OnionPerfRunnerError::OnionPerfAnalysisNetworkError)
            }
        }
    }

    /// The given UTC time in DATE format of "YYYY-MM-DD". Eg(2023-10-21)
    fn generate_date(&self, time: DateTime<Utc>) -> String {
        let year = time.year();
        let month = time.month();
        let day = time.day();

        let date = format!("{year}-{month:02}-{day:02}");
        date
    }

    /// Generate URLs for downloading the OnionPerf analysis file according to the current UTC date
    fn generate_onionperf_analysis_urls(&self, date: String) -> (Url, Url) {
        let (url_1, url_2) = {
            let url_1 = format!(
                "https://{}.onionperf.torproject.net:8443{}{}",
                self.host_name,
                ONION_PERF_ANALYAIS_PATHS[0],
                format_args!("{}.onionperf.analysis.json.xz", date)
            );
            let url_2 = format!(
                "https://{}.onionperf.torproject.net:8443{}{}",
                self.host_name,
                ONION_PERF_ANALYAIS_PATHS[1],
                format_args!("{}.onionperf.analysis.json.xz", date)
            );
            log::debug!("Trying to download onionperf analysis either from {url_1} or {url_2}");

            (Url::parse(&url_1), Url::parse(&url_2))
        };

        (url_1.unwrap(), url_2.unwrap())
    }
}

#[derive(Debug, thiserror::Error)]
enum OnionPerfRunnerError {
    #[error("The onion perf analysis file could not found at both links")]
    OnionPerfAnalysisFileNotFound,

    #[error("Can't connect to both links")]
    OnionPerfAnalysisNetworkError,
}

#[derive(Debug)]
enum OneTryResult {
    Continue,
    ReceivedAndParsed,
    NotFound,
}

#[cfg(test)]
mod tests {
    use super::OnionPerfRunner;
    use crate::runner::OneTryResult;
    use chrono::{TimeZone, Utc};
    use serial_test::serial;
    use std::{sync::Arc, time::Duration};
    use tokio::{
        sync::oneshot,
        time::{self, timeout},
    };
    use tokio_stream::StreamExt;

    #[tokio::test]
    #[serial]
    async fn test_received_and_parsed() {
        let runner = OnionPerfRunner::new("op-de8a");
        let mut utc_time = Utc.with_ymd_and_hms(2025, 3, 31, 0, 0, 0).unwrap();
        let utc_date = runner.generate_date(utc_time);
        let mut tries = 0;
        let onionperf_analysis_file = runner
            .download_onionperf_analysis_file(utc_date.clone())
            .await;
        assert!(
            onionperf_analysis_file.is_ok(),
            "Failed to download the analysis file."
        );
        let res = runner
            .process_one_try(
                &mut utc_time,
                &utc_date,
                &mut tries,
                onionperf_analysis_file,
            )
            .await;
        assert_eq!(
            tries, 0,
            "Unexpected tries count change. Expected: 0, Actual: {}",
            tries
        );
        assert!(
            matches!(res, OneTryResult::ReceivedAndParsed),
            "Failed to receive or parse. Result: {:?}",
            res
        );
        let mut rv = runner.onionperf_analysis_receiver_stream.lock().await;
        let mut has_result = false;
        tokio::select! {
            _ = (*rv).next() => has_result = true,
            _ = time::sleep(Duration::from_secs(1)) => ()
        }
        assert!(has_result, "No parsed analysis in stream.");
    }

    #[tokio::test]
    #[serial]
    async fn test_not_found() {
        let runner = OnionPerfRunner::new("op-de8a");
        let mut utc_time = Utc.with_ymd_and_hms(2077, 4, 1, 0, 0, 0).unwrap();
        let utc_date = runner.generate_date(utc_time);
        let mut tries = 0;
        let onionperf_analysis_file = runner
            .download_onionperf_analysis_file(utc_date.clone())
            .await;
        let res = runner
            .process_one_try(
                &mut utc_time,
                &utc_date,
                &mut tries,
                onionperf_analysis_file,
            )
            .await;
        assert_eq!(
            tries, 0,
            "Unexpected tries count change. Expected: 0, Actual: {}",
            tries
        );
        assert_eq!(
            utc_time,
            Utc.with_ymd_and_hms(2077, 3, 31, 0, 0, 0).unwrap(),
            "Time didn't sub a day. Expected: 2077-03-31, Actual: {}",
            utc_time.format("%Y-%m-%d")
        );
        assert!(
            matches!(res, OneTryResult::NotFound),
            "Expected NotFound result. Actual: {:?}",
            res
        );
    }

    #[tokio::test]
    #[should_panic]
    async fn test_sleeping() {
        let runner = OnionPerfRunner::new("op-de8a");
        let utc_time = Utc::now();
        // Drop the MutexGuard after modified already_checked.
        {
            let mut checked_guard = runner.already_checked.lock().await;
            (*checked_guard).push(utc_time);
        }
        timeout(Duration::from_secs(30), async move {
            runner.start().await;
        })
        .await
        .unwrap();
    }

    #[tokio::test]
    async fn test_stop() {
        let runner = Arc::new(OnionPerfRunner::new("op-de8a"));
        let utc_time = Utc::now();
        // Drop the MutexGuard after modified already_checked.
        {
            let mut checked_guard = runner.already_checked.lock().await;
            (*checked_guard).push(utc_time);
        }
        let runner_cloned = runner.clone();
        let (stopped, rx) = oneshot::channel();
        let handle = tokio::spawn(async move {
            runner_cloned.start().await;
            stopped.send(()).unwrap();
        });
        runner.stop().await;
        timeout(Duration::from_secs(30), async move {
            match rx.await {
                Ok(_) => (),
                Err(_) => panic!(),
            }
            handle.await.unwrap();
        })
        .await
        .unwrap();
    }
}