42#include "ext/tor_queue.h"
43#include <event2/event.h>
46#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
47#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
48#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
51typedef struct work_tailq_t work_tailq_t;
63 work_tailq_t
work[WORKQUEUE_N_PRIORITIES];
90 void *(*new_thread_state_fn)(
void*);
91 void (*free_thread_state_fn)(
void*);
92 void *new_thread_state_arg;
101#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
103#define WORKQUEUE_PRIORITY_BITS 2
120 void (*reply_fn)(
void *arg);
154#define workerthread_free(thread) \
155 FREE_AND_NULL(workerthread_t, workerthread_free_, (thread))
157#define replyqueue_free(queue) \
158 FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue))
165 void (*reply_fn)(
void*),
170 ent->reply_fn = reply_fn;
172 ent->priority = WQ_PRI_HIGH;
176#define workqueue_entry_free(ent) \
177 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
188 memset(ent, 0xf0,
sizeof(*ent));
214 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
221 workqueue_entry_free(ent);
233 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
248 work_tailq_t *queue = NULL, *this_queue;
250 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
251 this_queue = &pool->
work[i];
252 if (!TOR_TAILQ_EMPTY(this_queue)) {
269 TOR_TAILQ_REMOVE(queue,
work, next_work);
280 static int n_worker_threads_running = 0;
287 log_debug(
LD_GENERAL,
"Worker thread %u/%u has started [TID: %lu].",
300 log_debug(
LD_GENERAL,
"Worker thread has entered the work loop [TID: %lu].",
324 if (r != WQ_RPL_REPLY)
330 if (BUG(
work == NULL))
344 if (result != WQ_RPL_REPLY)
361 log_debug(
LD_GENERAL,
"Worker thread %u/%u has exited [TID: %lu].",
365 if (--n_worker_threads_running == 0)
379 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
380 TOR_TAILQ_INSERT_TAIL(&queue->answers,
work, next_work);
384 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
405 log_err(
LD_GENERAL,
"Can't launch worker thread.");
406 workerthread_free(thr);
449 void (*reply_fn)(
void *),
452 tor_assert(((
int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
453 ((
int)prio) <= WORKQUEUE_PRIORITY_LAST);
458 ent->priority = prio;
462 TOR_TAILQ_INSERT_TAIL(&pool->
work[prio], ent, next_work);
475 void (*reply_fn)(
void *),
498 void *(*dup_fn)(
void *),
500 void (*free_fn)(
void *),
504 void (*old_args_free_fn)(
void *arg);
513 new_args = tor_calloc(
n_threads,
sizeof(
void*));
516 new_args[i] = dup_fn(arg);
532 if (old_args[i] && old_args_free_fn)
533 old_args_free_fn(old_args[i]);
542#define MAX_THREADS 1024
549#define CHANCE_PERMISSIVE 37
550#define CHANCE_STRICT INT32_MAX
570 log_debug(
LD_GENERAL,
"Starting worker threads...");
585 pool->free_thread_state_fn(state);
594 struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
603 log_debug(
LD_GENERAL,
"Starting worker threads finished.");
606 log_warn(
LD_GENERAL,
"Failed to confirm worker threads' start up.");
609 log_warn(
LD_GENERAL,
"Failed to confirm worker threads' "
610 "start up after timeout.");
616 log_debug(
LD_GENERAL,
"Signaled the worker threads to enter the work loop.");
621 log_debug(
LD_GENERAL,
"Signaled the worker threads to exit...");
639 if (pool->
exit == 0) {
645 log_debug(
LD_GENERAL,
"Signaled worker threads to exit. "
646 "Waiting for them to exit...");
659 log_debug(
LD_GENERAL,
"All worker threads have exited.");
672 void *(*new_thread_state_fn)(
void*),
673 void (*free_thread_state_fn)(
void*),
684 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
685 TOR_TAILQ_INIT(&pool->
work[i]);
689 pool->new_thread_state_arg = arg;
690 pool->free_thread_state_fn = free_thread_state_fn;
696 threadpool_free(pool);
715 log_debug(
LD_GENERAL,
"Beginning to clean up...");
722 for (
int i = 0; i != pool->
n_threads; ++i)
723 workerthread_free(pool->
threads[i]);
730 log_warn(
LD_GENERAL,
"Freeing pool->update_args not possible. "
731 "pool->free_update_arg_fn is not set.");
738 log_warn(
LD_GENERAL,
"libevent error: deleting reply event failed.");
746 if (pool->new_thread_state_arg) {
747 if (!pool->free_thread_state_fn)
748 log_warn(
LD_GENERAL,
"Freeing pool->new_thread_state_arg not possible. "
749 "pool->free_thread_state_fn is not set.");
751 pool->free_thread_state_fn(pool->new_thread_state_arg);
784 TOR_TAILQ_INIT(&rq->answers);
800 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
801 work = TOR_TAILQ_FIRST(&queue->answers);
802 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
803 workqueue_entry_free(work);
854 int r = queue->alert.drain_fn(queue->alert.read_fd);
857 static ratelim_t warn_limit = RATELIM_INIT(7200);
859 "Failure from drain_fd: %s",
860 tor_socket_strerror(-r));
865 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
868 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
870 work->on_pool = NULL;
872 work->reply_fn(work->arg);
873 workqueue_entry_free(work);
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
struct event_base * tor_libevent_get_base(void)
Header for compat_libevent.c.
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_release(tor_mutex_t *m)
void tor_mutex_init(tor_mutex_t *m)
void tor_mutex_acquire(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
void tor_cond_signal_all(tor_cond_t *cond)
int tor_cond_init(tor_cond_t *cond)
int spawn_func(void(*func)(void *), void *data)
void tor_cond_uninit(tor_cond_t *cond)
void tor_cond_signal_one(tor_cond_t *cond)
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
unsigned long tor_get_thread_id(void)
Common functions for using (pseudo-)random number generators.
#define crypto_fast_rng_one_in_n(rng, n)
crypto_fast_rng_t * get_thread_fast_rng(void)
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Summarize similar messages that would otherwise flood the logs.
void *(* new_thread_state_fn)(void *)
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
void(* free_update_arg_fn)(void *)
struct event * reply_event
workqueue_reply_t(* update_fn)(void *, void *)
struct workerthread_t ** threads
replyqueue_t * reply_queue
int32_t lower_priority_chance
struct threadpool_t * in_pool
replyqueue_t * reply_queue
Macros to manage assertions, fatal and non-fatal.
#define tor_assert_nonfatal_unreached()
#define workqueue_priority_bitfield_t
void threadpool_free_(threadpool_t *pool)
static int worker_thread_has_work(workerthread_t *thread)
static void threadpool_stop_threads(threadpool_t *pool)
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
static int threadpool_start_threads(threadpool_t *pool, int n)
static void workqueue_entry_free_(workqueue_entry_t *ent)
void replyqueue_process(replyqueue_t *queue)
void * workqueue_entry_cancel(workqueue_entry_t *ent)
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
static void worker_thread_main(void *thread_)
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
static void replyqueue_free_(replyqueue_t *queue)
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
#define CHANCE_PERMISSIVE
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
unsigned int threadpool_get_n_threads(threadpool_t *tp)
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
#define WORKQUEUE_PRIORITY_BITS
static void workerthread_free_(workerthread_t *thread)