Add an interface abstraction to the WebP worker thread implementation

This allows custom implementations of threading mecanism.

Patch by Leonhard Gruenschloss.

Change-Id: Id8ea5917acd2f24fa8bce79748d1747de2751614
This commit is contained in:
skal
2014-06-12 11:35:44 +02:00
parent d6cd6358ff
commit 24e3080571
6 changed files with 165 additions and 103 deletions

View File

@ -14,11 +14,35 @@
#include <assert.h>
#include <string.h> // for memset()
#include "./thread.h"
#include "./utils.h"
#ifdef WEBP_USE_THREAD
#if defined(_WIN32)
#include <windows.h>
typedef HANDLE pthread_t;
typedef CRITICAL_SECTION pthread_mutex_t;
typedef struct {
HANDLE waiting_sem_;
HANDLE received_sem_;
HANDLE signal_event_;
} pthread_cond_t;
#else // !_WIN32
#include <pthread.h>
#endif // _WIN32
struct WebPWorkerImpl {
pthread_mutex_t mutex_;
pthread_cond_t condition_;
pthread_t thread_;
};
#if defined(_WIN32)
//------------------------------------------------------------------------------
// simplistic pthread emulation layer
@ -129,23 +153,25 @@ static int pthread_cond_wait(pthread_cond_t* const condition,
//------------------------------------------------------------------------------
static void Execute(WebPWorker* const worker); // Forward declaration.
static THREADFN ThreadLoop(void* ptr) {
WebPWorker* const worker = (WebPWorker*)ptr;
int done = 0;
while (!done) {
pthread_mutex_lock(&worker->mutex_);
pthread_mutex_lock(&worker->impl_->mutex_);
while (worker->status_ == OK) { // wait in idling mode
pthread_cond_wait(&worker->condition_, &worker->mutex_);
pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
}
if (worker->status_ == WORK) {
WebPWorkerExecute(worker);
Execute(worker);
worker->status_ = OK;
} else if (worker->status_ == NOT_OK) { // finish the worker
done = 1;
}
// signal to the main thread that we're done (for Sync())
pthread_cond_signal(&worker->condition_);
pthread_mutex_unlock(&worker->mutex_);
pthread_cond_signal(&worker->impl_->condition_);
pthread_mutex_unlock(&worker->impl_->mutex_);
}
return THREAD_RETURN(NULL); // Thread is finished
}
@ -153,32 +179,36 @@ static THREADFN ThreadLoop(void* ptr) {
// main thread state control
static void ChangeState(WebPWorker* const worker,
WebPWorkerStatus new_status) {
// no-op when attempting to change state on a thread that didn't come up
if (worker->status_ < OK) return;
// No-op when attempting to change state on a thread that didn't come up.
// Checking status_ without acquiring the lock first would result in a data
// race.
if (worker->impl_ == NULL) return;
pthread_mutex_lock(&worker->mutex_);
// wait for the worker to finish
while (worker->status_ != OK) {
pthread_cond_wait(&worker->condition_, &worker->mutex_);
pthread_mutex_lock(&worker->impl_->mutex_);
if (worker->status_ >= OK) {
// wait for the worker to finish
while (worker->status_ != OK) {
pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
}
// assign new status and release the working thread if needed
if (new_status != OK) {
worker->status_ = new_status;
pthread_cond_signal(&worker->impl_->condition_);
}
}
// assign new status and release the working thread if needed
if (new_status != OK) {
worker->status_ = new_status;
pthread_cond_signal(&worker->condition_);
}
pthread_mutex_unlock(&worker->mutex_);
pthread_mutex_unlock(&worker->impl_->mutex_);
}
#endif // WEBP_USE_THREAD
//------------------------------------------------------------------------------
void WebPWorkerInit(WebPWorker* const worker) {
static void Init(WebPWorker* const worker) {
memset(worker, 0, sizeof(*worker));
worker->status_ = NOT_OK;
}
int WebPWorkerSync(WebPWorker* const worker) {
static int Sync(WebPWorker* const worker) {
#ifdef WEBP_USE_THREAD
ChangeState(worker, OK);
#endif
@ -186,56 +216,88 @@ int WebPWorkerSync(WebPWorker* const worker) {
return !worker->had_error;
}
int WebPWorkerReset(WebPWorker* const worker) {
static int Reset(WebPWorker* const worker) {
int ok = 1;
worker->had_error = 0;
if (worker->status_ < OK) {
#ifdef WEBP_USE_THREAD
if (pthread_mutex_init(&worker->mutex_, NULL) ||
pthread_cond_init(&worker->condition_, NULL)) {
worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_));
if (worker->impl_ == NULL) {
return 0;
}
pthread_mutex_lock(&worker->mutex_);
ok = !pthread_create(&worker->thread_, NULL, ThreadLoop, worker);
if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) {
goto Error;
}
if (pthread_cond_init(&worker->impl_->condition_, NULL)) {
pthread_mutex_destroy(&worker->impl_->mutex_);
goto Error;
}
pthread_mutex_lock(&worker->impl_->mutex_);
ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker);
if (ok) worker->status_ = OK;
pthread_mutex_unlock(&worker->mutex_);
pthread_mutex_unlock(&worker->impl_->mutex_);
if (!ok) {
pthread_mutex_destroy(&worker->impl_->mutex_);
pthread_cond_destroy(&worker->impl_->condition_);
Error:
WebPSafeFree(worker->impl_);
worker->impl_ = NULL;
return 0;
}
#else
worker->status_ = OK;
#endif
} else if (worker->status_ > OK) {
ok = WebPWorkerSync(worker);
ok = Sync(worker);
}
assert(!ok || (worker->status_ == OK));
return ok;
}
void WebPWorkerExecute(WebPWorker* const worker) {
static void Execute(WebPWorker* const worker) {
if (worker->hook != NULL) {
worker->had_error |= !worker->hook(worker->data1, worker->data2);
}
}
void WebPWorkerLaunch(WebPWorker* const worker) {
static void Launch(WebPWorker* const worker) {
#ifdef WEBP_USE_THREAD
ChangeState(worker, WORK);
#else
WebPWorkerExecute(worker);
Execute(worker);
#endif
}
void WebPWorkerEnd(WebPWorker* const worker) {
static void End(WebPWorker* const worker) {
if (worker->status_ >= OK) {
#ifdef WEBP_USE_THREAD
ChangeState(worker, NOT_OK);
pthread_join(worker->thread_, NULL);
pthread_mutex_destroy(&worker->mutex_);
pthread_cond_destroy(&worker->condition_);
pthread_join(worker->impl_->thread_, NULL);
pthread_mutex_destroy(&worker->impl_->mutex_);
pthread_cond_destroy(&worker->impl_->condition_);
#else
worker->status_ = NOT_OK;
#endif
}
WebPSafeFree(worker->impl_);
worker->impl_ = NULL;
assert(worker->status_ == NOT_OK);
}
//------------------------------------------------------------------------------
static WebPWorkerInterface g_worker_interface = {
Init, Reset, Sync, Launch, Execute, End
};
void WebPSetWorkerInterface(const WebPWorkerInterface* const interface) {
assert(interface != NULL);
g_worker_interface = *interface;
}
const WebPWorkerInterface* WebPGetWorkerInterface(void) {
return &g_worker_interface;
}
//------------------------------------------------------------------------------

View File

@ -18,30 +18,12 @@
#include "../webp/config.h"
#endif
#include "../webp/types.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifdef WEBP_USE_THREAD
#if defined(_WIN32)
#include <windows.h>
typedef HANDLE pthread_t;
typedef CRITICAL_SECTION pthread_mutex_t;
typedef struct {
HANDLE waiting_sem_;
HANDLE received_sem_;
HANDLE signal_event_;
} pthread_cond_t;
#else
#include <pthread.h>
#endif /* _WIN32 */
#endif /* WEBP_USE_THREAD */
// State of the worker thread object
typedef enum {
NOT_OK = 0, // object is unusable
@ -53,13 +35,12 @@ typedef enum {
// arguments (data1 and data2), and should return false in case of error.
typedef int (*WebPWorkerHook)(void*, void*);
// Platform-dependent implementation details for the worker.
typedef struct WebPWorkerImpl WebPWorkerImpl;
// Synchronize object used to launch job in the worker thread
typedef struct {
#ifdef WEBP_USE_THREAD
pthread_mutex_t mutex_;
pthread_cond_t condition_;
pthread_t thread_;
#endif
WebPWorkerImpl* impl_;
WebPWorkerStatus status_;
WebPWorkerHook hook; // hook to call
void* data1; // first argument passed to 'hook'
@ -67,26 +48,41 @@ typedef struct {
int had_error; // return value of the last call to 'hook'
} WebPWorker;
// Must be called first, before any other method.
void WebPWorkerInit(WebPWorker* const worker);
// Must be called to initialize the object and spawn the thread. Re-entrant.
// Will potentially launch the thread. Returns false in case of error.
int WebPWorkerReset(WebPWorker* const worker);
// Makes sure the previous work is finished. Returns true if worker->had_error
// was not set and no error condition was triggered by the working thread.
int WebPWorkerSync(WebPWorker* const worker);
// Triggers the thread to call hook() with data1 and data2 argument. These
// hook/data1/data2 can be changed at any time before calling this function,
// but not be changed afterward until the next call to WebPWorkerSync().
void WebPWorkerLaunch(WebPWorker* const worker);
// This function is similar to WebPWorkerLaunch() except that it calls the
// hook directly instead of using a thread. Convenient to bypass the thread
// mechanism while still using the WebPWorker structs. WebPWorkerSync() must
// still be called afterward (for error reporting).
void WebPWorkerExecute(WebPWorker* const worker);
// Kill the thread and terminate the object. To use the object again, one
// must call WebPWorkerReset() again.
void WebPWorkerEnd(WebPWorker* const worker);
// The interface for all thread-worker related functions. All these functions
// must be implemented.
typedef struct {
// Must be called first, before any other method.
void (*Init)(WebPWorker* const worker);
// Must be called to initialize the object and spawn the thread. Re-entrant.
// Will potentially launch the thread. Returns false in case of error.
int (*Reset)(WebPWorker* const worker);
// Makes sure the previous work is finished. Returns true if worker->had_error
// was not set and no error condition was triggered by the working thread.
int (*Sync)(WebPWorker* const worker);
// Triggers the thread to call hook() with data1 and data2 argument. These
// hook/data1/data2 can be changed at any time before calling this function,
// but not be changed afterward until the next call to WebPWorkerSync().
void (*Launch)(WebPWorker* const worker);
// This function is similar to WebPWorkerLaunch() except that it calls the
// hook directly instead of using a thread. Convenient to bypass the thread
// mechanism while still using the WebPWorker structs. WebPWorkerSync() must
// still be called afterward (for error reporting).
void (*Execute)(WebPWorker* const worker);
// Kill the thread and terminate the object. To use the object again, one
// must call WebPWorkerReset() again.
void (*End)(WebPWorker* const worker);
} WebPWorkerInterface;
// Install a new set of threading functions, overriding the defaults. This
// should be done before any workers are started, i.e. before any encoding or
// decoding takes place. The contents of the interface struct are copied, it
// is safe to free the corresponding memory after this call. This function is
// not thread-safe.
WEBP_EXTERN(void) WebPSetWorkerInterface(
const WebPWorkerInterface* const interface);
// Retrieve the currently set thread worker interface.
WEBP_EXTERN(const WebPWorkerInterface*) WebPGetWorkerInterface(void);
//------------------------------------------------------------------------------