Source code for onbrisca.models.bridge_scanner
# SPDX-FileCopyrightText: 2022 The Tor Project, Inc.
#
# SPDX-License-Identifier: BSD-3-Clause
""""""
import asyncio
import functools
import logging
from datetime import datetime
from asgiref.sync import async_to_sync, sync_to_async
from stem.control import EventType
from onbasca.onbasca.models.relaydesc import RelayDesc
from onbasca.onbasca.models.scanner import Scanner
from onbasca.onbasca.models.webserver import WebServer
from onbasca.onbasca.util import bytes_range_from_head
from onbrisca import config
from onbrisca.bridge_torcontrol import BridgeTorControl
from onbrisca.http_client import HttpClient
from onbrisca.models.bridge import Bridge
from onbrisca.models.bridge_heartbeat import BridgeHeartbeat
from onbrisca.models.bridge_measurement import BridgeMeasurement
logger = logging.getLogger(__name__)
[docs]
class BridgeScanner(Scanner):
"""Singleton class that manages the measurers.
And initializes all the needed objects.
"""
class Meta:
proxy = True
[docs]
def init(self, port=None, socket=None): # , *args, **kwargs):
# Same init as Scanner
heartbeat = BridgeHeartbeat()
heartbeat.save()
self.heartbeat = heartbeat
logger.info("Heartbeat %s created.", self.heartbeat)
WebServer.objects.update_from_dict(config.WEB_SERVERS)
self.tor_control = BridgeTorControl()
self.tor_control.launch_or_connect_tor(port=port, socket=socket)
# Setting the event listener here and not in BridgeTorcontrol so that
# the event handler knows about the queue.
self.tor_control.controller.add_event_listener(
functools.partial(self.handle_new_descriptor_event),
EventType.NEWDESC,
)
self.socks_address = self.tor_control.get_socks_address()
self.tor_version = str(self.tor_control.controller.get_version())
self.save()
logger.info("Tor version %s.", self.tor_version)
self.session_kwargs = {
"nickname": self.nickname,
"uuid": self.uuid,
"tor_version": self.tor_version,
}
# Bridge specific
self.loop = asyncio.get_event_loop()
self.bridges_queue = asyncio.Queue()
# with asyncio we can reuse the same session to do not have to set
# headers, etc. every time, even if the underlying TCP connection will
# be different
self.http_client = HttpClient(
self.socks_address,
**self.session_kwargs,
)
logger.info("Scanner initialized.")
# Only the 1st time
self.consensus = self.tor_control.obtain_relays()
[docs]
def scan_bridges(self):
logger.debug("Starting new loop.")
try:
self.loop.run_until_complete(self._scan_bridges())
except Exception as e:
logger.warning("%s", e)
finally:
logger.debug("loop running? %s", self.loop.is_running())
async def _scan_bridges(self):
logger.debug("Selecting bridges to measure.")
# Do not try to measure all bridges at once, just a subset to give time
# to obtain their descriptors.
bridges = await sync_to_async(Bridge.objects.ordered)()
if not bridges:
return
await sync_to_async(self.tor_control.set_bridgelines)(bridges)
# Instead of adding here the bridges to the queue, they're set in tor
# and added to the queue when their descriptor is received.
# Unless there isn't progress, then put them in the queue to measure
# them again.
if (
self.heartbeat.measured_percent
== self.heartbeat.previous_measured_percent
):
logger.info("Measuring bridges already measured.")
for bridge in bridges:
await self.bridges_queue.put(bridge)
logger.debug("Starting workers.")
workers = [
asyncio.Task(self.scan_bridge()) for _ in range(config.NUM_THREADS)
]
await self.bridges_queue.join()
logger.debug("Stopping workers.")
for worker in workers:
worker.cancel()
[docs]
def handle_new_descriptor_event(self, event):
logger.info("New desc event.")
for fingerprint, _nickname in event.relays:
sd = self.tor_control.controller.get_server_descriptor(fingerprint)
# To know whether the descriptor is a bridge descriptor, it could
# also be checked `getattr(sd, "bridge_distribution", None)`,
# but this way it is also known that the bridge was add via the
# web server.
if Bridge.objects.filter(fingerprint=sd.fingerprint).exists():
logger.debug("Got descriptor for bridge %s", sd.fingerprint)
bridge = Bridge.objects.get(fingerprint=sd.fingerprint)
# Add it to the queue to measure.
async_to_sync(self.bridges_queue.put)(bridge)
_ = RelayDesc.objects.from_relay_desc(sd)
[docs]
async def scan_bridge(self):
while True:
try:
bridge = await self.bridges_queue.get()
except Exception as e:
logger.error("%s", e)
continue
logger.info("Scanning bridge %s", bridge)
try:
await self.ameasure_bridge(bridge)
# Catch any exception
except Exception as e:
logger.error("%s", e)
continue
[docs]
async def ameasure_bridge(self, bridge: Bridge):
bridge_measurement = await BridgeMeasurement.objects.acreate(
bridge=bridge,
)
webserver = await WebServer.objects.aselect_random()
bridge_measurement.webserver = webserver
await bridge_measurement.asave()
bridge._last_measured = datetime.utcnow()
await bridge.asave()
path = await bridge.helper_path(self.consensus)
circuit_id, response = await self.tor_control.afetch_http_head(
path, self.http_client, webserver.url
)
if (
not circuit_id
or isinstance(response, Exception)
or isinstance(response, str)
or not response.status == 200
):
await bridge_measurement.finish_with_error(response)
self.bridges_queue.task_done()
return
bridge_measurement._circuit_id = circuit_id
await bridge_measurement.asave()
bytes_range = bytes_range_from_head()
_ = await bridge_measurement.ameasure_bandwidth(
self.http_client, bytes_range
)
self.bridges_queue.task_done()
[docs]
def delete_old_objects(self, days=config.BRIDGE_OLDEST_DATA_DAYS):
RelayDesc.objects.delete_old()
# Old consensuses will get deleted when new ones arrive.
BridgeMeasurement.objects.delete_old(days)
Bridge.objects.delete_old(days)
BridgeHeartbeat.objects.delete_old(days)
[docs]
def scan(self):
# Only the first time to remove invalid bridges that were added before
# checking bridgelines.
Bridge.objects.delete_invalid()
while True:
if (
self.tor_control.controller.get_conf("TestingTorNetwork")
== "1"
and self.heartbeat.loops_count >= 3
):
logger.debug("Testing network and 3 loops, exiting.")
return
self.delete_old_objects()
try:
self.scan_bridges()
except Exception as e:
logger.warning("%s", e)
self.heartbeat.increment_loops()
self.heartbeat.log_status()
logger.info("Finished a loop.")
# In case tor has kill itself (see #160), try launching it again.
if not self.tor_control.controller.is_alive():
logger.warning("Something killed tor. Trying to re-connect")
self.tor_control.controller = (
self.tor_control.launch_or_connect_tor()
)
[docs]
def run(self):
logger.info("Starting bridge scanner.")
try:
self.scan()
except KeyboardInterrupt:
logger.info("Interrupted by the user.")
self.loop.run_until_complete(asyncio.sleep(0))
self.loop.close()
self.tor_control.controller.close()
async_to_sync(self.http_client.session.close)()
except Exception as e:
logger.error(e)
self.loop.run_until_complete(asyncio.sleep(0))
self.loop.close()
self.tor_control.controller.close()
async_to_sync(self.http_client.session.close)()