Source code for onbrisca.models.bridge_scanner

# SPDX-FileCopyrightText: 2022 The Tor Project, Inc.
#
# SPDX-License-Identifier: BSD-3-Clause
""""""
import asyncio
import functools
import logging
from datetime import datetime

from asgiref.sync import async_to_sync, sync_to_async
from stem.control import EventType

from onbasca.onbasca.models.relaydesc import RelayDesc
from onbasca.onbasca.models.scanner import Scanner
from onbasca.onbasca.models.webserver import WebServer
from onbasca.onbasca.util import bytes_range_from_head
from onbrisca import config
from onbrisca.bridge_torcontrol import BridgeTorControl
from onbrisca.http_client import HttpClient
from onbrisca.models.bridge import Bridge
from onbrisca.models.bridge_heartbeat import BridgeHeartbeat
from onbrisca.models.bridge_measurement import BridgeMeasurement

logger = logging.getLogger(__name__)


[docs] class BridgeScanner(Scanner): """Singleton class that manages the measurers. And initializes all the needed objects. """ class Meta: proxy = True
[docs] def init(self, port=None, socket=None): # , *args, **kwargs): # Same init as Scanner heartbeat = BridgeHeartbeat() heartbeat.save() self.heartbeat = heartbeat logger.info("Heartbeat %s created.", self.heartbeat) WebServer.objects.update_from_dict(config.WEB_SERVERS) self.tor_control = BridgeTorControl() self.tor_control.launch_or_connect_tor(port=port, socket=socket) # Setting the event listener here and not in BridgeTorcontrol so that # the event handler knows about the queue. self.tor_control.controller.add_event_listener( functools.partial(self.handle_new_descriptor_event), EventType.NEWDESC, ) self.socks_address = self.tor_control.get_socks_address() self.tor_version = str(self.tor_control.controller.get_version()) self.save() logger.info("Tor version %s.", self.tor_version) self.session_kwargs = { "nickname": self.nickname, "uuid": self.uuid, "tor_version": self.tor_version, } # Bridge specific self.loop = asyncio.get_event_loop() self.bridges_queue = asyncio.Queue() # with asyncio we can reuse the same session to do not have to set # headers, etc. every time, even if the underlying TCP connection will # be different self.http_client = HttpClient( self.socks_address, **self.session_kwargs, ) logger.info("Scanner initialized.") # Only the 1st time self.consensus = self.tor_control.obtain_relays()
[docs] def scan_bridges(self): logger.debug("Starting new loop.") try: self.loop.run_until_complete(self._scan_bridges()) except Exception as e: logger.warning("%s", e) finally: logger.debug("loop running? %s", self.loop.is_running())
async def _scan_bridges(self): logger.debug("Selecting bridges to measure.") # Do not try to measure all bridges at once, just a subset to give time # to obtain their descriptors. bridges = await sync_to_async(Bridge.objects.ordered)() if not bridges: return await sync_to_async(self.tor_control.set_bridgelines)(bridges) # Instead of adding here the bridges to the queue, they're set in tor # and added to the queue when their descriptor is received. # Unless there isn't progress, then put them in the queue to measure # them again. if ( self.heartbeat.measured_percent == self.heartbeat.previous_measured_percent ): logger.info("Measuring bridges already measured.") for bridge in bridges: await self.bridges_queue.put(bridge) logger.debug("Starting workers.") workers = [ asyncio.Task(self.scan_bridge()) for _ in range(config.NUM_THREADS) ] await self.bridges_queue.join() logger.debug("Stopping workers.") for worker in workers: worker.cancel()
[docs] def handle_new_descriptor_event(self, event): logger.info("New desc event.") for fingerprint, _nickname in event.relays: sd = self.tor_control.controller.get_server_descriptor(fingerprint) # To know whether the descriptor is a bridge descriptor, it could # also be checked `getattr(sd, "bridge_distribution", None)`, # but this way it is also known that the bridge was add via the # web server. if Bridge.objects.filter(fingerprint=sd.fingerprint).exists(): logger.debug("Got descriptor for bridge %s", sd.fingerprint) bridge = Bridge.objects.get(fingerprint=sd.fingerprint) # Add it to the queue to measure. async_to_sync(self.bridges_queue.put)(bridge) _ = RelayDesc.objects.from_relay_desc(sd)
[docs] async def scan_bridge(self): while True: try: bridge = await self.bridges_queue.get() except Exception as e: logger.error("%s", e) continue logger.info("Scanning bridge %s", bridge) try: await self.ameasure_bridge(bridge) # Catch any exception except Exception as e: logger.error("%s", e) continue
[docs] async def ameasure_bridge(self, bridge: Bridge): bridge_measurement = await BridgeMeasurement.objects.acreate( bridge=bridge, ) webserver = await WebServer.objects.aselect_random() bridge_measurement.webserver = webserver await bridge_measurement.asave() bridge._last_measured = datetime.utcnow() await bridge.asave() path = await bridge.helper_path(self.consensus) circuit_id, response = await self.tor_control.afetch_http_head( path, self.http_client, webserver.url ) if ( not circuit_id or isinstance(response, Exception) or isinstance(response, str) or not response.status == 200 ): await bridge_measurement.finish_with_error(response) self.bridges_queue.task_done() return bridge_measurement._circuit_id = circuit_id await bridge_measurement.asave() bytes_range = bytes_range_from_head() _ = await bridge_measurement.ameasure_bandwidth( self.http_client, bytes_range ) self.bridges_queue.task_done()
[docs] def delete_old_objects(self, days=config.BRIDGE_OLDEST_DATA_DAYS): RelayDesc.objects.delete_old() # Old consensuses will get deleted when new ones arrive. BridgeMeasurement.objects.delete_old(days) Bridge.objects.delete_old(days) BridgeHeartbeat.objects.delete_old(days)
[docs] def scan(self): # Only the first time to remove invalid bridges that were added before # checking bridgelines. Bridge.objects.delete_invalid() while True: if ( self.tor_control.controller.get_conf("TestingTorNetwork") == "1" and self.heartbeat.loops_count >= 3 ): logger.debug("Testing network and 3 loops, exiting.") return self.delete_old_objects() try: self.scan_bridges() except Exception as e: logger.warning("%s", e) self.heartbeat.increment_loops() self.heartbeat.log_status() logger.info("Finished a loop.") # In case tor has kill itself (see #160), try launching it again. if not self.tor_control.controller.is_alive(): logger.warning("Something killed tor. Trying to re-connect") self.tor_control.controller = ( self.tor_control.launch_or_connect_tor() )
[docs] def run(self): logger.info("Starting bridge scanner.") try: self.scan() except KeyboardInterrupt: logger.info("Interrupted by the user.") self.loop.run_until_complete(asyncio.sleep(0)) self.loop.close() self.tor_control.controller.close() async_to_sync(self.http_client.session.close)() except Exception as e: logger.error(e) self.loop.run_until_complete(asyncio.sleep(0)) self.loop.close() self.tor_control.controller.close() async_to_sync(self.http_client.session.close)()