erpc_scanner/relay.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
//! Relay storing bodies
use futures::stream::StreamExt;
use log::debug;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{
broadcast::{channel, Receiver, Sender},
RwLock,
};
use tor_dirmgr::DirProvider;
use tor_netdir::{DirEvent, NetDir, Relay};
/// NetDirProvider acts as a wrapper around the DirProvider trait object to be used to provide the latest NetDir,
/// by latest I mean whenever the NetDir is updated internally through the DirProvider trait
/// object, the new NetDir is saved
///
/// The NetDir is then used to build a [RelaysPool]
pub struct NetDirProvider {
/// The trait object used to get the latest NetDir
dirmgr: Arc<dyn DirProvider>,
/// The current NetDir in use
netdir: Arc<RwLock<Arc<NetDir>>>,
/// The Sender Half ot the Broadcast channel to send the changed NetDir
/// One can use netdirevents_sender.subscribe() to get the receiving handle
netdirevents_sender: Sender<NetDirProviderEvent>,
/// The Receiver Half of the broadcast channel to receive the latest NetDir
// It's used to store the reciever half temporarily so that the channel doesn't drop
#[allow(unused, dead_code)]
netdirevents_receiver: Receiver<NetDirProviderEvent>,
}
impl NetDirProvider {
/// Create a new NetDirProvider from a DirProvider trait object.
pub async fn from_dirmgr(
dirmgr: Arc<dyn DirProvider>,
) -> anyhow::Result<Arc<Self>> {
let netdir = dirmgr.timely_netdir()?;
// Channel to redirect the events from DirMgr to NetDirProviderEvents(New NetDir)
let (netdirevents_sender, netdirevents_receiver) =
channel::<NetDirProviderEvent>(1);
let _netdirevents_sender = netdirevents_sender.clone();
let netdir_provider = Arc::new(Self {
dirmgr,
netdir: Arc::new(RwLock::new(netdir)),
netdirevents_sender,
netdirevents_receiver,
});
// A task spawned to auto update NetDir we hold in self.netdir by listening to the events
// produces by DirMgr and getting a new NetDir and sending that NetDir as an event through
// self.netdirevents_receiver
//
let _netdir_provider = netdir_provider.clone();
tokio::task::spawn(async move {
_netdir_provider.auto_update(_netdirevents_sender).await;
});
Ok(netdir_provider)
}
/// Auto update the NetDir by waiting on the events from DirMgr
async fn auto_update(
&self,
netdirevents_sender: Sender<NetDirProviderEvent>,
) {
while let Some(dir_event) = self.dirmgr.events().next().await {
debug!("Received a DirEvent {dir_event:?} from DirMgr");
if dir_event == DirEvent::NewConsensus {
let mut current_netdir = self.netdir.write().await;
if let Ok(new_netdir) = self.dirmgr.timely_netdir() {
*current_netdir = new_netdir;
netdirevents_sender
.send(NetDirProviderEvent::NetDirChanged(current_netdir.clone()))
.expect("The channel to send the new NetDir seems to be closed");
}
}
}
}
/// Get the latest NetDir
pub async fn current_netdir(&self) -> Arc<NetDir> {
let netdir = self.netdir.read().await;
netdir.clone()
}
/// Gives you the receiver handle of the channel to listen for the NetDirProviderEvent
pub fn get_netdirprodiver_event_receiver(
&self,
) -> Receiver<NetDirProviderEvent> {
self.netdirevents_sender.subscribe()
}
}
/// Event that represents the arrival of new NetDir
#[derive(Debug, Clone)]
pub enum NetDirProviderEvent {
/// NetDir has changed and it encapsulates the latest NetDir
NetDirChanged(Arc<NetDir>),
}
/// RelaysPool stores the hashmap of Fingerprint of the Relay and the Relay itself and provides the
/// abstraction to get the reference to the relay through the ```get_relay(fingerprint)``` method
///
/// The RelaysPool holds the hashmap of <RELAY_FINGERPRINT, tor_netdir::Relay>, so that the Tor
/// Network graph stores only data enough to index the Relay i.e the RELAY_FINGERPRINT and
/// whenever it needs some data related to a certain relay, it simply requests this pool a
/// reference to the tor_netdir::Relay through ```get_relay(RELAY_FINGERPRINT)``` method
/// where RELAY_FINGERPRINT is a string that represents the stringified RSA ID of the relay
pub struct RelaysPool<'a, T> {
/// Maps <Fingerprint, Relay>
pub relays_hashmap: HashMap<String, (Relay<'a>, T)>,
}
impl<'a, T: Clone> RelaysPool<'a, T> {
/// Create empty ```RelaysPool```
///
/// It basically makes the inner HashMap empty
pub fn empty() -> Self {
let relays_hashmap = HashMap::<String, (Relay<'a>, T)>::new();
Self { relays_hashmap }
}
/// Takes the NetDir and creaates a [RelaysPool]
///
/// Why not get Arc<NetDir> and build the RelaysPool directly from there?
/// Well, it seems that when we take Arc<NetDir> as input parameter and get relays through
/// Relay<'a>, the lifetime of the Relay is tied to this parameter itself and
/// then we cannot send the [RelaysPool] we created out of this method
pub fn from_relays(relays: &[(Relay<'a>, T)]) -> Self {
let mut relays_hashmap = HashMap::<String, (Relay<'a>, T)>::new();
for relay in relays.iter() {
let key = relay.0.rsa_id().to_string();
relays_hashmap.insert(key, (relay.0.clone(), relay.1.clone()));
}
Self { relays_hashmap }
}
/// Gets you the reference to the Relay as you provide the fingerprint of the relay
pub fn get_relay<S: AsRef<str>>(
&self,
fingerprint: S,
) -> Option<&(Relay<'a>, T)> {
self.relays_hashmap.get(fingerprint.as_ref())
}
/// Add a ```tor_netdir::Relay<'_>``` into the RelaysPool with a certain fingerprint as the key
pub fn add_relay<S: ToString>(
&mut self,
fingerprint: S,
relay: Relay<'a>,
v: T,
) {
self.relays_hashmap
.insert(fingerprint.to_string(), (relay, v));
}
/// Calculate total no of Relays in the ```RelaysPool```
pub fn total_relays(&self) -> usize {
self.relays_hashmap.len()
}
}