naev 0.10.4
threadpool.c
1/*
2 * See Licensing and Copyright notice in threadpool.h
3 */
4/*
5 * @brief A simple threadpool implementation using a single queue.
6 *
7 * The queue is inspired by this paper (look for the queue with two locks):
8 *
9 * Maged M. Michael and Michael L. Scott. 1998. Nonblocking algorithms and
10 * preemption-safe locking on multiprogrammed shared memory multiprocessors. J.
11 * Parallel Distrib. Comput. 51, 1 (May 1998), 1-26. DOI=10.1006/jpdc.1998.1446
12 * http://dx.doi.org/10.1006/jpdc.1998.1446
13 *
14 * @ARTICLE{Michael98non-blockingalgorithms,
15 * author = {Maged M. Michael and Michael L. Scott},
16 * title = {Non-Blocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors},
17 * journal = {Journal of Parallel and Distributed Computing},
18 * year = {1998},
19 * volume = {51},
20 * pages = {1--26},
21 * }
22 *
23 * @note The algorithm/strategy for killing idle workers should be moved into
24 * the threadhandler and it should also be improved (the current strategy
25 * is probably not very good).
26 */
27
28
30#include <stdlib.h>
31#include "SDL.h"
32#include "SDL_error.h"
33#include "SDL_thread.h"
36#include "threadpool.h"
37
38#include "log.h"
39
40
41#define THREADPOOL_TIMEOUT (5 * 100) /* The time a worker thread waits in ms. */
42#define THREADSIG_STOP (1) /* The signal to stop a worker thread */
43#define THREADSIG_RUN (0) /* The signal to indicate the worker thread is running */
44
45
49static int MAXTHREADS = 8; /* Bit overkill, but oh well. */
50
51
55typedef struct Node_ {
56 void *data; /* The element in the list */
57 struct Node_ *next; /* The next node in the list */
58} Node;
59
64 Node *first; /* The first node */
65 Node *last; /* The second node */
66 /* A semaphore to ensure reads only happen when the queue is not empty */
67 SDL_sem *semaphore;
68 SDL_mutex *t_lock; /* Tail lock. Lock when reading/updating tail */
69 SDL_mutex *h_lock; /* Same as tail lock, except it's head lock */
70};
71
75typedef struct ThreadQueueData_ {
76 int (*function)(void *); /* The function to be called */
77 void *data; /* And its arguments */
79
83typedef struct ThreadData_ {
84 int (*function)(void *); /* The function to be called */
85 void *data; /* Arguments to the above function */
86 int signal; /* Signals to the thread */
87 SDL_sem *semaphore; /* The semaphore to signal new jobs or new signal in the
88 'signal' variable */
89 ThreadQueue *idle; /* The queue with idle threads */
90 ThreadQueue *stopped; /* The queue with stopped threads */
92
96typedef struct vpoolThreadData_ {
97 SDL_cond *cond; /* Condition variable for signalling all jobs in the vpool
98 are done */
99 SDL_mutex *mutex; /* The mutex to use with the above condition variable */
100 int *count; /* Variable to count number of finished jobs in the vpool */
101 ThreadQueueData *node; /* The job to be done */
103
104/* The global threadpool queue */
105static ThreadQueue *global_queue = NULL;
106
107
108/*
109 * Prototypes.
110 */
111static ThreadQueue* tq_create (void);
112static void tq_enqueue( ThreadQueue *q, void *data );
113static void* tq_dequeue( ThreadQueue *q );
114static void tq_destroy( ThreadQueue *q );
115static int threadpool_worker( void *data );
116static int threadpool_handler( void *data );
117static int vpool_worker( void *data );
118
119
129static ThreadQueue* tq_create (void)
130{
131 ThreadQueue *q;
132 Node *n;
133
134 /* Queue memory allocation. */
135 q = calloc( 1, sizeof(ThreadQueue) );
136
137 /* Allocate and insert the dummy node */
138 n = calloc( 1, sizeof(Node) );
139 n->next = NULL;
140 q->first = n;
141 q->last = n;
142
143 /* Create locks. */
144 q->t_lock = SDL_CreateMutex();
145 q->h_lock = SDL_CreateMutex();
146 q->semaphore = SDL_CreateSemaphore( 0 );
147
148 return q;
149}
150
157static void tq_enqueue( ThreadQueue *q, void *data )
158{
159 Node *n;
160
161 /* Allocate new struct. */
162 n = calloc( 1, sizeof(Node) );
163 n->data = data;
164 n->next = NULL;
165
166 /* Lock */
167 SDL_mutexP( q->t_lock );
168
169 /* Enqueue. */
170 q->last->next = n;
171 q->last = n;
172
173 /* Signal and unlock. This wil break if someone tries to enqueue 2^32+1
174 * elements or something. */
175 SDL_SemPost( q->semaphore );
176 SDL_mutexV( q->t_lock );
177}
178
187static void* tq_dequeue( ThreadQueue *q )
188{
189 void *d;
190 Node *newhead, *node;
191
192 /* Lock the head. */
193 SDL_mutexP( q->h_lock );
194
195 /* Start running. */
196 node = q->first;
197 newhead = node->next;
198
199 /* Head not consistent. */
200 if (newhead == NULL) {
201 WARN(_("Tried to dequeue while the queue was empty!"));
202 /* Ugly fix :/ */
203 /*
204 SDL_mutexV(q->h_lock);
205 return NULL;
206 */
207 /* We prefer to wait until the cache updates :/ */
208 do {
209 node = q->first;
210 newhead = node->next;
211 } while (newhead == NULL);
212 }
213
214 /* Remember the value and assign newhead as the new dummy element. */
215 d = newhead->data;
216 q->first = newhead;
217
218 /* Unlock */
219 SDL_mutexV( q->h_lock );
220
221 free( node );
222 return d;
223}
224
232static void tq_destroy( ThreadQueue *q )
233{
234 /* Iterate through the list and free the nodes */
235 while (q->first->next != NULL)
236 free( tq_dequeue(q) ); /* Locks q->t_lock, so we must destroy mutex after. */
237
238 /* Clean up threading structures. */
239 SDL_DestroySemaphore( q->semaphore );
240 SDL_DestroyMutex( q->h_lock );
241 SDL_DestroyMutex( q->t_lock );
242
243 free( q->first );
244 free( q );
245}
246
247
258int threadpool_newJob( int (*function)(void *), void *data )
259{
260 ThreadQueueData *node;
261
262 if (global_queue == NULL) {
263 WARN(_("Threadpool has not been initialized yet!"));
264 return -2;
265 }
266
267 /* Allocate and set parameters. */
268 node = calloc( 1, sizeof(ThreadQueueData) );
269 node->data = data;
270 node->function = function;
271
272 /* Actually enque. */
273 tq_enqueue( global_queue, node );
274
275 return 0;
276}
277
287static int threadpool_worker( void *data )
288{
289 ThreadData *work;
290
291 work = (ThreadData*) data;
292
293 /* Work loop */
294 while (1) {
295 /* Wait for new signal */
296 while (SDL_SemWait( work->semaphore ) == -1) {
297 /* Putting this in a while-loop is probably a really bad idea, but I
298 * don't have any better ideas. */
299 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
300 }
301 /* Break if received signal to stop */
302 if (work->signal == THREADSIG_STOP)
303 break;
304
305 /* Do work :-) */
306 work->function( work->data );
307
308 /* Enqueue itself in the idle worker threads queue */
309 tq_enqueue( work->idle, work );
310 }
311 /* Enqueue itself in the stopped worker threads queue when stopped */
312 tq_enqueue( work->stopped, work );
313
314 return 0;
315}
316
337static int threadpool_handler( void *data )
338{
339 (void) data;
340 int i, nrunning, newthread;
341 ThreadData *threadargs, *threadarg;
342 /* Queues for idle workers and stopped workers */
343 ThreadQueue *idle, *stopped;
344 ThreadQueueData *node;
345
346 /* Initialize the idle and stopped queues. */
347 idle = tq_create();
348 stopped = tq_create();
349
350 /* Allocate threadargs to communicate with workers */
351 threadargs = calloc( MAXTHREADS, sizeof(ThreadData) );
352
353 /* Initialize threadargs */
354 for (i=0; i<MAXTHREADS; i++) {
355 threadargs[i].function = NULL;
356 threadargs[i].data = NULL;
357 threadargs[i].semaphore = SDL_CreateSemaphore( 0 ); /* Used to give orders. */
358 threadargs[i].idle = idle;
359 threadargs[i].stopped = stopped;
360 threadargs[i].signal = THREADSIG_RUN;
361 /* 'Workers' that do not have a thread are considered stopped */
362 tq_enqueue( stopped, &threadargs[i] );
363 }
364
365 /* Set the number of running threads to 0 */
366 nrunning = 0;
367
368 /*
369 * Thread handler main loop.
370 */
371 while (1) {
372 /*
373 * We must now wait, this shall be done on each active thread. However they will
374 * be put to sleep as time passes. When we receive a command we'll proceed to process
375 * it.
376 */
377 if (nrunning > 0) {
378 /*
379 * Here we'll wait until thread gets work to do. If it doesn't it will
380 * just stop a worker thread and wait until it gets something to do.
381 */
382 if (SDL_SemWaitTimeout( global_queue->semaphore, THREADPOOL_TIMEOUT ) != 0) {
383 /* There weren't any new jobs so we'll start killing threads ;) */
384 if (SDL_SemTryWait( idle->semaphore ) == 0) {
385 threadarg = tq_dequeue( idle );
386 /* Set signal to stop worker thread */
387 threadarg->signal = THREADSIG_STOP;
388 /* Signal thread and decrement running threads counter */
389 SDL_SemPost( threadarg->semaphore );
390 nrunning -= 1;
391 }
392
393 /* We just go back to waiting on a thread. */
394 continue;
395 }
396
397 /* We got work. Continue to handle work. */
398 }
399 else {
400 /*
401 * Here we wait for a new job. No threads are alive at this point and the
402 * threadpool is just patiently waiting for work to arrive.
403 */
404 if (SDL_SemWait( global_queue->semaphore ) == -1) {
405 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
406 continue;
407 }
408
409 /* We got work. Continue to handle work. */
410 }
411
412 /*
413 * Get a new job from the queue. This should be safe as we have received
414 * a permission from the global_queue->semaphore.
415 */
416 node = tq_dequeue( global_queue );
417 newthread = 0;
418
419 /*
420 * Choose where to get the thread. Either idle, revive stopped or block until
421 * another thread becomes idle.
422 */
423 /* Idle thread available */
424 if (SDL_SemTryWait(idle->semaphore) == 0)
425 threadarg = tq_dequeue( idle );
426 /* Make a new thread */
427 else if (SDL_SemTryWait(stopped->semaphore) == 0) {
428 threadarg = tq_dequeue( stopped );
429 threadarg->signal = THREADSIG_RUN;
430 newthread = 1;
431 }
432 /* Wait for idle thread */
433 else {
434 while (SDL_SemWait(idle->semaphore) == -1) {
435 /* Bad idea */
436 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
437 }
438 threadarg = tq_dequeue( idle );
439 }
440
441 /* Assign arguments for the thread */
442 threadarg->function = node->function;
443 threadarg->data = node->data;
444 /* Signal the thread that there's a new job */
445 SDL_SemPost( threadarg->semaphore );
446
447 /* Start a new thread and increment the thread counter */
448 if (newthread) {
449 SDL_CreateThread( threadpool_worker,
450 "threadpool_worker",
451 threadarg );
452 nrunning += 1;
453 }
454
455 /* Free the now unused job from the global_queue */
456 free(node);
457 }
460 /* Clean up. */
461 tq_destroy( idle );
462 tq_destroy( stopped );
463 free( threadargs );
464
465 return 0;
466}
467
473int threadpool_init (void)
474{
475 MAXTHREADS = SDL_GetCPUCount() + 1; /* SDL 1.3 is pretty cool. */
476
477 /* There's already a queue */
478 if (global_queue != NULL) {
479 WARN(_("Threadpool has already been initialized!"));
480 return -1;
481 }
482
483 /* Create the global queue queue */
484 global_queue = tq_create();
485
486 /* Initialize the threadpool handler. */
487 if ( SDL_CreateThread( threadpool_handler, "threadpool_handler", NULL ) == NULL ) {
488 ERR( _( "Threadpool init failed: %s" ), SDL_GetError() );
489 return -1;
490 }
491
492 return 0;
493}
494
509ThreadQueue* vpool_create (void)
510{
511 return tq_create();
512}
513
523void vpool_enqueue( ThreadQueue *queue, int (*function)(void *), void *data )
524{
525 ThreadQueueData *node;
526
527 /* Allocate and set up data. */
528 node = calloc( 1, sizeof(ThreadQueueData) );
529 node->data = data;
530 node->function = function;
531
532 /* Add to vpool. */
533 tq_enqueue( queue, node );
534}
535
542static int vpool_worker( void *data )
543{
544 int cnt;
545 vpoolThreadData *work;
546
547 work = (vpoolThreadData*) data;
548
549 /* Do work */
550 work->node->function( work->node->data );
551
552 /* Decrement the counter and signal vpool_wait if all threads are done */
553 SDL_mutexP( work->mutex );
554 cnt = *(work->count) - 1;
555 if (cnt <= 0) /* All jobs done. */
556 SDL_CondSignal( work->cond ); /* Signal waiting thread */
557 *(work->count) = cnt;
558 SDL_mutexV( work->mutex );
559
560 return 0;
561}
562
563/* @brief Run every job in the vpool queue and block until every job in the
564 * queue is done.
565 *
566 * @note It destroys the queue when it's done.
567 */
568void vpool_wait( ThreadQueue *queue )
569{
570 int i, cnt;
571 SDL_cond *cond;
572 SDL_mutex *mutex;
573 vpoolThreadData *arg;
574 ThreadQueueData *node;
575
576 /* Create temporary threading structures. */
577 cond = SDL_CreateCond();
578 mutex = SDL_CreateMutex();
579 /* This might be a little ugly (and inefficient?) */
580 cnt = SDL_SemValue( queue->semaphore );
581
582 /* Allocate all vpoolThreadData objects */
583 arg = calloc( cnt, sizeof(vpoolThreadData) );
584
585 SDL_mutexP( mutex );
586 /* Initialize the vpoolThreadData */
587 for (i=0; i<cnt; i++) {
588 /* This is needed to keep the invariants of the queue */
589 while (SDL_SemWait( queue->semaphore ) == -1) {
590 /* Again, a really bad idea */
591 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
592 }
593 node = tq_dequeue( queue );
594
595 /* Set up arguments. */
596 arg[i].node = node;
597 arg[i].cond = cond;
598 arg[i].mutex = mutex;
599 arg[i].count = &cnt;
600
601 /* Launch new job. */
602 threadpool_newJob( vpool_worker, &arg[i] );
603 }
604
605 /* Wait for the threads to finish */
606 SDL_CondWait( cond, mutex );
607 SDL_mutexV( mutex );
608
609 /* Clean up */
610 SDL_DestroyMutex( mutex );
611 SDL_DestroyCond( cond );
612 tq_destroy( queue );
613 free( arg );
614}
static const double d[]
Definition: rng.c:273
Node struct.
Definition: queue.c:25
void * data
Definition: queue.c:26
Node next
Definition: queue.c:27
Node in the thread queue.
Definition: threadpool.c:55
Thread data.
Definition: threadpool.c:83
Data for the threadqueue.
Definition: threadpool.c:75
Threadqueue itself.
Definition: threadpool.c:63
Virtual thread pool data.
Definition: threadpool.c:96