erpc_scanner/
pool.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::{
    client::Client,
    relay::NetDirProvider,
    utils::CombinedUnboundedChannel,
    work::{CompletedWork, IncompleteWork},
};
use std::sync::Arc;
use tokio::sync::{
    mpsc::{unbounded_channel, UnboundedSender},
    Mutex,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tor_circmgr::CircMgr;
use tor_rtcompat::{PreferredRuntime, Runtime};

pub(crate) struct ClientPool<R: Runtime> {
    /// The list of Worker [Client]s among whom the [IncompleteWork] is going to be distributed
    /// properly
    worker_clients: Vec<Arc<Client<R>>>,

    /// Internal buffer of the entire pool, it's a unbounded channel that receives the
    /// [IncompleteWork] from the [Scanner] through it's Sending half and the [IncompleteWork] is
    /// then distributed among the [Client] through the receiving half
    incomplete_work_buffer: Arc<CombinedUnboundedChannel<IncompleteWork>>,

    /// Stream to receive the Completed Work from the [Client]s
    completed_work_receiver_stream:
        Mutex<UnboundedReceiverStream<CompletedWork>>,
}

impl ClientPool<PreferredRuntime> {
    pub(crate) async fn new(
        no_of_clients: u32,
        netdir_provider: Arc<NetDirProvider>,
        circmgr: Arc<CircMgr<PreferredRuntime>>,
    ) -> anyhow::Result<Self> {
        // The channels to push [IncompleteWork] into the [Client]s and get [CompletedWork] out of them
        let incomplete_work_buffer =
            Arc::new(CombinedUnboundedChannel::<IncompleteWork>::new());
        let (completed_work_sender, completed_work_receiver) =
            unbounded_channel::<CompletedWork>();

        let netdir = netdir_provider.current_netdir().await;

        // All the Worker [Client]
        let worker_clients = {
            let mut clients = Vec::new();
            for _ in 0..no_of_clients {
                clients.push(
                    Client::new(
                        circmgr.clone(),
                        netdir_provider.clone(),
                        netdir.clone(),
                    )
                    .await?,
                );
            }
            clients
        };

        // The receiving stream for [CompletedWork]
        let completed_work_receiver_stream =
            Mutex::new(UnboundedReceiverStream::new(completed_work_receiver));

        let pool = Self {
            worker_clients,
            incomplete_work_buffer,
            completed_work_receiver_stream,
        };

        pool.start_workers(completed_work_sender);

        Ok(pool)
    }

    /// Send the work into the internal channel of the pool, which then shall be then received by
    /// the receiver running in the [self.start_work] function, where work is going to be performed
    pub(crate) fn push_incomplete_work(
        &self,
        work: IncompleteWork,
    ) -> anyhow::Result<()> {
        self.incomplete_work_buffer.sender.send(work)?;
        Ok(())
    }

    pub(crate) async fn recv_completed_work(&self) -> Option<CompletedWork> {
        let mut completed_work_receiver_stream_lock =
            self.completed_work_receiver_stream.lock().await;
        //completed_work_receiver_stream_lock.
        //TODO: only take the required work from the stream
        //completed_work_receiver_stream_lock.try_next
        completed_work_receiver_stream_lock.next().await
    }

    /// Starts the work(i.e circuit creation and testing) and sends it to the
    /// inner UnboundedReceiverStream through sender half, UnboundedSender<CompletedWork> in the argument
    fn start_workers(
        &self,
        completed_work_sender: UnboundedSender<CompletedWork>,
    ) {
        let mut worker_clients = self.worker_clients.clone();
        let incomplete_work_buffer = self.incomplete_work_buffer.clone();

        // Continuosly listen for completed work in all the [Client]s and push them into
        // this pool's [CompletedWork] Sender half
        for worker_client in &worker_clients {
            let __completed_work_sender = completed_work_sender.clone();
            let __worker_client = worker_client.clone();
            tokio::spawn(async move {
                while let Some(completed_work) =
                    __worker_client.recv_completed_work().await
                {
                    __completed_work_sender.send(completed_work).unwrap();
                }
            });
        }

        tokio::spawn(async move {
            let mut worker_buffer_receiver_lock =
                incomplete_work_buffer.receiver.lock().await;
            while let Some(incomplete_work) =
                worker_buffer_receiver_lock.recv().await
            {
                // Sort the relays by increasing order of weight
                // lower weight means, less no of works were assigned and higher weight higher no
                // of works
                worker_clients.sort();

                // As the relays are sorted by lowest load to highest load, send until it is
                // successfully received by one of the lowest load worker
                for worker_client in &worker_clients {
                    if worker_client
                        .push_incomplete_work(&incomplete_work)
                        .is_ok()
                    {
                        break;
                    }
                }
            }
        });
    }
}