From 60a2298e62af40886e93efb6075aeacd9fa7a72b Mon Sep 17 00:00:00 2001 From: lxsang Date: Mon, 1 Oct 2018 22:49:20 +0200 Subject: [PATCH] fix sync --- libs/scheduler.c | 216 +++++++++++++++-------------------------------- libs/scheduler.h | 53 +++++------- relay.c | 11 +-- 3 files changed, 93 insertions(+), 187 deletions(-) diff --git a/libs/scheduler.c b/libs/scheduler.c index 3db1213..343fed7 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -1,20 +1,10 @@ #include "scheduler.h" -/* -private data & methods -*/ -static antd_scheduler_t scheduler; static void enqueue(antd_task_queue_t* q, antd_task_t* task) { antd_task_item_t it = *q; - while(it && it->task->id != task->id && it->next != NULL) + while(it && it->next != NULL) it = it->next; - if(it && it->task->id == task->id) - { - LOG("Task %d exists, ignore it\n", task->id); - //assert(it->task->id == task->id ); - return; - } antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); taski->task = task; taski->next = NULL; @@ -28,28 +18,17 @@ static void enqueue(antd_task_queue_t* q, antd_task_t* task) } } -static int working() -{ - int stat; - pthread_mutex_lock(&scheduler.scheduler_lock); - stat = scheduler.status; - pthread_mutex_unlock(&scheduler.scheduler_lock); - return stat; -} -static void stop() +static void stop(antd_scheduler_t* scheduler) { - pthread_mutex_lock(&scheduler.scheduler_lock); - scheduler.status = 0; - pthread_mutex_unlock(&scheduler.scheduler_lock); - for (int i = 0; i < scheduler.n_workers; i++) - pthread_join(scheduler.workers[i].pid, NULL); - if(scheduler.workers) free(scheduler.workers); + scheduler->status = 0; + for (int i = 0; i < scheduler->n_workers; i++) + pthread_join(scheduler->workers[i], NULL); + if(scheduler->workers) free(scheduler->workers); // destroy all the mutex - pthread_mutex_destroy(&scheduler.scheduler_lock); - pthread_mutex_destroy(&scheduler.task_lock); - pthread_mutex_destroy(&scheduler.queue_lock); - pthread_mutex_destroy(&scheduler.worker_lock); + pthread_mutex_destroy(&scheduler->scheduler_lock); + pthread_mutex_destroy(&scheduler->worker_lock); + pthread_mutex_destroy(&scheduler->pending_lock); } static antd_task_item_t dequeue(antd_task_queue_t* q) @@ -63,15 +42,6 @@ static antd_task_item_t dequeue(antd_task_queue_t* q) return it; } -static antd_task_item_t next_task() -{ - antd_task_item_t it = NULL; - pthread_mutex_lock(&scheduler.queue_lock); - it = dequeue(&scheduler.workers_queue); - pthread_mutex_unlock(&scheduler.queue_lock); - return it; -} - static antd_callback_t* callback_of( void* (*callback)(void*) ) { @@ -106,7 +76,7 @@ static void enqueue_callback(antd_callback_t* cb, antd_callback_t* el) it->next = el; } -static void execute_callback(antd_task_t* task) +static void execute_callback(antd_scheduler_t* scheduler, antd_task_t* task) { antd_callback_t* cb = task->callback; if(cb) @@ -115,21 +85,13 @@ static void execute_callback(antd_task_t* task) task->handle = cb->handle; task->callback = task->callback->next; free(cb); - antd_add_task(task); + antd_add_task(scheduler, task); } else { free(task); } } -static void work(void* data) -{ - antd_worker_t* worker = (antd_worker_t*)data; - while(working()) - { - antd_attach_task(worker); - } -} static void destroy_queue(antd_task_queue_t q) { @@ -146,108 +108,68 @@ static void destroy_queue(antd_task_queue_t q) free(curr); } } -static int antd_has_pending_task() +static void work(antd_scheduler_t* scheduler) { - int ret = 0; - - for(int i = 0; i < N_PRIORITY; i++) - if(scheduler.task_queue[i] != NULL) - { - ret = 1; - break; - } - if(!ret) + while(scheduler->status) { - ret = (scheduler.workers_queue != NULL); + antd_task_item_t it; + pthread_mutex_lock(&scheduler->worker_lock); + it = dequeue(&scheduler->workers_queue); + pthread_mutex_unlock(&scheduler->worker_lock); + // execute the task + //LOG("task executed by worker %d\n", worker->pid); + antd_execute_task(scheduler, it); } - - return ret; -} -static int antd_available_workers() -{ - int n = 0; - //pthread_mutex_lock(&scheduler.worker_lock); - for(int i=0; i < scheduler.n_workers; i++) - if(scheduler.workers[i].status == 0) n++; - //pthread_mutex_unlock(&scheduler.worker_lock); - return n; } + /* Main API methods init the main scheduler */ -/* -* assign task to a worker -*/ -void antd_attach_task(antd_worker_t* worker) -{ - antd_task_item_t it; - pthread_mutex_lock(&scheduler.worker_lock); - it = next_task(); - worker->status = 0; - if(it) - worker->status = 1; - pthread_mutex_unlock(&scheduler.worker_lock); - // execute the task - //LOG("task executed by worker %d\n", worker->pid); - antd_execute_task(it); -} -void antd_scheduler_init(int n) +void antd_scheduler_init(antd_scheduler_t* scheduler, int n) { - time_t t; - srand((unsigned) time(&t)); - scheduler.n_workers = n; - scheduler.status = 1; - scheduler.workers_queue = NULL; + scheduler->n_workers = n; + scheduler->status = 1; + scheduler->workers_queue = NULL; + scheduler->pending_task = 0 ; // init lock - pthread_mutex_init(&scheduler.scheduler_lock,NULL); - pthread_mutex_init(&scheduler.task_lock,NULL); - pthread_mutex_init(&scheduler.worker_lock,NULL); - pthread_mutex_init(&scheduler.queue_lock,NULL); - for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; + pthread_mutex_init(&scheduler->scheduler_lock,NULL); + pthread_mutex_init(&scheduler->worker_lock, NULL); + pthread_mutex_init(&scheduler->pending_lock, NULL); + for(int i = 0; i < N_PRIORITY; i++) scheduler->task_queue[i] = NULL; // create scheduler.workers if(n > 0) { - scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); - if(!scheduler.workers) + scheduler->workers = (pthread_t*)malloc(n*(sizeof(pthread_t))); + if(!scheduler->workers) { LOG("Cannot allocate memory for worker\n"); exit(-1); } - for(int i = 0; i < scheduler.n_workers;i++) + for(int i = 0; i < scheduler->n_workers;i++) { - scheduler.workers[i].status = 0; - if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + if (pthread_create(&scheduler->workers[i], NULL,(void *(*)(void *))work, (void*)scheduler) != 0) { - scheduler.workers[i].status = -1; perror("pthread_create: cannot create worker\n"); } } } - LOG("Antd scheduler initialized with %d worker\n", scheduler.n_workers); -} -void antd_task_lock() -{ - pthread_mutex_lock(&scheduler.task_lock); -} -void antd_task_unlock() -{ - pthread_mutex_unlock(&scheduler.task_lock); + LOG("Antd scheduler initialized with %d worker\n", scheduler->n_workers); } /* destroy all pending task pthread_mutex_lock(&scheduler.queue_lock); */ -void antd_scheduler_destroy() +void antd_scheduler_destroy(antd_scheduler_t* scheduler) { // free all the chains - stop(); + stop(scheduler); for(int i=0; i < N_PRIORITY; i++) { - destroy_queue(scheduler.task_queue[i]); + destroy_queue(scheduler->task_queue[i]); } - destroy_queue(scheduler.workers_queue); + destroy_queue(scheduler->workers_queue); } /* @@ -257,7 +179,6 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba { antd_task_t* task = (antd_task_t*)malloc(sizeof *task); task->stamp = (unsigned long)time(NULL); - task->id = rand(); task->data = data; task->handle = handle; task->callback = callback_of(callback); @@ -269,26 +190,30 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba /* scheduling a task */ -void antd_add_task(antd_task_t* task) +void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task) { // check if task is exist int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; - pthread_mutex_lock(&scheduler.scheduler_lock); - enqueue(&scheduler.task_queue[prio], task); - pthread_mutex_unlock(&scheduler.scheduler_lock); + pthread_mutex_lock(&scheduler->scheduler_lock); + enqueue(&scheduler->task_queue[prio], task); + pthread_mutex_unlock(&scheduler->scheduler_lock); + pthread_mutex_lock(&scheduler->pending_lock); + scheduler->pending_task++; + pthread_mutex_unlock(&scheduler->pending_lock); } -void antd_execute_task(antd_task_item_t taski) +void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) { - if(!taski) return; + if(!taski) + return; // execute the task void *ret = (*(taski->task->handle))(taski->task->data); // check the return data if it is a new task if(!ret) { // call the first callback - execute_callback(taski->task); + execute_callback(scheduler, taski->task); free(taski); } else @@ -308,65 +233,58 @@ void antd_execute_task(antd_task_item_t taski) if(!rtask->handle) { // call the first callback - execute_callback(rtask); + execute_callback(scheduler, rtask); free(taski->task); free(taski); } else { - antd_add_task(rtask); + antd_add_task(scheduler, rtask); free(taski->task); free(taski); } } + pthread_mutex_lock(&scheduler->pending_lock); + scheduler->pending_task--; + pthread_mutex_unlock(&scheduler->pending_lock); } -int antd_scheduler_busy() +int antd_scheduler_busy(antd_scheduler_t* scheduler) { - pthread_mutex_lock(&scheduler.worker_lock); - pthread_mutex_lock(&scheduler.scheduler_lock); - pthread_mutex_lock(&scheduler.queue_lock); - int ret = (antd_available_workers() != scheduler.n_workers) || antd_has_pending_task(); - pthread_mutex_unlock(&scheduler.queue_lock); - pthread_mutex_unlock(&scheduler.scheduler_lock); - pthread_mutex_unlock(&scheduler.worker_lock); - return ret; + return scheduler->pending_task != 0; } -int antd_scheduler_status() -{ - return scheduler.status; -} -void antd_task_schedule() + +void antd_task_schedule(antd_scheduler_t* scheduler) { // fetch next task from the task_queue antd_task_item_t it = NULL; - pthread_mutex_lock(&scheduler.scheduler_lock); + pthread_mutex_lock(&scheduler->scheduler_lock); for(int i = 0; i< N_PRIORITY; i++) { - it = dequeue(&scheduler.task_queue[i]); + it = dequeue(&scheduler->task_queue[i]); if(it) break; } - pthread_mutex_unlock(&scheduler.scheduler_lock); + pthread_mutex_unlock(&scheduler->scheduler_lock); if(!it) { return; } // has the task now // check the type of tas - if(it->task->type == LIGHT || scheduler.n_workers <= 0) + if(it->task->type == LIGHT || scheduler->n_workers <= 0) { // do it by myself - antd_execute_task(it); + antd_execute_task( scheduler, it); } else { // delegate to other workers by //pushing to the worker queue - pthread_mutex_lock(&scheduler.queue_lock); - enqueue(&scheduler.workers_queue, it->task); + pthread_mutex_lock(&scheduler->worker_lock); + enqueue(&scheduler->workers_queue, it->task); + pthread_mutex_unlock(&scheduler->worker_lock); free(it); - pthread_mutex_unlock(&scheduler.queue_lock); } } \ No newline at end of file diff --git a/libs/scheduler.h b/libs/scheduler.h index e025fa7..4c9747d 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -3,7 +3,7 @@ #include "utils.h" #include -// thread pool of workers + #define N_PRIORITY 10 #define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) #define LOW_PRIORITY (N_PRIORITY - 1) @@ -22,11 +22,7 @@ typedef struct { */ unsigned long stamp; /* - unique id - */ - int id; - /* - priority from 0 to 9 + priority from 0 to N_PRIORITY - 1 higher value is lower priority */ uint8_t priority; @@ -41,18 +37,12 @@ typedef struct { void * data; /* type of a task - light task is executed directly by - the leader - heavy tasks is delegated to workers + light tasks are executed directly + heavy tasks are delegated to workers */ antd_task_type_t type; } antd_task_t; -typedef struct { - pthread_t pid; - uint8_t status; // -1 quit, 0 available, 1 busy -} antd_worker_t; - typedef struct __task_item_t{ antd_task_t* task; @@ -62,25 +52,25 @@ typedef struct __task_item_t{ typedef antd_task_item_t antd_task_queue_t; typedef struct { - pthread_mutex_t queue_lock; pthread_mutex_t scheduler_lock; pthread_mutex_t worker_lock; - pthread_mutex_t task_lock; + pthread_mutex_t pending_lock; antd_task_queue_t task_queue[N_PRIORITY]; antd_task_queue_t workers_queue; uint8_t status; // 0 stop, 1 working - antd_worker_t* workers; + pthread_t* workers; int n_workers; + int pending_task; } antd_scheduler_t; /* init the main scheduler */ -void antd_scheduler_init(); +void antd_scheduler_init(antd_scheduler_t*, int); /* destroy all pending task */ -void antd_scheduler_destroy(); +void antd_scheduler_destroy(antd_scheduler_t*); /* create a task @@ -88,22 +78,19 @@ void antd_scheduler_destroy(); antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*)); /* - scheduling a task + add a task */ -void antd_add_task(antd_task_t*); - -void antd_task_lock(); -void antd_task_unlock(); -/* - Execute a task -*/ -int antd_scheduler_status(); +void antd_add_task(antd_scheduler_t*, antd_task_t*); /* execute and free a task a task */ -void antd_execute_task(antd_task_item_t); - -int antd_scheduler_busy(); -void antd_attach_task(antd_worker_t* worker); -void antd_task_schedule(); +void antd_execute_task(antd_scheduler_t*, antd_task_item_t); +/* + scheduler status +*/ +int antd_scheduler_busy(antd_scheduler_t*); +/* + schedule a task +*/ +void antd_task_schedule(antd_scheduler_t*); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index 48b64e4..0fe8327 100644 --- a/relay.c +++ b/relay.c @@ -1,6 +1,7 @@ #include "http_server.h" #include "libs/scheduler.h" #include +static antd_scheduler_t scheduler; /* this node is a relay from the http to https @@ -9,7 +10,7 @@ to https int server_sock = -1; void stop_serve(int dummy) { UNUSED(dummy); - antd_scheduler_destroy(); + antd_scheduler_destroy(&scheduler); close(server_sock); } /* @@ -89,15 +90,15 @@ int main(int argc, char* argv[]) //timeout.tv_sec = 0; //timeout.tv_usec = 500; // 0 worker - antd_scheduler_init(0); + antd_scheduler_init(&scheduler, 0); // set server socket to non blocking fcntl(server_sock, F_SETFL, O_NONBLOCK); /* Change the socket into non-blocking state */ LOG("relayd running on port %d\n", port); - while (antd_scheduler_status()) + while (scheduler.status) { // execute task - antd_task_schedule(); + antd_task_schedule(&scheduler); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { @@ -120,7 +121,7 @@ int main(int argc, char* argv[]) client->ip = strdup(inet_ntoa(client_name.sin_addr)); client->sock = client_sock; //accept_request(&client); - antd_add_task(antd_create_task(antd_get_host,(void*)client, antd_free_client )); + antd_add_task(&scheduler, antd_create_task(antd_get_host,(void*)client, antd_free_client )); } return(0);