use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use log::{debug, error, info, warn};
use neo4rs::{Error as Neo4rsDriverError, Graph, Query, RowStream};
use crate::config::Neo4jConfig;
use crate::graph::projections::{
build_gds_drop_cypher, build_gds_project_cypher,
};
use crate::models::metrics::{
CentralityAnalysisResult, CentralityDistribution, CentralityMetrics,
GraphMetrics, NodeMetrics, PathAnalysisResult, PathResult,
};
use crate::db_trait::{
AnalysisDatabase, AnalysisError, GraphProjectionParams,
};
use crate::models::partitions::{ComponentAnalysisResult, ConnectedComponent};
pub struct Neo4jAnalysisClient {
graph: Arc<Graph>,
}
impl Neo4jAnalysisClient {
pub async fn new(config: &Neo4jConfig) -> Result<Self, AnalysisError> {
Graph::new(&config.uri, &config.username, &config.password)
.await
.map_err(|e: Neo4rsDriverError| {
AnalysisError::ConnectionFailed(format!(
"Failed to connect to Neo4j at URI '{}': {}",
config.uri, e
))
})
.map(|graph_conn| Self {
graph: Arc::new(graph_conn),
})
}
}
#[async_trait]
impl AnalysisDatabase for Neo4jAnalysisClient {
async fn create_graph_projection(
&self,
params: &GraphProjectionParams,
) -> Result<(), AnalysisError> {
if let Err(e) =
self.delete_graph_projection(¶ms.projection_name).await
{
warn!(
"Attempt to delete old projection '{}' failed before \
creation: {:?}. Proceeding with creation attempt.",
params.projection_name, e
);
}
let project_query_cypher = build_gds_project_cypher(
¶ms.projection_name,
¶ms.node_label,
¶ms.relationship_types,
params.relationship_properties_to_project.as_deref(),
);
info!(
"Executing GDS Projection for graph '{}' with node label '{}'",
params.projection_name, params.node_label
);
debug!("GDS Projection Query: {}", project_query_cypher);
let project_query = Query::new(project_query_cypher);
let mut result_stream: RowStream = self
.graph
.execute(project_query)
.await
.map_err(AnalysisError::from)?;
match result_stream.next().await {
Ok(Some(row)) => {
let projected_graph_name: String =
row.get::<String>("graphName").ok_or_else(|| {
AnalysisError::ProjectionCreationFailed {
projection_name: params.projection_name.clone(),
source_error:
"GDS projection report missing 'graphName'"
.to_string(),
}
})?;
let node_count: i64 =
row.get::<i64>("nodeCount").ok_or_else(|| {
AnalysisError::ProjectionCreationFailed {
projection_name: params.projection_name.clone(),
source_error:
"GDS projection report missing 'nodeCount'"
.to_string(),
}
})?;
let relationship_count: i64 =
row.get::<i64>("relationshipCount").ok_or_else(|| {
AnalysisError::ProjectionCreationFailed {
projection_name: params.projection_name.clone(),
source_error: "GDS projection report \
missing 'relationshipCount'"
.to_string(),
}
})?;
let project_millis: i64 =
row.get::<i64>("projectMillis").ok_or_else(|| {
AnalysisError::ProjectionCreationFailed {
projection_name: params.projection_name.clone(),
source_error:
"GDS projection report missing 'projectMillis'"
.to_string(),
}
})?;
info!(
"GDS Reported: Projected graph '{}' with {} nodes, \
{} relationships in {} ms.",
projected_graph_name,
node_count,
relationship_count,
project_millis
);
if projected_graph_name != params.projection_name {
warn!(
"Projected graph name from GDS ('{}') does not \
match requested name ('{}').",
projected_graph_name, params.projection_name
);
}
}
Ok(None) => {
return Err(AnalysisError::ProjectionCreationFailed {
projection_name: params.projection_name.clone(),
source_error: "GDS graph project query executed \
but returned no rows."
.to_string(),
});
}
Err(e) => {
return Err(AnalysisError::ProjectionCreationFailed {
projection_name: params.projection_name.clone(),
source_error: format!(
"Failed to process result from GDS graph project \
query: {}",
e
),
});
}
}
info!(
"Graph projection command for '{}' completed.",
params.projection_name
);
Ok(())
}
async fn delete_graph_projection(
&self,
projection_name: &str,
) -> Result<(), AnalysisError> {
let exists =
self.check_graph_projection_exists(projection_name).await?;
if exists {
info!(
"GDS graph projection '{}' exists. Attempting to drop it.",
projection_name
);
let drop_query_cypher = build_gds_drop_cypher(projection_name);
let drop_query = Query::new(drop_query_cypher);
match self.graph.execute(drop_query).await {
Ok(mut drop_stream) => match drop_stream.next().await {
Ok(Some(row)) => {
let dropped_name_opt: Option<String> =
row.get::<String>("graphName");
info!(
"Successfully dropped GDS graph projection: '{}'. \
GDS returned: '{}'",
projection_name,
dropped_name_opt
.unwrap_or_else(|| "N/A".to_string())
);
}
Ok(None) => {
info!(
"GDS graph drop for '{}' returned no rows, \
assuming drop was successful.",
projection_name
);
}
Err(e) => {
warn!(
"Error processing result stream from GDS graph \
drop for '{}': {}. Assuming dropped.",
projection_name, e
);
}
},
Err(e_n4rs) => {
error!(
"Failed to execute GDS graph drop query for '{}': {}",
projection_name, e_n4rs
);
return Err(AnalysisError::ProjectionDropFailed {
projection_name: projection_name.to_string(),
source_error: format!(
"Failed to execute GDS graph drop query: {}",
e_n4rs
),
});
}
}
} else {
info!(
"GDS graph projection '{}' does not exist. No deletion \
needed.",
projection_name
);
}
Ok(())
}
async fn check_graph_projection_exists(
&self,
projection_name: &str,
) -> Result<bool, AnalysisError> {
debug!(
"Checking if GDS graph projection '{}' exists.",
projection_name
);
let query_str = "CALL gds.graph.exists($projection_name) YIELD \
graphName, exists RETURN exists";
let query = Query::new(query_str.to_string())
.param("projection_name", projection_name);
let mut stream: RowStream = self
.graph
.execute(query)
.await
.map_err(AnalysisError::from)?;
let row = stream
.next()
.await
.map_err(AnalysisError::from)?
.ok_or_else(|| {
AnalysisError::QueryFailed(format!(
"gds.graph.exists query for '{}' returned no rows",
projection_name
))
})?;
let exists: bool = row.get::<bool>("exists").ok_or_else(|| {
AnalysisError::QueryFailed(format!(
"Failed to get 'exists' field from gds.graph.exists \
for '{}' or field was null",
projection_name
))
})?;
debug!(
"GDS graph projection '{}' exists status: {}",
projection_name, exists
);
Ok(exists)
}
async fn calculate_graph_metrics(
&self,
projection_name: &str,
) -> Result<GraphMetrics, AnalysisError> {
info!(
"Calculating graph metrics for GDS projection: '{}'",
projection_name
);
let query_str = "CALL gds.graph.list($projection_name) YIELD \
graphName, nodeCount, relationshipCount \
RETURN nodeCount, relationshipCount";
let query = Query::new(query_str.to_string())
.param("projection_name", projection_name);
let mut stream: RowStream = self
.graph
.execute(query)
.await
.map_err(AnalysisError::from)?;
if let Some(row) = stream.next().await.map_err(AnalysisError::from)? {
let node_count: i64 =
row.get::<i64>("nodeCount").ok_or_else(|| {
AnalysisError::QueryFailed(format!(
"Failed to get 'nodeCount' for projection '{}' or field \
was null",
projection_name
))
})?;
let relationship_count: i64 =
row.get::<i64>("relationshipCount").ok_or_else(|| {
AnalysisError::QueryFailed(format!(
"Failed to get 'relationshipCount' for projection \
'{}' or field was null",
projection_name
))
})?;
info!(
"Basic metrics retrieved: {} nodes, {} relationships",
node_count, relationship_count
);
let node_degrees =
self.calculate_node_degrees(projection_name).await?;
if node_degrees.is_empty() {
return Ok(GraphMetrics {
node_count: Some(node_count),
relationship_count: Some(relationship_count),
degree_distribution: Some(HashMap::new()),
average_degree: Some(0.0),
max_degree: Some(0),
min_degree: Some(0),
});
}
let mut degree_distribution = HashMap::new();
let mut total_degree_sum = 0i64;
let mut max_degree = 0i64;
let mut min_degree = i64::MAX;
for node in &node_degrees {
let degree = node.total_degree;
*degree_distribution.entry(degree).or_insert(0) += 1;
total_degree_sum += degree;
max_degree = max_degree.max(degree);
min_degree = min_degree.min(degree);
}
if min_degree == i64::MAX {
min_degree = 0;
}
let average_degree = if !node_degrees.is_empty() {
total_degree_sum as f64 / node_degrees.len() as f64
} else {
0.0
};
info!(
"Comprehensive metrics calculated: avg_degree={:.2}, \
max_degree={}, min_degree={}, distribution_size={}",
average_degree,
max_degree,
min_degree,
degree_distribution.len()
);
Ok(GraphMetrics {
node_count: Some(node_count),
relationship_count: Some(relationship_count),
degree_distribution: Some(degree_distribution),
average_degree: Some(average_degree),
max_degree: Some(max_degree),
min_degree: Some(min_degree),
})
} else {
Err(AnalysisError::ProjectionNotFound(
projection_name.to_string(),
))
}
}
async fn calculate_node_degrees(
&self,
projection_name: &str,
) -> Result<Vec<NodeMetrics>, AnalysisError> {
info!(
"Calculating node metrics for GDS projection: '{}'",
projection_name
);
let query_str = "
MATCH (r:Relay)
OPTIONAL MATCH (r)-[out:CIRCUIT_SUCCESS]->()
OPTIONAL MATCH ()-[in:CIRCUIT_SUCCESS]->(r)
RETURN r.fingerprint as fingerprint,
count(DISTINCT in) as in_degree,
count(DISTINCT out) as out_degree,
count(DISTINCT in) + count(DISTINCT out) as total_degree
ORDER BY total_degree DESC
";
let query = Query::new(query_str.to_string());
let mut stream: RowStream = self
.graph
.execute(query)
.await
.map_err(AnalysisError::from)?;
let mut node_metrics = Vec::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let fingerprint: String =
row.get::<String>("fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'fingerprint' field from node degree \
calculation"
.to_string(),
)
})?;
let in_degree: i64 =
row.get::<i64>("in_degree").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'in_degree' field from node degree \
calculation"
.to_string(),
)
})?;
let out_degree: i64 =
row.get::<i64>("out_degree").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'out_degree' field from node degree \
calculation"
.to_string(),
)
})?;
let total_degree: i64 =
row.get::<i64>("total_degree").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'total_degree' field from node degree \
calculation"
.to_string(),
)
})?;
node_metrics.push(NodeMetrics {
fingerprint,
in_degree,
out_degree,
total_degree,
});
}
Ok(node_metrics)
}
async fn calculate_weakly_connected_components(
&self,
projection_name: &str,
) -> Result<ComponentAnalysisResult, AnalysisError> {
info!(
"Calculating weakly connected components for projection: '{}'",
projection_name
);
let wcc_query = format!(
"CALL gds.wcc.stream('{}')
YIELD nodeId, componentId
RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
componentId
ORDER BY componentId, relay_fingerprint",
projection_name
);
debug!("WCC Query: {}", wcc_query);
let query = Query::new(wcc_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute WCC query: {:?}", e);
AnalysisError::AlgorithmError(format!(
"WCC algorithm execution failed for projection '{}': {}",
projection_name, e
))
})?;
let mut component_map: HashMap<i64, Vec<String>> = HashMap::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let relay_fingerprint: String =
row.get::<String>("relay_fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'relay_fingerprint' from WCC result"
.to_string(),
)
})?;
let component_id: i64 =
row.get::<i64>("componentId").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'componentId' from WCC result"
.to_string(),
)
})?;
component_map
.entry(component_id)
.or_default()
.push(relay_fingerprint);
}
let mut components: Vec<ConnectedComponent> = component_map
.into_iter()
.map(|(component_id, relay_fingerprints)| {
let size = relay_fingerprints.len();
ConnectedComponent {
component_id,
relay_fingerprints,
size,
}
})
.collect();
components.sort_by(|a, b| b.size.cmp(&a.size));
let total_components = components.len();
let largest_component_size =
components.first().map(|c| c.size).unwrap_or(0);
let smallest_component_size =
components.last().map(|c| c.size).unwrap_or(0);
let mut component_size_distribution = HashMap::new();
for component in &components {
*component_size_distribution
.entry(component.size)
.or_insert(0) += 1;
}
let total_nodes: usize = components.iter().map(|c| c.size).sum();
let isolation_ratio = if total_nodes > 0 {
(largest_component_size as f64 / total_nodes as f64) * 100.0
} else {
0.0
};
info!(
"WCC analysis complete: {} components, largest: {}, \
smallest: {}, isolation ratio: {:.2}%",
total_components,
largest_component_size,
smallest_component_size,
isolation_ratio
);
Ok(ComponentAnalysisResult {
components,
total_components: Some(total_components),
largest_component_size: Some(largest_component_size),
smallest_component_size: Some(smallest_component_size),
component_size_distribution: Some(component_size_distribution),
isolation_ratio: Some(isolation_ratio),
modularity: None, })
}
async fn calculate_strongly_connected_components(
&self,
projection_name: &str,
) -> Result<ComponentAnalysisResult, AnalysisError> {
info!(
"Calculating strongly connected components for projection: '{}'",
projection_name
);
let scc_query = format!(
"CALL gds.scc.stream('{}')
YIELD nodeId, componentId
RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
componentId
ORDER BY componentId, relay_fingerprint",
projection_name
);
debug!("SCC Query: {}", scc_query);
let query = Query::new(scc_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute SCC query: {:?}", e);
AnalysisError::AlgorithmError(format!(
"SCC algorithm execution failed for projection '{}': {}",
projection_name, e
))
})?;
let mut component_map: HashMap<i64, Vec<String>> = HashMap::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let relay_fingerprint: String =
row.get::<String>("relay_fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'relay_fingerprint' from SCC result"
.to_string(),
)
})?;
let component_id: i64 =
row.get::<i64>("componentId").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'componentId' from SCC result"
.to_string(),
)
})?;
component_map
.entry(component_id)
.or_default()
.push(relay_fingerprint);
}
let mut components: Vec<ConnectedComponent> = component_map
.into_iter()
.map(|(component_id, relay_fingerprints)| {
let size = relay_fingerprints.len();
ConnectedComponent {
component_id,
relay_fingerprints,
size,
}
})
.collect();
components.sort_by(|a, b| b.size.cmp(&a.size));
let total_components = components.len();
let largest_component_size =
components.first().map(|c| c.size).unwrap_or(0);
let smallest_component_size =
components.last().map(|c| c.size).unwrap_or(0);
let mut component_size_distribution = HashMap::new();
for component in &components {
*component_size_distribution
.entry(component.size)
.or_insert(0) += 1;
}
let total_nodes: usize = components.iter().map(|c| c.size).sum();
let isolation_ratio = if total_nodes > 0 {
(largest_component_size as f64 / total_nodes as f64) * 100.0
} else {
0.0
};
info!(
"SCC analysis complete: {} components, largest: {}, \
smallest: {}, isolation ratio: {:.2}%",
total_components,
largest_component_size,
smallest_component_size,
isolation_ratio
);
Ok(ComponentAnalysisResult {
components,
total_components: Some(total_components),
largest_component_size: Some(largest_component_size),
smallest_component_size: Some(smallest_component_size),
component_size_distribution: Some(component_size_distribution),
isolation_ratio: Some(isolation_ratio),
modularity: None, })
}
async fn calculate_louvain_communities(
&self,
projection_name: &str,
params: &crate::config::LouvainConfig,
) -> Result<ComponentAnalysisResult, AnalysisError> {
info!(
"Calculating Louvain communities for projection: '{}'",
projection_name
);
let community_property = "louvainCommunity";
let louvain_write_query = format!(
"CALL gds.louvain.write('{}', {{
writeProperty: '{}',
maxIterations: {},
tolerance: {},
includeIntermediateCommunities: {}
}})
YIELD communityCount, ranLevels, modularity
RETURN communityCount, ranLevels, modularity",
projection_name,
community_property,
params.max_iterations,
params.tolerance,
params.include_intermediate_communities
);
debug!("Louvain Write Query: {}", louvain_write_query);
let write_query = Query::new(louvain_write_query);
let mut write_stream: RowStream =
self.graph.execute(write_query).await.map_err(|e| {
error!("Failed to execute Louvain write query: {:?}", e);
AnalysisError::AlgorithmError(format!(
"Louvain algorithm execution failed for projection '{}': {}",
projection_name, e
))
})?;
let mut modularity_score = None;
if let Some(row) =
write_stream.next().await.map_err(AnalysisError::from)?
{
let community_count: i64 =
row.get::<i64>("communityCount").unwrap_or(0);
let ran_levels: i64 = row.get::<i64>("ranLevels").unwrap_or(0);
let modularity: f64 = row.get::<f64>("modularity").unwrap_or(0.0);
info!(
"Louvain algorithm completed: {} communities, {} levels, \
modularity: {:.4}",
community_count, ran_levels, modularity
);
modularity_score = Some(modularity);
}
let louvain_stream_query = format!(
"MATCH (n:Relay)
WHERE n.{} IS NOT NULL
RETURN n.fingerprint AS relay_fingerprint, n.{} AS communityId
ORDER BY communityId, relay_fingerprint",
community_property, community_property
);
debug!("Louvain Stream Query: {}", louvain_stream_query);
let query = Query::new(louvain_stream_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute Louvain stream query: {:?}", e);
AnalysisError::AlgorithmError(format!(
"Louvain results streaming failed for projection '{}': {}",
projection_name, e
))
})?;
let mut community_map: HashMap<i64, Vec<String>> = HashMap::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let relay_fingerprint: String =
row.get::<String>("relay_fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'relay_fingerprint' from Louvain result"
.to_string(),
)
})?;
let community_id: i64 =
row.get::<i64>("communityId").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'communityId' from Louvain result"
.to_string(),
)
})?;
community_map
.entry(community_id)
.or_default()
.push(relay_fingerprint);
}
let mut communities: Vec<ConnectedComponent> = community_map
.into_iter()
.map(|(community_id, relay_fingerprints)| {
let size = relay_fingerprints.len();
ConnectedComponent {
component_id: community_id,
relay_fingerprints,
size,
}
})
.collect();
communities.sort_by(|a, b| b.size.cmp(&a.size));
let total_communities = communities.len();
let largest_community_size =
communities.first().map(|c| c.size).unwrap_or(0);
let smallest_community_size =
communities.last().map(|c| c.size).unwrap_or(0);
let mut community_size_distribution = HashMap::new();
for community in &communities {
*community_size_distribution
.entry(community.size)
.or_insert(0) += 1;
}
let total_nodes: usize = communities.iter().map(|c| c.size).sum();
let isolation_ratio = if total_nodes > 0 {
(largest_community_size as f64 / total_nodes as f64) * 100.0
} else {
0.0
};
let cleanup_query =
format!("MATCH (n:Relay) REMOVE n.{}", community_property);
let _ = self.graph.execute(Query::new(cleanup_query)).await;
info!(
"Louvain analysis complete: {} communities, largest: {}, \
smallest: {}, isolation ratio: {:.2}%, modularity: {:.4}",
total_communities,
largest_community_size,
smallest_community_size,
isolation_ratio,
modularity_score.unwrap_or(0.0)
);
Ok(ComponentAnalysisResult {
components: communities,
total_components: Some(total_communities),
largest_component_size: Some(largest_community_size),
smallest_component_size: Some(smallest_community_size),
component_size_distribution: Some(community_size_distribution),
isolation_ratio: Some(isolation_ratio),
modularity: modularity_score,
})
}
async fn calculate_label_propagation_communities(
&self,
projection_name: &str,
params: &crate::config::LabelPropagationConfig,
) -> Result<ComponentAnalysisResult, AnalysisError> {
info!(
"Calculating Label Propagation communities for projection: '{}'",
projection_name
);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let community_property = format!("lpaCommunity_{}", timestamp);
let cleanup_query = format!(
"CALL gds.graph.nodeProperties.drop('{}', ['lpaCommunity'], \
{{failIfMissing: false}}) YIELD propertiesRemoved",
projection_name
);
let _ = self.graph.execute(Query::new(cleanup_query)).await;
let lpa_mutate_query = format!(
"CALL gds.labelPropagation.mutate('{}', {{
maxIterations: {},
mutateProperty: '{}'
}})
YIELD communityCount, ranIterations
RETURN communityCount, ranIterations",
projection_name, params.max_iterations, community_property
);
debug!("Label Propagation Mutate Query: {}", lpa_mutate_query);
let mutate_query = Query::new(lpa_mutate_query);
let mut mutate_stream: RowStream =
self.graph.execute(mutate_query).await.map_err(|e| {
error!(
"Failed to execute Label Propagation mutate query: {:?}",
e
);
AnalysisError::AlgorithmError(format!(
"Label Propagation algorithm execution failed for \
projection '{}': {}",
projection_name, e
))
})?;
if let Some(row) =
mutate_stream.next().await.map_err(AnalysisError::from)?
{
let community_count: i64 =
row.get::<i64>("communityCount").unwrap_or(0);
let ran_iterations: i64 =
row.get::<i64>("ranIterations").unwrap_or(0);
info!(
"Label Propagation algorithm completed: {} communities, \
{} iterations",
community_count, ran_iterations
);
}
let modularity_score = match self
.calculate_modularity(projection_name, &community_property)
.await
{
Ok(modularity) => {
info!(
"Successfully calculated modularity for LPA \
communities: {:.4}",
modularity
);
Some(modularity)
}
Err(e) => {
error!(
"Failed to calculate modularity for LPA communities: {}",
e
);
warn!(
"Modularity calculation failed despite community \
property being stored in projection"
);
None
}
};
let lpa_stream_query = format!(
"CALL gds.graph.nodeProperty.stream('{}', '{}')
YIELD nodeId, propertyValue
RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
propertyValue AS communityId
ORDER BY communityId, relay_fingerprint",
projection_name, community_property
);
debug!("Label Propagation Stream Query: {}", lpa_stream_query);
let query = Query::new(lpa_stream_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!(
"Failed to execute Label Propagation stream query: {:?}",
e
);
AnalysisError::AlgorithmError(format!(
"Label Propagation results streaming failed for \
projection '{}': {}",
projection_name, e
))
})?;
let mut community_map: HashMap<i64, Vec<String>> = HashMap::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let relay_fingerprint: String =
row.get::<String>("relay_fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'relay_fingerprint' from Label \
Propagation result"
.to_string(),
)
})?;
let community_id: i64 =
row.get::<i64>("communityId").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'communityId' from Label Propagation \
result"
.to_string(),
)
})?;
community_map
.entry(community_id)
.or_default()
.push(relay_fingerprint);
}
let mut communities: Vec<ConnectedComponent> = community_map
.into_iter()
.map(|(community_id, relay_fingerprints)| {
let size = relay_fingerprints.len();
ConnectedComponent {
component_id: community_id,
relay_fingerprints,
size,
}
})
.collect();
communities.sort_by(|a, b| b.size.cmp(&a.size));
let total_communities = communities.len();
let largest_community_size =
communities.first().map(|c| c.size).unwrap_or(0);
let smallest_community_size =
communities.last().map(|c| c.size).unwrap_or(0);
let mut community_size_distribution = HashMap::new();
for community in &communities {
*community_size_distribution
.entry(community.size)
.or_insert(0) += 1;
}
let total_nodes: usize = communities.iter().map(|c| c.size).sum();
let isolation_ratio = if total_nodes > 0 {
(largest_community_size as f64 / total_nodes as f64) * 100.0
} else {
0.0
};
info!(
"Label Propagation analysis complete: {} communities, \
largest: {}, smallest: {}, isolation ratio: {:.2}%, \
modularity: {}",
total_communities,
largest_community_size,
smallest_community_size,
isolation_ratio,
modularity_score
.map(|m| format!("{:.4}", m))
.unwrap_or_else(|| "Not calculated".to_string())
);
Ok(ComponentAnalysisResult {
components: communities,
total_components: Some(total_communities),
largest_component_size: Some(largest_community_size),
smallest_component_size: Some(smallest_community_size),
component_size_distribution: Some(community_size_distribution),
isolation_ratio: Some(isolation_ratio),
modularity: modularity_score,
})
}
async fn calculate_modularity(
&self,
projection_name: &str,
community_property: &str,
) -> Result<f64, AnalysisError> {
info!(
"Calculating modularity for projection: '{}' with community \
property: '{}'",
projection_name, community_property
);
let modularity_query = format!(
"CALL gds.modularity.stats('{}', {{
communityProperty: '{}'
}})
YIELD modularity
RETURN modularity",
projection_name, community_property
);
debug!("Modularity Query: {}", modularity_query);
let query = Query::new(modularity_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute modularity query: {:?}", e);
AnalysisError::AlgorithmError(format!(
"Modularity calculation failed for projection '{}' with \
community property '{}': {}",
projection_name, community_property, e
))
})?;
if let Some(row) = stream.next().await.map_err(AnalysisError::from)? {
let modularity: f64 =
row.get::<f64>("modularity").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get 'modularity' field from modularity stats \
result"
.to_string(),
)
})?;
info!(
"Modularity calculation complete: {:.4} for community \
property: '{}'",
modularity, community_property
);
Ok(modularity)
} else {
Err(AnalysisError::AlgorithmError(
"No modularity result returned from gds.modularity.stats"
.to_string(),
))
}
}
async fn classify_components_by_geography(
&self,
components: &[ConnectedComponent],
) -> Result<
crate::models::partitions::PartitionClassificationResult,
AnalysisError,
> {
info!("Classifying {} components by geography", components.len());
if components.is_empty() {
return Ok(
crate::models::partitions::PartitionClassificationResult {
classification_type: crate::models::partitions::
ClassificationType::Geographic,
groups: Vec::new(),
metrics: crate::models::partitions::ClassificationMetrics {
total_groups: 0,
groups_with_partitions: 0,
classification_coverage: 0.0,
largest_group_size: 0,
average_group_size: 0.0,
diversity_score: 0.0,
partition_correlation: 0.0,
},
unclassified_relays: Vec::new(),
},
);
}
let mut all_fingerprints = Vec::new();
for component in components {
all_fingerprints
.extend(component.relay_fingerprints.iter().cloned());
}
let total_relays = all_fingerprints.len();
const BATCH_SIZE: usize = 1000;
let mut geography_map: HashMap<String, String> = HashMap::new();
let mut total_processed = 0;
for batch in all_fingerprints.chunks(BATCH_SIZE) {
let geography_query = "UNWIND $fingerprints AS fingerprint
MATCH (r:Relay {fingerprint: fingerprint})
RETURN r.fingerprint AS relay_fingerprint,
r.country AS country
ORDER BY r.country, r.fingerprint"
.to_string();
debug!(
"Geography Query batch {}: {} relays",
total_processed / BATCH_SIZE + 1,
batch.len()
);
let query = Query::new(geography_query)
.param("fingerprints", batch.to_vec());
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute geography query: {:?}", e);
AnalysisError::QueryFailed(format!(
"Geography classification query failed: {}",
e
))
})?;
let mut batch_count = 0;
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
batch_count += 1;
let fingerprint: String = row
.get::<String>("relay_fingerprint")
.ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get relay_fingerprint from result"
.to_string(),
)
})?;
let country: Option<String> = row.get::<String>("country");
if let Some(country_code) = country {
geography_map.insert(fingerprint, country_code);
}
}
total_processed += batch.len();
debug!(
"Processed batch: {} rows, total processed: {}",
batch_count, total_processed
);
}
debug!(
"Geography classification: processed {} relays total, \
found {} with country data",
total_processed,
geography_map.len()
);
let mut country_groups: HashMap<String, Vec<String>> = HashMap::new();
let mut component_mapping: HashMap<String, HashMap<i64, usize>> =
HashMap::new();
let mut unclassified_relays = Vec::new();
for component in components {
let mut country_relays: HashMap<String, Vec<String>> =
HashMap::new();
for fingerprint in &component.relay_fingerprints {
if let Some(country) = geography_map.get(fingerprint) {
country_relays
.entry(country.clone())
.or_default()
.push(fingerprint.clone());
} else {
unclassified_relays.push(fingerprint.clone());
}
}
for (country, relays) in country_relays {
country_groups
.entry(country.clone())
.or_default()
.extend(relays.clone());
*component_mapping
.entry(country.clone())
.or_default()
.entry(component.component_id)
.or_insert(0) += relays.len();
}
}
let mut groups = Vec::new();
let mut isolation_scores = Vec::new();
for (country, relays) in country_groups {
let isolation_score =
if let Some(component_map) = component_mapping.get(&country) {
let total_relays_in_group = relays.len();
let largest_component_size =
component_map.values().max().unwrap_or(&0);
if total_relays_in_group > 0 {
let non_largest_component_relays =
total_relays_in_group - largest_component_size;
(non_largest_component_relays as f64
/ total_relays_in_group as f64)
* 100.0
} else {
0.0
}
} else {
0.0
};
isolation_scores.push(isolation_score);
groups.push(crate::models::partitions::ClassificationGroup {
identifier: country.clone(),
relay_fingerprints: relays.clone(),
component_mapping: component_mapping
.get(&country)
.cloned()
.unwrap_or_default(),
isolation_score,
});
}
let metrics = self.calculate_classification_metrics(
&groups,
total_relays,
&isolation_scores,
);
info!(
"Geographic classification complete: {} countries, {:.1}% \
coverage, {} with partitions",
groups.len(),
metrics.classification_coverage,
metrics.groups_with_partitions
);
Ok(crate::models::partitions::PartitionClassificationResult {
classification_type:
crate::models::partitions::ClassificationType::Geographic,
groups,
metrics,
unclassified_relays,
})
}
async fn classify_components_by_asn(
&self,
components: &[ConnectedComponent],
) -> Result<
crate::models::partitions::PartitionClassificationResult,
AnalysisError,
> {
info!("Classifying {} components by ASN", components.len());
if components.is_empty() {
return Ok(
crate::models::partitions::PartitionClassificationResult {
classification_type:
crate::models::partitions::ClassificationType::ASN,
groups: Vec::new(),
metrics:
crate::models::partitions::ClassificationMetrics {
total_groups: 0,
groups_with_partitions: 0,
classification_coverage: 0.0,
largest_group_size: 0,
average_group_size: 0.0,
diversity_score: 0.0,
partition_correlation: 0.0,
},
unclassified_relays: Vec::new(),
},
);
}
let mut all_fingerprints = Vec::new();
for component in components {
all_fingerprints
.extend(component.relay_fingerprints.iter().cloned());
}
let total_relays = all_fingerprints.len();
const BATCH_SIZE: usize = 1000;
let mut asn_map: HashMap<String, String> = HashMap::new();
let mut total_processed = 0;
for batch in all_fingerprints.chunks(BATCH_SIZE) {
let asn_query = "UNWIND $fingerprints AS fingerprint
MATCH (r:Relay {fingerprint: fingerprint})
RETURN r.fingerprint AS relay_fingerprint, r.asn AS asn
ORDER BY r.asn, r.fingerprint"
.to_string();
debug!(
"ASN Query batch {}: {} relays",
total_processed / BATCH_SIZE + 1,
batch.len()
);
let query =
Query::new(asn_query).param("fingerprints", batch.to_vec());
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute ASN query: {:?}", e);
AnalysisError::QueryFailed(format!(
"ASN classification query failed: {}",
e
))
})?;
let mut batch_count = 0;
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
batch_count += 1;
let fingerprint: String = row
.get::<String>("relay_fingerprint")
.ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get relay_fingerprint from ASN result"
.to_string(),
)
})?;
let asn: Option<String> =
row.get::<i64>("asn").map(|v| v.to_string());
if let Some(asn_number) = asn {
if !asn_number.is_empty()
&& asn_number != "null"
&& asn_number != "0"
{
asn_map.insert(fingerprint, asn_number);
}
}
}
total_processed += batch.len();
debug!(
"Processed batch: {} rows, total processed: {}",
batch_count, total_processed
);
}
debug!(
"ASN classification: processed {} relays total, \
found {} with ASN data",
total_processed,
asn_map.len()
);
let mut asn_groups: HashMap<String, Vec<String>> = HashMap::new();
let mut component_mapping: HashMap<String, HashMap<i64, usize>> =
HashMap::new();
let mut unclassified_relays = Vec::new();
for component in components {
let mut asn_relays: HashMap<String, Vec<String>> = HashMap::new();
for fingerprint in &component.relay_fingerprints {
if let Some(asn) = asn_map.get(fingerprint) {
asn_relays
.entry(asn.clone())
.or_default()
.push(fingerprint.clone());
} else {
unclassified_relays.push(fingerprint.clone());
}
}
for (asn, relays) in asn_relays {
asn_groups
.entry(asn.clone())
.or_default()
.extend(relays.clone());
*component_mapping
.entry(asn.clone())
.or_default()
.entry(component.component_id)
.or_insert(0) += relays.len();
}
}
let mut groups = Vec::new();
let mut isolation_scores = Vec::new();
for (asn, relays) in asn_groups {
let isolation_score =
if let Some(component_map) = component_mapping.get(&asn) {
let total_relays_in_group = relays.len();
let largest_component_size =
component_map.values().max().unwrap_or(&0);
if total_relays_in_group > 0 {
let non_largest_component_relays =
total_relays_in_group - largest_component_size;
(non_largest_component_relays as f64
/ total_relays_in_group as f64)
* 100.0
} else {
0.0
}
} else {
0.0
};
isolation_scores.push(isolation_score);
groups.push(crate::models::partitions::ClassificationGroup {
identifier: asn.clone(),
relay_fingerprints: relays.clone(),
component_mapping: component_mapping
.get(&asn)
.cloned()
.unwrap_or_default(),
isolation_score,
});
}
let metrics = self.calculate_classification_metrics(
&groups,
total_relays,
&isolation_scores,
);
info!(
"ASN classification complete: {} ASNs, {:.1}% coverage, \
{} with partitions",
groups.len(),
metrics.classification_coverage,
metrics.groups_with_partitions
);
Ok(crate::models::partitions::PartitionClassificationResult {
classification_type:
crate::models::partitions::ClassificationType::ASN,
groups,
metrics,
unclassified_relays,
})
}
async fn classify_components_by_family(
&self,
components: &[ConnectedComponent],
) -> Result<
crate::models::partitions::PartitionClassificationResult,
AnalysisError,
> {
info!("Classifying {} components by family", components.len());
if components.is_empty() {
return Ok(
crate::models::partitions::PartitionClassificationResult {
classification_type:
crate::models::partitions::ClassificationType::Family,
groups: Vec::new(),
metrics:
crate::models::partitions::ClassificationMetrics {
total_groups: 0,
groups_with_partitions: 0,
classification_coverage: 0.0,
largest_group_size: 0,
average_group_size: 0.0,
diversity_score: 0.0,
partition_correlation: 0.0,
},
unclassified_relays: Vec::new(),
},
);
}
let mut all_fingerprints = Vec::new();
for component in components {
all_fingerprints
.extend(component.relay_fingerprints.iter().cloned());
}
let total_relays = all_fingerprints.len();
const BATCH_SIZE: usize = 1000;
let mut relay_families: HashMap<String, Vec<String>> = HashMap::new();
let mut total_processed = 0;
debug!(
"Starting family data collection for {} relays",
total_relays
);
for batch in all_fingerprints.chunks(BATCH_SIZE) {
let family_query = "UNWIND $fingerprints AS fingerprint
MATCH (r:Relay {fingerprint: fingerprint})
WHERE r.family IS NOT NULL AND size(r.family) > 0
RETURN r.fingerprint AS relay_fingerprint,
r.family AS family_array
ORDER BY r.fingerprint"
.to_string();
debug!(
"Family Query batch {}: {} relays",
total_processed / BATCH_SIZE + 1,
batch.len()
);
let query =
Query::new(family_query).param("fingerprints", batch.to_vec());
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!("Failed to execute family query: {:?}", e);
AnalysisError::QueryFailed(format!(
"Family classification query failed: {}",
e
))
})?;
let mut batch_count = 0;
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
batch_count += 1;
let fingerprint: String = row
.get::<String>("relay_fingerprint")
.ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get relay_fingerprint from result"
.to_string(),
)
})?;
let family_array: Option<Vec<String>> =
row.get::<Vec<String>>("family_array");
if let Some(family_members) = family_array {
if !family_members.is_empty() {
relay_families.insert(fingerprint, family_members);
}
}
}
total_processed += batch.len();
debug!(
"Processed batch: {} rows, total processed: {}",
batch_count, total_processed
);
}
info!(
"Family data collection complete: {} relays with family data \
out of {} total",
relay_families.len(),
total_relays
);
let family_groups =
self.build_family_connected_components(&relay_families);
debug!(
"Found {} family groups using connected components analysis",
family_groups.len()
);
let mut final_groups = Vec::new();
let mut isolation_scores = Vec::new();
let mut unclassified_relays = Vec::new();
let mut relay_to_family: HashMap<String, usize> = HashMap::new();
for (family_idx, family_relays) in family_groups.iter().enumerate() {
for relay in family_relays {
relay_to_family.insert(relay.clone(), family_idx);
}
}
for family_relays in family_groups.iter() {
let mut component_mapping: HashMap<i64, usize> = HashMap::new();
let mut group_relay_fingerprints = Vec::new();
for component in components {
for fingerprint in &component.relay_fingerprints {
if family_relays.contains(fingerprint) {
group_relay_fingerprints.push(fingerprint.clone());
*component_mapping
.entry(component.component_id)
.or_insert(0) += 1;
}
}
}
if !group_relay_fingerprints.is_empty() {
let isolation_score = if !component_mapping.is_empty() {
let total_relays_in_group = group_relay_fingerprints.len();
let largest_component_size =
component_mapping.values().max().unwrap_or(&0);
if total_relays_in_group > 0 {
let non_largest_component_relays =
total_relays_in_group - largest_component_size;
(non_largest_component_relays as f64
/ total_relays_in_group as f64)
* 100.0
} else {
0.0
}
} else {
0.0
};
isolation_scores.push(isolation_score);
let family_identifier = if family_relays.len() <= 3 {
let mut sorted_relays = family_relays.clone();
sorted_relays.sort();
format!(
"family_{}",
sorted_relays
.iter()
.take(2)
.map(|r| &r[0..8.min(r.len())])
.collect::<Vec<_>>()
.join("_")
)
} else {
format!(
"family_{}relays_{:x}",
family_relays.len(),
family_relays.iter().map(|s| s.as_bytes()).fold(
0u64,
|acc, bytes| {
acc.wrapping_mul(31).wrapping_add(
bytes.iter().fold(0u64, |a, b| {
a.wrapping_mul(31)
.wrapping_add(*b as u64)
}),
)
}
)
)
};
final_groups.push(
crate::models::partitions::ClassificationGroup {
identifier: family_identifier,
relay_fingerprints: group_relay_fingerprints,
component_mapping,
isolation_score,
},
);
}
}
for component in components {
for fingerprint in &component.relay_fingerprints {
if !relay_to_family.contains_key(fingerprint) {
unclassified_relays.push(fingerprint.clone());
}
}
}
let metrics = self.calculate_classification_metrics(
&final_groups,
total_relays,
&isolation_scores,
);
info!(
"Family classification complete: {} families, {:.1}% coverage, \
{} with partitions",
final_groups.len(),
metrics.classification_coverage,
metrics.groups_with_partitions
);
info!(
"Family analysis: {} unclassified relays, avg family size: \
{:.1}, largest family: {}",
unclassified_relays.len(),
metrics.average_group_size,
metrics.largest_group_size
);
Ok(crate::models::partitions::PartitionClassificationResult {
classification_type:
crate::models::partitions::ClassificationType::Family,
groups: final_groups,
metrics,
unclassified_relays,
})
}
async fn calculate_betweenness_centrality(
&self,
projection_name: &str,
sampling_size: Option<usize>,
sampling_seed: Option<u64>,
) -> Result<CentralityAnalysisResult, AnalysisError> {
info!(
"Calculating betweenness centrality for projection: '{}'",
projection_name
);
let config_params = if let Some(sample_size) = sampling_size {
let seed = sampling_seed.unwrap_or(42);
format!(
", {{samplingSize: {}, samplingSeed: {}}}",
sample_size, seed
)
} else {
", {}".to_string()
};
let betweenness_query = format!(
"CALL gds.betweenness.stream('{}'{})
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
score AS betweenness_centrality
ORDER BY score DESC",
projection_name, config_params
);
debug!("Betweenness Centrality Query: {}", betweenness_query);
let query = Query::new(betweenness_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!(
"Failed to execute betweenness centrality query: {:?}",
e
);
AnalysisError::AlgorithmError(format!(
"Betweenness centrality failed for projection '{}': {}",
projection_name, e
))
})?;
let mut centrality_metrics = Vec::new();
let mut scores = Vec::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let fingerprint: String =
row.get::<String>("relay_fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get relay_fingerprint".to_string(),
)
})?;
let score: f64 =
row.get::<f64>("betweenness_centrality").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get betweenness_centrality score"
.to_string(),
)
})?;
scores.push(score);
centrality_metrics.push(CentralityMetrics {
fingerprint,
betweenness_centrality: Some(score),
closeness_centrality: None,
});
}
let betweenness_distribution = if !scores.is_empty() {
let min = scores.iter().copied().fold(f64::INFINITY, f64::min);
let max = scores.iter().copied().fold(f64::NEG_INFINITY, f64::max);
let mean = scores.iter().sum::<f64>() / scores.len() as f64;
let mut sorted_scores = scores.clone();
sorted_scores.sort_by(|a, b| a.partial_cmp(b).unwrap());
let len = sorted_scores.len();
let get_percentile = |p: f64| -> f64 {
let index = ((len as f64 - 1.0) * p).round() as usize;
sorted_scores[index.min(len - 1)]
};
Some(CentralityDistribution {
min,
max,
mean,
p50: get_percentile(0.50),
p75: get_percentile(0.75),
p90: get_percentile(0.90),
p95: get_percentile(0.95),
p99: get_percentile(0.99),
p999: get_percentile(0.999),
})
} else {
None
};
let total_nodes = centrality_metrics.len();
info!(
"Betweenness centrality complete: {} nodes analyzed",
total_nodes
);
Ok(CentralityAnalysisResult {
centrality_metrics,
total_nodes_analyzed: Some(total_nodes),
betweenness_distribution,
closeness_distribution: None,
})
}
async fn calculate_closeness_centrality(
&self,
projection_name: &str,
use_wasserman_faust: Option<bool>,
) -> Result<CentralityAnalysisResult, AnalysisError> {
info!(
"Calculating closeness centrality for projection: '{}'",
projection_name
);
let config_params = if let Some(use_wf) = use_wasserman_faust {
format!(", {{useWassermanFaust: {}}}", use_wf)
} else {
", {}".to_string()
};
let closeness_query = format!(
"CALL gds.closeness.stream('{}'{})
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
score AS closeness_centrality
ORDER BY score DESC",
projection_name, config_params
);
debug!("Closeness Centrality Query: {}", closeness_query);
let query = Query::new(closeness_query);
let mut stream: RowStream =
self.graph.execute(query).await.map_err(|e| {
error!(
"Failed to execute closeness centrality query: {:?}",
e
);
AnalysisError::AlgorithmError(format!(
"Closeness centrality failed for projection '{}': {}",
projection_name, e
))
})?;
let mut centrality_metrics = Vec::new();
let mut scores = Vec::new();
while let Some(row) =
stream.next().await.map_err(AnalysisError::from)?
{
let fingerprint: String =
row.get::<String>("relay_fingerprint").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get relay_fingerprint".to_string(),
)
})?;
let score: f64 =
row.get::<f64>("closeness_centrality").ok_or_else(|| {
AnalysisError::QueryFailed(
"Failed to get closeness_centrality score".to_string(),
)
})?;
scores.push(score);
centrality_metrics.push(CentralityMetrics {
fingerprint,
betweenness_centrality: None,
closeness_centrality: Some(score),
});
}
let closeness_distribution = if !scores.is_empty() {
let min = scores.iter().copied().fold(f64::INFINITY, f64::min);
let max = scores.iter().copied().fold(f64::NEG_INFINITY, f64::max);
let mean = scores.iter().sum::<f64>() / scores.len() as f64;
let mut sorted_scores = scores.clone();
sorted_scores.sort_by(|a, b| a.partial_cmp(b).unwrap());
let len = sorted_scores.len();
let get_percentile = |p: f64| -> f64 {
let index = ((len as f64 - 1.0) * p).round() as usize;
sorted_scores[index.min(len - 1)]
};
Some(CentralityDistribution {
min,
max,
mean,
p50: get_percentile(0.50),
p75: get_percentile(0.75),
p90: get_percentile(0.90),
p95: get_percentile(0.95),
p99: get_percentile(0.99),
p999: get_percentile(0.999),
})
} else {
None
};
let total_nodes = centrality_metrics.len();
info!(
"Closeness centrality complete: {} nodes analyzed",
total_nodes
);
Ok(CentralityAnalysisResult {
centrality_metrics,
total_nodes_analyzed: Some(total_nodes),
betweenness_distribution: None,
closeness_distribution,
})
}
async fn calculate_combined_centrality(
&self,
projection_name: &str,
betweenness_sampling_size: Option<usize>,
betweenness_sampling_seed: Option<u64>,
use_wasserman_faust: Option<bool>,
) -> Result<CentralityAnalysisResult, AnalysisError> {
info!(
"Calculating combined centrality for projection: '{}'",
projection_name
);
let betweenness_result = self
.calculate_betweenness_centrality(
projection_name,
betweenness_sampling_size,
betweenness_sampling_seed,
)
.await?;
let closeness_result = self
.calculate_closeness_centrality(
projection_name,
use_wasserman_faust,
)
.await?;
let mut combined_metrics = HashMap::new();
for metric in betweenness_result.centrality_metrics {
combined_metrics.insert(metric.fingerprint.clone(), metric);
}
for metric in closeness_result.centrality_metrics {
if let Some(existing) =
combined_metrics.get_mut(&metric.fingerprint)
{
existing.closeness_centrality = metric.closeness_centrality;
} else {
combined_metrics.insert(metric.fingerprint.clone(), metric);
}
}
let final_metrics: Vec<CentralityMetrics> =
combined_metrics.into_values().collect();
let total_nodes = final_metrics.len();
Ok(CentralityAnalysisResult {
centrality_metrics: final_metrics,
total_nodes_analyzed: Some(total_nodes),
betweenness_distribution: betweenness_result
.betweenness_distribution,
closeness_distribution: closeness_result.closeness_distribution,
})
}
async fn analyze_paths_between_communities(
&self,
projection_name: &str,
source_nodes: &[String],
target_nodes: &[String],
) -> Result<PathAnalysisResult, AnalysisError> {
info!(
"Analyzing paths between {} sources and {} targets",
source_nodes.len(),
target_nodes.len()
);
let mut path_results = Vec::new();
let mut connected_pairs = 0;
let path_query = format!(
"UNWIND $sources as source_fingerprint
UNWIND $targets as target_fingerprint
WITH source_fingerprint, target_fingerprint
WHERE source_fingerprint <> target_fingerprint
MATCH (s:Relay {{fingerprint: source_fingerprint}})
MATCH (t:Relay {{fingerprint: target_fingerprint}})
WITH s, t, source_fingerprint, target_fingerprint
CALL gds.shortestPath.dijkstra.stream('{}', {{
sourceNode: id(s),
targetNode: id(t)
}})
YIELD totalCost, nodeIds
RETURN source_fingerprint, target_fingerprint,
totalCost, size(nodeIds) as pathLength,
[nodeId in nodeIds | gds.util.asNode(nodeId).fingerprint] as
pathNodes",
projection_name
);
let query = Query::new(path_query)
.param("sources", source_nodes)
.param("targets", target_nodes);
match self.graph.execute(query).await {
Ok(mut stream) => {
let mut found_pairs = std::collections::HashSet::new();
while let Ok(Some(row)) = stream.next().await {
let source_fingerprint: String = row
.get("source_fingerprint")
.unwrap_or_else(|| "unknown".to_string());
let target_fingerprint: String = row
.get("target_fingerprint")
.unwrap_or_else(|| "unknown".to_string());
let total_cost: f64 = row.get("totalCost").unwrap_or(0.0);
let path_length: i64 = row.get("pathLength").unwrap_or(0);
let path_nodes: Vec<String> =
row.get("pathNodes").unwrap_or_else(Vec::new);
found_pairs.insert((
source_fingerprint.clone(),
target_fingerprint.clone(),
));
if total_cost > 0.0 {
connected_pairs += 1;
}
path_results.push(PathResult {
source_fingerprint,
target_fingerprint,
path_exists: total_cost > 0.0,
path_length: Some(path_length as usize),
path_cost: Some(total_cost),
intermediate_nodes: Some(path_nodes),
});
}
for source in source_nodes {
for target in target_nodes {
if source != target
&& !found_pairs
.contains(&(source.clone(), target.clone()))
{
path_results.push(PathResult {
source_fingerprint: source.clone(),
target_fingerprint: target.clone(),
path_exists: false,
path_length: None,
path_cost: None,
intermediate_nodes: None,
});
}
}
}
let total_pairs = source_nodes.len() * target_nodes.len()
- source_nodes
.iter()
.filter(|s| target_nodes.contains(s))
.count();
let avg_path_length = if connected_pairs > 0 {
path_results
.iter()
.filter(|p| p.path_exists)
.map(|p| p.path_length.unwrap_or(0))
.sum::<usize>() as f64
/ connected_pairs as f64
} else {
0.0
};
let max_path_length = path_results
.iter()
.filter(|p| p.path_exists)
.map(|p| p.path_length.unwrap_or(0))
.max();
let min_path_length = path_results
.iter()
.filter(|p| p.path_exists)
.map(|p| p.path_length.unwrap_or(0))
.min();
info!(
"Path analysis complete: {}/{} paths exist, avg: {:.2}",
connected_pairs, total_pairs, avg_path_length
);
Ok(PathAnalysisResult {
path_results,
total_paths_analyzed: Some(total_pairs),
connected_community_pairs: Some(connected_pairs),
disconnected_community_pairs: Some(
total_pairs - connected_pairs,
),
average_path_length: Some(avg_path_length),
max_path_length,
min_path_length,
})
}
Err(e) => {
error!("Path analysis query failed: {:?}", e);
Err(AnalysisError::AlgorithmError(format!(
"Path analysis failed for projection '{}': {}. \
This likely indicates a graph projection issue.",
projection_name, e
)))
}
}
}
}
impl Neo4jAnalysisClient {
fn build_family_connected_components(
&self,
relay_families: &HashMap<String, Vec<String>>,
) -> Vec<Vec<String>> {
debug!(
"Building family connected components from {} relays \
with family data",
relay_families.len()
);
let mut parent: HashMap<String, String> = HashMap::new();
let mut rank: HashMap<String, usize> = HashMap::new();
for relay in relay_families.keys() {
parent.insert(relay.clone(), relay.clone());
rank.insert(relay.clone(), 0);
}
fn find_root(x: &str, parent: &mut HashMap<String, String>) -> String {
let parent_x = parent.get(x).unwrap().clone();
if parent_x != *x {
let root = find_root(&parent_x, parent);
parent.insert(x.to_string(), root.clone());
root
} else {
x.to_string()
}
}
fn union_sets(
x: &str,
y: &str,
parent: &mut HashMap<String, String>,
rank: &mut HashMap<String, usize>,
) {
let root_x = find_root(x, parent);
let root_y = find_root(y, parent);
if root_x != root_y {
let rank_x = rank.get(&root_x).unwrap_or(&0);
let rank_y = rank.get(&root_y).unwrap_or(&0);
match rank_x.cmp(rank_y) {
std::cmp::Ordering::Less => {
parent.insert(root_x, root_y);
}
std::cmp::Ordering::Greater => {
parent.insert(root_y, root_x);
}
std::cmp::Ordering::Equal => {
parent.insert(root_y, root_x.clone());
rank.insert(root_x, rank_x + 1);
}
}
}
}
let mut family_member_to_relays: HashMap<String, Vec<String>> =
HashMap::new();
for (relay, family_members) in relay_families {
for family_member in family_members {
family_member_to_relays
.entry(family_member.clone())
.or_default()
.push(relay.clone());
}
}
debug!(
"Built family member index with {} unique family members",
family_member_to_relays.len()
);
let mut connections_made = 0;
let mut large_member_groups = 0;
let mut total_family_groups = 0;
for relays_in_family in family_member_to_relays.values() {
if relays_in_family.len() > 1 {
total_family_groups += 1;
if relays_in_family.len() >= 50 {
large_member_groups += 1;
}
for i in 0..relays_in_family.len() {
for j in (i + 1)..relays_in_family.len() {
let relay1 = &relays_in_family[i];
let relay2 = &relays_in_family[j];
union_sets(relay1, relay2, &mut parent, &mut rank);
connections_made += 1;
}
}
}
}
info!(
"Family relationship analysis: {} total family groups, {} large member groups (≥50 relays), {} connections made",
total_family_groups, large_member_groups, connections_made
);
let mut components: HashMap<String, Vec<String>> = HashMap::new();
for relay in relay_families.keys() {
let root = find_root(relay, &mut parent);
components.entry(root).or_default().push(relay.clone());
}
let mut result: Vec<Vec<String>> = components.into_values().collect();
for component in &mut result {
component.sort();
}
result.sort_by_key(|b| std::cmp::Reverse(b.len()));
info!(
"Connected components analysis complete: {} family groups found, \
largest: {} relays",
result.len(),
result.first().map(|c| c.len()).unwrap_or(0)
);
let single_relay_families =
result.iter().filter(|c| c.len() == 1).count();
let medium_families = result.iter().filter(|c| c.len() >= 10).count();
let large_families = result.iter().filter(|c| c.len() >= 50).count();
info!(
"Family group statistics: {} single-relay, {} medium (≥10 relays), {} large (≥50 relays)",
single_relay_families, medium_families, large_families
);
result
}
fn calculate_classification_metrics(
&self,
groups: &[crate::models::partitions::ClassificationGroup],
total_relays: usize,
isolation_scores: &[f64],
) -> crate::models::partitions::ClassificationMetrics {
let total_classified = groups
.iter()
.map(|g| g.relay_fingerprints.len())
.sum::<usize>();
let coverage = if total_relays > 0 {
(total_classified as f64 / total_relays as f64) * 100.0
} else {
0.0
};
let groups_with_partitions = groups
.iter()
.filter(|g| g.component_mapping.len() > 1)
.count();
let largest_group_size = groups
.iter()
.map(|g| g.relay_fingerprints.len())
.max()
.unwrap_or(0);
let average_group_size = if !groups.is_empty() {
total_classified as f64 / groups.len() as f64
} else {
0.0
};
let partition_correlation = if !isolation_scores.is_empty() {
isolation_scores.iter().sum::<f64>()
/ isolation_scores.len() as f64
/ 100.0
} else {
0.0
};
let diversity_score = if !groups.is_empty() && total_classified > 0 {
let total_relays_f64 = total_classified as f64;
let entropy = groups
.iter()
.map(|g| g.relay_fingerprints.len() as f64 / total_relays_f64)
.filter(|&p| p > 0.0)
.map(|p| -p * p.ln())
.sum::<f64>();
entropy / (groups.len() as f64).ln()
} else {
0.0
};
crate::models::partitions::ClassificationMetrics {
total_groups: groups.len(),
groups_with_partitions,
classification_coverage: coverage,
largest_group_size,
average_group_size,
diversity_score,
partition_correlation,
}
}
}