33#include "SDL_thread.h"
36#include "threadpool.h"
41#define THREADPOOL_TIMEOUT (5 * 100)
42#define THREADSIG_STOP (1)
43#define THREADSIG_RUN (0)
49static int MAXTHREADS = 8;
75typedef struct ThreadQueueData_ {
76 int (*function)(
void *);
83typedef struct ThreadData_ {
84 int (*function)(
void *);
96typedef struct vpoolThreadData_ {
105static ThreadQueue *global_queue = NULL;
111static ThreadQueue* tq_create (
void);
112static void tq_enqueue( ThreadQueue *q,
void *data );
113static void* tq_dequeue( ThreadQueue *q );
114static void tq_destroy( ThreadQueue *q );
115static int threadpool_worker(
void *data );
116static int threadpool_handler(
void *data );
117static int vpool_worker(
void *data );
129static ThreadQueue* tq_create (
void)
135 q = calloc( 1,
sizeof(ThreadQueue) );
138 n = calloc( 1,
sizeof(
Node) );
144 q->t_lock = SDL_CreateMutex();
145 q->h_lock = SDL_CreateMutex();
146 q->semaphore = SDL_CreateSemaphore( 0 );
157static void tq_enqueue( ThreadQueue *q,
void *data )
162 n = calloc( 1,
sizeof(
Node) );
167 SDL_mutexP( q->t_lock );
175 SDL_SemPost( q->semaphore );
176 SDL_mutexV( q->t_lock );
187static void* tq_dequeue( ThreadQueue *q )
190 Node *newhead, *node;
193 SDL_mutexP( q->h_lock );
197 newhead = node->next;
200 if (newhead == NULL) {
201 WARN(_(
"Tried to dequeue while the queue was empty!"));
210 newhead = node->next;
211 }
while (newhead == NULL);
219 SDL_mutexV( q->h_lock );
232static void tq_destroy( ThreadQueue *q )
235 while (q->first->next != NULL)
236 free( tq_dequeue(q) );
239 SDL_DestroySemaphore( q->semaphore );
240 SDL_DestroyMutex( q->h_lock );
241 SDL_DestroyMutex( q->t_lock );
258int threadpool_newJob(
int (*function)(
void *),
void *data )
262 if (global_queue == NULL) {
263 WARN(_(
"Threadpool has not been initialized yet!"));
270 node->function = function;
273 tq_enqueue( global_queue, node );
287static int threadpool_worker(
void *data )
296 while (SDL_SemWait( work->semaphore ) == -1) {
299 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
302 if (work->signal == THREADSIG_STOP)
306 work->function( work->data );
309 tq_enqueue( work->idle, work );
312 tq_enqueue( work->stopped, work );
337static int threadpool_handler(
void *data )
340 int i, nrunning, newthread;
343 ThreadQueue *idle, *stopped;
348 stopped = tq_create();
351 threadargs = calloc( MAXTHREADS,
sizeof(
ThreadData) );
354 for (i=0; i<MAXTHREADS; i++) {
355 threadargs[i].function = NULL;
356 threadargs[i].data = NULL;
357 threadargs[i].semaphore = SDL_CreateSemaphore( 0 );
358 threadargs[i].idle = idle;
359 threadargs[i].stopped = stopped;
360 threadargs[i].signal = THREADSIG_RUN;
362 tq_enqueue( stopped, &threadargs[i] );
382 if (SDL_SemWaitTimeout( global_queue->semaphore, THREADPOOL_TIMEOUT ) != 0) {
384 if (SDL_SemTryWait( idle->semaphore ) == 0) {
385 threadarg = tq_dequeue( idle );
387 threadarg->signal = THREADSIG_STOP;
389 SDL_SemPost( threadarg->semaphore );
404 if (SDL_SemWait( global_queue->semaphore ) == -1) {
405 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
416 node = tq_dequeue( global_queue );
424 if (SDL_SemTryWait(idle->semaphore) == 0)
425 threadarg = tq_dequeue( idle );
427 else if (SDL_SemTryWait(stopped->semaphore) == 0) {
428 threadarg = tq_dequeue( stopped );
429 threadarg->signal = THREADSIG_RUN;
434 while (SDL_SemWait(idle->semaphore) == -1) {
436 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
438 threadarg = tq_dequeue( idle );
442 threadarg->function = node->function;
443 threadarg->data = node->data;
445 SDL_SemPost( threadarg->semaphore );
449 SDL_CreateThread( threadpool_worker,
462 tq_destroy( stopped );
473int threadpool_init (
void)
475 MAXTHREADS = SDL_GetCPUCount() + 1;
478 if (global_queue != NULL) {
479 WARN(_(
"Threadpool has already been initialized!"));
484 global_queue = tq_create();
487 if ( SDL_CreateThread( threadpool_handler,
"threadpool_handler", NULL ) == NULL ) {
488 ERR( _(
"Threadpool init failed: %s" ), SDL_GetError() );
509ThreadQueue* vpool_create (
void)
523void vpool_enqueue( ThreadQueue *queue,
int (*function)(
void *),
void *data )
530 node->function = function;
533 tq_enqueue( queue, node );
542static int vpool_worker(
void *data )
550 work->node->function( work->node->data );
553 SDL_mutexP( work->mutex );
554 cnt = *(work->count) - 1;
556 SDL_CondSignal( work->cond );
557 *(work->count) = cnt;
558 SDL_mutexV( work->mutex );
568void vpool_wait( ThreadQueue *queue )
577 cond = SDL_CreateCond();
578 mutex = SDL_CreateMutex();
580 cnt = SDL_SemValue( queue->semaphore );
587 for (i=0; i<cnt; i++) {
589 while (SDL_SemWait( queue->semaphore ) == -1) {
591 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
593 node = tq_dequeue( queue );
598 arg[i].mutex = mutex;
602 threadpool_newJob( vpool_worker, &arg[i] );
606 SDL_CondWait( cond, mutex );
610 SDL_DestroyMutex( mutex );
611 SDL_DestroyCond( cond );
Node in the thread queue.
Data for the threadqueue.
Virtual thread pool data.