primary/primary/
tor_network.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
//! This moduale contains code necessary to
//! - Store the circuit data in a Graph Database
//! - Create the graph of the Tor Network
//! - Create ```IncompleteWork```
//!
//! The approach is that, we have a struct called [TorNetwork] that basically stores the graph of the
//! entire TorNetwork
//!
//! The Vertex of this graph represents a [Node] that basically stores the fingerprint of the Relay
// The following line avoids clippy error `mutable_key_type`
// (https://rust-lang.github.io/rust-clippy/master/index.html#/mutable_key_type)
// Check and change if needed `HashSet` for 'already_used_as_exit_hasmap` and
// `nodes_set`
use super::config::Sqlite3Config;
use super::db::neo4j::{Neo4jDbClient, RelayMetadata};
use super::db::sqlite3::Sqlite3DbClient;
use super::db::sqlite3::Sqlite3DbResumeClient;
use super::{
    config::PrimaryWorkerConfig,
    relay::{Node, NodeStatus},
};
use crossbeam::atomic::AtomicCell;
use erpc_scanner::work::CompletedWorkStatus;
use erpc_scanner::{
    relay::{NetDirProvider, NetDirProviderEvent, RelaysPool},
    work::{CompletedWork, IncompleteWork},
};
use humantime::format_rfc3339;
use log::{error, info, trace};
use petgraph::graph::DiGraph;
use petgraph::graph::NodeIndex;
use reqwest;
use serde::{Deserialize, Serialize};
use serde_json;
use std::collections::HashSet;
use std::sync::OnceLock;
use std::{
    collections::HashMap,
    sync::Arc,
    time::{Duration, Instant},
};
use tokio::sync::{
    mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
    oneshot, Mutex, RwLock,
};
use tor_netdir::{Relay, SubnetConfig};
use tor_netdoc::doc::netstatus::RelayWeight;

type FingerprintNodeHashmap = HashMap<String, (Arc<Node>, NodeIndex)>;

/// Onionoo relay details structure matching the API response
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct OnionooRelay {
    pub fingerprint: Option<String>,
    pub country: Option<String>,
    #[serde(rename = "as")]
    pub asn_string: Option<String>,
    pub flags: Option<Vec<String>>,
    pub consensus_weight: Option<u64>,
    pub effective_family: Option<Vec<String>>,
    pub observed_bandwidth: Option<u64>,
    pub guard_probability: Option<f64>,
    pub middle_probability: Option<f64>,
    pub exit_probability: Option<f64>,
}

/// Onionoo bulk response structure
#[derive(Debug, Deserialize)]
pub struct OnionooResponse {
    pub relays: Vec<OnionooRelay>,
}

/// Cache for Onionoo relay data indexed by fingerprint
type OnionooCache = HashMap<String, OnionooRelay>;

/// Global Onionoo data cache - loaded once and reused
static ONIONOO_CACHE: OnceLock<Option<OnionooCache>> = OnceLock::new();

/// Download fresh Onionoo relay data
async fn download_fresh_onionoo_data() -> anyhow::Result<()> {
    info!("Downloading fresh Onionoo relay data...");

    // Fetch Onionoo data
    fetch_onionoo_data().await?;

    Ok(())
}

/// Fetch Onionoo relay data from the official API
async fn fetch_onionoo_data() -> anyhow::Result<()> {
    info!("Fetching Onionoo relay data from official API...");

    let client = reqwest::Client::builder()
        .user_agent("eRPC/1.0 (Tor Network Analysis Tool)")
        // 2 minutes timeout for large response
        .timeout(std::time::Duration::from_secs(120))
        .build()?;

    // Fetch details data
    let onionoo_url = "https://onionoo.torproject.org/details";
    let details_response =
        client.get(onionoo_url).send().await.map_err(|e| {
            anyhow::anyhow!("Failed to fetch Onionoo details: {}", e)
        })?;

    if !details_response.status().is_success() {
        return Err(anyhow::anyhow!(
            "Failed to fetch Onionoo details: HTTP {}",
            details_response.status()
        ));
    }

    let details_text = details_response.text().await.map_err(|e| {
        anyhow::anyhow!("Failed to read Onionoo details response: {}", e)
    })?;

    // Parse JSON response
    let details_response: OnionooResponse =
        serde_json::from_str(&details_text).map_err(|e| {
            anyhow::anyhow!("Failed to parse Onionoo details JSON: {}", e)
        })?;

    info!(
        "Successfully parsed Onionoo data: {} relays",
        details_response.relays.len()
    );

    // Save raw data to cache file for debugging/backup
    tokio::fs::create_dir_all("./onionoo_cache")
        .await
        .map_err(|e| {
            anyhow::anyhow!("Failed to create cache directory: {}", e)
        })?;

    tokio::fs::write("./onionoo_cache/onionoo_data.json", &details_text)
        .await
        .map_err(|e| {
            anyhow::anyhow!("Failed to write details cache file: {}", e)
        })?;

    let cache: HashMap<String, OnionooRelay> = details_response
        .relays
        .into_iter()
        .filter_map(|relay| {
            if let Some(ref fingerprint) = relay.fingerprint {
                Some((fingerprint.clone(), relay))
            } else {
                None
            }
        })
        .collect();

    info!("Cached {} relay entries", cache.len());

    Ok(())
}

/// Initialize Onionoo cache from downloaded data
async fn init_onionoo_cache() -> Option<OnionooCache> {
    info!("Initializing Onionoo data cache...");

    let cache_file = "./onionoo_cache/onionoo_data.json";

    // Try to load from cache file first
    if std::path::Path::new(cache_file).exists() {
        match tokio::fs::read_to_string(cache_file).await {
            Ok(content) => {
                match serde_json::from_str::<OnionooResponse>(&content) {
                    Ok(onionoo_response) => {
                        let mut cache = HashMap::new();
                        for relay in onionoo_response.relays {
                            if let Some(fingerprint) = &relay.fingerprint {
                                cache.insert(fingerprint.clone(), relay);
                            }
                        }
                        info!(
                            "Onionoo cache initialized successfully with {} relays",
                            cache.len()
                        );
                        return Some(cache);
                    }
                    Err(e) => {
                        error!("Failed to parse cached Onionoo data: {}", e);
                        error!(
                            "Please ensure relay_metadata_enrichment_enabled \
                            is set to true to download fresh data"
                        );
                    }
                }
            }
            Err(e) => {
                error!("Failed to read cached Onionoo data: {}", e);
                error!(
                    "Please ensure relay_metadata_enrichment_enabled \
                    is set to true to download fresh data"
                );
            }
        }
    } else {
        error!("Onionoo cache file not found: {}", cache_file);
        error!(
            "Please ensure relay_metadata_enrichment_enabled \
            is set to true to download data"
        );
    }

    None
}

