1
0
mirror of https://github.com/lxsang/ant-http synced 2024-07-03 13:39:46 +02:00

reduce cpu usage using semaphore

This commit is contained in:
lxsang 2018-10-10 12:42:47 +02:00
parent d60109fcf9
commit 592fc88587
3 changed files with 93 additions and 27 deletions

32
httpd.c
View File

@ -95,7 +95,8 @@ void stop_serve(int dummy) {
ERR_remove_state(0); ERR_remove_state(0);
ERR_free_strings(); ERR_free_strings();
#endif #endif
close(server_sock); if(server_sock != -1)
close(server_sock);
sigprocmask(SIG_UNBLOCK, &mask, NULL); sigprocmask(SIG_UNBLOCK, &mask, NULL);
} }
int main(int argc, char* argv[]) int main(int argc, char* argv[])
@ -130,23 +131,28 @@ int main(int argc, char* argv[])
LOG("httpd running on port %d\n", port); LOG("httpd running on port %d\n", port);
// default to 4 workers // default to 4 workers
antd_scheduler_init(&scheduler, config()->n_workers); antd_scheduler_init(&scheduler, config()->n_workers);
set_nonblock(server_sock); // use blocking server_sock
int stat = 0; // make the scheduler wait for event on another thread
struct timespec ts_sleep; // this allow to ged rid of high cpu usage on
// endless loop without doing anything
// set_nonblock(server_sock);
pthread_t scheduler_th;
if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_wait, (void*)&scheduler) != 0)
{
perror("pthread_create: cannot create worker\n");
stop_serve(0);
exit(1);
}
else
{
// reclaim data when exit
pthread_detach(scheduler_th);
}
while (scheduler.status) while (scheduler.status)
{ {
stat = antd_task_schedule(&scheduler);
client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len);
if (client_sock == -1) if (client_sock == -1)
{ {
if(!stat)
{
// sleep for 500usec if
// there is nothing todo
ts_sleep.tv_sec = 0;
ts_sleep.tv_nsec = 500000;
nanosleep(&ts_sleep, NULL);
}
continue; continue;
} }
antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t));

View File

