use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use log::{error, info, warn};
use crate::algorithms::{
CommunityAnalyzer, ComponentAnalyzer, PartitionClassifier,
};
use crate::args::Args;
use crate::db_trait::{
AnalysisDatabase, AnalysisError, GraphProjectionParams,
};
use crate::models::metrics::{GraphMetrics, NodeMetrics};
use crate::models::partitions::ComponentAnalysisResult;
pub struct TaskHandler {
db_client: Arc<dyn AnalysisDatabase>,
args: Args,
config: crate::config::AnalysisConfig,
}
impl TaskHandler {
pub fn new(
db_client: Arc<dyn AnalysisDatabase>,
args: Args,
config: crate::config::AnalysisConfig,
) -> Self {
Self {
db_client,
args,
config,
}
}
pub async fn execute(&self) -> Result<(), Box<dyn std::error::Error>> {
match &self.args.task {
Some(task) => self.execute_specific_task(task).await,
None => self.execute_default_analysis().await,
}
}
async fn execute_specific_task(
&self,
task: &str,
) -> Result<(), Box<dyn std::error::Error>> {
match task {
"projection-create" => self.handle_projection_create().await,
"projection-delete" => self.handle_projection_delete().await,
"projection-exists" => self.handle_projection_exists().await,
"metrics-basic" => self.handle_metrics_basic().await,
"metrics-degrees" => self.handle_metrics_degrees().await,
"metrics-distribution" => self.handle_metrics_distribution().await,
"components-analysis" => self.handle_components_analysis().await,
"community-louvain-consensus" => {
self.handle_community_louvain_consensus().await
}
"community-lpa" => self.handle_community_lpa().await,
"partition-classify-geography" => {
self.handle_partition_classify_geography().await
}
"partition-classify-asn" => {
self.handle_partition_classify_asn().await
}
"partition-classify-family" => {
self.handle_partition_classify_family().await
}
"partition-classify-all" => {
self.handle_partition_classify_all().await
}
"centrality-betweenness" => {
self.handle_centrality_betweenness().await
}
"centrality-closeness" => self.handle_centrality_closeness().await,
"centrality-combined" => self.handle_centrality_combined().await,
"path-connectivity" => self.handle_path_connectivity().await,
"advanced-path-analysis" => {
self.handle_advanced_path_analysis().await
}
"info-database" => self.handle_info_database().await,
"web-server" => self.handle_web_server().await,
"help" => self.handle_help().await,
_ => {
error!("Unknown task: {}", task);
println!("{}", Args::task_help());
std::process::exit(1);
}
}
}
async fn execute_default_analysis(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running default analysis workflow...");
let proj_params = self.create_default_projection_params();
info!(
"Creating/updating GDS graph projection: '{}' with node \
label: '{}'",
proj_params.projection_name, proj_params.node_label
);
self.db_client
.create_graph_projection(&proj_params)
.await
.map_err(|e: AnalysisError| {
error!(
"Failed to create GDS graph projection '{}': {:?}",
proj_params.projection_name, e
);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!(
"Successfully created/updated GDS graph projection: '{}'",
proj_params.projection_name
);
self.calculate_and_display_metrics(&proj_params.projection_name)
.await?;
self.handle_components_analysis().await?;
Ok(())
}
pub async fn handle_projection_create(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let proj_params = self.create_projection_params("tor_erpc_projection");
if !self.args.force
&& self
.db_client
.check_graph_projection_exists(&proj_params.projection_name)
.await?
{
println!(
"Projection '{}' already exists. Use --force to recreate.",
proj_params.projection_name
);
return Ok(());
}
info!(
"Creating graph projection: '{}'",
proj_params.projection_name
);
self.db_client.create_graph_projection(&proj_params).await?;
println!(
"✅ Successfully created projection: '{}'",
proj_params.projection_name
);
Ok(())
}
pub async fn handle_projection_delete(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Deleting graph projection: tor_erpc_projection");
self.db_client
.delete_graph_projection("tor_erpc_projection")
.await?;
println!("✅ Successfully deleted projection: tor_erpc_projection");
Ok(())
}
async fn handle_projection_exists(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let exists = self
.db_client
.check_graph_projection_exists("tor_erpc_projection")
.await?;
if exists {
println!("✅ Projection: tor_erpc_projection exists");
} else {
println!("❌ Projection: tor_erpc_projection does not exist");
}
Ok(())
}
async fn handle_metrics_basic(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Calculating basic metrics for projection: tor_erpc_projection");
let metrics = self
.db_client
.calculate_graph_metrics("tor_erpc_projection")
.await
.map_err(|e| {
error!("Failed to calculate graph metrics: {:?}", e);
e
})?;
info!("Successfully calculated metrics, displaying results...");
self.display_basic_metrics(&metrics)?;
Ok(())
}
async fn handle_metrics_degrees(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let node_degrees = self
.db_client
.calculate_node_degrees("tor_erpc_projection")
.await
.map_err(|e| {
error!("Failed to calculate node degrees: {:?}", e);
e
})?;
self.display_degree_metrics(&node_degrees)?;
Ok(())
}
async fn handle_metrics_distribution(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let metrics = self
.db_client
.calculate_graph_metrics("tor_erpc_projection")
.await
.map_err(|e| {
error!(
"Failed to calculate graph metrics for distribution: {:?}",
e
);
e
})?;
self.display_degree_distribution(&metrics)?;
Ok(())
}
async fn handle_info_database(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let connection_status = match self
.db_client
.check_graph_projection_exists("test_connection")
.await
{
Ok(_) => "Connected",
Err(_) => "Connection Failed",
};
println!("Database Information:");
println!(" Type: Neo4j");
println!(" Status: {}", connection_status);
Ok(())
}
async fn handle_help(&self) -> Result<(), Box<dyn std::error::Error>> {
println!("{}", Args::task_help());
Ok(())
}
async fn handle_web_server(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting eRPC Analysis web server...");
let host = "127.0.0.1".to_string();
let port = 8080;
println!("Starting eRPC Analysis web server");
println!(
" API endpoints available at http://{}:{}/erpc/",
host, port
);
println!();
println!("Analysis Endpoints (GET - Run & Return Results):");
println!(
" Components analysis: http://{}:{}/erpc/components",
host, port
);
println!(
" Partition classification: http://{}:{}/erpc/partitions",
host, port
);
println!(
" Centrality analysis: http://{}:{}/erpc/centrality",
host, port
);
println!(" Community detection (Louvain): http://{}:{}/erpc/community/louvain", host, port);
println!(" Graph metrics: http://{}:{}/erpc/metrics", host, port);
println!(
" Metrics degrees: http://{}:{}/erpc/metrics/degrees",
host, port
);
println!(
" Metrics distribution: http://{}:{}/erpc/metrics/distribution",
host, port
);
println!(
" Community LPA: http://{}:{}/erpc/community/lpa",
host, port
);
println!(
" Path connectivity: http://{}:{}/erpc/path/connectivity",
host, port
);
println!(
" Advanced path analysis: http://{}:{}/erpc/advanced-paths",
host, port
);
println!();
println!("Utility Endpoints:");
println!(" Health check: http://{}:{}/erpc/health", host, port);
println!(
" Database info: http://{}:{}/erpc/database/info",
host, port
);
println!();
println!("Press Ctrl+C to stop the server");
let web_server = crate::web::WebServer::new(
host,
port,
Arc::clone(&self.db_client),
self.config.clone(),
);
web_server.start().await.map_err(|e| {
error!("Failed to start web server: {}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
Ok(())
}
pub async fn handle_components_analysis(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
self.handle_connectivity_projection().await?;
info!("Running weak connectivity analysis using WCC algorithm...");
self.run_wcc_analysis("tor_erpc_connectivity_analysis")
.await?;
info!("Running strong connectivity analysis using SCC algorithm...");
self.run_scc_analysis("tor_erpc_connectivity_analysis")
.await?;
Ok(())
}
pub async fn handle_community_louvain_consensus(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Running Louvain consensus community detection for projection: \
tor_erpc_connectivity_analysis"
);
self.handle_connectivity_projection().await?;
self.run_louvain_consensus_analysis("tor_erpc_connectivity_analysis")
.await?;
println!(
"✅ Successfully completed Louvain consensus community detection \
analysis"
);
Ok(())
}
pub async fn handle_community_lpa(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Running Label Propagation community detection for projection: \
tor_erpc_connectivity_analysis"
);
self.handle_connectivity_projection().await?;
self.run_lpa_analysis("tor_erpc_connectivity_analysis")
.await?;
println!(
"✅ Successfully completed Label Propagation community detection \
analysis"
);
Ok(())
}
async fn handle_partition_classify_geography(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running partition classification by geography...");
self.handle_connectivity_projection().await?;
let component_analyzer =
ComponentAnalyzer::new(Arc::clone(&self.db_client));
let wcc_result = component_analyzer
.analyze_weakly_connected_components(
"tor_erpc_connectivity_analysis",
)
.await?;
let classifier = PartitionClassifier::new(Arc::clone(&self.db_client));
let result = classifier
.classify_by_geography(&wcc_result.components)
.await?;
classifier.display_geographic_classification(
&result,
&self.config.analysis_params.analysis,
)?;
println!(
"✅ Successfully completed geographic partition classification"
);
Ok(())
}
async fn handle_partition_classify_asn(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running partition classification by ASN...");
self.handle_connectivity_projection().await?;
let component_analyzer =
ComponentAnalyzer::new(Arc::clone(&self.db_client));
let wcc_result = component_analyzer
.analyze_weakly_connected_components(
"tor_erpc_connectivity_analysis",
)
.await?;
let classifier = PartitionClassifier::new(Arc::clone(&self.db_client));
let result =
classifier.classify_by_asn(&wcc_result.components).await?;
classifier.display_asn_classification(
&result,
&self.config.analysis_params.analysis,
)?;
println!("✅ Successfully completed ASN partition classification");
Ok(())
}
async fn handle_partition_classify_family(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running partition classification by family...");
self.handle_connectivity_projection().await?;
let component_analyzer =
ComponentAnalyzer::new(Arc::clone(&self.db_client));
let wcc_result = component_analyzer
.analyze_weakly_connected_components(
"tor_erpc_connectivity_analysis",
)
.await?;
let classifier = PartitionClassifier::new(Arc::clone(&self.db_client));
let result = classifier
.classify_by_family(&wcc_result.components)
.await?;
classifier.display_family_classification(
&result,
&self.config.analysis_params.analysis,
)?;
println!("✅ Successfully completed family partition classification");
Ok(())
}
pub async fn handle_partition_classify_all(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running comprehensive partition classification...");
self.handle_connectivity_projection().await?;
let component_analyzer =
ComponentAnalyzer::new(Arc::clone(&self.db_client));
let wcc_result = component_analyzer
.analyze_weakly_connected_components(
"tor_erpc_connectivity_analysis",
)
.await?;
let classifier = PartitionClassifier::new(Arc::clone(&self.db_client));
info!("Running geographic classification...");
let geo_result = classifier
.classify_by_geography(&wcc_result.components)
.await?;
classifier.display_geographic_classification(
&geo_result,
&self.config.analysis_params.analysis,
)?;
info!("Running ASN classification...");
let asn_result =
classifier.classify_by_asn(&wcc_result.components).await?;
classifier.display_asn_classification(
&asn_result,
&self.config.analysis_params.analysis,
)?;
info!("Running family classification...");
let family_result = classifier
.classify_by_family(&wcc_result.components)
.await?;
classifier.display_family_classification(
&family_result,
&self.config.analysis_params.analysis,
)?;
println!("✅ Successfully completed all partition classifications");
Ok(())
}
async fn run_wcc_analysis(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let analyzer = ComponentAnalyzer::new(Arc::clone(&self.db_client));
let result = analyzer
.analyze_weakly_connected_components(projection_name)
.await?;
analyzer.display_weak_connectivity_analysis(
&result,
&self.config.analysis_params.analysis,
)?;
Ok(())
}
async fn run_scc_analysis(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let analyzer = ComponentAnalyzer::new(Arc::clone(&self.db_client));
let result = analyzer
.analyze_strongly_connected_components(projection_name)
.await?;
analyzer.display_strong_connectivity_analysis(
&result,
&self.config.analysis_params.analysis,
)?;
Ok(())
}
async fn run_louvain_consensus_analysis(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let consensus_runs = self
.config
.analysis_params
.community_detection
.consensus_runs;
info!(
"Starting Louvain consensus community detection analysis ({} \
runs)...",
consensus_runs
);
let analyzer = CommunityAnalyzer::new(self.db_client.clone());
let mut results = Vec::new();
for run_number in 1..=consensus_runs {
info!(
"Running Louvain iteration {}/{}...",
run_number, consensus_runs
);
let result = analyzer
.analyze_louvain_communities(
projection_name,
&self.config.analysis_params.community_detection.louvain,
)
.await?;
info!("=== Louvain Run {} Results ===", run_number);
analyzer.display_louvain_community_analysis(
&result,
&self.config.analysis_params.analysis,
)?;
results.push(result);
}
self.analyze_louvain_consensus(&results, consensus_runs);
info!("✅ Louvain consensus analysis completed");
Ok(())
}
fn analyze_louvain_consensus(
&self,
results: &[ComponentAnalysisResult],
consensus_runs: u32,
) {
info!(
"=== Louvain Consensus Analysis ({} Runs) ===",
consensus_runs
);
let mut community_counts = Vec::new();
let mut largest_sizes = Vec::new();
let mut isolation_ratios = Vec::new();
let mut modularity_scores = Vec::new();
for (i, result) in results.iter().enumerate() {
let count = result.total_components.unwrap_or(0);
let largest = result.largest_component_size.unwrap_or(0);
let isolation = result.isolation_ratio.unwrap_or(0.0);
let modularity = result.modularity.unwrap_or(0.0);
community_counts.push(count);
largest_sizes.push(largest);
isolation_ratios.push(isolation);
modularity_scores.push(modularity);
info!(
"Run {}: {} communities, largest: {}, isolation: {:.2}%, \
modularity: {:.4}",
i + 1,
count,
largest,
isolation,
modularity
);
}
let avg_communities = community_counts.iter().sum::<usize>() as f64
/ community_counts.len() as f64;
let min_communities = *community_counts.iter().min().unwrap();
let max_communities = *community_counts.iter().max().unwrap();
let avg_largest = largest_sizes.iter().sum::<usize>() as f64
/ largest_sizes.len() as f64;
let min_largest = *largest_sizes.iter().min().unwrap();
let max_largest = *largest_sizes.iter().max().unwrap();
let avg_isolation = isolation_ratios.iter().sum::<f64>()
/ isolation_ratios.len() as f64;
let min_isolation = isolation_ratios
.iter()
.min_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
let max_isolation = isolation_ratios
.iter()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
let avg_modularity = modularity_scores.iter().sum::<f64>()
/ modularity_scores.len() as f64;
let min_modularity = modularity_scores
.iter()
.min_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
let max_modularity = modularity_scores
.iter()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
info!("=== Stability Analysis ===");
info!(
"Community Count - Avg: {:.1}, Range: {} - {}",
avg_communities, min_communities, max_communities
);
info!(
"Largest Community - Avg: {:.1}, Range: {} - {}",
avg_largest, min_largest, max_largest
);
info!(
"Isolation Ratio - Avg: {:.2}%, Range: {:.2}% - {:.2}%",
avg_isolation, min_isolation, max_isolation
);
info!(
"Modularity Score - Avg: {:.4}, Range: {:.4} - {:.4}",
avg_modularity, min_modularity, max_modularity
);
let communities_variance = community_counts
.iter()
.map(|&x| (x as f64 - avg_communities).powi(2))
.sum::<f64>()
/ community_counts.len() as f64;
let communities_std = communities_variance.sqrt();
let communities_cv = communities_std / avg_communities * 100.0;
let isolation_variance = isolation_ratios
.iter()
.map(|&x| (x - avg_isolation).powi(2))
.sum::<f64>()
/ isolation_ratios.len() as f64;
let isolation_std = isolation_variance.sqrt();
let isolation_cv = isolation_std / avg_isolation * 100.0;
let modularity_variance = modularity_scores
.iter()
.map(|&x| (x - avg_modularity).powi(2))
.sum::<f64>()
/ modularity_scores.len() as f64;
let modularity_std = modularity_variance.sqrt();
let modularity_cv = if avg_modularity != 0.0 {
modularity_std / avg_modularity * 100.0
} else {
0.0
};
info!("=== Stability Metrics ===");
info!(
"Community Count Coefficient of Variation: {:.1}%",
communities_cv
);
info!(
"Isolation Ratio Coefficient of Variation: {:.1}%",
isolation_cv
);
info!(
"Modularity Score Coefficient of Variation: {:.1}%",
modularity_cv
);
if avg_modularity >= 0.3 {
info!("✅ STRONG community structure detected (modularity ≥ 0.3)");
} else if avg_modularity >= 0.1 {
info!(
"⚠️ MODERATE community structure detected (0.1 ≤ modularity \
< 0.3)"
);
} else {
info!("❌ WEAK community structure detected (modularity < 0.1)");
}
if communities_cv < 10.0 {
info!("✅ Community structure is STABLE (CV < 10%)");
} else if communities_cv < 20.0 {
info!(
"⚠️ Community structure is MODERATELY STABLE (10% ≤ CV < 20%)"
);
} else {
info!("❌ Community structure is UNSTABLE (CV ≥ 20%)");
}
let threshold = self
.config
.analysis_params
.analysis
.isolation_ratio_threshold;
if avg_isolation < threshold {
info!(
"⚠️ Network fragmentation detected: Average isolation ratio \
{:.2}% is below threshold {:.1}%",
avg_isolation, threshold
);
} else {
info!(
"✅ Network connectivity is healthy: Average isolation ratio \
{:.2}% is above threshold {:.1}%",
avg_isolation, threshold
);
}
}
async fn run_lpa_analysis(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let analyzer = CommunityAnalyzer::new(Arc::clone(&self.db_client));
let result = analyzer
.analyze_label_propagation_communities(
projection_name,
&self
.config
.analysis_params
.community_detection
.label_propagation,
)
.await?;
analyzer.display_label_propagation_community_analysis(
&result,
&self.config.analysis_params.analysis,
)?;
Ok(())
}
fn create_default_projection_params(&self) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "NATURAL".to_string());
rel_types_map
.insert("CIRCUIT_FAILURE".to_string(), "NATURAL".to_string());
GraphProjectionParams {
projection_name: "tor_erpc_projection".to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
fn create_projection_params(&self, name: &str) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "NATURAL".to_string());
rel_types_map
.insert("CIRCUIT_FAILURE".to_string(), "NATURAL".to_string());
GraphProjectionParams {
projection_name: name.to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
fn create_partition_detection_params(
&self,
name: &str,
) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "NATURAL".to_string());
GraphProjectionParams {
projection_name: name.to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
fn create_path_analysis_params(
&self,
name: &str,
) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "UNDIRECTED".to_string());
GraphProjectionParams {
projection_name: name.to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
async fn handle_connectivity_projection(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let exists = self
.db_client
.check_graph_projection_exists("tor_erpc_connectivity_analysis")
.await?;
if !exists {
info!(
"❌ Connectivity projection 'tor_erpc_connectivity_analysis' \
does not exist."
);
info!("Creating projection with SUCCESS edges only...");
let partition_params = self.create_partition_detection_params(
"tor_erpc_connectivity_analysis",
);
self.db_client
.create_graph_projection(&partition_params)
.await
.map_err(|e| {
error!(
"Failed to create connectivity projection: {:?}",
e
);
e
})?;
info!(
"✅ Created connectivity projection - tor_erpc_connectivity_analysis"
);
}
Ok(())
}
pub async fn handle_path_analysis_projection(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let projection_name = "tor_erpc_path_analysis";
let exists = self
.db_client
.check_graph_projection_exists(projection_name)
.await?;
if !exists {
info!(
"❌ Path analysis projection '{}' does not exist.",
projection_name
);
info!("Creating projection with UNDIRECTED edges for analysis...");
let path_params =
self.create_path_analysis_params(projection_name);
self.db_client
.create_graph_projection(&path_params)
.await
.map_err(|e| {
error!(
"Failed to create path analysis projection: {:?}",
e
);
e
})?;
info!("✅ Created path analysis projection - {}", projection_name);
}
Ok(())
}
async fn calculate_and_display_metrics(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
info!("=== Starting Graph Metrics Calculation ===");
let graph_metrics = self
.db_client
.calculate_graph_metrics(projection_name)
.await
.map_err(|e: AnalysisError| {
error!("Failed to calculate graph metrics: {:?}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
self.display_basic_metrics(&graph_metrics)?;
self.display_degree_distribution(&graph_metrics)?;
let node_degrees = self
.db_client
.calculate_node_degrees(projection_name)
.await
.map_err(|e: AnalysisError| {
error!("Failed to calculate node degrees: {:?}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
self.display_degree_metrics(&node_degrees)?;
info!("=== Graph Metrics Calculation Complete ===");
Ok(())
}
fn display_basic_metrics(
&self,
metrics: &GraphMetrics,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Basic Graph Metrics:");
info!(" Nodes: {}", metrics.node_count.unwrap_or(0));
info!(
" Relationships: {}",
metrics.relationship_count.unwrap_or(0)
);
info!(
" Average degree: {:.2}",
metrics.average_degree.unwrap_or(0.0)
);
info!(" Maximum degree: {}", metrics.max_degree.unwrap_or(0));
info!(" Minimum degree: {}", metrics.min_degree.unwrap_or(0));
Ok(())
}
fn display_degree_metrics(
&self,
node_degrees: &[NodeMetrics],
) -> Result<(), Box<dyn std::error::Error>> {
let mut sorted_degrees = node_degrees.to_vec();
sorted_degrees.sort_by(|a, b| b.total_degree.cmp(&a.total_degree));
info!(
"Top {} Most Connected Relays:",
self.config.analysis_params.analysis.max_display_components
);
for (i, node) in sorted_degrees
.iter()
.take(self.config.analysis_params.analysis.max_display_components)
.enumerate()
{
info!(
" {}. {} - Total: {}, In: {}, Out: {}",
i + 1,
&node.fingerprint,
node.total_degree,
node.in_degree,
node.out_degree
);
}
Ok(())
}
fn display_degree_distribution(
&self,
metrics: &GraphMetrics,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(degree_dist) = &metrics.degree_distribution {
info!(
"Degree Distribution ({} unique values):",
degree_dist.len()
);
let mut dist_vec: Vec<(i64, i64)> =
degree_dist.iter().map(|(&k, &v)| (k, v)).collect();
dist_vec.sort_by(|a, b| b.1.cmp(&a.1)); info!(" Top 5 Most Common Degrees:");
for (i, (degree, count)) in dist_vec
.iter()
.take(
self.config
.analysis_params
.analysis
.max_display_components
.min(5),
)
.enumerate()
{
info!(
" {}. {} relays have degree {}",
i + 1,
count,
degree
);
}
} else {
warn!("No degree distribution data available");
}
Ok(())
}
async fn handle_centrality_betweenness(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running betweenness centrality analysis...");
self.handle_connectivity_projection().await?;
let centrality_analyzer =
crate::algorithms::centrality::CentralityAnalyzer::new(
Arc::clone(&self.db_client),
);
let centrality_config = &self.config.analysis_params.centrality;
let result = centrality_analyzer
.analyze_betweenness_centrality(
"tor_erpc_connectivity_analysis",
centrality_config.betweenness_sampling_size,
centrality_config.betweenness_sampling_seed,
)
.await?;
centrality_analyzer.display_centrality_results(
&result,
"betweenness",
&self.config.analysis_params.analysis,
);
println!("✅ Successfully completed betweenness centrality analysis");
Ok(())
}
async fn handle_centrality_closeness(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running closeness centrality analysis...");
self.handle_connectivity_projection().await?;
let centrality_analyzer =
crate::algorithms::centrality::CentralityAnalyzer::new(
Arc::clone(&self.db_client),
);
let centrality_config = &self.config.analysis_params.centrality;
let result = centrality_analyzer
.analyze_closeness_centrality(
"tor_erpc_connectivity_analysis",
centrality_config.use_wasserman_faust,
)
.await?;
centrality_analyzer.display_centrality_results(
&result,
"closeness",
&self.config.analysis_params.analysis,
);
println!("✅ Successfully completed closeness centrality analysis");
Ok(())
}
pub async fn handle_centrality_combined(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running combined centrality analysis...");
self.handle_connectivity_projection().await?;
let centrality_analyzer =
crate::algorithms::centrality::CentralityAnalyzer::new(
Arc::clone(&self.db_client),
);
let centrality_config = &self.config.analysis_params.centrality;
let result = centrality_analyzer
.analyze_combined_centrality(
"tor_erpc_connectivity_analysis",
centrality_config.betweenness_sampling_size,
centrality_config.betweenness_sampling_seed,
centrality_config.use_wasserman_faust,
)
.await?;
centrality_analyzer.display_centrality_results(
&result,
"combined",
&self.config.analysis_params.analysis,
);
println!("✅ Successfully completed combined centrality analysis");
Ok(())
}
pub async fn handle_path_connectivity(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running internal connectivity analysis within communities...");
self.handle_path_analysis_projection().await?;
info!("Validating graph projection...");
let projection_info = self
.db_client
.calculate_graph_metrics("tor_erpc_path_analysis")
.await?;
info!(
"Projection has {} nodes and {} relationships",
projection_info.node_count.unwrap_or(0),
projection_info.relationship_count.unwrap_or(0)
);
if projection_info.node_count.unwrap_or(0) == 0 {
return Err(
"Graph projection is empty! Cannot perform connectivity analysis."
.into(),
);
}
info!("🔍 Detecting communities using Louvain algorithm...");
let community_analyzer =
crate::algorithms::communities::CommunityAnalyzer::new(
Arc::clone(&self.db_client),
);
let louvain_result = community_analyzer
.analyze_louvain_communities(
"tor_erpc_path_analysis",
&self.config.analysis_params.community_detection.louvain,
)
.await?;
let total_communities = louvain_result.total_components.unwrap_or(0);
if total_communities == 0 {
info!("No communities found for connectivity analysis");
return Ok(());
}
info!(
"Found {} communities with modularity: {:.4}",
total_communities,
louvain_result.modularity.unwrap_or(0.0)
);
let largest_community = &louvain_result.components[0];
info!(
"Analyzing internal connectivity of largest community with {} nodes",
largest_community.relay_fingerprints.len()
);
let nodes_per_group = self
.config
.analysis_params
.path_analysis
.internal_component_sample_size;
let (source_samples, target_samples) = self
.sample_nodes_for_path_analysis(
&largest_community.relay_fingerprints,
&largest_community.relay_fingerprints,
nodes_per_group,
)?;
info!(
"Analyzing internal paths between {} source and {} target nodes",
source_samples.len(),
target_samples.len()
);
let path_analyzer = crate::algorithms::path::PathAnalyzer::new(
Arc::clone(&self.db_client),
);
let result = path_analyzer
.analyze_inter_community_paths(
"tor_erpc_path_analysis",
&source_samples,
&target_samples,
)
.await?;
path_analyzer.display_path_results(&result);
let connectivity_ratio = if let (Some(connected), Some(total)) = (
result.connected_community_pairs,
result.total_paths_analyzed,
) {
if total > 0 {
(connected as f64 / total as f64) * 100.0
} else {
0.0
}
} else {
0.0
};
info!("=== Internal Community Connectivity Analysis ===");
info!("Internal connectivity ratio: {:.2}%", connectivity_ratio);
if connectivity_ratio > 90.0 {
info!("✅ Good internal connectivity within largest community");
} else if connectivity_ratio > 50.0 {
info!(
"⚠️ Moderate internal connectivity within largest community"
);
} else {
info!("❌ Poor internal connectivity within largest community");
}
if let Some(avg_length) = result.average_path_length {
info!("📏 Average internal path length: {:.2}", avg_length);
}
info!("=== Community Structure Analysis ===");
info!("Network has {} communities total", total_communities);
info!(
"Largest community: {} nodes ({:.2}% of network)",
largest_community.relay_fingerprints.len(),
(largest_community.relay_fingerprints.len() as f64
/ projection_info.node_count.unwrap_or(1) as f64)
* 100.0
);
if louvain_result.components.len() > 1 {
info!(
"Second largest community: {} nodes",
louvain_result.components[1].relay_fingerprints.len()
);
}
let modularity = louvain_result.modularity.unwrap_or(0.0);
if modularity > 0.3 {
info!(
"✅ Strong community structure (modularity: {:.4})",
modularity
);
} else {
info!(
"⚠️ Weak community structure (modularity: {:.4})",
modularity
);
}
println!("✅ Successfully completed internal connectivity analysis");
Ok(())
}
pub async fn handle_advanced_path_analysis(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running advanced path analysis using community detection...");
self.handle_path_analysis_projection().await?;
let projection_info = self
.db_client
.calculate_graph_metrics("tor_erpc_path_analysis")
.await?;
info!(
"Projection has {} nodes and {} relationships",
projection_info.node_count.unwrap_or(0),
projection_info.relationship_count.unwrap_or(0)
);
if projection_info.node_count.unwrap_or(0) == 0 {
return Err("Graph projection is empty!".into());
}
info!("🔍 Step 1: Detecting communities using Louvain algorithm...");
let community_analyzer =
crate::algorithms::communities::CommunityAnalyzer::new(
Arc::clone(&self.db_client),
);
let louvain_result = community_analyzer
.analyze_louvain_communities(
"tor_erpc_path_analysis",
&self.config.analysis_params.community_detection.louvain,
)
.await?;
let total_communities = louvain_result.total_components.unwrap_or(0);
if total_communities < 2 {
info!(
"Found only {} community - advanced path analysis requires >= 2",
total_communities
);
return Ok(());
}
info!(
"Found {} communities with modularity: {:.4}",
total_communities,
louvain_result.modularity.unwrap_or(0.0)
);
info!("🔍 Step 2: Analyzing paths between communities...");
let path_analyzer = crate::algorithms::path::PathAnalyzer::new(
Arc::clone(&self.db_client),
);
let num_top = self
.config
.analysis_params
.path_analysis
.num_top_communities;
let top_communities = louvain_result
.components
.iter()
.take(num_top)
.collect::<Vec<_>>();
for (i, source_community) in top_communities.iter().enumerate() {
for (j, target_community) in top_communities.iter().enumerate() {
if i >= j {
continue;
}
info!(
"Analyzing paths: Community {} ({} nodes) → Community {} ({} nodes)",
source_community.component_id,
source_community.relay_fingerprints.len(),
target_community.component_id,
target_community.relay_fingerprints.len()
);
let sample_size = self
.config
.analysis_params
.path_analysis
.sample_size_communities;
let (source_samples, target_samples) = self
.sample_nodes_for_path_analysis(
&source_community.relay_fingerprints,
&target_community.relay_fingerprints,
sample_size,
)?;
match path_analyzer
.analyze_inter_community_paths(
"tor_erpc_path_analysis",
&source_samples,
&target_samples,
)
.await
{
Ok(result) => {
let connectivity_ratio =
if let (Some(connected), Some(total)) = (
result.connected_community_pairs,
result.total_paths_analyzed,
) {
if total > 0 {
(connected as f64 / total as f64) * 100.0
} else {
0.0
}
} else {
0.0
};
info!(
" Inter-community connectivity: {:.2}%",
connectivity_ratio
);
if let Some(avg_length) = result.average_path_length {
info!(" Average path length: {:.2}", avg_length);
}
}
Err(e) => {
error!(" Path analysis failed: {}", e);
}
}
}
}
info!("🔍 Step 3: Analyzing paths by ASN partitioning...");
let classifier =
crate::algorithms::classification::PartitionClassifier::new(
Arc::clone(&self.db_client),
);
let asn_result = classifier
.classify_by_asn(&louvain_result.components)
.await?;
info!(
"Found {} ASN groups with {:.1}% coverage",
asn_result.metrics.total_groups,
asn_result.metrics.classification_coverage
);
let num_top_asn =
self.config.analysis_params.path_analysis.num_top_asn_groups;
let top_asn_groups = asn_result
.groups
.iter()
.take(num_top_asn)
.collect::<Vec<_>>();
for (i, source_asn) in top_asn_groups.iter().enumerate() {
for (j, target_asn) in top_asn_groups.iter().enumerate() {
if i >= j {
continue;
}
info!(
"Analyzing paths: ASN {} ({} relays) → ASN {} ({} relays)",
source_asn.identifier,
source_asn.relay_fingerprints.len(),
target_asn.identifier,
target_asn.relay_fingerprints.len()
);
let sample_size = self
.config
.analysis_params
.path_analysis
.sample_size_asn_groups;
let (source_samples, target_samples) = self
.sample_nodes_for_path_analysis(
&source_asn.relay_fingerprints,
&target_asn.relay_fingerprints,
sample_size,
)?;
match path_analyzer
.analyze_inter_community_paths(
"tor_erpc_path_analysis",
&source_samples,
&target_samples,
)
.await
{
Ok(result) => {
let connectivity_ratio =
if let (Some(connected), Some(total)) = (
result.connected_community_pairs,
result.total_paths_analyzed,
) {
if total > 0 {
(connected as f64 / total as f64) * 100.0
} else {
0.0
}
} else {
0.0
};
info!(
" Inter-ASN connectivity: {:.2}%",
connectivity_ratio
);
if connectivity_ratio > 0.0 {
if let Some(avg_length) =
result.average_path_length
{
info!(
" Average path length: {:.2}",
avg_length
);
}
}
}
Err(e) => {
error!(" Path analysis failed: {}", e);
}
}
}
}
info!("=== Advanced Path Analysis Summary ===");
info!(
"✅ Modularity score: {:.4} (>0.3 indicates strong community structure)",
louvain_result.modularity.unwrap_or(0.0)
);
println!("✅ Successfully completed advanced path analysis");
Ok(())
}
fn sample_nodes_for_path_analysis(
&self,
source_collection: &[String],
target_collection: &[String],
nodes_per_group: usize,
) -> Result<(Vec<String>, Vec<String>), Box<dyn std::error::Error>> {
use rand::prelude::*;
if source_collection.is_empty() || target_collection.is_empty() {
return Err("Cannot sample from empty collections".into());
}
let mut source_nodes = source_collection.to_vec();
source_nodes.shuffle(&mut thread_rng());
let source_samples =
source_nodes.into_iter().take(nodes_per_group).collect();
let mut target_nodes = target_collection.to_vec();
target_nodes.shuffle(&mut thread_rng());
let target_samples =
target_nodes.into_iter().take(nodes_per_group).collect();
Ok((source_samples, target_samples))
}
}