From 90d44d782ffeb7077e356e6376925ec841003b7a Mon Sep 17 00:00:00 2001 From: Xuan Sang LE Date: Wed, 26 Sep 2018 10:30:04 +0200 Subject: [PATCH] use single thread scheduler and worker --- Makefile | 2 +- relayd => forward | 0 libs/scheduler.c | 79 ++++++++++++++++++++++++++--------------------- libs/scheduler.h | 1 + relay.c | 22 ++++++++----- 5 files changed, 60 insertions(+), 44 deletions(-) rename relayd => forward (100%) diff --git a/Makefile b/Makefile index 068ff55..bbb9af4 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ httpd: lib $(SERVER_O) relay: lib $(SERVER_O) $(CC) $(CFLAGS) $(SERVER_O) -o $(BUILDIRD)/relay relay.c $(SERVERLIB) - cp relayd $(BUILDIRD) + cp forward $(BUILDIRD) lib: $(LIBOBJS) $(CC) $(CFLAGS) $(DB_LIB) $(SSL_LIB) -shared -o $(LIB_NAME).$(EXT) $(LIBOBJS) cp $(LIB_NAME).$(EXT) $(LIB_PATH$)/ diff --git a/relayd b/forward similarity index 100% rename from relayd rename to forward diff --git a/libs/scheduler.c b/libs/scheduler.c index 0b04f1b..9751d31 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -80,28 +80,6 @@ static int available_workers() pthread_mutex_unlock(&scheduler.server_lock); return n; } - -static void work(void* data) -{ - antd_task_item_t it; - antd_worker_t* worker = (antd_worker_t*)data; - while(working()) - { - pthread_mutex_lock(&scheduler.server_lock); - worker->status = 0; - pthread_mutex_unlock(&scheduler.server_lock); - // fetch the next in queue - it = next_task(); - if(!it) continue; - //LOG("worker processing \n"); - pthread_mutex_lock(&scheduler.server_lock); - worker->status = 1; - pthread_mutex_unlock(&scheduler.server_lock); - // execute the task - antd_execute_task(it); - } -} - static antd_callback_t* callback_of( void* (*callback)(void*) ) { antd_callback_t* cb = NULL; @@ -151,33 +129,64 @@ static void execute_callback(antd_task_t* task) free(task); } } - +static void work(void* data) +{ + antd_worker_t* worker = (antd_worker_t*)data; + while(working()) + { + antd_attach_task(worker); + } +} /* Main API methods init the main scheduler */ + +/* +* assign task to a worker +*/ +void antd_attach_task(antd_worker_t* worker) +{ + antd_task_item_t it; + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 0; + pthread_mutex_unlock(&scheduler.server_lock); + // fetch the next in queue + it = next_task(); + if(!it) return; + //LOG("worker processing \n"); + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 1; + pthread_mutex_unlock(&scheduler.server_lock); + // execute the task + antd_execute_task(it); +} + void antd_scheduler_init(int n) { time_t t; srand((unsigned) time(&t)); scheduler.n_workers = n; scheduler.status = 1; - scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); - if(!scheduler.workers) - { - LOG("Cannot allocate memory for worker\n"); - exit(-1); - } for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; // create scheduler.workers - for(int i = 0; i < scheduler.n_workers;i++) + if(n > 0) { - scheduler.workers[i].status = 0; - if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); + if(!scheduler.workers) { - scheduler.workers[i].status = -1; - perror("pthread_create: cannot create worker\n"); + LOG("Cannot allocate memory for worker\n"); + exit(-1); + } + for(int i = 0; i < scheduler.n_workers;i++) + { + scheduler.workers[i].status = 0; + if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + { + scheduler.workers[i].status = -1; + perror("pthread_create: cannot create worker\n"); + } } } LOG("Antd scheduler initialized with %d worker\n", scheduler.n_workers); @@ -197,6 +206,7 @@ void antd_scheduler_destroy() { // free all the chains antd_task_item_t it, curr; + stop(); for(int i=0; i < N_PRIORITY; i++) { it = scheduler.task_queue[i]; @@ -211,7 +221,6 @@ void antd_scheduler_destroy() free(curr); } } - stop(); } /* diff --git a/libs/scheduler.h b/libs/scheduler.h index 9d1ff65..f304776 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -94,4 +94,5 @@ int antd_scheduler_status(); void antd_execute_task(antd_task_item_t); int antd_scheduler_busy(); +void antd_attach_task(antd_worker_t* worker); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index 455df91..d76648c 100644 --- a/relay.c +++ b/relay.c @@ -1,5 +1,6 @@ #include "http_server.h" #include "libs/scheduler.h" +#include /* this node is a relay from the http to https @@ -65,7 +66,7 @@ void* antd_get_host(void * client) void** data = (void**)malloc(2*(sizeof *data)); data[0] = client; data[1] = (void*)host; - LOG("Host is %s\n", host); + LOG("[%s] Request for: %s --> https://%s\n", ((antd_client_t*)client)->ip, host, host); return antd_create_task(antd_redirect,data, NULL); } @@ -84,23 +85,28 @@ int main(int argc, char* argv[]) signal(SIGABRT, SIG_IGN); signal(SIGINT, stop_serve); server_sock = startup(&port); - // 1 worker is - antd_scheduler_init(1); - LOG("httpd running on port %d\n", port); + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500; + // 0 worker + antd_scheduler_init(0); + antd_worker_t worker; + worker.status = 0; + // set server socket to non blocking + fcntl(server_sock, F_SETFL, O_NONBLOCK); /* Change the socket into non-blocking state */ + LOG("relayd running on port %d\n", port); while (antd_scheduler_status()) { + // execute task + antd_attach_task(&worker); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { - perror("Cannot accept client request\n"); continue; } antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); // set timeout to socket - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 500; if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) perror("setsockopt failed\n");