/// Get cached Onionoo data, initializing if needed
fn get_onionoo_cache() -> Option<&'static OnionooCache> {
    ONIONOO_CACHE
        .get_or_init(|| {
            tokio::task::block_in_place(|| {
                tokio::runtime::Handle::current()
                    .block_on(init_onionoo_cache())
            })
        })
        .as_ref()
}

/// Parse ASN from Onionoo AS string (e.g., "AS13335" -> 13335)
fn parse_asn_from_string(asn_string: &str) -> Option<u32> {
    if let Some(stripped) = asn_string.strip_prefix("AS") {
        stripped.parse().ok()
    } else {
        asn_string.parse().ok()
    }
}

/// Normalize fingerprint format for Onionoo lookup
/// Converts from Tor consensus format (e.g., "$AA51C355..." or "aa51c355...")
/// to Onionoo format ("AA51C355..." - uppercase, no $ prefix)
fn normalize_fingerprint_for_onionoo(fingerprint: &str) -> String {
    fingerprint.trim_start_matches('$').to_uppercase()
}

/// Fast metadata lookup using Onionoo cached data
/// Returns relay metadata from Onionoo data using fingerprint lookup
fn lookup_relay_metadata_by_fingerprint(
    fingerprint: &str,
) -> Option<&'static OnionooRelay> {
    // Normalize fingerprint format for Onionoo lookup
    let normalized_fingerprint =
        normalize_fingerprint_for_onionoo(fingerprint);

    if let Some(cache) = get_onionoo_cache() {
        match cache.get(&normalized_fingerprint) {
            Some(relay_data) => {
                trace!(
                    "Onionoo lookup for {} (normalized: {}): found relay \
                    data with country={:?}, ASN={:?}",
                    fingerprint,
                    normalized_fingerprint,
                    relay_data.country,
                    relay_data.asn_string
                );
                Some(relay_data)
            }
            None => {
                trace!(
                    "Onionoo lookup for {} (normalized: {}): no data found \
                    in cache of {} entries",
                    fingerprint,
                    normalized_fingerprint,
                    cache.len()
                );
                None
            }
        }
    } else {
        trace!(
            "No Onionoo cache available for fingerprint: {}",
            fingerprint
        );
        None
    }
}

/// A Datastructure to represent the Tor Network in a graph
pub struct TorNetwork {
    /// The graph to store the entire Tor Network and all the operations we are performing
    /// within the Tor Network
    ///
    /// NOTE: It's not useful(enough) right now, but in the FUTURE,
    /// it can be used to sync the database and the graph i.e if any data is missing in the
    /// database at the end of the scan it can be used to validate the database
    graph: Arc<RwLock<DiGraph<Arc<Node>, Edge>>>,

    /// The Receiver half to use when we want to receive a new NetDir
    netdir_provider: Arc<NetDirProvider>,

    /// The sending half to use when we want to send ```IncompleteWork```
    incomplete_work_sender: UnboundedSender<IncompleteWork>,

    /// The receiving half to use when we want to receive a ```IncompleteWork``` produced
    pub incomplete_work_receiver:
        Arc<Mutex<UnboundedReceiver<IncompleteWork>>>,

    /// The sending half to use when we have to send a ```CompletedWork``` to be used within
    /// [TorNetwork]
    pub completed_work_sender: UnboundedSender<CompletedWork>,

    /// The receiving half to use when we have to receive a ```CompletedWork```
    completed_work_receiver: Arc<Mutex<UnboundedReceiver<CompletedWork>>>,

    /// Client to the Neo4j Graph Database
    neo4j_client: Option<Arc<Neo4jDbClient>>,

    /// A [Sqlite3DbClient], which is a wrapper around Sqlite3 connection pool and
    /// has abstractions to add circuit creation attempts
    sqlite3_client: Option<Arc<Sqlite3DbClient>>,

    /// The [PrimaryWorker] config
    primary_worker_config: Arc<PrimaryWorkerConfig>,

    /// The current status of the [TorNetwork] i.e if it's either
    /// set to "NotStarted", "Running" or "Paused"
    ///
    /// It can be mutated during the runtime between "Running" and "Paused"
    /// in order to pause or continue producing [IncompleteWork]
    ///
    ///
    /// !FEATURE TODO: Add some kind of way to pause and resume through this
    tor_network_status: Arc<AtomicCell<TorNetworkStatus>>,

    /// A HashMap to get the (NodeIndex and Arc<Node>) just using the fingerprint of the items in
    /// the current NetDir
    fingerprint_node_hashmap: Arc<RwLock<FingerprintNodeHashmap>>,

    /// Data to resume from if we are provided any
    pub resume_data: Vec<CompletedWork>,

    /// Key value pair of OnionPerfHostName and OnionPerfAnalysisFileDate that have been
    /// already injested into the database
    pub checked_onionperf_dates: HashMap<String, Vec<String>>,

    /// Metrics sender for updating heartbeat metrics
    metrics_sender:
        Mutex<Option<mpsc::UnboundedSender<super::heartbeat::MetricEvent>>>,
}

