42#include "ext/tor_queue.h"
43#include <event2/event.h>
46#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
47#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
48#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
51typedef struct work_tailq_t work_tailq_t;
63 work_tailq_t
work[WORKQUEUE_N_PRIORITIES];
88 void *(*new_thread_state_fn)(
void*);
89 void (*free_thread_state_fn)(
void*);
90 void *new_thread_state_arg;
94#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
96#define WORKQUEUE_PRIORITY_BITS 2
113 void (*reply_fn)(
void *arg);
152 void (*reply_fn)(
void*),
157 ent->reply_fn = reply_fn;
159 ent->priority = WQ_PRI_HIGH;
163#define workqueue_entry_free(ent) \
164 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
175 memset(ent, 0xf0,
sizeof(*ent));
201 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
208 workqueue_entry_free(ent);
220 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
235 work_tailq_t *queue = NULL, *this_queue;
237 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
238 this_queue = &pool->
work[i];
239 if (!TOR_TAILQ_EMPTY(this_queue)) {
256 TOR_TAILQ_REMOVE(queue,
work, next_work);
287 if (r != WQ_RPL_REPLY) {
295 if (BUG(
work == NULL))
307 if (result != WQ_RPL_REPLY) {
331 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
332 TOR_TAILQ_INSERT_TAIL(&queue->answers,
work, next_work);
336 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
357 log_err(
LD_GENERAL,
"Can't launch worker thread.");
392 void (*reply_fn)(
void *),
395 tor_assert(((
int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
396 ((
int)prio) <= WORKQUEUE_PRIORITY_LAST);
401 ent->priority = prio;
405 TOR_TAILQ_INSERT_TAIL(&pool->
work[prio], ent, next_work);
418 void (*reply_fn)(
void *),
441 void *(*dup_fn)(
void *),
443 void (*free_fn)(
void *),
447 void (*old_args_free_fn)(
void *arg);
456 new_args = tor_calloc(
n_threads,
sizeof(
void*));
459 new_args[i] = dup_fn(arg);
475 if (old_args[i] && old_args_free_fn)
476 old_args_free_fn(old_args[i]);
485#define MAX_THREADS 1024
492#define CHANCE_PERMISSIVE 37
493#define CHANCE_STRICT INT32_MAX
523 pool->free_thread_state_fn(state);
547 void (*free_thread_state_fn)(
void*),
555 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
556 TOR_TAILQ_INIT(&pool->
work[i]);
560 pool->new_thread_state_arg = arg;
561 pool->free_thread_state_fn = free_thread_state_fn;
602 TOR_TAILQ_INIT(&rq->answers);
652 int r = queue->alert.drain_fn(queue->alert.read_fd);
655 static ratelim_t warn_limit = RATELIM_INIT(7200);
657 "Failure from drain_fd: %s",
658 tor_socket_strerror(-r));
663 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
666 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
668 work->on_pool = NULL;
670 work->reply_fn(work->arg);
671 workqueue_entry_free(work);
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
struct event_base * tor_libevent_get_base(void)
Header for compat_libevent.c.
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_release(tor_mutex_t *m)
void tor_mutex_init(tor_mutex_t *m)
void tor_mutex_acquire(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
void tor_cond_signal_all(tor_cond_t *cond)
int tor_cond_init(tor_cond_t *cond)
int spawn_func(void(*func)(void *), void *data)
void tor_cond_uninit(tor_cond_t *cond)
void tor_cond_signal_one(tor_cond_t *cond)
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
Common functions for using (pseudo-)random number generators.
#define crypto_fast_rng_one_in_n(rng, n)
crypto_fast_rng_t * get_thread_fast_rng(void)
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Summarize similar messages that would otherwise flood the logs.
void *(* new_thread_state_fn)(void *)
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
void(* free_update_arg_fn)(void *)
struct event * reply_event
workqueue_reply_t(* update_fn)(void *, void *)
struct workerthread_t ** threads
replyqueue_t * reply_queue
int32_t lower_priority_chance
struct threadpool_t * in_pool
replyqueue_t * reply_queue
Macros to manage assertions, fatal and non-fatal.
#define tor_assert_nonfatal_unreached()
#define workqueue_priority_bitfield_t
static int worker_thread_has_work(workerthread_t *thread)
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
static int threadpool_start_threads(threadpool_t *pool, int n)
static void workqueue_entry_free_(workqueue_entry_t *ent)
void replyqueue_process(replyqueue_t *queue)
void * workqueue_entry_cancel(workqueue_entry_t *ent)
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
static void worker_thread_main(void *thread_)
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
#define CHANCE_PERMISSIVE
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
unsigned int threadpool_get_n_threads(threadpool_t *tp)
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
#define WORKQUEUE_PRIORITY_BITS