Tor 0.5.0.0-alpha-dev
Loading...
Searching...
No Matches
workqueue.c
Go to the documentation of this file.
1
2/* copyright (c) 2013-2024, The Tor Project, Inc. */
3/* See LICENSE for licensing information */
4
5/**
6 * \file workqueue.c
7 *
8 * \brief Implements worker threads, queues of work for them, and mechanisms
9 * for them to send answers back to the main thread.
10 *
11 * The main structure here is a threadpool_t : it manages a set of worker
12 * threads, a queue of pending work, and a reply queue. Every piece of work
13 * is a workqueue_entry_t, containing data to process and a function to
14 * process it with.
15 *
16 * The main thread informs the worker threads of pending work by using a
17 * condition variable. The workers inform the main process of completed work
18 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
19 *
20 * The main thread can also queue an "update" that will be handled by all the
21 * workers. This is useful for updating state that all the workers share.
22 *
23 * In Tor today, there is currently only one thread pool, managed
24 * in cpuworker.c and handling a variety of types of work, from the original
25 * "onion skin" circuit handshakes, to consensus diff computation, to
26 * client-side onion service PoW generation.
27 */
28
29#include "orconfig.h"
32
34#include "lib/intmath/weakrng.h"
35#include "lib/log/ratelim.h"
36#include "lib/log/log.h"
37#include "lib/log/util_bug.h"
38#include "lib/net/alertsock.h"
39#include "lib/net/socket.h"
40#include "lib/thread/threads.h"
42
43#include "ext/tor_queue.h"
44#include <event2/event.h>
45#include <string.h>
46
47#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
48#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
49#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
50
51TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
52typedef struct work_tailq_t work_tailq_t;
53
55 /** An array of pointers to workerthread_t: one for each running worker
56 * thread. */
58
59 /** Condition variable that we wait on when we have no work, and which
60 * gets signaled when our queue becomes nonempty. */
62 /** Queues of pending work that we have to do. The queue with priority
63 * <b>p</b> is work[p]. */
64 work_tailq_t work[WORKQUEUE_N_PRIORITIES];
65
66 /** The current 'update generation' of the threadpool. Any thread that is
67 * at an earlier generation needs to run the update function. */
68 unsigned generation;
69
70 /** Function that should be run for updates on each thread. */
71 workqueue_reply_t (*update_fn)(void *, void *);
72 /** Function to free update arguments if they can't be run. */
73 void (*free_update_arg_fn)(void *);
74 /** Array of n_threads update arguments. */
76 /** Event to notice when another thread has sent a reply. */
77 struct event *reply_event;
78 void (*reply_cb)(threadpool_t *);
79
80 /** Number of elements in threads. */
82 /** Number of elements to be created in threads. */
84 /** Mutex to protect all the above fields. */
86
87 /** A reply queue to use when constructing new threads. */
89
90 /** Functions used to allocate and free thread state. */
91 void *(*new_thread_state_fn)(void*);
92 void (*free_thread_state_fn)(void*);
93 void *new_thread_state_arg;
94
95 /** Used for signalling the worker threads to exit. */
96 int exit;
97 /** Mutex for controlling worker threads' startup and exit. */
99};
100
101/** Used to put a workqueue_priority_t value into a bitfield. */
102#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
103/** Number of bits needed to hold all legal values of workqueue_priority_t */
104#define WORKQUEUE_PRIORITY_BITS 2
105
107 /** The next workqueue_entry_t that's pending on the same thread or
108 * reply queue. */
109 TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
110 /** The threadpool to which this workqueue_entry_t was assigned. This field
111 * is set when the workqueue_entry_t is created, and won't be cleared until
112 * after it's handled in the main thread. */
113 struct threadpool_t *on_pool;
114 /** True iff this entry is waiting for a worker to start processing it. */
115 uint8_t pending;
116 /** Priority of this entry. */
118 /** Function to run in the worker thread. */
119 workqueue_reply_t (*fn)(void *state, void *arg);
120 /** Function to run while processing the reply queue. */
121 void (*reply_fn)(void *arg);
122 /** Argument for the above functions. */
123 void *arg;
124};
125
127 /** Mutex to protect the answers field */
129 /** Doubly-linked list of answers that the reply queue needs to handle. */
130 TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
131
132 /** Mechanism to wake up the main thread when it is receiving answers. */
133 alert_sockets_t alert;
134};
135
136/** A worker thread represents a single thread in a thread pool. */
137typedef struct workerthread_t {
138 /** Which thread it this? In range 0..in_pool->n_threads-1 */
139 int index;
140 /** The pool this thread is a part of. */
142 /** User-supplied state field that we pass to the worker functions of each
143 * work item. */
144 void *state;
145 /** Reply queue to which we pass our results. */
147 /** The current update generation of this thread */
148 unsigned generation;
149 /** One over the probability of taking work from a lower-priority queue. */
152
153static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
154static void workerthread_free_(workerthread_t *thread);
155#define workerthread_free(thread) \
156 FREE_AND_NULL(workerthread_t, workerthread_free_, (thread))
157static void replyqueue_free_(replyqueue_t *queue);
158#define replyqueue_free(queue) \
159 FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue))
160
161/** Allocate and return a new workqueue_entry_t, set up to run the function
162 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
163 * thread. See threadpool_queue_work() for full documentation. */
164static workqueue_entry_t *
166 void (*reply_fn)(void*),
167 void *arg)
168{
169 workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
170 ent->fn = fn;
171 ent->reply_fn = reply_fn;
172 ent->arg = arg;
173 ent->priority = WQ_PRI_HIGH;
174 return ent;
175}
176
177#define workqueue_entry_free(ent) \
178 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
179
180/**
181 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
182 * any queue.
183 */
184static void
186{
187 if (!ent)
188 return;
189 memset(ent, 0xf0, sizeof(*ent));
190 tor_free(ent);
191}
192
193/**
194 * Cancel a workqueue_entry_t that has been returned from
195 * threadpool_queue_work.
196 *
197 * You must not call this function on any work whose reply function has been
198 * executed in the main thread; that will cause undefined behavior (probably,
199 * a crash).
200 *
201 * If the work is cancelled, this function return the argument passed to the
202 * work function. It is the caller's responsibility to free this storage.
203 *
204 * This function will have no effect if the worker thread has already executed
205 * or begun to execute the work item. In that case, it will return NULL.
206 */
207void *
209{
210 int cancelled = 0;
211 void *result = NULL;
212 tor_mutex_acquire(&ent->on_pool->lock);
213 workqueue_priority_t prio = ent->priority;
214 if (ent->pending) {
215 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
216 cancelled = 1;
217 result = ent->arg;
218 }
219 tor_mutex_release(&ent->on_pool->lock);
220
221 if (cancelled) {
222 workqueue_entry_free(ent);
223 }
224 return result;
225}
226
227/**DOCDOC
228
229 must hold lock */
230static int
232{
233 unsigned i;
234 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
235 if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
236 return 1;
237 }
238 return thread->generation != thread->in_pool->generation;
239}
240
241/** Extract the next workqueue_entry_t from the the thread's pool, removing
242 * it from the relevant queues and marking it as non-pending.
243 *
244 * The caller must hold the lock. */
245static workqueue_entry_t *
247{
248 threadpool_t *pool = thread->in_pool;
249 work_tailq_t *queue = NULL, *this_queue;
250 unsigned i;
251 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
252 this_queue = &pool->work[i];
253 if (!TOR_TAILQ_EMPTY(this_queue)) {
254 queue = this_queue;
256 thread->lower_priority_chance)) {
257 /* Usually we'll just break now, so that we can get out of the loop
258 * and use the queue where we found work. But with a small
259 * probability, we'll keep looking for lower priority work, so that
260 * we don't ignore our low-priority queues entirely. */
261 break;
262 }
263 }
264 }
265
266 if (queue == NULL)
267 return NULL;
268
269 workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
270 TOR_TAILQ_REMOVE(queue, work, next_work);
271 work->pending = 0;
272 return work;
273}
274
275/**
276 * Main function for the worker thread.
277 */
278static void
279worker_thread_main(void *thread_)
280{
281 static int n_worker_threads_running = 0;
282 static unsigned long control_lock_owner = 0;
283 workerthread_t *thread = thread_;
284 threadpool_t *pool = thread->in_pool;
286 workqueue_reply_t result;
287
289 log_debug(LD_GENERAL, "Worker thread %u/%u has started [TID: %lu].",
290 n_worker_threads_running + 1, pool->n_threads_max,
292
293 if (++n_worker_threads_running == pool->n_threads_max)
295
297
298 /* Wait until all worker threads have started.
299 * pool->lock must be prelocked here. */
300 tor_mutex_acquire(&pool->lock);
301
302 if (control_lock_owner == 0) {
303 /* pool->control_lock stays locked. This is required for the main thread
304 * to wait for the worker threads to exit on shutdown, so the memory
305 * clean up won't begin before all threads have exited. */
307 control_lock_owner = tor_get_thread_id();
308 }
309
310 log_debug(LD_GENERAL, "Worker thread has entered the work loop [TID: %lu].",
312
313 while (1) {
314 /* Exit thread when signaled to exit */
315 if (pool->exit)
316 goto exit;
317
318 /* lock must be held at this point. */
319 while (worker_thread_has_work(thread)) {
320 /* lock must be held at this point. */
321 if (thread->in_pool->generation != thread->generation) {
322 void *arg = thread->in_pool->update_args[thread->index];
323 thread->in_pool->update_args[thread->index] = NULL;
324 workqueue_reply_t (*update_fn)(void*,void*) =
325 thread->in_pool->update_fn;
326 thread->generation = thread->in_pool->generation;
327 tor_mutex_release(&pool->lock);
328
329 workqueue_reply_t r = update_fn(thread->state, arg);
330
331 tor_mutex_acquire(&pool->lock);
332
333 /* We may need to exit the thread. */
334 if (r != WQ_RPL_REPLY)
335 goto exit;
336
337 continue;
338 }
340 if (BUG(work == NULL))
341 break;
342 tor_mutex_release(&pool->lock);
343
344 /* We run the work function without holding the thread lock. This
345 * is the main thread's first opportunity to give us more work. */
346 result = work->fn(thread->state, work->arg);
347
348 /* Queue the reply for the main thread. */
349 queue_reply(thread->reply_queue, work);
350
351 tor_mutex_acquire(&pool->lock);
352
353 /* We may need to exit the thread. */
354 if (result != WQ_RPL_REPLY)
355 goto exit;
356 }
357 /* At this point the lock is held, and there is no work in this thread's
358 * queue. */
359
360 /* TODO: support an idle-function */
361
362 /* Okay. Now, wait till somebody has work for us. */
363 if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
364 log_warn(LD_GENERAL, "Fail tor_cond_wait.");
365 }
366 }
367
368exit:
369 /* At this point pool->lock must be held */
370
371 log_debug(LD_GENERAL, "Worker thread %u/%u has exited [TID: %lu].",
372 pool->n_threads_max - n_worker_threads_running + 1,
374
375 if (tor_get_thread_id() == control_lock_owner) {
376 /* Wait for the other worker threads to exit so we
377 * can safely unlock pool->control_lock. */
378 while (n_worker_threads_running > 1) {
379 tor_mutex_release(&pool->lock);
380 tor_sleep_msec(10);
381 tor_mutex_acquire(&pool->lock);
382 }
383
384 tor_mutex_release(&pool->lock);
385 /* Let the main thread know, the last worker thread has exited. */
387 } else {
388 --n_worker_threads_running;
389 tor_mutex_release(&pool->lock);
390 }
391}
392
393/** Put a reply on the reply queue. The reply must not currently be on
394 * any thread's work queue. */
395static void
397{
398 int was_empty;
399 tor_mutex_acquire(&queue->lock);
400 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
401 TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
402 tor_mutex_release(&queue->lock);
403
404 if (was_empty) {
405 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
406 /* XXXX complain! */
407 }
408 }
409}
410
411/** Allocate and start a new worker thread to use state object <b>state</b>,
412 * and send responses to <b>replyqueue</b>. */
413static workerthread_t *
414workerthread_new(int32_t lower_priority_chance,
415 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
416{
417 workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
418 thr->state = state;
419 thr->reply_queue = replyqueue;
420 thr->in_pool = pool;
421 thr->lower_priority_chance = lower_priority_chance;
422
423 if (spawn_func(worker_thread_main, thr) < 0) {
424 //LCOV_EXCL_START
426 log_err(LD_GENERAL, "Can't launch worker thread.");
427 workerthread_free(thr);
428 return NULL;
429 //LCOV_EXCL_STOP
430 }
431
432 return thr;
433}
434
435/**
436 * Free up the resources allocated by a worker thread.
437 */
438static void
440{
441 tor_free(thread);
442}
443
444/**
445 * Queue an item of work for a thread in a thread pool. The function
446 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
447 * thread's state object, and the provided object <b>arg</b>. It must return
448 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
449 *
450 * Regardless of its return value, the function <b>reply_fn</b> will later be
451 * run in the main thread when it invokes replyqueue_process(), and will
452 * receive as its argument the same <b>arg</b> object. It's the reply
453 * function's responsibility to free the work object.
454 *
455 * On success, return a workqueue_entry_t object that can be passed to
456 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
457 * currently possible, but callers should check anyway.)
458 *
459 * Items are executed in a loose priority order -- each thread will usually
460 * take from the queued work with the highest prioirity, but will occasionally
461 * visit lower-priority queues to keep them from starving completely.
462 *
463 * Note that because of priorities and thread behavior, work items may not
464 * be executed strictly in order.
465 */
469 workqueue_reply_t (*fn)(void *, void *),
470 void (*reply_fn)(void *),
471 void *arg)
472{
473 tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
474 ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
475
476 workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
477 ent->on_pool = pool;
478 ent->pending = 1;
479 ent->priority = prio;
480
481 tor_mutex_acquire(&pool->lock);
482
483 TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
484
486
487 tor_mutex_release(&pool->lock);
488
489 return ent;
490}
491
492/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
495 workqueue_reply_t (*fn)(void *, void *),
496 void (*reply_fn)(void *),
497 void *arg)
498{
499 return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
500}
501
502/**
503 * Queue a copy of a work item for every thread in a pool. This can be used,
504 * for example, to tell the threads to update some parameter in their states.
505 *
506 * Arguments are as for <b>threadpool_queue_work</b>, except that the
507 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
508 * make a copy of it.
509 *
510 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
511 * will be run. If a new update is scheduled before the old update finishes
512 * running, then the new will replace the old in any threads that haven't run
513 * it yet.
514 *
515 * Return 0 on success, -1 on failure.
516 */
517int
519 void *(*dup_fn)(void *),
520 workqueue_reply_t (*fn)(void *, void *),
521 void (*free_fn)(void *),
522 void *arg)
523{
524 int i, n_threads;
525 void (*old_args_free_fn)(void *arg);
526 void **old_args;
527 void **new_args;
528
529 tor_mutex_acquire(&pool->lock);
530 n_threads = pool->n_threads;
531 old_args = pool->update_args;
532 old_args_free_fn = pool->free_update_arg_fn;
533
534 new_args = tor_calloc(n_threads, sizeof(void*));
535 for (i = 0; i < n_threads; ++i) {
536 if (dup_fn)
537 new_args[i] = dup_fn(arg);
538 else
539 new_args[i] = arg;
540 }
541
542 pool->update_args = new_args;
543 pool->free_update_arg_fn = free_fn;
544 pool->update_fn = fn;
545 ++pool->generation;
546
548
549 tor_mutex_release(&pool->lock);
550
551 if (old_args) {
552 for (i = 0; i < n_threads; ++i) {
553 if (old_args[i] && old_args_free_fn)
554 old_args_free_fn(old_args[i]);
555 }
556 tor_free(old_args);
557 }
558
559 return 0;
560}
561
562/** Don't have more than this many threads per pool. */
563#define MAX_THREADS 1024
564
565/** For half of our threads, choose lower priority queues with probability
566 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
567 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
568 * stalling forever. If it's too high, we have a risk of low-priority tasks
569 * grabbing half of the threads. */
570#define CHANCE_PERMISSIVE 37
571#define CHANCE_STRICT INT32_MAX
572
573/** Launch threads until we have <b>n</b>. */
574static int
576{
577 if (BUG(n < 0))
578 return -1; // LCOV_EXCL_LINE
579 if (n > MAX_THREADS)
580 n = MAX_THREADS;
581
583 tor_mutex_acquire(&pool->lock);
584
585 if (pool->n_threads < n)
586 pool->threads = tor_reallocarray(pool->threads,
587 sizeof(workerthread_t*), n);
588
589 int status = 0;
590 pool->n_threads_max = n;
591 log_debug(LD_GENERAL, "Starting worker threads...");
592
593 while (pool->n_threads < n) {
594 /* For half of our threads, we'll choose lower priorities permissively;
595 * for the other half, we'll stick more strictly to higher priorities.
596 * This keeps slow low-priority tasks from taking over completely. */
597 int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
598
599 void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
600 workerthread_t *thr = workerthread_new(chance,
601 state, pool, pool->reply_queue);
602
603 if (!thr) {
604 //LCOV_EXCL_START
606 pool->free_thread_state_fn(state);
607 status = -1;
608 goto check_status;
609 //LCOV_EXCL_STOP
610 }
611 thr->index = pool->n_threads;
612 pool->threads[pool->n_threads++] = thr;
613 }
614
615 struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
616
617 /* Wait for the last launched thread to confirm us, it has started.
618 * Wait max 30 seconds */
619 status = tor_cond_wait(&pool->condition, &pool->control_lock, &tv);
620
621check_status:
622 switch (status) {
623 case 0:
624 log_debug(LD_GENERAL, "Starting worker threads finished.");
625 break;
626 case -1:
627 log_warn(LD_GENERAL, "Failed to confirm worker threads' start up.");
628 break;
629 case 1:
630 log_warn(LD_GENERAL, "Failed to confirm worker threads' "
631 "start up after timeout.");
632 FALLTHROUGH;
633 default:
634 status = -1;
635 }
636
637 log_debug(LD_GENERAL, "Signaled the worker threads to enter the work loop.");
638
639 /* If we had an error, let the worker threads (if any) exit directly. */
640 if (status != 0) {
641 pool->exit = 1;
642 log_debug(LD_GENERAL, "Signaled the worker threads to exit...");
643 }
644
645 /* Let worker threads enter the work loop. */
646 tor_mutex_release(&pool->lock);
647 /* Let one of the worker threads take the ownership of pool->control_lock.
648 * This is required for compliance with POSIX. */
650
651 return status;
652}
653
654/** Stop all worker threads */
655static void
657{
658 tor_mutex_acquire(&pool->lock);
659
660 if (pool->exit == 0) {
661 /* Signal the worker threads to exit */
662 pool->exit = 1;
663 /* If worker threads are waiting for work, let them continue to exit */
665
666 log_debug(LD_GENERAL, "Signaled worker threads to exit. "
667 "Waiting for them to exit...");
668 }
669
670 tor_mutex_release(&pool->lock);
671
672 /* Wait until all worker threads have exited.
673 * pool->control_lock must be prelocked here. */
675 /* Unlock required, else main thread hangs on mutex uninit. */
677
678 /* If this message appears in the log before all threads have confirmed
679 * their exit, then pool->control_lock wasn't prelocked for some reason. */
680 log_debug(LD_GENERAL, "All worker threads have exited.");
681}
682
683/**
684 * Construct a new thread pool with <b>n</b> worker threads, configured to
685 * send their output to <b>replyqueue</b>. The threads' states will be
686 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
687 * as its argument. When the threads close, they will call
688 * <b>free_thread_state_fn</b> on their states.
689 */
691threadpool_new(int n_threads,
692 replyqueue_t *replyqueue,
693 void *(*new_thread_state_fn)(void*),
694 void (*free_thread_state_fn)(void*),
695 void *arg)
696{
697 threadpool_t *pool;
698 pool = tor_malloc_zero(sizeof(threadpool_t));
700 tor_cond_init(&pool->condition);
702 pool->exit = 0;
703
704 unsigned i;
705 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
706 TOR_TAILQ_INIT(&pool->work[i]);
707 }
708
709 pool->new_thread_state_fn = new_thread_state_fn;
710 pool->new_thread_state_arg = arg;
711 pool->free_thread_state_fn = free_thread_state_fn;
712 pool->reply_queue = replyqueue;
713
714 if (threadpool_start_threads(pool, n_threads) < 0) {
715 //LCOV_EXCL_START
717 threadpool_free(pool);
718 return NULL;
719 //LCOV_EXCL_STOP
720 }
721
722 return pool;
723}
724
725/**
726 * Free up the resources allocated by worker threads, worker thread pool, ...
727 */
728void
730{
731 if (!pool)
732 return;
733
735
736 log_debug(LD_GENERAL, "Beginning to clean up...");
737
739 tor_mutex_uninit(&pool->lock);
741
742 if (pool->threads) {
743 for (int i = 0; i != pool->n_threads; ++i)
744 workerthread_free(pool->threads[i]);
745
746 tor_free(pool->threads);
747 }
748
749 if (pool->update_args) {
750 if (!pool->free_update_arg_fn)
751 log_warn(LD_GENERAL, "Freeing pool->update_args not possible. "
752 "pool->free_update_arg_fn is not set.");
753 else
754 pool->free_update_arg_fn(pool->update_args);
755 }
756
757 if (pool->reply_event) {
758 if (tor_event_del(pool->reply_event) == -1)
759 log_warn(LD_GENERAL, "libevent error: deleting reply event failed.");
760 else
761 tor_event_free(pool->reply_event);
762 }
763
764 if (pool->reply_queue)
765 replyqueue_free(pool->reply_queue);
766
767 if (pool->new_thread_state_arg) {
768 if (!pool->free_thread_state_fn)
769 log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. "
770 "pool->free_thread_state_fn is not set.");
771 else
772 pool->free_thread_state_fn(pool->new_thread_state_arg);
773 }
774
775 tor_free(pool);
776
777 log_debug(LD_GENERAL, "Cleanup finished.");
778}
779
780/** Return the reply queue associated with a given thread pool. */
786
787/** Allocate a new reply queue. Reply queues are used to pass results from
788 * worker threads to the main thread. Since the main thread is running an
789 * IO-centric event loop, it needs to get woken up with means other than a
790 * condition variable. */
792replyqueue_new(uint32_t alertsocks_flags)
793{
794 replyqueue_t *rq;
795
796 rq = tor_malloc_zero(sizeof(replyqueue_t));
797 if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
798 //LCOV_EXCL_START
799 replyqueue_free(rq);
800 return NULL;
801 //LCOV_EXCL_STOP
802 }
803
804 tor_mutex_init(&rq->lock);
805 TOR_TAILQ_INIT(&rq->answers);
806
807 return rq;
808}
809
810/**
811 * Free up the resources allocated by a reply queue.
812 */
813static void
815{
816 if (!queue)
817 return;
818
819 workqueue_entry_t *work;
820
821 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
822 work = TOR_TAILQ_FIRST(&queue->answers);
823 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
824 workqueue_entry_free(work);
825 }
826
827 tor_free(queue);
828}
829
830/** Internal: Run from the libevent mainloop when there is work to handle in
831 * the reply queue handler. */
832static void
833reply_event_cb(evutil_socket_t sock, short events, void *arg)
834{
835 threadpool_t *tp = arg;
836 (void) sock;
837 (void) events;
839 if (tp->reply_cb)
840 tp->reply_cb(tp);
841}
842
843/** Register the threadpool <b>tp</b>'s reply queue with Tor's global
844 * libevent mainloop. If <b>cb</b> is provided, it is run after
845 * each time there is work to process from the reply queue. Return 0 on
846 * success, -1 on failure.
847 */
848int
850 void (*cb)(threadpool_t *tp))
851{
852 struct event_base *base = tor_libevent_get_base();
853
854 if (tp->reply_event) {
855 tor_event_free(tp->reply_event);
856 }
857 tp->reply_event = tor_event_new(base,
858 tp->reply_queue->alert.read_fd,
859 EV_READ|EV_PERSIST,
861 tp);
863 tp->reply_cb = cb;
864 return event_add(tp->reply_event, NULL);
865}
866
867/**
868 * Process all pending replies on a reply queue. The main thread should call
869 * this function every time the socket returned by replyqueue_get_socket() is
870 * readable.
871 */
872void
874{
875 int r = queue->alert.drain_fn(queue->alert.read_fd);
876 if (r < 0) {
877 //LCOV_EXCL_START
878 static ratelim_t warn_limit = RATELIM_INIT(7200);
879 log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
880 "Failure from drain_fd: %s",
881 tor_socket_strerror(-r));
882 //LCOV_EXCL_STOP
883 }
884
885 tor_mutex_acquire(&queue->lock);
886 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
887 /* lock must be held at this point.*/
888 workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
889 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
890 tor_mutex_release(&queue->lock);
891 work->on_pool = NULL;
892
893 work->reply_fn(work->arg);
894 workqueue_entry_free(work);
895
896 tor_mutex_acquire(&queue->lock);
897 }
898
899 tor_mutex_release(&queue->lock);
900}
901
902/** Return the number of threads configured for the given pool. */
903unsigned int
905{
906 tor_assert(tp);
907 return tp->n_threads;
908}
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
Definition alertsock.c:191
Header for alertsock.c.
struct event_base * tor_libevent_get_base(void)
Header for compat_libevent.c.
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_release(tor_mutex_t *m)
void tor_mutex_init(tor_mutex_t *m)
void tor_mutex_acquire(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
void tor_cond_signal_all(tor_cond_t *cond)
int tor_cond_init(tor_cond_t *cond)
int spawn_func(void(*func)(void *), void *data)
void tor_cond_uninit(tor_cond_t *cond)
void tor_cond_signal_one(tor_cond_t *cond)
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
unsigned long tor_get_thread_id(void)
void tor_sleep_msec(int msec)
Definition compat_time.c:58
Functions and types for monotonic times.
Common functions for using (pseudo-)random number generators.
#define crypto_fast_rng_one_in_n(rng, n)
Definition crypto_rand.h:80
crypto_fast_rng_t * get_thread_fast_rng(void)
Headers for log.c.
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Definition log.h:288
#define LD_GENERAL
Definition log.h:62
#define LOG_WARN
Definition log.h:53
#define tor_free(p)
Definition malloc.h:56
Summarize similar messages that would otherwise flood the logs.
Header for socket.c.
tor_mutex_t lock
Definition workqueue.c:128
void *(* new_thread_state_fn)(void *)
Definition workqueue.c:91
int n_threads_max
Definition workqueue.c:83
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
Definition workqueue.c:64
tor_mutex_t control_lock
Definition workqueue.c:98
unsigned generation
Definition workqueue.c:68
void(* free_update_arg_fn)(void *)
Definition workqueue.c:73
tor_mutex_t lock
Definition workqueue.c:85
struct event * reply_event
Definition workqueue.c:77
tor_cond_t condition
Definition workqueue.c:61
workqueue_reply_t(* update_fn)(void *, void *)
Definition workqueue.c:71
struct workerthread_t ** threads
Definition workqueue.c:57
replyqueue_t * reply_queue
Definition workqueue.c:88
void ** update_args
Definition workqueue.c:75
int32_t lower_priority_chance
Definition workqueue.c:150
struct threadpool_t * in_pool
Definition workqueue.c:141
unsigned generation
Definition workqueue.c:148
replyqueue_t * reply_queue
Definition workqueue.c:146
Definition workqueue.c:106
Header for threads.c.
Macros to manage assertions, fatal and non-fatal.
#define tor_assert_nonfatal_unreached()
Definition util_bug.h:177
#define tor_assert(expr)
Definition util_bug.h:103
Header for weakrng.c.
#define workqueue_priority_bitfield_t
Definition workqueue.c:102
void threadpool_free_(threadpool_t *pool)
Definition workqueue.c:729
static int worker_thread_has_work(workerthread_t *thread)
Definition workqueue.c:231
static void threadpool_stop_threads(threadpool_t *pool)
Definition workqueue.c:656
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
Definition workqueue.c:782
static int threadpool_start_threads(threadpool_t *pool, int n)
Definition workqueue.c:575
static void workqueue_entry_free_(workqueue_entry_t *ent)
Definition workqueue.c:185
void replyqueue_process(replyqueue_t *queue)
Definition workqueue.c:873
void * workqueue_entry_cancel(workqueue_entry_t *ent)
Definition workqueue.c:208
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
Definition workqueue.c:246
static void worker_thread_main(void *thread_)
Definition workqueue.c:279
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
Definition workqueue.c:833
static void replyqueue_free_(replyqueue_t *queue)
Definition workqueue.c:814
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition workqueue.c:467
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition workqueue.c:494
#define MAX_THREADS
Definition workqueue.c:563
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
Definition workqueue.c:792
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
Definition workqueue.c:849
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
Definition workqueue.c:518
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
Definition workqueue.c:396
#define CHANCE_PERMISSIVE
Definition workqueue.c:570
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
Definition workqueue.c:691
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
Definition workqueue.c:414
unsigned int threadpool_get_n_threads(threadpool_t *tp)
Definition workqueue.c:904
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition workqueue.c:165
#define WORKQUEUE_PRIORITY_BITS
Definition workqueue.c:104
static void workerthread_free_(workerthread_t *thread)
Definition workqueue.c:439
Header for workqueue.c.
workqueue_reply_t
Definition workqueue.h:24
workqueue_priority_t
Definition workqueue.h:31