@ -22,13 +22,20 @@ static void enqueue(antd_task_queue_t* q, antd_task_t* task)
static void stop(antd_scheduler_t* scheduler) static void stop(antd_scheduler_t* scheduler)
{ {
scheduler->status = 0; scheduler->status = 0;
// unlock all idle workers if any
for (int i = 0; i < scheduler->n_workers; i++) for (int i = 0; i < scheduler->n_workers; i++)
pthread_join(scheduler->workers[i], NULL); sem_post(&scheduler->worker_sem);
sem_post(&scheduler->scheduler_sem);
for (int i = 0; i < scheduler->n_workers; i++)
if(scheduler->workers[i].id != -1)
pthread_join(scheduler->workers[i].tid, NULL);
if(scheduler->workers) free(scheduler->workers); if(scheduler->workers) free(scheduler->workers);
// destroy all the mutex // destroy all the mutex
pthread_mutex_destroy(&scheduler->scheduler_lock); pthread_mutex_destroy(&scheduler->scheduler_lock);
pthread_mutex_destroy(&scheduler->worker_lock); pthread_mutex_destroy(&scheduler->worker_lock);
pthread_mutex_destroy(&scheduler->pending_lock); pthread_mutex_destroy(&scheduler->pending_lock);
sem_destroy(&scheduler->scheduler_sem);
sem_destroy(&scheduler->worker_sem);
} }
static antd_task_item_t dequeue(antd_task_queue_t* q) static antd_task_item_t dequeue(antd_task_queue_t* q)
@ -108,8 +115,9 @@ static void destroy_queue(antd_task_queue_t q)
free(curr); free(curr);
} }
} }
static void work(antd_scheduler_t* scheduler) static void work(antd_worker_t* worker)
{ {
antd_scheduler_t* scheduler = (antd_scheduler_t*) worker->manager;
while(scheduler->status) while(scheduler->status)
{ {
antd_task_item_t it; antd_task_item_t it;
@ -118,16 +126,18 @@ static void work(antd_scheduler_t* scheduler)
pthread_mutex_unlock(&scheduler->worker_lock); pthread_mutex_unlock(&scheduler->worker_lock);
// execute the task // execute the task
//LOG("task executed by worker %d\n", worker->pid); //LOG("task executed by worker %d\n", worker->pid);
// no task to execute, just sleep for 500usec // no task to execute, just sleep wait
if(!it) if(!it)
{ {
struct timespec ts_sleep; //LOG("Worker %d goes to idle state\n", worker->id);
ts_sleep.tv_sec = 0; sem_wait(&scheduler->worker_sem);
ts_sleep.tv_nsec = 500000;
nanosleep(&ts_sleep, NULL);
continue;
} }
antd_execute_task(scheduler, it); else
{
//LOG("task executed by worker %d\n", worker->id);
antd_execute_task(scheduler, it);
}
} }
} }
@ -142,7 +152,18 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n)
scheduler->status = 1; scheduler->status = 1;
scheduler->workers_queue = NULL; scheduler->workers_queue = NULL;
scheduler->pending_task = 0 ; scheduler->pending_task = 0 ;
// init lock // init semaphore
if (sem_init(&scheduler->scheduler_sem, 0, 0) == -1)
{
LOG("Cannot init semaphore for scheduler\n");
exit(-1);
}
if (sem_init(&scheduler->worker_sem, 0, 0) == -1)
{
LOG("Cannot init semaphore for workers\n");
exit(-1);
}
// init lock
pthread_mutex_init(&scheduler->scheduler_lock,NULL); pthread_mutex_init(&scheduler->scheduler_lock,NULL);
pthread_mutex_init(&scheduler->worker_lock, NULL); pthread_mutex_init(&scheduler->worker_lock, NULL);
pthread_mutex_init(&scheduler->pending_lock, NULL); pthread_mutex_init(&scheduler->pending_lock, NULL);
@ -150,7 +171,7 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n)
// create scheduler.workers // create scheduler.workers
if(n > 0) if(n > 0)
{ {
scheduler->workers = (pthread_t*)malloc(n*(sizeof(pthread_t))); scheduler->workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t)));
if(!scheduler->workers) if(!scheduler->workers)
{ {
LOG("Cannot allocate memory for worker\n"); LOG("Cannot allocate memory for worker\n");
@ -158,10 +179,16 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n)
} }
for(int i = 0; i < scheduler->n_workers;i++) for(int i = 0; i < scheduler->n_workers;i++)
{ {
if (pthread_create(&scheduler->workers[i], NULL,(void *(*)(void *))work, (void*)scheduler) != 0) scheduler->workers[i].id = -1;
scheduler->workers[i].manager = (void*)scheduler;
if (pthread_create(&scheduler->workers[i].tid, NULL,(void *(*)(void *))work, (void*)&scheduler->workers[i]) != 0)
{ {
perror("pthread_create: cannot create worker\n"); perror("pthread_create: cannot create worker\n");
} }
else
{
scheduler->workers[i].id = i;
}
} }
} }
LOG("Antd scheduler initialized with %d worker\n", scheduler->n_workers); LOG("Antd scheduler initialized with %d worker\n", scheduler->n_workers);
@ -211,6 +238,8 @@ void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task)
pthread_mutex_lock(&scheduler->pending_lock); pthread_mutex_lock(&scheduler->pending_lock);
scheduler->pending_task++; scheduler->pending_task++;
pthread_mutex_unlock(&scheduler->pending_lock); pthread_mutex_unlock(&scheduler->pending_lock);
// wake up the scheduler if idle
sem_post(&scheduler->scheduler_sem);
} }
@ -283,7 +312,7 @@ int antd_task_schedule(antd_scheduler_t* scheduler)
return 0; return 0;
} }
// has the task now // has the task now
// check the type of tas // check the type of task
if(it->task->type == LIGHT || scheduler->n_workers <= 0) if(it->task->type == LIGHT || scheduler->n_workers <= 0)
{ {
// do it by myself // do it by myself
@ -296,7 +325,22 @@ int antd_task_schedule(antd_scheduler_t* scheduler)
pthread_mutex_lock(&scheduler->worker_lock); pthread_mutex_lock(&scheduler->worker_lock);
enqueue(&scheduler->workers_queue, it->task); enqueue(&scheduler->workers_queue, it->task);
pthread_mutex_unlock(&scheduler->worker_lock); pthread_mutex_unlock(&scheduler->worker_lock);
// wake up idle worker
sem_post(&scheduler->worker_sem);
free(it); free(it);
} }
return 1; return 1;
}
void antd_wait(antd_scheduler_t* scheduler)
{
int stat;
while(scheduler->status)
{
stat = antd_task_schedule(scheduler);
if(!stat)
{
// no task found, go to idle state
sem_wait(&scheduler->scheduler_sem);
}
}
} }

View File

@ -3,6 +3,7 @@
#include "utils.h" #include "utils.h"
#include <pthread.h> #include <pthread.h>
#include <semaphore.h>
#define N_PRIORITY 10 #define N_PRIORITY 10
#define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) #define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2))
@ -52,13 +53,24 @@ typedef struct __task_item_t{
typedef antd_task_item_t antd_task_queue_t; typedef antd_task_item_t antd_task_queue_t;
typedef struct { typedef struct {
int id;
pthread_t tid;
void* manager;
} antd_worker_t;
typedef struct {
// data lock
pthread_mutex_t scheduler_lock; pthread_mutex_t scheduler_lock;
pthread_mutex_t worker_lock; pthread_mutex_t worker_lock;
pthread_mutex_t pending_lock; pthread_mutex_t pending_lock;
// event handle
sem_t scheduler_sem;
sem_t worker_sem;
// worker and data
antd_task_queue_t task_queue[N_PRIORITY]; antd_task_queue_t task_queue[N_PRIORITY];
antd_task_queue_t workers_queue; antd_task_queue_t workers_queue;
uint8_t status; // 0 stop, 1 working uint8_t status; // 0 stop, 1 working
pthread_t* workers; antd_worker_t* workers;
int n_workers; int n_workers;
int pending_task; int pending_task;
} antd_scheduler_t; } antd_scheduler_t;
@ -93,4 +105,8 @@ int antd_scheduler_busy(antd_scheduler_t*);
schedule a task schedule a task
*/ */
int antd_task_schedule(antd_scheduler_t*); int antd_task_schedule(antd_scheduler_t*);
/*
wait for event
*/
void antd_wait(antd_scheduler_t*);
#endif #endif