Tor 0.4.9.2-alpha-dev
|
Implements worker threads, queues of work for them, and mechanisms for them to send answers back to the main thread. More...
#include "orconfig.h"
#include "lib/evloop/compat_libevent.h"
#include "lib/evloop/workqueue.h"
#include "lib/crypt_ops/crypto_rand.h"
#include "lib/intmath/weakrng.h"
#include "lib/log/ratelim.h"
#include "lib/log/log.h"
#include "lib/log/util_bug.h"
#include "lib/net/alertsock.h"
#include "lib/net/socket.h"
#include "lib/thread/threads.h"
#include "ext/tor_queue.h"
#include <event2/event.h>
#include <string.h>
Go to the source code of this file.
Data Structures | |
struct | threadpool_t |
struct | workqueue_entry_t |
struct | replyqueue_t |
struct | workerthread_t |
Macros | |
#define | WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH |
#define | WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW |
#define | WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) |
#define | workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) |
#define | WORKQUEUE_PRIORITY_BITS 2 |
#define | workerthread_free(thread) FREE_AND_NULL(workerthread_t, workerthread_free_, (thread)) |
#define | replyqueue_free(queue) FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue)) |
#define | workqueue_entry_free(ent) FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent)) |
#define | MAX_THREADS 1024 |
#define | CHANCE_PERMISSIVE 37 |
#define | CHANCE_STRICT INT32_MAX |
Typedefs | |
typedef struct work_tailq_t | work_tailq_t |
Functions | |
TOR_TAILQ_HEAD (work_tailq_t, workqueue_entry_t) | |
static void | queue_reply (replyqueue_t *queue, workqueue_entry_t *work) |
static void | workerthread_free_ (workerthread_t *thread) |
static void | replyqueue_free_ (replyqueue_t *queue) |
static workqueue_entry_t * | workqueue_entry_new (workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg) |
static void | workqueue_entry_free_ (workqueue_entry_t *ent) |
void * | workqueue_entry_cancel (workqueue_entry_t *ent) |
static int | worker_thread_has_work (workerthread_t *thread) |
static workqueue_entry_t * | worker_thread_extract_next_work (workerthread_t *thread) |
static void | worker_thread_main (void *thread_) |
static workerthread_t * | workerthread_new (int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue) |
workqueue_entry_t * | threadpool_queue_work_priority (threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg) |
workqueue_entry_t * | threadpool_queue_work (threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg) |
int | threadpool_queue_update (threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg) |
static int | threadpool_start_threads (threadpool_t *pool, int n) |
static void | threadpool_stop_threads (threadpool_t *pool) |
threadpool_t * | threadpool_new (int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg) |
void | threadpool_free_ (threadpool_t *pool) |
replyqueue_t * | threadpool_get_replyqueue (threadpool_t *tp) |
replyqueue_t * | replyqueue_new (uint32_t alertsocks_flags) |
static void | reply_event_cb (evutil_socket_t sock, short events, void *arg) |
int | threadpool_register_reply_event (threadpool_t *tp, void(*cb)(threadpool_t *tp)) |
void | replyqueue_process (replyqueue_t *queue) |
unsigned int | threadpool_get_n_threads (threadpool_t *tp) |
Implements worker threads, queues of work for them, and mechanisms for them to send answers back to the main thread.
The main structure here is a threadpool_t : it manages a set of worker threads, a queue of pending work, and a reply queue. Every piece of work is a workqueue_entry_t, containing data to process and a function to process it with.
The main thread informs the worker threads of pending work by using a condition variable. The workers inform the main process of completed work by using an alert_sockets_t object, as implemented in net/alertsock.c.
The main thread can also queue an "update" that will be handled by all the workers. This is useful for updating state that all the workers share.
In Tor today, there is currently only one thread pool, managed in cpuworker.c and handling a variety of types of work, from the original "onion skin" circuit handshakes, to consensus diff computation, to client-side onion service PoW generation.
Definition in file workqueue.c.
#define CHANCE_PERMISSIVE 37 |
For half of our threads, choose lower priority queues with probability 1/N for each of these values. Both are chosen somewhat arbitrarily. If CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks stalling forever. If it's too high, we have a risk of low-priority tasks grabbing half of the threads.
Definition at line 549 of file workqueue.c.
#define CHANCE_STRICT INT32_MAX |
Definition at line 550 of file workqueue.c.
#define MAX_THREADS 1024 |
Don't have more than this many threads per pool.
Definition at line 542 of file workqueue.c.
#define replyqueue_free | ( | queue | ) | FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue)) |
Definition at line 157 of file workqueue.c.
#define workerthread_free | ( | thread | ) | FREE_AND_NULL(workerthread_t, workerthread_free_, (thread)) |
Definition at line 154 of file workqueue.c.
#define workqueue_entry_free | ( | ent | ) | FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent)) |
Definition at line 176 of file workqueue.c.
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) |
Definition at line 48 of file workqueue.c.
#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) |
Used to put a workqueue_priority_t value into a bitfield.
Definition at line 101 of file workqueue.c.
#define WORKQUEUE_PRIORITY_BITS 2 |
Number of bits needed to hold all legal values of workqueue_priority_t
Definition at line 103 of file workqueue.c.
#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH |
Definition at line 46 of file workqueue.c.
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW |
Definition at line 47 of file workqueue.c.
typedef struct work_tailq_t work_tailq_t |
Definition at line 51 of file workqueue.c.
|
static |
Put a reply on the reply queue. The reply must not currently be on any thread's work queue.
Definition at line 375 of file workqueue.c.
|
static |
Internal: Run from the libevent mainloop when there is work to handle in the reply queue handler.
Definition at line 812 of file workqueue.c.
|
static |
Free up the resources allocated by a reply queue.
Definition at line 793 of file workqueue.c.
replyqueue_t * replyqueue_new | ( | uint32_t | alertsocks_flags | ) |
Allocate a new reply queue. Reply queues are used to pass results from worker threads to the main thread. Since the main thread is running an IO-centric event loop, it needs to get woken up with means other than a condition variable.
Definition at line 771 of file workqueue.c.
void replyqueue_process | ( | replyqueue_t * | queue | ) |
Process all pending replies on a reply queue. The main thread should call this function every time the socket returned by replyqueue_get_socket() is readable.
Definition at line 852 of file workqueue.c.
Referenced by reply_event_cb().
void threadpool_free_ | ( | threadpool_t * | pool | ) |
Free up the resources allocated by worker threads, worker thread pool, ...
Definition at line 708 of file workqueue.c.
unsigned int threadpool_get_n_threads | ( | threadpool_t * | tp | ) |
Return the number of threads configured for the given pool.
Definition at line 883 of file workqueue.c.
replyqueue_t * threadpool_get_replyqueue | ( | threadpool_t * | tp | ) |
Return the reply queue associated with a given thread pool.
Definition at line 761 of file workqueue.c.
threadpool_t * threadpool_new | ( | int | n_threads, |
replyqueue_t * | replyqueue, | ||
void *(*)(void *) | new_thread_state_fn, | ||
void(*)(void *) | free_thread_state_fn, | ||
void * | arg | ||
) |
Construct a new thread pool with n worker threads, configured to send their output to replyqueue. The threads' states will be constructed with the new_thread_state_fn call, receiving arg as its argument. When the threads close, they will call free_thread_state_fn on their states.
Definition at line 670 of file workqueue.c.
int threadpool_queue_update | ( | threadpool_t * | pool, |
void *(*)(void *) | dup_fn, | ||
workqueue_reply_t(*)(void *, void *) | fn, | ||
void(*)(void *) | free_fn, | ||
void * | arg | ||
) |
Queue a copy of a work item for every thread in a pool. This can be used, for example, to tell the threads to update some parameter in their states.
Arguments are as for threadpool_queue_work, except that the arg value is passed to dup_fn once per each thread to make a copy of it.
UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update will be run. If a new update is scheduled before the old update finishes running, then the new will replace the old in any threads that haven't run it yet.
Return 0 on success, -1 on failure.
Definition at line 497 of file workqueue.c.
workqueue_entry_t * threadpool_queue_work | ( | threadpool_t * | pool, |
workqueue_reply_t(*)(void *, void *) | fn, | ||
void(*)(void *) | reply_fn, | ||
void * | arg | ||
) |
As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH
Definition at line 473 of file workqueue.c.
workqueue_entry_t * threadpool_queue_work_priority | ( | threadpool_t * | pool, |
workqueue_priority_t | prio, | ||
workqueue_reply_t(*)(void *, void *) | fn, | ||
void(*)(void *) | reply_fn, | ||
void * | arg | ||
) |
Queue an item of work for a thread in a thread pool. The function fn will be run in a worker thread, and will receive as arguments the thread's state object, and the provided object arg. It must return one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
Regardless of its return value, the function reply_fn will later be run in the main thread when it invokes replyqueue_process(), and will receive as its argument the same arg object. It's the reply function's responsibility to free the work object.
On success, return a workqueue_entry_t object that can be passed to workqueue_entry_cancel(). On failure, return NULL. (Failure is not currently possible, but callers should check anyway.)
Items are executed in a loose priority order – each thread will usually take from the queued work with the highest prioirity, but will occasionally visit lower-priority queues to keep them from starving completely.
Note that because of priorities and thread behavior, work items may not be executed strictly in order.
Definition at line 446 of file workqueue.c.
Referenced by threadpool_queue_work().
int threadpool_register_reply_event | ( | threadpool_t * | tp, |
void(*)(threadpool_t *tp) | cb | ||
) |
Register the threadpool tp's reply queue with Tor's global libevent mainloop. If cb is provided, it is run after each time there is work to process from the reply queue. Return 0 on success, -1 on failure.
Definition at line 828 of file workqueue.c.
|
static |
Launch threads until we have n.
Definition at line 554 of file workqueue.c.
|
static |
Stop all worker threads
Definition at line 635 of file workqueue.c.
Referenced by threadpool_free_().
|
static |
Extract the next workqueue_entry_t from the the thread's pool, removing it from the relevant queues and marking it as non-pending.
The caller must hold the lock.
Definition at line 245 of file workqueue.c.
|
static |
|
static |
Main function for the worker thread.
Definition at line 278 of file workqueue.c.
|
static |
Free up the resources allocated by a worker thread.
Definition at line 418 of file workqueue.c.
|
static |
Allocate and start a new worker thread to use state object state, and send responses to replyqueue.
Definition at line 393 of file workqueue.c.
void * workqueue_entry_cancel | ( | workqueue_entry_t * | ent | ) |
Cancel a workqueue_entry_t that has been returned from threadpool_queue_work.
You must not call this function on any work whose reply function has been executed in the main thread; that will cause undefined behavior (probably, a crash).
If the work is cancelled, this function return the argument passed to the work function. It is the caller's responsibility to free this storage.
This function will have no effect if the worker thread has already executed or begun to execute the work item. In that case, it will return NULL.
Definition at line 207 of file workqueue.c.
Referenced by cpuworker_cancel_circ_handshake().
|
static |
Release all storage held in ent. Call only when ent is not on any queue.
Definition at line 184 of file workqueue.c.
|
static |
Allocate and return a new workqueue_entry_t, set up to run the function fn in the worker thread, and reply_fn in the main thread. See threadpool_queue_work() for full documentation.
Definition at line 164 of file workqueue.c.