impl TorNetwork {
    /// Create a empty [TorNetwork], that is not running
    pub async fn new(
        primary_worker_config: Arc<PrimaryWorkerConfig>,
        netdir_provider: Arc<NetDirProvider>,
    ) -> anyhow::Result<Arc<Self>> {
        let neo4j_client = match primary_worker_config.neo4j_config {
            Some(ref neo4j_config) => {
                Some(Arc::new(Neo4jDbClient::new(neo4j_config).await?))
            }
            None => None,
        };

        // Download fresh Onionoo data if metadata enrichment is enabled
        if primary_worker_config
            .primary
            .relay_metadata_enrichment_enabled
        {
            info!("Relay metadata enrichment enabled - downloading fresh Onionoo data");
            download_fresh_onionoo_data().await.map_err(|e| {
                anyhow::anyhow!("Failed to download fresh Onionoo data: {}", e)
            })?;

            info!("Successfully downloaded fresh Onionoo data");
        } else {
            info!("Relay metadata enrichment disabled - skipping Onionoo data download");
        }

        let resume_data = {
            let mut v = vec![];
            if let Some(ref args) = primary_worker_config.args {
                if let Some(ref path) = args.resume {
                    log::info!("Getting previously created circuits from the database {path}");
                    let sqlite3_config = Sqlite3Config { path: path.clone() };
                    let sqlite3_db_resume_client =
                        Sqlite3DbResumeClient::new(&sqlite3_config)?;

                    let completed_works = sqlite3_db_resume_client
                        .get_all_completed_works()
                        .await?;
                    log::info!(
                        "Loaded all previous created circuits from the database {path}. Total {} circuit creation attempts were stored",
                        completed_works.len()
                    );
                    v = completed_works;
                }
            }
            v
        };

        let checked_onionperf_dates = {
            let mut checked = HashMap::new();
            if let Some(ref args) = primary_worker_config.args {
                if let Some(ref path) = args.resume {
                    log::info!("Getting previous checked onionperf anlaysis file dates from the database {path}");
                    let sqlite3_config = Sqlite3Config { path: path.clone() };
                    let sqlite3_db_resume_client =
                        Sqlite3DbResumeClient::new(&sqlite3_config)?;

                    let checked_onionperf_analysis_file_dates =
                        sqlite3_db_resume_client
                            .get_all_checked_onionperf_analysis_file_date()
                            .await?;
                    log::info!(
                        "Loaded all checked onionperf anlaysis file dates"
                    );
                    checked = checked_onionperf_analysis_file_dates;
                }
            }
            checked
        };

        // If we need to store in sqlite3 or not and also if we would need then check if there's
        // the resume state sqlite3 database or not, if there is then we start resuming there
        // diretly
        let sqlite3_client = match primary_worker_config.sqlite3_config {
            Some(ref sqlite3_config) => {
                if let Some(ref args) = primary_worker_config.args {
                    if let Some(ref path) = args.resume {
                        let sqlite3_config =
                            Sqlite3Config { path: path.clone() };
                        Some(Arc::new(Sqlite3DbClient::new(&sqlite3_config)?))
                    } else {
                        Some(Arc::new(Sqlite3DbClient::new(sqlite3_config)?))
                    }
                } else {
                    Some(Arc::new(Sqlite3DbClient::new(sqlite3_config)?))
                }
            }
            None => None,
        };

        if neo4j_client.is_none() && sqlite3_client.is_none() {
            error!("You haven't set any external database to store the results, please add one and run again else running this tool will be of no use");
            panic!()
        }

        let tor_network_status =
            Arc::new(AtomicCell::new(TorNetworkStatus::NotStarted));
        let graph = Arc::default();
        let (incomplete_work_sender, incomplete_work_receiver) =
            unbounded_channel::<IncompleteWork>();
        let (completed_work_sender, completed_work_receiver) =
            unbounded_channel::<CompletedWork>();

        let incomplete_work_receiver =
            Arc::new(Mutex::new(incomplete_work_receiver));
        let completed_work_receiver =
            Arc::new(Mutex::new(completed_work_receiver));

        let fingerprint_node_hashmap = Arc::default();

        let tor_network = Arc::new(Self {
            graph,
            netdir_provider,
            incomplete_work_sender,
            incomplete_work_receiver,
            completed_work_sender,
            completed_work_receiver,
            sqlite3_client,
            neo4j_client,
            primary_worker_config,
            tor_network_status,
            fingerprint_node_hashmap,
            resume_data,
            checked_onionperf_dates,
            metrics_sender: Mutex::new(None),
        });

        Ok(tor_network)
    }

    /// Get the current [TorNetworkStatus]
    #[allow(dead_code)]
    pub async fn get_tor_network_status(&self) -> TorNetworkStatus {
        self.tor_network_status.load()
    }

    /// Set the [TorNetworkStatus] to either "Pause" or "Running"
    pub fn set_tor_network_status(
        &self,
        tor_network_status: TorNetworkStatus,
    ) {
        self.tor_network_status.store(tor_network_status);
    }

