diff --git a/httpd.c b/httpd.c index c5e8ae3..cb386e7 100644 --- a/httpd.c +++ b/httpd.c @@ -95,7 +95,8 @@ void stop_serve(int dummy) { ERR_remove_state(0); ERR_free_strings(); #endif - close(server_sock); + if(server_sock != -1) + close(server_sock); sigprocmask(SIG_UNBLOCK, &mask, NULL); } int main(int argc, char* argv[]) @@ -130,23 +131,28 @@ int main(int argc, char* argv[]) LOG("httpd running on port %d\n", port); // default to 4 workers antd_scheduler_init(&scheduler, config()->n_workers); - set_nonblock(server_sock); - int stat = 0; - struct timespec ts_sleep; + // use blocking server_sock + // make the scheduler wait for event on another thread + // this allow to ged rid of high cpu usage on + // endless loop without doing anything + // set_nonblock(server_sock); + pthread_t scheduler_th; + if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_wait, (void*)&scheduler) != 0) + { + perror("pthread_create: cannot create worker\n"); + stop_serve(0); + exit(1); + } + else + { + // reclaim data when exit + pthread_detach(scheduler_th); + } while (scheduler.status) { - stat = antd_task_schedule(&scheduler); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { - if(!stat) - { - // sleep for 500usec if - // there is nothing todo - ts_sleep.tv_sec = 0; - ts_sleep.tv_nsec = 500000; - nanosleep(&ts_sleep, NULL); - } continue; } antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); diff --git a/libs/scheduler.c b/libs/scheduler.c index a8c6e91..97bc7cc 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -22,13 +22,20 @@ static void enqueue(antd_task_queue_t* q, antd_task_t* task) static void stop(antd_scheduler_t* scheduler) { scheduler->status = 0; + // unlock all idle workers if any for (int i = 0; i < scheduler->n_workers; i++) - pthread_join(scheduler->workers[i], NULL); + sem_post(&scheduler->worker_sem); + sem_post(&scheduler->scheduler_sem); + for (int i = 0; i < scheduler->n_workers; i++) + if(scheduler->workers[i].id != -1) + pthread_join(scheduler->workers[i].tid, NULL); if(scheduler->workers) free(scheduler->workers); // destroy all the mutex pthread_mutex_destroy(&scheduler->scheduler_lock); pthread_mutex_destroy(&scheduler->worker_lock); pthread_mutex_destroy(&scheduler->pending_lock); + sem_destroy(&scheduler->scheduler_sem); + sem_destroy(&scheduler->worker_sem); } static antd_task_item_t dequeue(antd_task_queue_t* q) @@ -108,8 +115,9 @@ static void destroy_queue(antd_task_queue_t q) free(curr); } } -static void work(antd_scheduler_t* scheduler) +static void work(antd_worker_t* worker) { + antd_scheduler_t* scheduler = (antd_scheduler_t*) worker->manager; while(scheduler->status) { antd_task_item_t it; @@ -118,16 +126,18 @@ static void work(antd_scheduler_t* scheduler) pthread_mutex_unlock(&scheduler->worker_lock); // execute the task //LOG("task executed by worker %d\n", worker->pid); - // no task to execute, just sleep for 500usec + // no task to execute, just sleep wait if(!it) { - struct timespec ts_sleep; - ts_sleep.tv_sec = 0; - ts_sleep.tv_nsec = 500000; - nanosleep(&ts_sleep, NULL); - continue; + //LOG("Worker %d goes to idle state\n", worker->id); + sem_wait(&scheduler->worker_sem); } - antd_execute_task(scheduler, it); + else + { + //LOG("task executed by worker %d\n", worker->id); + antd_execute_task(scheduler, it); + } + } } @@ -142,7 +152,18 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n) scheduler->status = 1; scheduler->workers_queue = NULL; scheduler->pending_task = 0 ; - // init lock + // init semaphore + if (sem_init(&scheduler->scheduler_sem, 0, 0) == -1) + { + LOG("Cannot init semaphore for scheduler\n"); + exit(-1); + } + if (sem_init(&scheduler->worker_sem, 0, 0) == -1) + { + LOG("Cannot init semaphore for workers\n"); + exit(-1); + } + // init lock pthread_mutex_init(&scheduler->scheduler_lock,NULL); pthread_mutex_init(&scheduler->worker_lock, NULL); pthread_mutex_init(&scheduler->pending_lock, NULL); @@ -150,7 +171,7 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n) // create scheduler.workers if(n > 0) { - scheduler->workers = (pthread_t*)malloc(n*(sizeof(pthread_t))); + scheduler->workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); if(!scheduler->workers) { LOG("Cannot allocate memory for worker\n"); @@ -158,10 +179,16 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n) } for(int i = 0; i < scheduler->n_workers;i++) { - if (pthread_create(&scheduler->workers[i], NULL,(void *(*)(void *))work, (void*)scheduler) != 0) + scheduler->workers[i].id = -1; + scheduler->workers[i].manager = (void*)scheduler; + if (pthread_create(&scheduler->workers[i].tid, NULL,(void *(*)(void *))work, (void*)&scheduler->workers[i]) != 0) { perror("pthread_create: cannot create worker\n"); } + else + { + scheduler->workers[i].id = i; + } } } LOG("Antd scheduler initialized with %d worker\n", scheduler->n_workers); @@ -211,6 +238,8 @@ void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task) pthread_mutex_lock(&scheduler->pending_lock); scheduler->pending_task++; pthread_mutex_unlock(&scheduler->pending_lock); + // wake up the scheduler if idle + sem_post(&scheduler->scheduler_sem); } @@ -283,7 +312,7 @@ int antd_task_schedule(antd_scheduler_t* scheduler) return 0; } // has the task now - // check the type of tas + // check the type of task if(it->task->type == LIGHT || scheduler->n_workers <= 0) { // do it by myself @@ -296,7 +325,22 @@ int antd_task_schedule(antd_scheduler_t* scheduler) pthread_mutex_lock(&scheduler->worker_lock); enqueue(&scheduler->workers_queue, it->task); pthread_mutex_unlock(&scheduler->worker_lock); + // wake up idle worker + sem_post(&scheduler->worker_sem); free(it); } return 1; +} +void antd_wait(antd_scheduler_t* scheduler) +{ + int stat; + while(scheduler->status) + { + stat = antd_task_schedule(scheduler); + if(!stat) + { + // no task found, go to idle state + sem_wait(&scheduler->scheduler_sem); + } + } } \ No newline at end of file diff --git a/libs/scheduler.h b/libs/scheduler.h index 349c1f2..4b3cc55 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -3,6 +3,7 @@ #include "utils.h" #include +#include #define N_PRIORITY 10 #define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) @@ -52,13 +53,24 @@ typedef struct __task_item_t{ typedef antd_task_item_t antd_task_queue_t; typedef struct { + int id; + pthread_t tid; + void* manager; +} antd_worker_t; + +typedef struct { + // data lock pthread_mutex_t scheduler_lock; pthread_mutex_t worker_lock; pthread_mutex_t pending_lock; + // event handle + sem_t scheduler_sem; + sem_t worker_sem; + // worker and data antd_task_queue_t task_queue[N_PRIORITY]; antd_task_queue_t workers_queue; uint8_t status; // 0 stop, 1 working - pthread_t* workers; + antd_worker_t* workers; int n_workers; int pending_task; } antd_scheduler_t; @@ -93,4 +105,8 @@ int antd_scheduler_busy(antd_scheduler_t*); schedule a task */ int antd_task_schedule(antd_scheduler_t*); +/* +wait for event +*/ +void antd_wait(antd_scheduler_t*); #endif \ No newline at end of file