Tor 0.4.9.0-alpha-dev
workqueue.c
Go to the documentation of this file.
1
2/* copyright (c) 2013-2015, The Tor Project, Inc. */
3/* See LICENSE for licensing information */
4
5/**
6 * \file workqueue.c
7 *
8 * \brief Implements worker threads, queues of work for them, and mechanisms
9 * for them to send answers back to the main thread.
10 *
11 * The main structure here is a threadpool_t : it manages a set of worker
12 * threads, a queue of pending work, and a reply queue. Every piece of work
13 * is a workqueue_entry_t, containing data to process and a function to
14 * process it with.
15 *
16 * The main thread informs the worker threads of pending work by using a
17 * condition variable. The workers inform the main process of completed work
18 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
19 *
20 * The main thread can also queue an "update" that will be handled by all the
21 * workers. This is useful for updating state that all the workers share.
22 *
23 * In Tor today, there is currently only one thread pool, managed
24 * in cpuworker.c and handling a variety of types of work, from the original
25 * "onion skin" circuit handshakes, to consensus diff computation, to
26 * client-side onion service PoW generation.
27 */
28
29#include "orconfig.h"
32
34#include "lib/intmath/weakrng.h"
35#include "lib/log/ratelim.h"
36#include "lib/log/log.h"
37#include "lib/log/util_bug.h"
38#include "lib/net/alertsock.h"
39#include "lib/net/socket.h"
40#include "lib/thread/threads.h"
41
42#include "ext/tor_queue.h"
43#include <event2/event.h>
44#include <string.h>
45
46#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
47#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
48#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
49
50TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
51typedef struct work_tailq_t work_tailq_t;
52
54 /** An array of pointers to workerthread_t: one for each running worker
55 * thread. */
57
58 /** Condition variable that we wait on when we have no work, and which
59 * gets signaled when our queue becomes nonempty. */
61 /** Queues of pending work that we have to do. The queue with priority
62 * <b>p</b> is work[p]. */
63 work_tailq_t work[WORKQUEUE_N_PRIORITIES];
64
65 /** The current 'update generation' of the threadpool. Any thread that is
66 * at an earlier generation needs to run the update function. */
67 unsigned generation;
68
69 /** Function that should be run for updates on each thread. */
70 workqueue_reply_t (*update_fn)(void *, void *);
71 /** Function to free update arguments if they can't be run. */
72 void (*free_update_arg_fn)(void *);
73 /** Array of n_threads update arguments. */
75 /** Event to notice when another thread has sent a reply. */
76 struct event *reply_event;
77 void (*reply_cb)(threadpool_t *);
78
79 /** Number of elements in threads. */
81 /** Mutex to protect all the above fields. */
83
84 /** A reply queue to use when constructing new threads. */
86
87 /** Functions used to allocate and free thread state. */
88 void *(*new_thread_state_fn)(void*);
89 void (*free_thread_state_fn)(void*);
90 void *new_thread_state_arg;
91};
92
93/** Used to put a workqueue_priority_t value into a bitfield. */
94#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
95/** Number of bits needed to hold all legal values of workqueue_priority_t */
96#define WORKQUEUE_PRIORITY_BITS 2
97
99 /** The next workqueue_entry_t that's pending on the same thread or
100 * reply queue. */
101 TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
102 /** The threadpool to which this workqueue_entry_t was assigned. This field
103 * is set when the workqueue_entry_t is created, and won't be cleared until
104 * after it's handled in the main thread. */
105 struct threadpool_t *on_pool;
106 /** True iff this entry is waiting for a worker to start processing it. */
107 uint8_t pending;
108 /** Priority of this entry. */
110 /** Function to run in the worker thread. */
111 workqueue_reply_t (*fn)(void *state, void *arg);
112 /** Function to run while processing the reply queue. */
113 void (*reply_fn)(void *arg);
114 /** Argument for the above functions. */
115 void *arg;
116};
117
119 /** Mutex to protect the answers field */
121 /** Doubly-linked list of answers that the reply queue needs to handle. */
122 TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
123
124 /** Mechanism to wake up the main thread when it is receiving answers. */
125 alert_sockets_t alert;
126};
127
128/** A worker thread represents a single thread in a thread pool. */
129typedef struct workerthread_t {
130 /** Which thread it this? In range 0..in_pool->n_threads-1 */
131 int index;
132 /** The pool this thread is a part of. */
134 /** User-supplied state field that we pass to the worker functions of each
135 * work item. */
136 void *state;
137 /** Reply queue to which we pass our results. */
139 /** The current update generation of this thread */
140 unsigned generation;
141 /** One over the probability of taking work from a lower-priority queue. */
144
145static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
146
147/** Allocate and return a new workqueue_entry_t, set up to run the function
148 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
149 * thread. See threadpool_queue_work() for full documentation. */
150static workqueue_entry_t *
152 void (*reply_fn)(void*),
153 void *arg)
154{
155 workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
156 ent->fn = fn;
157 ent->reply_fn = reply_fn;
158 ent->arg = arg;
159 ent->priority = WQ_PRI_HIGH;
160 return ent;
161}
162
163#define workqueue_entry_free(ent) \
164 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
165
166/**
167 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
168 * any queue.
169 */
170static void
172{
173 if (!ent)
174 return;
175 memset(ent, 0xf0, sizeof(*ent));
176 tor_free(ent);
177}
178
179/**
180 * Cancel a workqueue_entry_t that has been returned from
181 * threadpool_queue_work.
182 *
183 * You must not call this function on any work whose reply function has been
184 * executed in the main thread; that will cause undefined behavior (probably,
185 * a crash).
186 *
187 * If the work is cancelled, this function return the argument passed to the
188 * work function. It is the caller's responsibility to free this storage.
189 *
190 * This function will have no effect if the worker thread has already executed
191 * or begun to execute the work item. In that case, it will return NULL.
192 */
193void *
195{
196 int cancelled = 0;
197 void *result = NULL;
198 tor_mutex_acquire(&ent->on_pool->lock);
199 workqueue_priority_t prio = ent->priority;
200 if (ent->pending) {
201 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
202 cancelled = 1;
203 result = ent->arg;
204 }
205 tor_mutex_release(&ent->on_pool->lock);
206
207 if (cancelled) {
208 workqueue_entry_free(ent);
209 }
210 return result;
211}
212
213/**DOCDOC
214
215 must hold lock */
216static int
218{
219 unsigned i;
220 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
221 if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
222 return 1;
223 }
224 return thread->generation != thread->in_pool->generation;
225}
226
227/** Extract the next workqueue_entry_t from the the thread's pool, removing
228 * it from the relevant queues and marking it as non-pending.
229 *
230 * The caller must hold the lock. */
231static workqueue_entry_t *
233{
234 threadpool_t *pool = thread->in_pool;
235 work_tailq_t *queue = NULL, *this_queue;
236 unsigned i;
237 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
238 this_queue = &pool->work[i];
239 if (!TOR_TAILQ_EMPTY(this_queue)) {
240 queue = this_queue;
242 thread->lower_priority_chance)) {
243 /* Usually we'll just break now, so that we can get out of the loop
244 * and use the queue where we found work. But with a small
245 * probability, we'll keep looking for lower priority work, so that
246 * we don't ignore our low-priority queues entirely. */
247 break;
248 }
249 }
250 }
251
252 if (queue == NULL)
253 return NULL;
254
255 workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
256 TOR_TAILQ_REMOVE(queue, work, next_work);
257 work->pending = 0;
258 return work;
259}
260
261/**
262 * Main function for the worker thread.
263 */
264static void
265worker_thread_main(void *thread_)
266{
267 workerthread_t *thread = thread_;
268 threadpool_t *pool = thread->in_pool;
270 workqueue_reply_t result;
271
272 tor_mutex_acquire(&pool->lock);
273 while (1) {
274 /* lock must be held at this point. */
275 while (worker_thread_has_work(thread)) {
276 /* lock must be held at this point. */
277 if (thread->in_pool->generation != thread->generation) {
278 void *arg = thread->in_pool->update_args[thread->index];
279 thread->in_pool->update_args[thread->index] = NULL;
280 workqueue_reply_t (*update_fn)(void*,void*) =
281 thread->in_pool->update_fn;
282 thread->generation = thread->in_pool->generation;
283 tor_mutex_release(&pool->lock);
284
285 workqueue_reply_t r = update_fn(thread->state, arg);
286
287 if (r != WQ_RPL_REPLY) {
288 return;
289 }
290
291 tor_mutex_acquire(&pool->lock);
292 continue;
293 }
295 if (BUG(work == NULL))
296 break;
297 tor_mutex_release(&pool->lock);
298
299 /* We run the work function without holding the thread lock. This
300 * is the main thread's first opportunity to give us more work. */
301 result = work->fn(thread->state, work->arg);
302
303 /* Queue the reply for the main thread. */
304 queue_reply(thread->reply_queue, work);
305
306 /* We may need to exit the thread. */
307 if (result != WQ_RPL_REPLY) {
308 return;
309 }
310 tor_mutex_acquire(&pool->lock);
311 }
312 /* At this point the lock is held, and there is no work in this thread's
313 * queue. */
314
315 /* TODO: support an idle-function */
316
317 /* Okay. Now, wait till somebody has work for us. */
318 if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
319 log_warn(LD_GENERAL, "Fail tor_cond_wait.");
320 }
321 }
322}
323
324/** Put a reply on the reply queue. The reply must not currently be on
325 * any thread's work queue. */
326static void
328{
329 int was_empty;
330 tor_mutex_acquire(&queue->lock);
331 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
332 TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
333 tor_mutex_release(&queue->lock);
334
335 if (was_empty) {
336 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
337 /* XXXX complain! */
338 }
339 }
340}
341
342/** Allocate and start a new worker thread to use state object <b>state</b>,
343 * and send responses to <b>replyqueue</b>. */
344static workerthread_t *
345workerthread_new(int32_t lower_priority_chance,
346 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
347{
348 workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
349 thr->state = state;
350 thr->reply_queue = replyqueue;
351 thr->in_pool = pool;
352 thr->lower_priority_chance = lower_priority_chance;
353
354 if (spawn_func(worker_thread_main, thr) < 0) {
355 //LCOV_EXCL_START
357 log_err(LD_GENERAL, "Can't launch worker thread.");
358 tor_free(thr);
359 return NULL;
360 //LCOV_EXCL_STOP
361 }
362
363 return thr;
364}
365
366/**
367 * Queue an item of work for a thread in a thread pool. The function
368 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
369 * thread's state object, and the provided object <b>arg</b>. It must return
370 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
371 *
372 * Regardless of its return value, the function <b>reply_fn</b> will later be
373 * run in the main thread when it invokes replyqueue_process(), and will
374 * receive as its argument the same <b>arg</b> object. It's the reply
375 * function's responsibility to free the work object.
376 *
377 * On success, return a workqueue_entry_t object that can be passed to
378 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
379 * currently possible, but callers should check anyway.)
380 *
381 * Items are executed in a loose priority order -- each thread will usually
382 * take from the queued work with the highest prioirity, but will occasionally
383 * visit lower-priority queues to keep them from starving completely.
384 *
385 * Note that because of priorities and thread behavior, work items may not
386 * be executed strictly in order.
387 */
391 workqueue_reply_t (*fn)(void *, void *),
392 void (*reply_fn)(void *),
393 void *arg)
394{
395 tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
396 ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
397
398 workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
399 ent->on_pool = pool;
400 ent->pending = 1;
401 ent->priority = prio;
402
403 tor_mutex_acquire(&pool->lock);
404
405 TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
406
408
409 tor_mutex_release(&pool->lock);
410
411 return ent;
412}
413
414/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
417 workqueue_reply_t (*fn)(void *, void *),
418 void (*reply_fn)(void *),
419 void *arg)
420{
421 return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
422}
423
424/**
425 * Queue a copy of a work item for every thread in a pool. This can be used,
426 * for example, to tell the threads to update some parameter in their states.
427 *
428 * Arguments are as for <b>threadpool_queue_work</b>, except that the
429 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
430 * make a copy of it.
431 *
432 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
433 * will be run. If a new update is scheduled before the old update finishes
434 * running, then the new will replace the old in any threads that haven't run
435 * it yet.
436 *
437 * Return 0 on success, -1 on failure.
438 */
439int
441 void *(*dup_fn)(void *),
442 workqueue_reply_t (*fn)(void *, void *),
443 void (*free_fn)(void *),
444 void *arg)
445{
446 int i, n_threads;
447 void (*old_args_free_fn)(void *arg);
448 void **old_args;
449 void **new_args;
450
451 tor_mutex_acquire(&pool->lock);
452 n_threads = pool->n_threads;
453 old_args = pool->update_args;
454 old_args_free_fn = pool->free_update_arg_fn;
455
456 new_args = tor_calloc(n_threads, sizeof(void*));
457 for (i = 0; i < n_threads; ++i) {
458 if (dup_fn)
459 new_args[i] = dup_fn(arg);
460 else
461 new_args[i] = arg;
462 }
463
464 pool->update_args = new_args;
465 pool->free_update_arg_fn = free_fn;
466 pool->update_fn = fn;
467 ++pool->generation;
468
470
471 tor_mutex_release(&pool->lock);
472
473 if (old_args) {
474 for (i = 0; i < n_threads; ++i) {
475 if (old_args[i] && old_args_free_fn)
476 old_args_free_fn(old_args[i]);
477 }
478 tor_free(old_args);
479 }
480
481 return 0;
482}
483
484/** Don't have more than this many threads per pool. */
485#define MAX_THREADS 1024
486
487/** For half of our threads, choose lower priority queues with probability
488 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
489 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
490 * stalling forever. If it's too high, we have a risk of low-priority tasks
491 * grabbing half of the threads. */
492#define CHANCE_PERMISSIVE 37
493#define CHANCE_STRICT INT32_MAX
494
495/** Launch threads until we have <b>n</b>. */
496static int
498{
499 if (BUG(n < 0))
500 return -1; // LCOV_EXCL_LINE
501 if (n > MAX_THREADS)
502 n = MAX_THREADS;
503
504 tor_mutex_acquire(&pool->lock);
505
506 if (pool->n_threads < n)
507 pool->threads = tor_reallocarray(pool->threads,
508 sizeof(workerthread_t*), n);
509
510 while (pool->n_threads < n) {
511 /* For half of our threads, we'll choose lower priorities permissively;
512 * for the other half, we'll stick more strictly to higher priorities.
513 * This keeps slow low-priority tasks from taking over completely. */
514 int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
515
516 void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
517 workerthread_t *thr = workerthread_new(chance,
518 state, pool, pool->reply_queue);
519
520 if (!thr) {
521 //LCOV_EXCL_START
523 pool->free_thread_state_fn(state);
524 tor_mutex_release(&pool->lock);
525 return -1;
526 //LCOV_EXCL_STOP
527 }
528 thr->index = pool->n_threads;
529 pool->threads[pool->n_threads++] = thr;
530 }
531 tor_mutex_release(&pool->lock);
532
533 return 0;
534}
535
536/**
537 * Construct a new thread pool with <b>n</b> worker threads, configured to
538 * send their output to <b>replyqueue</b>. The threads' states will be
539 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
540 * as its argument. When the threads close, they will call
541 * <b>free_thread_state_fn</b> on their states.
542 */
545 replyqueue_t *replyqueue,
546 void *(*new_thread_state_fn)(void*),
547 void (*free_thread_state_fn)(void*),
548 void *arg)
549{
550 threadpool_t *pool;
551 pool = tor_malloc_zero(sizeof(threadpool_t));
553 tor_cond_init(&pool->condition);
554 unsigned i;
555 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
556 TOR_TAILQ_INIT(&pool->work[i]);
557 }
558
560 pool->new_thread_state_arg = arg;
561 pool->free_thread_state_fn = free_thread_state_fn;
562 pool->reply_queue = replyqueue;
563
564 if (threadpool_start_threads(pool, n_threads) < 0) {
565 //LCOV_EXCL_START
568 tor_mutex_uninit(&pool->lock);
569 tor_free(pool);
570 return NULL;
571 //LCOV_EXCL_STOP
572 }
573
574 return pool;
575}
576
577/** Return the reply queue associated with a given thread pool. */
580{
581 return tp->reply_queue;
582}
583
584/** Allocate a new reply queue. Reply queues are used to pass results from
585 * worker threads to the main thread. Since the main thread is running an
586 * IO-centric event loop, it needs to get woken up with means other than a
587 * condition variable. */
589replyqueue_new(uint32_t alertsocks_flags)
590{
591 replyqueue_t *rq;
592
593 rq = tor_malloc_zero(sizeof(replyqueue_t));
594 if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
595 //LCOV_EXCL_START
596 tor_free(rq);
597 return NULL;
598 //LCOV_EXCL_STOP
599 }
600
601 tor_mutex_init(&rq->lock);
602 TOR_TAILQ_INIT(&rq->answers);
603
604 return rq;
605}
606
607/** Internal: Run from the libevent mainloop when there is work to handle in
608 * the reply queue handler. */
609static void
610reply_event_cb(evutil_socket_t sock, short events, void *arg)
611{
612 threadpool_t *tp = arg;
613 (void) sock;
614 (void) events;
616 if (tp->reply_cb)
617 tp->reply_cb(tp);
618}
619
620/** Register the threadpool <b>tp</b>'s reply queue with Tor's global
621 * libevent mainloop. If <b>cb</b> is provided, it is run after
622 * each time there is work to process from the reply queue. Return 0 on
623 * success, -1 on failure.
624 */
625int
627 void (*cb)(threadpool_t *tp))
628{
629 struct event_base *base = tor_libevent_get_base();
630
631 if (tp->reply_event) {
632 tor_event_free(tp->reply_event);
633 }
634 tp->reply_event = tor_event_new(base,
635 tp->reply_queue->alert.read_fd,
636 EV_READ|EV_PERSIST,
638 tp);
640 tp->reply_cb = cb;
641 return event_add(tp->reply_event, NULL);
642}
643
644/**
645 * Process all pending replies on a reply queue. The main thread should call
646 * this function every time the socket returned by replyqueue_get_socket() is
647 * readable.
648 */
649void
651{
652 int r = queue->alert.drain_fn(queue->alert.read_fd);
653 if (r < 0) {
654 //LCOV_EXCL_START
655 static ratelim_t warn_limit = RATELIM_INIT(7200);
656 log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
657 "Failure from drain_fd: %s",
658 tor_socket_strerror(-r));
659 //LCOV_EXCL_STOP
660 }
661
662 tor_mutex_acquire(&queue->lock);
663 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
664 /* lock must be held at this point.*/
665 workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
666 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
667 tor_mutex_release(&queue->lock);
668 work->on_pool = NULL;
669
670 work->reply_fn(work->arg);
671 workqueue_entry_free(work);
672
673 tor_mutex_acquire(&queue->lock);
674 }
675
676 tor_mutex_release(&queue->lock);
677}
678
679/** Return the number of threads configured for the given pool. */
680unsigned int
682{
683 tor_assert(tp);
684 return tp->n_threads;
685}
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
Definition: alertsock.c:191
Header for alertsock.c.
struct event_base * tor_libevent_get_base(void)
Header for compat_libevent.c.
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_release(tor_mutex_t *m)
void tor_mutex_init(tor_mutex_t *m)
void tor_mutex_acquire(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
void tor_cond_signal_all(tor_cond_t *cond)
int tor_cond_init(tor_cond_t *cond)
int spawn_func(void(*func)(void *), void *data)
void tor_cond_uninit(tor_cond_t *cond)
void tor_cond_signal_one(tor_cond_t *cond)
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
Common functions for using (pseudo-)random number generators.
#define crypto_fast_rng_one_in_n(rng, n)
Definition: crypto_rand.h:80
crypto_fast_rng_t * get_thread_fast_rng(void)
Headers for log.c.
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Definition: log.h:288
#define LD_GENERAL
Definition: log.h:62
#define LOG_WARN
Definition: log.h:53
#define tor_free(p)
Definition: malloc.h:56
Summarize similar messages that would otherwise flood the logs.
Header for socket.c.
tor_mutex_t lock
Definition: workqueue.c:120
void *(* new_thread_state_fn)(void *)
Definition: workqueue.c:88
int n_threads
Definition: workqueue.c:80
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
Definition: workqueue.c:63
unsigned generation
Definition: workqueue.c:67
void(* free_update_arg_fn)(void *)
Definition: workqueue.c:72
tor_mutex_t lock
Definition: workqueue.c:82
struct event * reply_event
Definition: workqueue.c:76
tor_cond_t condition
Definition: workqueue.c:60
workqueue_reply_t(* update_fn)(void *, void *)
Definition: workqueue.c:70
struct workerthread_t ** threads
Definition: workqueue.c:56
replyqueue_t * reply_queue
Definition: workqueue.c:85
void ** update_args
Definition: workqueue.c:74
int32_t lower_priority_chance
Definition: workqueue.c:142
void * state
Definition: workqueue.c:136
struct threadpool_t * in_pool
Definition: workqueue.c:133
unsigned generation
Definition: workqueue.c:140
replyqueue_t * reply_queue
Definition: workqueue.c:138
Definition: workqueue.c:98
Header for threads.c.
Macros to manage assertions, fatal and non-fatal.
#define tor_assert_nonfatal_unreached()
Definition: util_bug.h:177
#define tor_assert(expr)
Definition: util_bug.h:103
Header for weakrng.c.
#define workqueue_priority_bitfield_t
Definition: workqueue.c:94
static int worker_thread_has_work(workerthread_t *thread)
Definition: workqueue.c:217
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
Definition: workqueue.c:579
static int threadpool_start_threads(threadpool_t *pool, int n)
Definition: workqueue.c:497
static void workqueue_entry_free_(workqueue_entry_t *ent)
Definition: workqueue.c:171
void replyqueue_process(replyqueue_t *queue)
Definition: workqueue.c:650
void * workqueue_entry_cancel(workqueue_entry_t *ent)
Definition: workqueue.c:194
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
Definition: workqueue.c:232
static void worker_thread_main(void *thread_)
Definition: workqueue.c:265
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
Definition: workqueue.c:610
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:389
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:416
#define MAX_THREADS
Definition: workqueue.c:485
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
Definition: workqueue.c:589
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
Definition: workqueue.c:626
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
Definition: workqueue.c:440
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
Definition: workqueue.c:327
#define CHANCE_PERMISSIVE
Definition: workqueue.c:492
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
Definition: workqueue.c:544
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
Definition: workqueue.c:345
unsigned int threadpool_get_n_threads(threadpool_t *tp)
Definition: workqueue.c:681
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:151
#define WORKQUEUE_PRIORITY_BITS
Definition: workqueue.c:96
Header for workqueue.c.
workqueue_reply_t
Definition: workqueue.h:24
workqueue_priority_t
Definition: workqueue.h:31