    /// Start running the TorNetwork
    pub async fn start(&self) {
        self.set_tor_network_status(TorNetworkStatus::Running);
        info!("TorNetworkStatus set to {:?}", TorNetworkStatus::Running);

        // Subscribe to the NetDirProviderEvent receiving handle
        let mut netdir_provider_event_receiver =
            self.netdir_provider.get_netdirprodiver_event_receiver();

        // Create a netdir and relays_pool varaible that can be accessed by everyone under this scope
        let mut current_netdir = self.netdir_provider.current_netdir().await;
        let relays: Vec<Relay<'_>> = current_netdir.relays().collect();

        // A hashmap of relays currently in the NetDir and (tor_netdir::Relay, Arc<Node>)
        let mut relays_pool = RelaysPool::empty();

        //// A hashmap of relays currently in the graph and (Arc<Node>)
        //let mut hashmap_graph_nodes = HashMap::<String, (Arc<Node>, NodeIndex)>::new();

        info!(
            "TorNetwork started with NetDir of valid lifetime upto UTC Time : {}, and has {} Relays",
            format_rfc3339(current_netdir.lifetime().valid_until()),
            relays.len()
        );

        // Create new Nodes and RelaysPool
        for relay in &relays {
            let weight = match relay.rs().rs.weight {
                RelayWeight::Measured(measured_w) => measured_w,
                _ => 1, // Giving lowest priority to the unmeasured relay
            };
            let fingerprint = relay.rsa_id().to_string();
            let node = Node::new(
                fingerprint.clone(),
                self.incomplete_work_sender.clone(),
                weight,
            );
            relays_pool.add_relay(
                fingerprint.clone(),
                relay.clone(),
                node.clone(),
            );
        }

        // Stores any relay that isn't in the internal graph yet! from the RelaysPool(NetDir)
        // in the internal petgraph and database(neo4j or/and sqlite3)
        self.store_nodes(&relays_pool).await;

        // For each relay, check if they are in the same subnet or same family
        // with the other relay and then add those relay in to_be_used_as_exit
        // or in_same_subnet_or_family accordingly
        self.add_nodes_to_be_used_as_exit_for_each_node_in_the_graph(
            &relays_pool,
        )
        .await;

        let mut first_attempt = true;

        loop {
            match netdir_provider_event_receiver.try_recv() {
                Ok(NetDirProviderEvent::NetDirChanged(new_netdir)) => {
                    // If it's the first attempt, then it should directly go for Err(_) in this
                    // match statement, not here, it can come here because the arti has a cache and
                    // it produces DirEvent::NewConsensus in few seconds after we have already gotten the
                    // NetDir
                    if first_attempt {
                        first_attempt = false;
                    } else {
                        // New NetDir was received, handle influx of new Relay
                        current_netdir = new_netdir;
                        relays_pool = RelaysPool::empty();

                        let relays: Vec<Relay<'_>> =
                            current_netdir.relays().collect();
                        for relay in &relays {
                            let fingerprint = relay.rsa_id().to_string();
                            let weight = match relay.rs().rs.weight {
                                RelayWeight::Measured(measured_w) => {
                                    measured_w
                                }
                                _ => 1, // Giving lowest priority to the unmeasured relay
                            };
                            let node = Node::new(
                                fingerprint.clone(),
                                self.incomplete_work_sender.clone(),
                                weight,
                            );
                            relays_pool.add_relay(
                                fingerprint.clone(),
                                relay.clone(),
                                node.clone(),
                            );
                        }
                        info!(
                            "Received a new NetDir of valid lifetime upto UTC Time : {}, and has {} Relays",
                            format_rfc3339(current_netdir.lifetime().valid_until()),
                            relays.len()
                        );

                        self.store_nodes(&relays_pool).await;

                        self.add_nodes_to_be_used_as_exit_for_each_node_in_the_graph(&relays_pool).await;
                    }
                }

                Err(_) => {
                    // If it arrives here on the very first attempt, we turn of the first_attempt
                    first_attempt = false;

                    // No new NetDir was received, let's continue with where we left our work
                    // A scope to drop the read guard on the graph
                    {
                        let graph = self.graph.read().await;

                        // Go through all the Nodes in the graph and start(producing IncompleteWork) those nodes if they
                        // haven't been or paused
                        for node in graph.node_weights() {
                            match node.get_status() {
                                NodeStatus::NotStarted => {
                                    let node = node.clone();
                                    tokio::task::spawn(async move {
                                        node.set_status(NodeStatus::Running);
                                        Node::start(node).await.unwrap();
                                    });
                                }
                                NodeStatus::Paused => {
                                    // TODO: Add support for resume if pause is supported
                                }
                                _ => {
                                    // Do nothing beacuse eithe the relay was stopped or it's
                                    // running
                                }
                            }
                        }
                    }

                    let (sd, mut rv) = oneshot::channel();
                    let fresh_netdir_check_interval = self
                        .primary_worker_config
                        .primary
                        .fresh_netdir_check_interval;
                    tokio::spawn(async move {
                        tokio::time::sleep(Duration::from_secs(
                            fresh_netdir_check_interval,
                        ))
                        .await;
                        sd.send(())
                    });

                    // Start receiving the completed work here for the next 20 mins and then
                    // got to check if there's influx of new Relay
                    let mut completed_work_receiver =
                        self.completed_work_receiver.lock().await;
                    while let Some(completed_work) =
                        completed_work_receiver.recv().await
                    {
                        // A scope to drop the read guard on the fingerprint_node_hashmap
                        let (guard_relay_node, exit_relay_node) = {
                            let fingerprint_node_hashmap =
                                self.fingerprint_node_hashmap.read().await;
                            let guard_relay = fingerprint_node_hashmap
                                .get(completed_work.source_relay.as_str());
                            let exit_relay = fingerprint_node_hashmap.get(
                                completed_work.destination_relay.as_str(),
                            );

                            match (guard_relay, exit_relay) {
                                (
                                    Some((guard_relay, _)),
                                    Some((exit_relay, _)),
                                ) => (
                                    Some(guard_relay.clone()),
                                    Some(exit_relay.clone()),
                                ),
                                _ => (None, None),
                            }
                        };

                        if let (
                            Some(guard_relay_node),
                            Some(exit_relay_node),
                        ) = (guard_relay_node, exit_relay_node)
                        {
                            let _completed_work = completed_work.clone();
                            tokio::spawn(async move {
                                guard_relay_node
                                    .submit_completed_work(
                                        _completed_work.clone(),
                                    )
                                    .await;
                                exit_relay_node
                                    .submit_completed_work(_completed_work)
                                    .await;
                            });

                            // Add in the petgraph and database
                            self.store_edge(completed_work).await;
                        }
                        // Move on after 20 mins
                        if rv.try_recv().is_ok() {
                            break;
                        }
                    }
                }
            }
        }
    }

    // Each relay goes through all the relays and checks if those relays are in the
    // same subnet as them or are in the same family as them
    async fn add_nodes_to_be_used_as_exit_for_each_node_in_the_graph(
        &self,
        relays_pool: &RelaysPool<'_, Arc<Node>>,
    ) {
        info!("Started filtering the two hop circuit combinations that a Relay should make and ignore(if a relay should make circuit with relay that's in the same subnet/family). Please wait few seconds");

        // estimate how many circuits should be built to cover all the network
        // and estimate how long it would take
        let total_relays = relays_pool.relays_hashmap.len();
        // Same-subnet pair, Same-family pair and a pair of
        // a relay to itself considering as invalid_pairs
        let mut invalid_pairs = HashSet::new();
        let no_of_parallel_circuit_primary =
            self.primary_worker_config.primary.no_of_parallel_circuits;
        // if secondary_allowed is true then assign value accordingly
        let no_of_parallel_circuit_secondary =
            match self.primary_worker_config.primary.secondary_allowed {
                true => {
                    self.primary_worker_config
                        .secondary
                        .no_of_parallel_circuits
                }
                false => 0,
            };
        let start_time = Instant::now();
        let subnet_config = SubnetConfig::default();

        let circuit_with_relay_on_same_subnet = self
            .primary_worker_config
            .primary
            .circuit_with_relay_on_same_subnet;
        let circuit_with_relay_on_same_family = self
            .primary_worker_config
            .primary
            .circuit_with_relay_of_same_family;

        let nodes: Vec<Arc<Node>> = {
            let graph = self.graph.read().await;
            graph.node_weights().cloned().collect()
        };

        // NOTE : It should run only at the initial of program
        // TODO: Make sure it runs only once if it's allowed to run
        let mut already_used_as_exit_nodes: HashMap<
            String,
            HashSet<Arc<Node>>,
        > = HashMap::new();
        for completed_work in &self.resume_data {
            if let Some((_, source_node)) =
                relays_pool.relays_hashmap.get(&completed_work.source_relay)
            {
                if let Some((_, destination_node)) = relays_pool
                    .relays_hashmap
                    .get(&completed_work.destination_relay)
                {
                    #[allow(clippy::mutable_key_type)]
                    let node_set = already_used_as_exit_nodes
                        .entry(source_node.fingerprint.clone())
                        .or_default();
                    node_set.insert(destination_node.clone());
                }
            }
        }

        log::info!(
            "The total keys were {:?}",
            already_used_as_exit_nodes.keys().len()
        );
        // We'll go through the all the nodes in the graph (i.e the Relays that are in the NetDir and that are
        // not in the NetDir) that we had stored
        //
        // Only those Nodes that are in the RelaysPool will be considered here, because it has the
        // corresponding ```tor_netdir::Relay```
        for node_1 in &nodes {
            if let Some((relay_1, _)) =
                relays_pool.get_relay(node_1.fingerprint.as_str())
            {
                // Adding relays to the ignore list because they are in the same subnet as relay_1
                if !circuit_with_relay_on_same_subnet {
                    for (relay_2, node_2) in
                        relays_pool.relays_hashmap.values()
                    {
                        // If we are not allowed to make circut with relay on the same subnet then we
                        // add that relay in the ignore list

                        if relay_1
                            .low_level_details()
                            .in_same_subnet(relay_2, &subnet_config)
                        {
                            invalid_pairs.insert((
                                node_1.fingerprint.clone(),
                                node_2.fingerprint.clone(),
                            ));
                            node_1
                                .add_a_node_in_same_subnet_or_family(
                                    node_2.clone(),
                                )
                                .await;
                        }
                    }
                }

                // Adding relays to the ignore list because they are in the same family as relay_1
                if !circuit_with_relay_on_same_family {
                    let md = relay_1.md().family().members();
                    for rsa_identity in md {
                        if let Some((_, node_2)) =
                            relays_pool.get_relay(rsa_identity.to_string())
                        {
                            invalid_pairs.insert((
                                node_1.fingerprint.clone(),
                                node_2.fingerprint.clone(),
                            ));
                            node_1
                                .add_a_node_in_same_subnet_or_family(
                                    node_2.clone(),
                                )
                                .await;
                        }
                    }
                }

                // Now that we have set the ignore list(in_same_subnet_or_family) of all the
                // relays(Node). Each Node will try to add a Node that's not been already added
                // (checking the to_be_used_as_exit and already_used_as_exit) and that's not
                // in the ignore list

                let _nodes = nodes.clone();
                let node_already_used_as_exit_nodes = {
                    let mut v = vec![];
                    if let Some(_already_used_as_exit_nodes) =
                        already_used_as_exit_nodes.get(&node_1.fingerprint)
                    {
                        v = _already_used_as_exit_nodes
                            .iter()
                            .cloned()
                            .collect();
                    }
                    v
                };

                let _node_1 = node_1.clone();
                tokio::spawn(async move {
                    // The to_be_used_as_exit lock gets acquired, so we don't have to worry about
                    // Node getting started before to_be_used_as_exit as is filled
                    _node_1
                        .add_relays_in_already_used_as_exit(
                            node_already_used_as_exit_nodes,
                        )
                        .await;
                    _node_1.add_relays_in_to_be_used_as_exit(_nodes).await;
                });
            }
        }

        let end_time = Instant::now();
        info!(
            "Finished filtering in {:?} ",
            end_time.duration_since(start_time)
        );

        // Calculate total_possible circuits including
        // the circuit between relay itself
        let total_possible = total_relays * total_relays;
        // Calculate valid circuits
        let valid_circuits = total_possible - invalid_pairs.len();

        // estimated the average build time for a 2-hop circuit to be approximately 0.8 seconds.
        let circuit_creation_time = (valid_circuits as f64 * 0.8)
            / (no_of_parallel_circuit_primary
                + no_of_parallel_circuit_secondary) as f64;
        info!(
            r"
            Circuit Metrics:
            - Total relays: {}
            - Invalid pairs: {}
            - Valid Circuit: {}
            - Circuit creation time: {:.2}s",
            total_relays,
            invalid_pairs.len(),
            valid_circuits,
            circuit_creation_time
        );
    }

    /// Add [Node] i.e a Relay in the graph with no edge between them, from
    /// the ```RelaysPool```
    ///
    /// It only adds those [Node] in the graph, that doesn't have the same
    /// fingerprint as other [Node] in the graph, which means all the
    /// [Node] in the graph are unique.
    ///
    /// **NOTE** :
    /// - It adds a Node in the **petgraph** and (if turned on) in the **Neo4j
    ///   Database**
    /// - It doesn't add anything in the sqlite3 database because it has tables
    ///   and there's no concept of nodes, so we only store the edge i.e the
    ///   circuit creation attempt, which can be either **failed** or
    ///   **success**
    pub async fn store_nodes(&self, relays_pool: &RelaysPool<'_, Arc<Node>>) {
        info!(
            "Attempting to add unique relays from the NetDir in the internal petgraph {} ",
            if self.primary_worker_config.primary.neo4j_allowed {
                "and Neo4j Database"
            } else {
                ""
            },
        );
        let mut count = 0;
        for (_, node) in relays_pool.relays_hashmap.values() {
            if self
                .store_node_checked_with_metadata(node.clone(), relays_pool)
                .await
            {
                count += 1;
            }
        }
        info!(
            "Added {count} unique relays in the internal petgraph {}",
            if self.primary_worker_config.primary.neo4j_allowed {
                "and spawned tokio task to add unique relays in neo4j graph database"
            } else {
                ""
            },
        );

        // Update relay metrics
        self.update_relay_metrics(count).await;
    }

    /// Add the result of circuit creation attempt
    ///
    /// It adds the data in the internal petgraph, graph database(if it's allowed)
    /// and the sqlite3 databasee(if it's allowed)
    pub async fn store_edge(&self, completed_work: CompletedWork) {
        // Add in the sqlite3 database(if it's allowed)
        if let Some(ref sqlite3_client) = self.sqlite3_client {
            let sqlite3_client = sqlite3_client.clone();
            let completed_work = completed_work.clone();
            tokio::spawn(async move {
                sqlite3_client.add_completed_work(completed_work);
            });
        }

        // Add in the Neo4j Database(if it's allowed)
        if let Some(ref neo4j_client) = self.neo4j_client {
            let neo4j_client = neo4j_client.clone();
            let completed_work = completed_work.clone();
            tokio::spawn(async move {
                neo4j_client.add_completed_work(completed_work).await;
            });
        }

        // Add in the petgraph
        let mut graph = self.graph.write().await;
        let fingerprint_node_hashmap =
            self.fingerprint_node_hashmap.read().await;

        let guard_relay =
            fingerprint_node_hashmap.get(&completed_work.source_relay);
        let exit_relay =
            fingerprint_node_hashmap.get(&completed_work.destination_relay);

        if let (
            Some((_, guard_relay_node_index)),
            Some((_, exit_relay_node_index)),
        ) = (guard_relay, exit_relay)
        {
            let edge = Edge {
                status: completed_work.status,
            };
            graph.add_edge(
                *guard_relay_node_index,
                *exit_relay_node_index,
                edge,
            );
        };
    }

    pub fn store_onionperf_analysis_time_data(
        &self,
        host_name: String,
        date: String,
    ) {
        if let Some(ref sqlite3_client) = self.sqlite3_client {
            let sqlite3_client = sqlite3_client.clone();
            sqlite3_client
                .add_onionperf_analysisfile_time_data(host_name, date);
        }
    }

    /// Store node with metadata enrichment capability
    /// Extracts metadata from RelaysPool and stores enriched data if enabled
    pub async fn store_node_checked_with_metadata(
        &self,
        node: Arc<Node>,
        relays_pool: &RelaysPool<'_, Arc<Node>>,
    ) -> bool {
        let fingerprint = node.fingerprint();
        let mut graph = self.graph.write().await;
        let mut fingerprint_node_hashmap =
            self.fingerprint_node_hashmap.write().await;

        if !fingerprint_node_hashmap.contains_key(&fingerprint) {
            // Add a Node in the petgraph
            let node_index = graph.add_node(node.clone());

            // Add the (fingerprint, (node, node_index)) in the hashmap
            fingerprint_node_hashmap
                .insert(fingerprint.clone(), (node.clone(), node_index));

            // Add a Node in the Neo4j Database
            if let Some(ref neo4j_client) = self.neo4j_client {
                let neo4j_client = neo4j_client.clone();
                let node_clone = node.clone();

                // Check if metadata enrichment is enabled
                let metadata_enabled = self
                    .primary_worker_config
                    .primary
                    .relay_metadata_enrichment_enabled;

                // Extract metadata before spawning if needed
                let metadata = if metadata_enabled {
                    if let Some((relay, _)) =
                        relays_pool.get_relay(&fingerprint)
                    {
                        Some(Self::extract_relay_metadata(relay))
                    } else {
                        None
                    }
                } else {
                    None
                };

                tokio::spawn(async move {
                    if let Some(metadata) = metadata {
                        neo4j_client
                            .add_node_with_metadata(node_clone, metadata)
                            .await;
                    } else {
                        neo4j_client.add_node(node_clone).await;
                    }
                });
            }
            true
        } else {
            false
        }
    }

    /// Set the metrics sender for heartbeat updates
    pub async fn set_metrics_sender(
        &self,
        sender: mpsc::UnboundedSender<super::heartbeat::MetricEvent>,
    ) {
        let mut metrics_sender = self.metrics_sender.lock().await;
        *metrics_sender = Some(sender);
    }

    /// Update relay metrics if a metrics sender is available
    pub async fn update_relay_metrics(&self, relay_count: u64) {
        let metrics_sender = self.metrics_sender.lock().await;
        if let Some(sender) = metrics_sender.as_ref() {
            if let Err(e) = sender.send(
                super::heartbeat::MetricEvent::RelaysProcessed(relay_count),
            ) {
                log::warn!("Failed to send relay metrics: {}", e);
            }
        }
    }

    /// Extract metadata from a tor_netdir::Relay object using Onionoo data
    fn extract_relay_metadata(relay: &tor_netdir::Relay<'_>) -> RelayMetadata {
        let fingerprint = relay.rsa_id().to_string();

        // Look up relay data in Onionoo cache
        let onionoo_data = lookup_relay_metadata_by_fingerprint(&fingerprint);

        let (country, asn) = if let Some(data) = onionoo_data {
            let country = data.country.clone();
            let asn = data
                .asn_string
                .as_ref()
                .and_then(|s| parse_asn_from_string(s));
            trace!(
                "Relay {} found in Onionoo: country={:?}, ASN={:?}",
                fingerprint,
                country,
                asn
            );
            (country, asn)
        } else {
            trace!("Relay {} not found in Onionoo cache", fingerprint);
            (None, None)
        };

        // Extract address and port information from consensus
        let (address, or_port) =
            if let Some(addr) = relay.rs().rs.addrs.first() {
                let addr_str = addr.ip().to_string();
                let port = addr.port();
                trace!(
                    "Extracted address {} port {} for relay {}",
                    addr_str,
                    port,
                    fingerprint
                );
                (Some(addr_str), Some(port))
            } else {
                (None, None)
            };

        // Use Onionoo effective_family data if available, otherwise use consensus data
        let family = if let Some(data) = onionoo_data {
            data.effective_family.clone()
        } else {
            let family_members: Vec<String> = relay
                .md()
                .family()
                .members()
                .map(|id| id.to_string())
                .collect();
            if family_members.is_empty() {
                None
            } else {
                Some(family_members)
            }
        };

        // Use Onionoo flags if available, otherwise extract from consensus
        let flags = if let Some(data) = onionoo_data {
            data.flags.clone()
        } else {
            let mut flag_list = Vec::new();
            if relay.rs().is_flagged_bad_exit() {
                flag_list.push("BadExit".to_string());
            }
            if relay.rs().is_flagged_exit() {
                flag_list.push("Exit".to_string());
            }
            if relay.rs().is_flagged_fast() {
                flag_list.push("Fast".to_string());
            }
            if relay.rs().is_flagged_guard() {
                flag_list.push("Guard".to_string());
            }
            if relay.rs().is_flagged_hsdir() {
                flag_list.push("HSDir".to_string());
            }
            if relay.rs().is_flagged_stable() {
                flag_list.push("Stable".to_string());
            }
            if relay.rs().is_flagged_v2dir() {
                flag_list.push("V2Dir".to_string());
            }

            if flag_list.is_empty() {
                None
            } else {
                Some(flag_list)
            }
        };

        // Use Onionoo consensus weight if available, otherwise extract from consensus
        let bandwidth_weight = if let Some(data) = onionoo_data {
            data.consensus_weight.map(|w| w as u32)
        } else {
            match relay.rs().rs.weight {
                RelayWeight::Measured(weight) => Some(weight),
                RelayWeight::Unmeasured(weight) => Some(weight),
                _ => None,
            }
        };

        // Use Onionoo observed bandwidth if available
        let measured_bandwidth =
            onionoo_data.and_then(|data| data.observed_bandwidth);

        // Extract selection probabilities from Onionoo data
        let guard_probability =
            onionoo_data.and_then(|data| data.guard_probability);
        let middle_probability =
            onionoo_data.and_then(|data| data.middle_probability);
        let exit_probability =
            onionoo_data.and_then(|data| data.exit_probability);

        trace!(
            "Extracted metadata for relay {}: addr={:?}, port={:?}, \
            family_size={:?}, flags_count={:?}, guard_prob={:?}",
            fingerprint,
            address,
            or_port,
            family.as_ref().map(|f| f.len()),
            flags.as_ref().map(|f| f.len()),
            guard_probability
        );

        RelayMetadata {
            country,
            asn,
            family,
            flags,
            bandwidth_weight,
            measured_bandwidth,
            address,
            or_port,
            guard_probability,
            middle_probability,
            exit_probability,
        }
    }
}

/// Represents the status of the [TorNetwork] i.e
/// if it's either producing [IncompleteWork] or not and reciving [CompletedWork]
#[derive(Debug, Clone, Copy)]
pub enum TorNetworkStatus {
    /// [TorNetwork] is not started yet and it's not producing [IncompleteWork]
    NotStarted,

    /// [TorNetwork] is running and it's [IncompleteWork]
    Running,

    /// [TorNetwork] is paused and it's not producing [IncompleteWork]
    #[allow(dead_code)]
    Paused,
}

/// The Node of the Tor Network graph, which holds the information about the relay, it just holds the data to index a relay
/// from Relays Pool through it's RSA ID
///
/// The Edge of the Nodes in the Tor Network graph, which basically represents the type of
/// circuit a relay has with other relay
///
/// If there's no edge between two relays then it means that they were in the same family or they
/// are in the same /16 subnet family
///
/// (See : https://gitlab.torproject.org/tpo/network-health/erpc/-/issues/17)
#[derive(Debug)]
pub struct Edge {
    #[allow(dead_code)]
    status: CompletedWorkStatus,
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Test metadata extraction functionality with comprehensive data
    #[test]
    fn test_relay_metadata_structure() {
        let metadata = RelayMetadata {
            country: Some("DE".to_string()),
            asn: Some(210558),
            family: Some(vec![
                "5ADC776E831EC4609D8D5AC0D5A757B00FC6BFDAE".to_string(),
                "02FCF62C0AD3AD1D208D7F27E12E8840EF553C7E0".to_string(),
            ]),
            flags: Some(vec![
                "Exit".to_string(),
                "Fast".to_string(),
                "Guard".to_string(),
                "Running".to_string(),
            ]),
            bandwidth_weight: Some(10000),
            measured_bandwidth: Some(13238143),
            address: Some("45.141.215.17".to_string()),
            or_port: Some(7100),
            guard_probability: Some(0.000010992514),
            middle_probability: Some(0.000010992395),
            exit_probability: Some(0.000017676482),
        };

        // Verify essential fields are populated
        assert!(metadata.country.is_some());
        assert!(metadata.asn.is_some());
        assert!(metadata.family.is_some());
        assert!(metadata.flags.is_some());
        assert!(metadata.bandwidth_weight.is_some());
        assert!(metadata.measured_bandwidth.is_some());
        assert!(metadata.address.is_some());
        assert!(metadata.or_port.is_some());

        // Verify probability values
        assert!(
            metadata.guard_probability.unwrap() > 0.0
                && metadata.guard_probability.unwrap() < 1.0
        );
        assert!(
            metadata.middle_probability.unwrap() > 0.0
                && metadata.middle_probability.unwrap() < 1.0
        );
        assert!(
            metadata.exit_probability.unwrap() > 0.0
                && metadata.exit_probability.unwrap() < 1.0
        );

        // Verify ASN and port values
        assert_eq!(metadata.asn.unwrap(), 210558);
        assert_eq!(metadata.or_port.unwrap(), 7100);
    }

    /// Test Onionoo metadata lookup functionality
    #[test]
    fn test_onionoo_lookup() {
        // Test ASN parsing function
        assert_eq!(parse_asn_from_string("AS13335"), Some(13335));
        assert_eq!(parse_asn_from_string("13335"), Some(13335));
        assert_eq!(parse_asn_from_string("invalid"), None);

        // Test fingerprint normalization for Onionoo lookup
        assert_eq!(
            normalize_fingerprint_for_onionoo("$aa51c355347671cc66ae88"),
            "AA51C355347671CC66AE88"
        );
        assert_eq!(
            normalize_fingerprint_for_onionoo("aa51c355347671cc66ae88"),
            "AA51C355347671CC66AE88"
        );
        assert_eq!(
            normalize_fingerprint_for_onionoo("AA51C355347671CC66AE88"),
            "AA51C355347671CC66AE88"
        );
    }

    /// Integration test to verify Onionoo API connectivity and data parsing
    #[tokio::test]
    #[ignore] // Use --ignored flag to run this test
    async fn test_onionoo_integration() {
        // Test that we can fetch and parse Onionoo data
        let result = fetch_onionoo_data().await;
        assert!(
            result.is_ok(),
            "Failed to fetch Onionoo data: {:?}",
            result.err()
        );

        // Test that we can initialize the cache
        let cache = init_onionoo_cache().await;
        assert!(cache.is_some(), "Failed to initialize Onionoo cache");

        let cache = cache.unwrap();
        assert!(!cache.is_empty(), "Onionoo cache should not be empty");

        // Test a specific relay lookup (using a long-running relay fingerprint)
        // This is the moria1 directory authority fingerprint in Onionoo format (uppercase)
        let test_fingerprint = "9695DFC35FFEB861329B9F1AB04C46397020CE31";
        if let Some(relay) = cache.get(test_fingerprint) {
            info!(
                "Found test relay in Onionoo cache: fingerprint={:?}",
                relay.fingerprint
            );
            assert_eq!(relay.fingerprint.as_deref(), Some(test_fingerprint));
            // Verify we have useful metadata
            assert!(
                relay.country.is_some()
                    || relay.asn_string.is_some()
                    || relay.flags.is_some()
            );
        } else {
            info!("Test relay not found in cache (this is okay, it might not be running)");
        }

        // Test fingerprint normalization with consensus format
        let consensus_format = "$9695dfc35ffeb861329b9f1ab04c46397020ce31"; // lowercase with $
        let normalized = normalize_fingerprint_for_onionoo(consensus_format);
        assert_eq!(normalized, test_fingerprint);
        info!(
            "Fingerprint normalization test passed: {} -> {}",
            consensus_format, normalized
        );

        info!(
            "Onionoo integration test passed! Cache contains {} relays",
            cache.len()
        );
    }

    /// Test Onionoo data structure parsing
    #[test]
    fn test_onionoo_relay_parsing() {
        let json_data = r#"{
            "fingerprint": "ABCD1234567890ABCDEF1234567890ABCD123456",
            "country": "us",
            "as": "AS13335",
            "flags": ["Fast", "Guard", "Running", "Stable", "V2Dir"],
            "consensus_weight": 10000,
            "effective_family": ["EFGH1234567890ABCDEF1234567890ABCD123456"],
            "observed_bandwidth": 1048576,
            "guard_probability": 0.025,
            "middle_probability": 0.015,
            "exit_probability": 0.005
        }"#;

        let relay: Result<OnionooRelay, _> = serde_json::from_str(json_data);
        assert!(relay.is_ok(), "Failed to parse OnionooRelay JSON");

        let relay = relay.unwrap();
        assert_eq!(
            relay.fingerprint,
            Some("ABCD1234567890ABCDEF1234567890ABCD123456".to_string())
        );
        assert_eq!(relay.country, Some("us".to_string()));
        assert_eq!(relay.asn_string, Some("AS13335".to_string()));
        assert_eq!(parse_asn_from_string("AS13335"), Some(13335));
        assert_eq!(relay.consensus_weight, Some(10000));
        assert!(relay.flags.is_some());
        assert_eq!(relay.flags.as_ref().unwrap().len(), 5);
        assert_eq!(relay.guard_probability, Some(0.025));
        assert_eq!(relay.middle_probability, Some(0.015));
        assert_eq!(relay.exit_probability, Some(0.005));
    }

    /// Test fingerprint normalization and ASN parsing functions
    #[test]
    fn test_metadata_pipeline_functions() {
        // Test fingerprint normalization for various formats
        assert_eq!(
            normalize_fingerprint_for_onionoo(
                "$000A10D43011EA4928A35F610405F92B4433B4DC"
            ),
            "000A10D43011EA4928A35F610405F92B4433B4DC"
        );
        assert_eq!(
            normalize_fingerprint_for_onionoo(
                "000a10d43011ea4928a35f610405f92b4433b4dc"
            ),
            "000A10D43011EA4928A35F610405F92B4433B4DC"
        );
        assert_eq!(
            normalize_fingerprint_for_onionoo(
                "000A10D43011EA4928A35F610405F92B4433B4DC"
            ),
            "000A10D43011EA4928A35F610405F92B4433B4DC"
        );

        // Test ASN parsing from Onionoo format
        assert_eq!(parse_asn_from_string("AS7018"), Some(7018));
        assert_eq!(parse_asn_from_string("AS13335"), Some(13335));
        assert_eq!(parse_asn_from_string("AS210558"), Some(210558));
        assert_eq!(parse_asn_from_string("7018"), Some(7018));
        assert_eq!(parse_asn_from_string("invalid"), None);

        info!("Metadata pipeline functions test passed!");
    }
}